Skip to content
Permalink
Browse files
perf: don't fetch rows when waiting for query to finish (#400)
When there are large result sets, fetching rows while waiting for the
query to finish can cause the API to hang indefinitely. (This may be due
to an interaction between connection timeout and API timeout.)

This reverts commit 86f6a51 (#374).

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes googleapis/python-bigquery-pandas#343
Fixes #394 🦕
  • Loading branch information
tswast committed Nov 24, 2020
1 parent 673a9cb commit 730df17
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 80 deletions.
@@ -1534,7 +1534,7 @@ def _get_query_results(
A new ``_QueryResults`` instance.
"""

extra_params = {}
extra_params = {"maxResults": 0}

if project is None:
project = self.project
@@ -3187,7 +3187,6 @@ def _list_rows_from_query_results(
page_size=None,
retry=DEFAULT_RETRY,
timeout=None,
first_page_response=None,
):
"""List the rows of a completed query.
See
@@ -3248,7 +3247,6 @@ def _list_rows_from_query_results(
table=destination,
extra_params=params,
total_rows=total_rows,
first_page_response=first_page_response,
)
return row_iterator

@@ -1177,10 +1177,6 @@ def result(
if self._query_results.total_rows is None:
return _EmptyRowIterator()

first_page_response = None
if max_results is None and page_size is None and start_index is None:
first_page_response = self._query_results._properties

rows = self._client._list_rows_from_query_results(
self.job_id,
self.location,
@@ -1193,7 +1189,6 @@ def result(
start_index=start_index,
retry=retry,
timeout=timeout,
first_page_response=first_page_response,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
@@ -787,9 +787,7 @@ def test_result(self):
"location": "EU",
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "3",
"rows": [{"f": [{"v": "abc"}]}],
"pageToken": "next-page",
"totalRows": "2",
}
job_resource = self._make_resource(started=True, location="EU")
job_resource_done = self._make_resource(started=True, ended=True, location="EU")
@@ -801,9 +799,9 @@ def test_result(self):
query_page_resource = {
# Explicitly set totalRows to be different from the initial
# response to test update during iteration.
"totalRows": "2",
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "def"}]}],
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(
query_resource, query_resource_done, job_resource_done, query_page_resource
@@ -814,20 +812,19 @@ def test_result(self):
result = job.result()

self.assertIsInstance(result, RowIterator)
self.assertEqual(result.total_rows, 3)
self.assertEqual(result.total_rows, 2)
rows = list(result)
self.assertEqual(len(rows), 2)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(rows[1].col1, "def")
# Test that the total_rows property has changed during iteration, based
# on the response from tabledata.list.
self.assertEqual(result.total_rows, 2)
self.assertEqual(result.total_rows, 1)

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_results_call = mock.call(
method="GET",
path=query_results_path,
query_params={"location": "EU"},
query_params={"maxResults": 0, "location": "EU"},
timeout=None,
)
reload_call = mock.call(
@@ -842,7 +839,6 @@ def test_result(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"pageToken": "next-page",
},
timeout=None,
)
@@ -855,9 +851,7 @@ def test_result_with_done_job_calls_get_query_results(self):
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
"rows": [{"f": [{"v": "abc"}]}],
"pageToken": "next-page",
"totalRows": "1",
}
job_resource = self._make_resource(started=True, ended=True, location="EU")
job_resource["configuration"]["query"]["destinationTable"] = {
@@ -866,9 +860,9 @@ def test_result_with_done_job_calls_get_query_results(self):
"tableId": "dest_table",
}
results_page_resource = {
"totalRows": "2",
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "def"}]}],
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(query_resource_done, results_page_resource)
client = _make_client(self.PROJECT, connection=conn)
@@ -877,15 +871,14 @@ def test_result_with_done_job_calls_get_query_results(self):
result = job.result()

rows = list(result)
self.assertEqual(len(rows), 2)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(rows[1].col1, "def")

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_results_call = mock.call(
method="GET",
path=query_results_path,
query_params={"location": "EU"},
query_params={"maxResults": 0, "location": "EU"},
timeout=None,
)
query_results_page_call = mock.call(
@@ -894,7 +887,6 @@ def test_result_with_done_job_calls_get_query_results(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"pageToken": "next-page",
},
timeout=None,
)
@@ -908,12 +900,6 @@ def test_result_with_max_results(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "5",
# These rows are discarded because max_results is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
query_page_resource = {
"totalRows": "5",
@@ -939,7 +925,6 @@ def test_result_with_max_results(self):
rows = list(result)

self.assertEqual(len(rows), 3)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(len(connection.api_request.call_args_list), 2)
query_page_request = connection.api_request.call_args_list[1]
self.assertEqual(
@@ -994,7 +979,7 @@ def test_result_w_retry(self):
query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"location": "asia-northeast1"},
query_params={"maxResults": 0, "location": "asia-northeast1"},
timeout=None,
)
reload_call = mock.call(
@@ -1094,12 +1079,6 @@ def test_result_w_page_size(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "4",
# These rows are discarded because page_size is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
job_resource = self._make_resource(started=True, ended=True, location="US")
q_config = job_resource["configuration"]["query"]
@@ -1130,7 +1109,6 @@ def test_result_w_page_size(self):
# Assert
actual_rows = list(result)
self.assertEqual(len(actual_rows), 4)
self.assertEqual(actual_rows[0].col1, "row1")

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_page_1_call = mock.call(
@@ -1164,12 +1142,6 @@ def test_result_with_start_index(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "5",
# These rows are discarded because start_index is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
tabledata_resource = {
"totalRows": "5",
@@ -1196,7 +1168,6 @@ def test_result_with_start_index(self):
rows = list(result)

self.assertEqual(len(rows), 4)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(len(connection.api_request.call_args_list), 2)
tabledata_list_request = connection.api_request.call_args_list[1]
self.assertEqual(
@@ -100,7 +100,6 @@ def test_to_dataframe_bqstorage_preserve_order(query):
]
},
"totalRows": "4",
"pageToken": "next-page",
}
connection = _make_connection(get_query_results_resource, job_resource)
client = _make_client(connection=connection)
@@ -135,16 +134,7 @@ def test_to_dataframe_bqstorage_preserve_order(query):


@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`")
@pytest.mark.parametrize(
"method_kwargs",
[
{"create_bqstorage_client": False},
# Since all rows are contained in the first page of results, the BigQuery
# Storage API won't actually be used.
{"create_bqstorage_client": True},
],
)
def test_to_arrow(method_kwargs):
def test_to_arrow():
from google.cloud.bigquery.job import QueryJob as target_class

begun_resource = _make_job_resource(job_type="query")
@@ -172,6 +162,8 @@ def test_to_arrow(method_kwargs):
},
]
},
}
tabledata_resource = {
"rows": [
{
"f": [
@@ -185,15 +177,17 @@ def test_to_arrow(method_kwargs):
{"v": {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}},
]
},
],
]
}
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = _make_connection(begun_resource, query_resource, done_resource)
connection = _make_connection(
begun_resource, query_resource, done_resource, tabledata_resource
)
client = _make_client(connection=connection)
job = target_class.from_api_repr(begun_resource, client)

tbl = job.to_arrow(**method_kwargs)
tbl = job.to_arrow(create_bqstorage_client=False)

assert isinstance(tbl, pyarrow.Table)
assert tbl.num_rows == 2
@@ -375,16 +369,7 @@ def test_to_arrow_w_tqdm_wo_query_plan():


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.parametrize(
"method_kwargs",
[
{"create_bqstorage_client": False},
# Since all rows are contained in the first page of results, the BigQuery
# Storage API won't actually be used.
{"create_bqstorage_client": True},
],
)
def test_to_dataframe(method_kwargs):
def test_to_dataframe():
from google.cloud.bigquery.job import QueryJob as target_class

begun_resource = _make_job_resource(job_type="query")
@@ -398,20 +383,24 @@ def test_to_dataframe(method_kwargs):
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
}
tabledata_resource = {
"rows": [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
],
]
}
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = _make_connection(begun_resource, query_resource, done_resource)
connection = _make_connection(
begun_resource, query_resource, done_resource, tabledata_resource
)
client = _make_client(connection=connection)
job = target_class.from_api_repr(begun_resource, client)

df = job.to_dataframe(**method_kwargs)
df = job.to_dataframe(create_bqstorage_client=False)

assert isinstance(df, pandas.DataFrame)
assert len(df) == 4 # verify the number of rows
@@ -456,7 +445,6 @@ def test_to_dataframe_bqstorage():
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
"pageToken": "next-page",
}
connection = _make_connection(query_resource)
client = _make_client(connection=connection)
@@ -319,7 +319,7 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self):
conn.api_request.assert_called_once_with(
method="GET",
path=path,
query_params={"timeoutMs": 500, "location": self.LOCATION},
query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION},
timeout=42,
)

@@ -336,7 +336,7 @@ def test__get_query_results_miss_w_client_location(self):
conn.api_request.assert_called_once_with(
method="GET",
path="/projects/PROJECT/queries/nothere",
query_params={"location": self.LOCATION},
query_params={"maxResults": 0, "location": self.LOCATION},
timeout=None,
)

0 comments on commit 730df17

Please sign in to comment.