Skip to content

Commit

Permalink
fix(bigquery): address fastpath iteration with iterator.Pager (#2947)
Browse files Browse the repository at this point in the history
Iterating query results from the fastpath with an iterator.Pager
forces a cache miss if the pager size is misaligned with the cached
page size.

Logic to reduce the size of the getQueryResults payload by dictating
the projected fields was yielding an empty pageToken and no rows, which
is equivalent to an empty page response.

This PR corrects the projection logic, and adds an integration test to
exercise the use of misaligned paging.
  • Loading branch information
shollyman committed Oct 1, 2020
1 parent eb34d22 commit a791533
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
38 changes: 38 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -982,6 +982,44 @@ func TestIntegration_SimpleRowResults(t *testing.T) {
}
}

func TestIntegration_QueryIterationPager(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

sql := `
SELECT
num,
num * 2 as double
FROM
UNNEST(GENERATE_ARRAY(1,5)) as num`
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
t.Fatalf("Read: %v", err)
}
pager := iterator.NewPager(it, 2, "")
rowsFetched := 0
for {
var rows [][]Value
nextPageToken, err := pager.NextPage(&rows)
if err != nil {
t.Fatalf("NextPage: %v", err)
}
rowsFetched = rowsFetched + len(rows)

if nextPageToken == "" {
break
}
}

wantRows := 5
if rowsFetched != wantRows {
t.Errorf("Expected %d rows, got %d", wantRows, rowsFetched)
}
}

func TestIntegration_RoutineStoredProcedure(t *testing.T) {
// Verifies we're exhibiting documented behavior, where we're expected
// to return the last resultset in a script as the response from a script
Expand Down
5 changes: 2 additions & 3 deletions bigquery/iterator.go
Expand Up @@ -268,11 +268,11 @@ func fetchJobResultPage(ctx context.Context, src *rowSource, schema Schema, star
// reduce data transfered by leveraging api projections
projectedFields := []googleapi.Field{"rows", "pageToken", "totalRows"}
call := src.j.c.bqs.Jobs.GetQueryResults(src.j.projectID, src.j.jobID).Location(src.j.location)
call = call.Fields(projectedFields...)
if schema == nil {
// only project schema if we weren't supplied one.
call = call.Fields("schema")
projectedFields = append(projectedFields, "schema")
}
call = call.Fields(projectedFields...)
setClientHeader(call.Header())
if pageToken != "" {
call.PageToken(pageToken)
Expand All @@ -294,7 +294,6 @@ func fetchJobResultPage(ctx context.Context, src *rowSource, schema Schema, star
if schema == nil {
schema = bqToSchema(res.Schema)
}

rows, err := convertRows(res.Rows, schema)
if err != nil {
return nil, err
Expand Down

0 comments on commit a791533

Please sign in to comment.