From 0ca3d87c84ed6b4e4d2860163558b9f598e743ab Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 27 Jan 2026 14:23:49 -0500 Subject: [PATCH 01/14] fix: updates timeout/retry code to respect hanging server --- google/cloud/bigquery/_pandas_helpers.py | 10 +++- google/cloud/bigquery/retry.py | 8 ++-- tests/unit/test_download_bqstorage_timeout.py | 48 +++++++++++++++++++ 3 files changed, 61 insertions(+), 5 deletions(-) create mode 100644 tests/unit/test_download_bqstorage_timeout.py diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 5460f7ca7..984a8eb7c 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -33,6 +33,7 @@ from google.cloud.bigquery import _pyarrow_helpers from google.cloud.bigquery import _versions_helpers +from google.cloud.bigquery import retry as bq_retry from google.cloud.bigquery import schema @@ -928,6 +929,7 @@ def _download_table_bqstorage( if "@" in table.table_id: raise ValueError("Reading from a specific snapshot is not currently supported.") + start_time = time.time() requested_streams = determine_requested_streams(preserve_order, max_stream_count) requested_session = bigquery_storage.types.stream.ReadSession( @@ -944,10 +946,16 @@ def _download_table_bqstorage( ArrowSerializationOptions.CompressionCodec(1) ) + retry_policy = None + if timeout is not None: + retry_policy = bq_retry.DEFAULT_RETRY.with_deadline(timeout) + session = bqstorage_client.create_read_session( parent="projects/{}".format(project_id), read_session=requested_session, max_stream_count=requested_streams, + retry=retry_policy, + timeout=timeout, ) _LOGGER.debug( @@ -983,8 +991,6 @@ def _download_table_bqstorage( # Manually manage the pool to control shutdown behavior on timeout. pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams)) wait_on_shutdown = True - start_time = time.time() - try: # Manually submit jobs and wait for download to complete rather # than using pool.map because pool.map continues running in the diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 19012efd6..d7f81a02a 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -64,11 +64,13 @@ def _should_retry(exc): We retry if and only if the 'reason' is 'backendError' or 'rateLimitExceeded'. """ - if not hasattr(exc, "errors") or len(exc.errors) == 0: - # Check for unstructured error returns, e.g. from GFE + try: + reason = exc.errors[0]["reason"] + except (AttributeError, IndexError, TypeError, KeyError): + # Fallback for when errors attribute is missing, empty, or not a dict + # or doesn't contain "reason" (e.g. gRPC exceptions). return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES) - reason = exc.errors[0]["reason"] return reason in _RETRYABLE_REASONS diff --git a/tests/unit/test_download_bqstorage_timeout.py b/tests/unit/test_download_bqstorage_timeout.py new file mode 100644 index 000000000..35f15cfd0 --- /dev/null +++ b/tests/unit/test_download_bqstorage_timeout.py @@ -0,0 +1,48 @@ +import pytest +from unittest import mock +from google.cloud.bigquery import _pandas_helpers + +try: + from google.cloud import bigquery_storage +except ImportError: + bigquery_storage = None + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`" +) +def test_download_table_bqstorage_passes_timeout_to_create_read_session(): + # Mock dependencies + project_id = "test-project" + table = mock.Mock() + table.table_id = "test_table" + table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test" + + bqstorage_client = mock.Mock(spec=bigquery_storage.BigQueryReadClient) + # Mock create_read_session to return a session with no streams so the function returns early + # (Checking start of loop logic vs empty streams return) + session = mock.Mock() + # If streams is empty, _download_table_bqstorage returns early, which is fine for this test + session.streams = [] + bqstorage_client.create_read_session.return_value = session + + # Call the function + timeout = 123.456 + # download_arrow_bqstorage yields frames, so we need to iterate to trigger execution + list( + _pandas_helpers.download_arrow_bqstorage( + project_id, table, bqstorage_client, timeout=timeout + ) + ) + + # Verify timeout and retry were passed + bqstorage_client.create_read_session.assert_called_once() + _, kwargs = bqstorage_client.create_read_session.call_args + assert "timeout" in kwargs + assert kwargs["timeout"] == timeout + + assert "retry" in kwargs + retry_policy = kwargs["retry"] + assert retry_policy is not None + # Check if deadline is set correctly in the retry policy + assert retry_policy._deadline == timeout From f58d712692e541552db81dd5359d3aafc94d6af0 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 27 Jan 2026 15:59:02 -0500 Subject: [PATCH 02/14] updates REST interactions to handle timeout --- google/cloud/bigquery/_pandas_helpers.py | 37 ++++++++++++++++++++---- google/cloud/bigquery/table.py | 6 +++- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 984a8eb7c..9a777a20d 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -741,7 +741,7 @@ def _row_iterator_page_to_arrow(page, column_names, arrow_types): return pyarrow.RecordBatch.from_arrays(arrays, names=column_names) -def download_arrow_row_iterator(pages, bq_schema): +def download_arrow_row_iterator(pages, bq_schema, timeout=None): """Use HTTP JSON RowIterator to construct an iterable of RecordBatches. Args: @@ -752,6 +752,10 @@ def download_arrow_row_iterator(pages, bq_schema): Mapping[str, Any] \ ]]): A decription of the fields in result pages. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Yields: :class:`pyarrow.RecordBatch` The next page of records as a ``pyarrow`` record batch. @@ -760,8 +764,16 @@ def download_arrow_row_iterator(pages, bq_schema): column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema] arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema] - for page in pages: - yield _row_iterator_page_to_arrow(page, column_names, arrow_types) + if timeout is None: + for page in pages: + yield _row_iterator_page_to_arrow(page, column_names, arrow_types) + else: + start_time = time.monotonic() + for page in pages: + if time.monotonic() - start_time > timeout: + raise concurrent.futures.TimeoutError() + + yield _row_iterator_page_to_arrow(page, column_names, arrow_types) def _row_iterator_page_to_dataframe(page, column_names, dtypes): @@ -779,7 +791,7 @@ def _row_iterator_page_to_dataframe(page, column_names, dtypes): return pandas.DataFrame(columns, columns=column_names) -def download_dataframe_row_iterator(pages, bq_schema, dtypes): +def download_dataframe_row_iterator(pages, bq_schema, dtypes, timeout=None): """Use HTTP JSON RowIterator to construct a DataFrame. Args: @@ -793,14 +805,27 @@ def download_dataframe_row_iterator(pages, bq_schema, dtypes): dtypes(Mapping[str, numpy.dtype]): The types of columns in result data to hint construction of the resulting DataFrame. Not all column types have to be specified. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Yields: :class:`pandas.DataFrame` The next page of records as a ``pandas.DataFrame`` record batch. """ bq_schema = schema._to_schema_fields(bq_schema) column_names = [field.name for field in bq_schema] - for page in pages: - yield _row_iterator_page_to_dataframe(page, column_names, dtypes) + + if timeout is None: + for page in pages: + yield _row_iterator_page_to_dataframe(page, column_names, dtypes) + else: + start_time = time.monotonic() + for page in pages: + if time.monotonic() - start_time > timeout: + raise concurrent.futures.TimeoutError() + + yield _row_iterator_page_to_dataframe(page, column_names, dtypes) def _bqstorage_page_to_arrow(page): diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 195461006..88b673a8b 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -2152,7 +2152,10 @@ def to_arrow_iterable( timeout=timeout, ) tabledata_list_download = functools.partial( - _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema + _pandas_helpers.download_arrow_row_iterator, + iter(self.pages), + self.schema, + timeout=timeout, ) return self._to_page_iterable( bqstorage_download, @@ -2366,6 +2369,7 @@ def to_dataframe_iterable( iter(self.pages), self.schema, dtypes, + timeout=timeout, ) return self._to_page_iterable( bqstorage_download, From f62c42bd7f8cb2e9fd6c4d95734f5ffa23e62ae7 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 27 Jan 2026 16:13:14 -0500 Subject: [PATCH 03/14] updates time to monotonic --- google/cloud/bigquery/_pandas_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 9a777a20d..227cdec24 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -1037,7 +1037,7 @@ def _download_table_bqstorage( while not_done: # Check for timeout if timeout is not None: - elapsed = time.time() - start_time + elapsed = time.monotonic() - start_time if elapsed > timeout: wait_on_shutdown = False raise concurrent.futures.TimeoutError( From 291cfe9ecb276f8797353f5b3cf7ddfd1c84db51 Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Wed, 28 Jan 2026 07:28:10 -0500 Subject: [PATCH 04/14] Update retry.py Added debug level logging to respond to comment and cleaned up some comments. --- google/cloud/bigquery/retry.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index d7f81a02a..6fd458df5 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from google.api_core import exceptions from google.api_core import retry import google.api_core.future.polling from google.auth import exceptions as auth_exceptions # type: ignore import requests.exceptions +_LOGGER = logging.getLogger(__name__) _RETRYABLE_REASONS = frozenset( ["rateLimitExceeded", "backendError", "internalError", "badGateway"] @@ -61,14 +64,15 @@ def _should_retry(exc): """Predicate for determining when to retry. - We retry if and only if the 'reason' is 'backendError' - or 'rateLimitExceeded'. + We retry if and only if the 'reason' is in _RETRYABLE_REASONS or is + in _UNSTRUCTURED_RETRYABLE_TYPES. """ try: reason = exc.errors[0]["reason"] except (AttributeError, IndexError, TypeError, KeyError): # Fallback for when errors attribute is missing, empty, or not a dict # or doesn't contain "reason" (e.g. gRPC exceptions). + _LOGGER.debug("Inspecting unstructured error for retry: %r", exc) return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES) return reason in _RETRYABLE_REASONS From 6f0bad03ae49128ba645240c204a81f8cbdd3c86 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 28 Jan 2026 07:58:21 -0500 Subject: [PATCH 05/14] updates conditional statement to idiomatic python --- google/cloud/bigquery/_pandas_helpers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 227cdec24..c4f8f5b32 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -971,9 +971,9 @@ def _download_table_bqstorage( ArrowSerializationOptions.CompressionCodec(1) ) - retry_policy = None - if timeout is not None: - retry_policy = bq_retry.DEFAULT_RETRY.with_deadline(timeout) + retry_policy = ( + bq_retry.DEFAULT_RETRY.with_deadline(timeout) if timeout is not None else None + ) session = bqstorage_client.create_read_session( parent="projects/{}".format(project_id), From 2544afbbbea502faf8787963396b2d7788fea547 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 28 Jan 2026 09:56:51 -0500 Subject: [PATCH 06/14] updates test suite to accommodate new timeout parameter --- google/cloud/bigquery/_pandas_helpers.py | 2 +- google/cloud/bigquery/dbapi/cursor.py | 2 ++ tests/unit/job/test_query_pandas.py | 6 ++++++ tests/unit/test_dbapi_cursor.py | 6 +++++- tests/unit/test_table.py | 4 ++++ 5 files changed, 18 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index c4f8f5b32..7bd9f99b6 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -954,7 +954,7 @@ def _download_table_bqstorage( if "@" in table.table_id: raise ValueError("Reading from a specific snapshot is not currently supported.") - start_time = time.time() + start_time = time.monotonic() requested_streams = determine_requested_streams(preserve_order, max_stream_count) requested_session = bigquery_storage.types.stream.ReadSession( diff --git a/google/cloud/bigquery/dbapi/cursor.py b/google/cloud/bigquery/dbapi/cursor.py index 014a6825e..bffd7678f 100644 --- a/google/cloud/bigquery/dbapi/cursor.py +++ b/google/cloud/bigquery/dbapi/cursor.py @@ -323,6 +323,8 @@ def _bqstorage_fetch(self, bqstorage_client): read_session=requested_session, # a single stream only, as DB API is not well-suited for multithreading max_stream_count=1, + retry=None, + timeout=None, ) if not read_session.streams: diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index 4390309f1..e0e0438f5 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -179,6 +179,8 @@ def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg): parent="projects/test-project", read_session=expected_session, max_stream_count=1, # Use a single stream to preserve row order. + retry=None, + timeout=None, ) @@ -593,6 +595,8 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg): parent="projects/bqstorage-billing-project", read_session=expected_session, max_stream_count=0, # Use default number of streams for best performance. + retry=None, + timeout=None, ) bqstorage_client.read_rows.assert_called_once_with(stream_id) @@ -644,6 +648,8 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression(): parent="projects/bqstorage-billing-project", read_session=expected_session, max_stream_count=0, + retry=None, + timeout=None, ) diff --git a/tests/unit/test_dbapi_cursor.py b/tests/unit/test_dbapi_cursor.py index 6fca4cec0..c5cad8c91 100644 --- a/tests/unit/test_dbapi_cursor.py +++ b/tests/unit/test_dbapi_cursor.py @@ -480,7 +480,11 @@ def fake_ensure_bqstorage_client(bqstorage_client=None, **kwargs): data_format=bigquery_storage.DataFormat.ARROW, ) mock_bqstorage_client.create_read_session.assert_called_once_with( - parent="projects/P", read_session=expected_session, max_stream_count=1 + parent="projects/P", + read_session=expected_session, + max_stream_count=1, + retry=None, + timeout=None, ) # Check the data returned. diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 97a1b4916..9879c3dc2 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -6853,6 +6853,8 @@ def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order): parent=mock.ANY, read_session=mock.ANY, max_stream_count=max_stream_count if not preserve_order else 1, + retry=None, + timeout=None, ) @@ -6888,4 +6890,6 @@ def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order): parent=mock.ANY, read_session=mock.ANY, max_stream_count=max_stream_count if not preserve_order else 1, + retry=None, + timeout=None, ) From 00eb8b547c463e8534aaeafdaaabf0317e816cad Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 28 Jan 2026 10:09:30 -0500 Subject: [PATCH 07/14] revises retry deadline to accommodate the timeout. --- tests/unit/test_client_retry.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_client_retry.py b/tests/unit/test_client_retry.py index 6e49cc464..f0e7ac88f 100644 --- a/tests/unit/test_client_retry.py +++ b/tests/unit/test_client_retry.py @@ -23,6 +23,11 @@ PROJECT = "test-project" +# A deadline > 1.0s is required because the default retry (google.api_core.retry.Retry) +# has an initial delay of 1.0s. If the deadline is <= 1.0s, the first retry attempt +# (scheduled for now + 1.0s) will be rejected immediately as exceeding the deadline. +_RETRY_DEADLINE = 10.0 + def _make_credentials(): import google.auth.credentials @@ -83,7 +88,7 @@ def test_call_api_applying_custom_retry_on_timeout(global_time_lock): "api_request", side_effect=[TimeoutError, "result"], ) - retry = DEFAULT_RETRY.with_deadline(1).with_predicate( + retry = DEFAULT_RETRY.with_deadline(_RETRY_DEADLINE).with_predicate( lambda exc: isinstance(exc, TimeoutError) ) From e5973a022ee23c5f7b8d3eab05e01fc9bbd37641 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 28 Jan 2026 10:31:47 -0500 Subject: [PATCH 08/14] adds two tests to ensure proper coverage --- tests/unit/test__pandas_helpers.py | 58 ++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index a1cbb726b..f0bb48d4b 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2252,3 +2252,61 @@ def fast_download_stream( results = list(result_gen) assert results == ["result_page"] + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +def test_download_arrow_row_iterator_timeout(module_under_test): + bq_schema = [schema.SchemaField("name", "STRING")] + + # Mock page with to_arrow method + mock_page = mock.Mock() + mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_pydict({"name": ["foo"]}) + + def slow_pages(): + # First page yields quickly + yield mock_page + # Sleep to exceed timeout + time.sleep(0.1) + yield mock_page + + # Timeout of 0.05s + timeout = 0.05 + iterator = module_under_test.download_arrow_row_iterator( + slow_pages(), bq_schema, timeout=timeout + ) + + # First item should succeed + next(iterator) + + # Second item should fail with TimeoutError + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +def test_download_dataframe_row_iterator_timeout(module_under_test): + bq_schema = [schema.SchemaField("name", "STRING")] + dtypes = {} + + # Mock page + mock_page = mock.Mock() + # Mock iterator for _row_iterator_page_to_dataframe checking next(iter(page)) + mock_page.__iter__ = lambda self: iter(["row1"]) + mock_page._columns = [["foo"]] + + def slow_pages(): + yield mock_page + time.sleep(0.1) + yield mock_page + + timeout = 0.05 + iterator = module_under_test.download_dataframe_row_iterator( + slow_pages(), bq_schema, dtypes, timeout=timeout + ) + + next(iterator) + + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) From 90ef70fcfa1826830c3733d3acb2c22b616b26a1 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 28 Jan 2026 10:39:38 -0500 Subject: [PATCH 09/14] updates mock --- tests/unit/test__pandas_helpers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index f0bb48d4b..da2e79bd5 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2262,6 +2262,7 @@ def test_download_arrow_row_iterator_timeout(module_under_test): # Mock page with to_arrow method mock_page = mock.Mock() mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_pydict({"name": ["foo"]}) + mock_page.__iter__ = lambda self: iter(["row1"]) def slow_pages(): # First page yields quickly From 8ee106cd261cd4f9dbc9fb0c7fd0b2ad2756c295 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 28 Jan 2026 10:47:27 -0500 Subject: [PATCH 10/14] updates mock with subscriptable columns --- tests/unit/test__pandas_helpers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index da2e79bd5..c1b415336 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2263,6 +2263,7 @@ def test_download_arrow_row_iterator_timeout(module_under_test): mock_page = mock.Mock() mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_pydict({"name": ["foo"]}) mock_page.__iter__ = lambda self: iter(["row1"]) + mock_page._columns = [["foo"]] def slow_pages(): # First page yields quickly From 60ae827228a6a999653404410e6766bd11c0670b Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 28 Jan 2026 11:19:24 -0500 Subject: [PATCH 11/14] Updates tests to account for transitive dependency and older version of pyarrow --- tests/unit/test__pandas_helpers.py | 5 +- tests/unit/test__pandas_helpers_timeout.py | 79 ++++++++++++++++++++++ tests/unit/test_table.py | 4 ++ 3 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test__pandas_helpers_timeout.py diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index c1b415336..e862830eb 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2261,7 +2261,10 @@ def test_download_arrow_row_iterator_timeout(module_under_test): # Mock page with to_arrow method mock_page = mock.Mock() - mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_pydict({"name": ["foo"]}) + mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays( + [pyarrow.array(["foo"])], + names=["name"], + ) mock_page.__iter__ = lambda self: iter(["row1"]) mock_page._columns = [["foo"]] diff --git a/tests/unit/test__pandas_helpers_timeout.py b/tests/unit/test__pandas_helpers_timeout.py new file mode 100644 index 000000000..3feb3cb29 --- /dev/null +++ b/tests/unit/test__pandas_helpers_timeout.py @@ -0,0 +1,79 @@ +import concurrent.futures +import time +from unittest import mock +import pytest + +# Try to import necessary modules, but don't fail if they are missing +# as the tests will skip themselves if dependencies are missing. +try: + import pandas + import pyarrow +except ImportError: + pandas = None + pyarrow = None + +from google.cloud.bigquery import _pandas_helpers +from google.cloud.bigquery import schema + + +@pytest.fixture +def module_under_test(): + return _pandas_helpers + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_download_arrow_row_iterator_timeout(module_under_test): + bq_schema = [schema.SchemaField("name", "STRING")] + + # Mock page with to_arrow method + mock_page = mock.Mock() + mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_pydict({"name": ["foo"]}) + + def slow_pages(): + # First page yields quickly + yield mock_page + # Sleep to exceed timeout + time.sleep(0.1) + yield mock_page + + # Timeout of 0.05s + timeout = 0.05 + iterator = module_under_test.download_arrow_row_iterator( + slow_pages(), bq_schema, timeout=timeout + ) + + # First item should succeed + next(iterator) + + # Second item should fail with TimeoutError + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_download_dataframe_row_iterator_timeout(module_under_test): + bq_schema = [schema.SchemaField("name", "STRING")] + dtypes = {} + + # Mock page + mock_page = mock.Mock() + # Mock iterator for _row_iterator_page_to_dataframe checking next(iter(page)) + mock_page.__iter__ = lambda self: iter(["row1"]) + mock_page._columns = [["foo"]] + + def slow_pages(): + yield mock_page + time.sleep(0.1) + yield mock_page + + timeout = 0.05 + iterator = module_under_test.download_dataframe_row_iterator( + slow_pages(), bq_schema, dtypes, timeout=timeout + ) + + next(iterator) + + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 9879c3dc2..a8397247d 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -4125,6 +4125,10 @@ def test_to_dataframe_tqdm_error(self): # Warn that a progress bar was requested, but creating the tqdm # progress bar failed. for warning in warned: # pragma: NO COVER + # Pyparsing warnings appear to be coming from a transitive + # dependency and are unrelated to the code under test. + if "Pyparsing" in warning.category.__name__: + continue self.assertIn( warning.category, [UserWarning, DeprecationWarning, tqdm.TqdmExperimentalWarning], From f263a8db1e7be0df4cd869e120392ce9f7b1a466 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 28 Jan 2026 11:29:45 -0500 Subject: [PATCH 12/14] Removes accidentally added file --- tests/unit/test__pandas_helpers_timeout.py | 79 ---------------------- 1 file changed, 79 deletions(-) delete mode 100644 tests/unit/test__pandas_helpers_timeout.py diff --git a/tests/unit/test__pandas_helpers_timeout.py b/tests/unit/test__pandas_helpers_timeout.py deleted file mode 100644 index 3feb3cb29..000000000 --- a/tests/unit/test__pandas_helpers_timeout.py +++ /dev/null @@ -1,79 +0,0 @@ -import concurrent.futures -import time -from unittest import mock -import pytest - -# Try to import necessary modules, but don't fail if they are missing -# as the tests will skip themselves if dependencies are missing. -try: - import pandas - import pyarrow -except ImportError: - pandas = None - pyarrow = None - -from google.cloud.bigquery import _pandas_helpers -from google.cloud.bigquery import schema - - -@pytest.fixture -def module_under_test(): - return _pandas_helpers - - -@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -def test_download_arrow_row_iterator_timeout(module_under_test): - bq_schema = [schema.SchemaField("name", "STRING")] - - # Mock page with to_arrow method - mock_page = mock.Mock() - mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_pydict({"name": ["foo"]}) - - def slow_pages(): - # First page yields quickly - yield mock_page - # Sleep to exceed timeout - time.sleep(0.1) - yield mock_page - - # Timeout of 0.05s - timeout = 0.05 - iterator = module_under_test.download_arrow_row_iterator( - slow_pages(), bq_schema, timeout=timeout - ) - - # First item should succeed - next(iterator) - - # Second item should fail with TimeoutError - with pytest.raises(concurrent.futures.TimeoutError): - next(iterator) - - -@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -def test_download_dataframe_row_iterator_timeout(module_under_test): - bq_schema = [schema.SchemaField("name", "STRING")] - dtypes = {} - - # Mock page - mock_page = mock.Mock() - # Mock iterator for _row_iterator_page_to_dataframe checking next(iter(page)) - mock_page.__iter__ = lambda self: iter(["row1"]) - mock_page._columns = [["foo"]] - - def slow_pages(): - yield mock_page - time.sleep(0.1) - yield mock_page - - timeout = 0.05 - iterator = module_under_test.download_dataframe_row_iterator( - slow_pages(), bq_schema, dtypes, timeout=timeout - ) - - next(iterator) - - with pytest.raises(concurrent.futures.TimeoutError): - next(iterator) From 7e9b9f9f48e2c94b0d553b5edb3e99ede485f180 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 28 Jan 2026 12:29:11 -0500 Subject: [PATCH 13/14] Updates tests with success case --- tests/unit/test__pandas_helpers.py | 60 +++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 18 deletions(-) diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index e862830eb..062805f23 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2256,7 +2256,16 @@ def fast_download_stream( @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") -def test_download_arrow_row_iterator_timeout(module_under_test): +@pytest.mark.parametrize( + "sleep_time, timeout, should_timeout", + [ + (0.1, 0.05, True), # Timeout case + (0, 10.0, False), # Success case + ], +) +def test_download_arrow_row_iterator_with_timeout( + module_under_test, sleep_time, timeout, should_timeout +): bq_schema = [schema.SchemaField("name", "STRING")] # Mock page with to_arrow method @@ -2268,30 +2277,41 @@ def test_download_arrow_row_iterator_timeout(module_under_test): mock_page.__iter__ = lambda self: iter(["row1"]) mock_page._columns = [["foo"]] - def slow_pages(): + def pages_gen(): # First page yields quickly yield mock_page - # Sleep to exceed timeout - time.sleep(0.1) + if sleep_time > 0: + time.sleep(sleep_time) yield mock_page - # Timeout of 0.05s - timeout = 0.05 iterator = module_under_test.download_arrow_row_iterator( - slow_pages(), bq_schema, timeout=timeout + pages_gen(), bq_schema, timeout=timeout ) - # First item should succeed + # First item should always succeed next(iterator) - # Second item should fail with TimeoutError - with pytest.raises(concurrent.futures.TimeoutError): - next(iterator) + if should_timeout: + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) + else: + # Should succeed and complete + results = list(iterator) + assert len(results) == 1 # 1 remaining item @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") -def test_download_dataframe_row_iterator_timeout(module_under_test): +@pytest.mark.parametrize( + "sleep_time, timeout, should_timeout", + [ + (0.1, 0.05, True), # Timeout case + (0, 10.0, False), # Success case + ], +) +def test_download_dataframe_row_iterator_with_timeout( + module_under_test, sleep_time, timeout, should_timeout +): bq_schema = [schema.SchemaField("name", "STRING")] dtypes = {} @@ -2301,17 +2321,21 @@ def test_download_dataframe_row_iterator_timeout(module_under_test): mock_page.__iter__ = lambda self: iter(["row1"]) mock_page._columns = [["foo"]] - def slow_pages(): + def pages_gen(): yield mock_page - time.sleep(0.1) + if sleep_time > 0: + time.sleep(sleep_time) yield mock_page - timeout = 0.05 iterator = module_under_test.download_dataframe_row_iterator( - slow_pages(), bq_schema, dtypes, timeout=timeout + pages_gen(), bq_schema, dtypes, timeout=timeout ) next(iterator) - with pytest.raises(concurrent.futures.TimeoutError): - next(iterator) + if should_timeout: + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) + else: + results = list(iterator) + assert len(results) == 1 From b7ea81af6e5bdaac5f9a4d18e3bb4a922bcc1cbb Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Thu, 29 Jan 2026 07:11:35 -0500 Subject: [PATCH 14/14] merges content from temp file to permanent file, deletes temp file --- tests/unit/test__pandas_helpers.py | 44 +++++++++++++++++ tests/unit/test_download_bqstorage_timeout.py | 48 ------------------- 2 files changed, 44 insertions(+), 48 deletions(-) delete mode 100644 tests/unit/test_download_bqstorage_timeout.py diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index 062805f23..6ec62c0b6 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2339,3 +2339,47 @@ def pages_gen(): else: results = list(iterator) assert len(results) == 1 + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`" +) +def test_download_arrow_bqstorage_passes_timeout_to_create_read_session( + module_under_test, +): + # Mock dependencies + project_id = "test-project" + table = mock.Mock() + table.table_id = "test_table" + table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test" + + bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + # Mock create_read_session to return a session with no streams so the function returns early + # (Checking start of loop logic vs empty streams return) + session = mock.Mock() + # If streams is empty, _download_table_bqstorage returns early, which is fine for this test + session.streams = [] + bqstorage_client.create_read_session.return_value = session + + # Call the function + timeout = 123.456 + # download_arrow_bqstorage yields frames, so we need to iterate to trigger execution + list( + module_under_test.download_arrow_bqstorage( + project_id, table, bqstorage_client, timeout=timeout + ) + ) + + # Verify timeout and retry were passed + bqstorage_client.create_read_session.assert_called_once() + _, kwargs = bqstorage_client.create_read_session.call_args + assert "timeout" in kwargs + assert kwargs["timeout"] == timeout + + assert "retry" in kwargs + retry_policy = kwargs["retry"] + assert retry_policy is not None + # Check if deadline is set correctly in the retry policy + assert retry_policy._deadline == timeout diff --git a/tests/unit/test_download_bqstorage_timeout.py b/tests/unit/test_download_bqstorage_timeout.py deleted file mode 100644 index 35f15cfd0..000000000 --- a/tests/unit/test_download_bqstorage_timeout.py +++ /dev/null @@ -1,48 +0,0 @@ -import pytest -from unittest import mock -from google.cloud.bigquery import _pandas_helpers - -try: - from google.cloud import bigquery_storage -except ImportError: - bigquery_storage = None - - -@pytest.mark.skipif( - bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`" -) -def test_download_table_bqstorage_passes_timeout_to_create_read_session(): - # Mock dependencies - project_id = "test-project" - table = mock.Mock() - table.table_id = "test_table" - table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test" - - bqstorage_client = mock.Mock(spec=bigquery_storage.BigQueryReadClient) - # Mock create_read_session to return a session with no streams so the function returns early - # (Checking start of loop logic vs empty streams return) - session = mock.Mock() - # If streams is empty, _download_table_bqstorage returns early, which is fine for this test - session.streams = [] - bqstorage_client.create_read_session.return_value = session - - # Call the function - timeout = 123.456 - # download_arrow_bqstorage yields frames, so we need to iterate to trigger execution - list( - _pandas_helpers.download_arrow_bqstorage( - project_id, table, bqstorage_client, timeout=timeout - ) - ) - - # Verify timeout and retry were passed - bqstorage_client.create_read_session.assert_called_once() - _, kwargs = bqstorage_client.create_read_session.call_args - assert "timeout" in kwargs - assert kwargs["timeout"] == timeout - - assert "retry" in kwargs - retry_policy = kwargs["retry"] - assert retry_policy is not None - # Check if deadline is set correctly in the retry policy - assert retry_policy._deadline == timeout