Skip to content

Commit

Permalink
Making BigQuery table.fetch_data() into an iterator.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Nov 1, 2016
1 parent 092dd09 commit 0e7b01f
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 59 deletions.
102 changes: 66 additions & 36 deletions bigquery/google/cloud/bigquery/table.py
Expand Up @@ -32,7 +32,8 @@
from google.cloud.streaming.transfer import RESUMABLE_UPLOAD
from google.cloud.streaming.transfer import Upload
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery._helpers import _rows_from_json
from google.cloud.bigquery._helpers import _row_from_json
from google.cloud.iterator import Iterator


_TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'"
Expand Down Expand Up @@ -653,47 +654,36 @@ def fetch_data(self, max_results=None, page_token=None, client=None):
up-to-date with the schema as defined on the back-end: if the
two schemas are not identical, the values returned may be
incomplete. To ensure that the local copy of the schema is
up-to-date, call the table's ``reload`` method.
up-to-date, call :meth:`reload`.
:type max_results: int
:param max_results: (Optional) maximum number of rows to return.
:param max_results: (Optional) Maximum number of rows to return.
:type page_token: str
:param page_token:
(Optional) token representing a cursor into the table's rows.
:type client: :class:`~google.cloud.bigquery.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.
:rtype: tuple
:returns: ``(row_data, total_rows, page_token)``, where ``row_data``
is a list of tuples, one per result row, containing only
the values; ``total_rows`` is a count of the total number
of rows in the table; and ``page_token`` is an opaque
string which can be used to fetch the next batch of rows
(``None`` if no further batches can be fetched).
:param page_token: (Optional) Token representing a cursor into the
table's rows.
:type client: :class:`~google.cloud.bigquery.client.Client`
:param client: (Optional) The client to use. If not passed, falls
back to the ``client`` stored on the current dataset.
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of row data :class:`tuple`s. Each page in the
iterator will have the ``total_rows`` attribute set,
which counts the total number of rows **in the table**
(this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).
"""
client = self._require_client(client)
params = {}

if max_results is not None:
params['maxResults'] = max_results

if page_token is not None:
params['pageToken'] = page_token

response = client.connection.api_request(method='GET',
path='%s/data' % self.path,
query_params=params)
total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
page_token = response.get('pageToken')
rows_data = _rows_from_json(response.get('rows', ()), self._schema)

return rows_data, total_rows, page_token
path = '%s/data' % (self.path,)
iterator = Iterator(client=client, path=path,
item_to_value=_item_to_row, items_key='rows',
page_token=page_token, max_results=max_results,
page_start=_rows_page_start)
iterator.schema = self._schema
# Over-ride the key used to retrieve the next page token.
iterator._NEXT_TOKEN = 'pageToken'
return iterator

def insert_data(self,
rows,
Expand Down Expand Up @@ -1083,6 +1073,46 @@ def _build_schema_resource(fields):
return infos


def _item_to_row(iterator, resource):
"""Convert a JSON row to the native object.
.. note::
This assumes that the ``schema`` attribute has been
added to the iterator after being created, which
should be done by the caller.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type resource: dict
:param resource: An item to be converted to a row.
:rtype: tuple
:returns: The next row in the page.
"""
return _row_from_json(resource, iterator.schema)


# pylint: disable=unused-argument
def _rows_page_start(iterator, page, response):
"""Grab total rows after a :class:`~google.cloud.iterator.Page` started.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type page: :class:`~google.cloud.iterator.Page`
:param page: The page that was just created.
:type response: dict
:param response: The JSON API response for a page of rows in a table.
"""
total_rows = response.get('totalRows')
if total_rows is not None:
page.total_rows = int(total_rows)
# pylint: enable=unused-argument


class _UploadConfig(object):
"""Faux message FBO apitools' 'configure_request'."""
accept = ['*/*']
Expand Down
27 changes: 21 additions & 6 deletions bigquery/unit_tests/test_table.py
Expand Up @@ -1068,7 +1068,11 @@ def _bigquery_timestamp_float_repr(ts_float):
table = self._makeOne(self.TABLE_NAME, dataset=dataset,
schema=[full_name, age, joined])

rows, total_rows, page_token = table.fetch_data()
iterator = table.fetch_data()
iterator.update_page()
rows = list(iterator.page)
total_rows = iterator.page.total_rows
page_token = iterator.next_page_token

self.assertEqual(len(rows), 4)
self.assertEqual(rows[0], ('Phred Phlyntstone', 32, WHEN))
Expand Down Expand Up @@ -1129,9 +1133,12 @@ def test_fetch_data_w_alternate_client(self):
table = self._makeOne(self.TABLE_NAME, dataset=dataset,
schema=[full_name, age, voter, score])

rows, total_rows, page_token = table.fetch_data(client=client2,
max_results=MAX,
page_token=TOKEN)
iterator = table.fetch_data(
client=client2, max_results=MAX, page_token=TOKEN)
iterator.update_page()
rows = list(iterator.page)
total_rows = getattr(iterator.page, 'total_rows', None)
page_token = iterator.next_page_token

self.assertEqual(len(rows), 4)
self.assertEqual(rows[0], ('Phred Phlyntstone', 32, True, 3.1415926))
Expand Down Expand Up @@ -1177,7 +1184,11 @@ def test_fetch_data_w_repeated_fields(self):
table = self._makeOne(self.TABLE_NAME, dataset=dataset,
schema=[full_name, struct])

rows, total_rows, page_token = table.fetch_data()
iterator = table.fetch_data()
iterator.update_page()
rows = list(iterator.page)
total_rows = iterator.page.total_rows
page_token = iterator.next_page_token

self.assertEqual(len(rows), 1)
self.assertEqual(rows[0][0], ['red', 'green'])
Expand Down Expand Up @@ -1227,7 +1238,11 @@ def test_fetch_data_w_record_schema(self):
table = self._makeOne(self.TABLE_NAME, dataset=dataset,
schema=[full_name, phone])

rows, total_rows, page_token = table.fetch_data()
iterator = table.fetch_data()
iterator.update_page()
rows = list(iterator.page)
total_rows = iterator.page.total_rows
page_token = iterator.next_page_token

self.assertEqual(len(rows), 3)
self.assertEqual(rows[0][0], 'Phred Phlyntstone')
Expand Down
1 change: 1 addition & 0 deletions core/google/cloud/iterator.py
Expand Up @@ -297,6 +297,7 @@ class HTTPIterator(Iterator):

_PAGE_TOKEN = 'pageToken'
_MAX_RESULTS = 'maxResults'
_NEXT_TOKEN = 'nextPageToken'
_RESERVED_PARAMS = frozenset([_PAGE_TOKEN, _MAX_RESULTS])
_HTTP_METHOD = 'GET'

Expand Down
7 changes: 4 additions & 3 deletions docs/bigquery-usage.rst
Expand Up @@ -209,16 +209,17 @@ Run a query which can be expected to complete within bounded time:
:start-after: [START client_run_sync_query]
:end-before: [END client_run_sync_query]

If the rows returned by the query do not fit into the inital response,
then we need to fetch the remaining rows via ``fetch_data``:
If the rows returned by the query do not fit into the initial response,
then we need to fetch the remaining rows via
:meth:`~google.cloud.bigquery.query.QueryResults.fetch_data`:

.. literalinclude:: bigquery_snippets.py
:start-after: [START client_run_sync_query_paged]
:end-before: [END client_run_sync_query_paged]

If the query takes longer than the timeout allowed, ``query.complete``
will be ``False``. In that case, we need to poll the associated job until
it is done, and then fetch the reuslts:
it is done, and then fetch the results:

.. literalinclude:: bigquery_snippets.py
:start-after: [START client_run_sync_query_timeout]
Expand Down
19 changes: 10 additions & 9 deletions docs/bigquery_snippets.py
Expand Up @@ -341,7 +341,9 @@ def _warm_up_inserted_table_data(table):

while len(rows) == 0 and counter > 0:
counter -= 1
rows, _, _ = table.fetch_data()
iterator = table.fetch_data()
iterator.update_page()
rows = list(iterator.page)
if len(rows) == 0:
time.sleep(5)

Expand Down Expand Up @@ -376,13 +378,8 @@ def do_something(row):
found_rows.append(row)

# [START table_fetch_data]
rows, _, token = table.fetch_data()
while True:
for row in rows:
do_something(row)
if token is None:
break
rows, _, token = table.fetch_data(page_token=token)
for row in table.fetch_data():
do_something(row)
# [END table_fetch_data]

assert len(found_rows) == len(ROWS_TO_INSERT)
Expand Down Expand Up @@ -424,7 +421,11 @@ def table_upload_from_file(client, to_delete):

_warm_up_inserted_table_data(table)

rows, total, token = table.fetch_data()
iterator = table.fetch_data()
iterator.update_page()
rows = list(iterator.page)
total = iterator.page.total_rows
token = iterator.next_page_token

assert len(rows) == total == 2
assert token is None
Expand Down
2 changes: 1 addition & 1 deletion scripts/run_pylint.py
Expand Up @@ -72,7 +72,7 @@
}
TEST_RC_REPLACEMENTS = {
'FORMAT': {
'max-module-lines': 1960,
'max-module-lines': 2000,
},
}

Expand Down
14 changes: 10 additions & 4 deletions system_tests/bigquery.py
Expand Up @@ -263,6 +263,12 @@ def test_update_table(self):
self.assertEqual(found.field_type, expected.field_type)
self.assertEqual(found.mode, expected.mode)

@staticmethod
def _fetch_single_page(table):
iterator = table.fetch_data()
iterator.update_page()
return list(iterator.page)

def test_insert_data_then_dump_table(self):
import datetime
from google.cloud._helpers import UTC
Expand Down Expand Up @@ -303,11 +309,11 @@ def test_insert_data_then_dump_table(self):
def _has_rows(result):
return len(result[0]) > 0

# Allow for 90 seconds of "warm up" before rows visible. See:
# Allow for "warm up" before rows visible. See:
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability
# 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds
retry = RetryResult(_has_rows, max_tries=8)
rows, _, _ = retry(table.fetch_data)()
rows = retry(self._fetch_single_page)(table)

by_age = operator.itemgetter(1)
self.assertEqual(sorted(rows, key=by_age),
Expand Down Expand Up @@ -361,7 +367,7 @@ def _job_done(instance):

self.assertEqual(job.output_rows, len(ROWS))

rows, _, _ = table.fetch_data()
rows = self._fetch_single_page(table)
by_age = operator.itemgetter(1)
self.assertEqual(sorted(rows, key=by_age),
sorted(ROWS, key=by_age))
Expand Down Expand Up @@ -431,7 +437,7 @@ def _job_done(instance):
retry = RetryInstanceState(_job_done, max_tries=8)
retry(job.reload)()

rows, _, _ = table.fetch_data()
rows = self._fetch_single_page(table)
by_age = operator.itemgetter(1)
self.assertEqual(sorted(rows, key=by_age),
sorted(ROWS, key=by_age))
Expand Down

0 comments on commit 0e7b01f

Please sign in to comment.