From 5a62b9a73f7c9f68a409b626ffd23352ec363380 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Thu, 13 Nov 2025 23:00:30 +0000 Subject: [PATCH 01/12] feat: add pandas_gbq.sample --- pandas_gbq/__init__.py | 3 + pandas_gbq/constants.py | 7 ++ pandas_gbq/core/read.py | 179 ++++++++++++++++++++++++++++++++++ pandas_gbq/core/sample.py | 118 +++++++++++++++++++++++ pandas_gbq/exceptions.py | 21 ++++ pandas_gbq/gbq_connector.py | 185 ++++-------------------------------- setup.py | 1 + 7 files changed, 345 insertions(+), 169 deletions(-) create mode 100644 pandas_gbq/core/read.py create mode 100644 pandas_gbq/core/sample.py diff --git a/pandas_gbq/__init__.py b/pandas_gbq/__init__.py index a842c81f..b0a060bd 100644 --- a/pandas_gbq/__init__.py +++ b/pandas_gbq/__init__.py @@ -2,6 +2,7 @@ # Use of this source code is governed by a BSD-style # license that can be found in the LICENSE file. +import logging import warnings from pandas_gbq import version as pandas_gbq_version @@ -21,6 +22,8 @@ FutureWarning, ) +logger = logging.Logger(__name__) + __version__ = pandas_gbq_version.__version__ __all__ = [ diff --git a/pandas_gbq/constants.py b/pandas_gbq/constants.py index 37266b3c..498b03b5 100644 --- a/pandas_gbq/constants.py +++ b/pandas_gbq/constants.py @@ -2,6 +2,8 @@ # Use of this source code is governed by a BSD-style # license that can be found in the LICENSE file. +import google.api_core.exceptions + # BigQuery uses powers of 2 in calculating data sizes. See: # https://cloud.google.com/bigquery/pricing#data The documentation uses # GiB rather than GB to disambiguate from the alternative base 10 units. @@ -10,3 +12,8 @@ BYTES_IN_MIB = 1024 * BYTES_IN_KIB BYTES_IN_GIB = 1024 * BYTES_IN_MIB BYTES_TO_RECOMMEND_BIGFRAMES = BYTES_IN_GIB + +HTTP_ERRORS = ( + google.api_core.exceptions.ClientError, + google.api_core.exceptions.GoogleAPIError, +) diff --git a/pandas_gbq/core/read.py b/pandas_gbq/core/read.py new file mode 100644 index 00000000..bc089002 --- /dev/null +++ b/pandas_gbq/core/read.py @@ -0,0 +1,179 @@ +# Copyright (c) 2025 pandas-gbq Authors All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +from __future__ import annotations + +import typing +from typing import Any, Dict, Optional, Sequence +import warnings + +import google.cloud.bigquery +import google.cloud.bigquery.table +import numpy as np + +import pandas_gbq +import pandas_gbq.constants +import pandas_gbq.exceptions +import pandas_gbq.features +import pandas_gbq.timestamp + +# Only import at module-level at type checking time to avoid circular +# dependencies in the pandas package, which has an optional dependency on +# pandas-gbq. +if typing.TYPE_CHECKING: # pragma: NO COVER + import pandas + + +def _bqschema_to_nullsafe_dtypes(schema_fields): + """Specify explicit dtypes based on BigQuery schema. + + This function only specifies a dtype when the dtype allows nulls. + Otherwise, use pandas's default dtype choice. + + See: http://pandas.pydata.org/pandas-docs/dev/missing_data.html + #missing-data-casting-rules-and-indexing + """ + import db_dtypes + + # If you update this mapping, also update the table at + # `docs/reading.rst`. + dtype_map = { + "FLOAT": np.dtype(float), + "INTEGER": "Int64", + "TIME": db_dtypes.TimeDtype(), + # Note: Other types such as 'datetime64[ns]' and db_types.DateDtype() + # are not included because the pandas range does not align with the + # BigQuery range. We need to attempt a conversion to those types and + # fall back to 'object' when there are out-of-range values. + } + + # Amend dtype_map with newer extension types if pandas version allows. + if pandas_gbq.features.FEATURES.pandas_has_boolean_dtype: + dtype_map["BOOLEAN"] = "boolean" + + dtypes = {} + for field in schema_fields: + name = str(field["name"]) + # Array BigQuery type is represented as an object column containing + # list objects. + if field["mode"].upper() == "REPEATED": + dtypes[name] = "object" + continue + + dtype = dtype_map.get(field["type"].upper()) + if dtype: + dtypes[name] = dtype + + return dtypes + + +def _finalize_dtypes( + df: pandas.DataFrame, schema_fields: Sequence[Dict[str, Any]] +) -> pandas.DataFrame: + """ + Attempt to change the dtypes of those columns that don't map exactly. + + For example db_dtypes.DateDtype() and datetime64[ns] cannot represent + 0001-01-01, but they can represent dates within a couple hundred years of + 1970. See: + https://github.com/googleapis/python-bigquery-pandas/issues/365 + """ + import db_dtypes + import pandas.api.types + + # If you update this mapping, also update the table at + # `docs/reading.rst`. + dtype_map = { + "DATE": db_dtypes.DateDtype(), + "DATETIME": "datetime64[ns]", + "TIMESTAMP": "datetime64[ns]", + } + + for field in schema_fields: + # This method doesn't modify ARRAY/REPEATED columns. + if field["mode"].upper() == "REPEATED": + continue + + name = str(field["name"]) + dtype = dtype_map.get(field["type"].upper()) + + # Avoid deprecated conversion to timezone-naive dtype by only casting + # object dtypes. + if dtype and pandas.api.types.is_object_dtype(df[name]): + df[name] = df[name].astype(dtype, errors="ignore") + + # Ensure any TIMESTAMP columns are tz-aware. + df = pandas_gbq.timestamp.localize_df(df, schema_fields) + + return df + + +def download_results( + results: google.cloud.bigquery.table.RowIterator, + *, + bqclient: google.cloud.bigquery.Client, + progress_bar_type: Optional[str], + warn_on_large_results: bool = True, + max_results: Optional[int], + user_dtypes: Optional[dict], + use_bqstorage_api: bool, +) -> Optional[pandas.DataFrame]: + # No results are desired, so don't bother downloading anything. + if max_results == 0: + return None + + if user_dtypes is None: + user_dtypes = {} + + create_bqstorage_client = use_bqstorage_api + if max_results is not None: + create_bqstorage_client = False + + # If we're downloading a large table, BigQuery DataFrames might be a + # better fit. Not all code paths will populate rows_iter._table, but + # if it's not populated that means we are working with a small result + # set. + if ( + warn_on_large_results + and (table_ref := getattr(results, "_table", None)) is not None + ): + table = bqclient.get_table(table_ref) + if ( + isinstance((num_bytes := table.num_bytes), int) + and num_bytes > pandas_gbq.constants.BYTES_TO_RECOMMEND_BIGFRAMES + ): + num_gib = num_bytes / pandas_gbq.constants.BYTES_IN_GIB + warnings.warn( + f"Recommendation: Your results are {num_gib:.1f} GiB. " + "Consider using BigQuery DataFrames (https://bit.ly/bigframes-intro)" + "to process large results with pandas compatible APIs with transparent SQL " + "pushdown to BigQuery engine. This provides an opportunity to save on costs " + "and improve performance. " + "Please reach out to bigframes-feedback@google.com with any " + "questions or concerns. To disable this message, run " + "warnings.simplefilter('ignore', category=pandas_gbq.exceptions.LargeResultsWarning)", + category=pandas_gbq.exceptions.LargeResultsWarning, + # user's code + # -> read_gbq + # -> run_query + # -> download_results + stacklevel=4, + ) + + try: + schema_fields = [field.to_api_repr() for field in results.schema] + conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) + conversion_dtypes.update(user_dtypes) + df = results.to_dataframe( + dtypes=conversion_dtypes, + progress_bar_type=progress_bar_type, + create_bqstorage_client=create_bqstorage_client, + ) + except pandas_gbq.constants.HTTP_ERRORS as ex: + raise pandas_gbq.exceptions.translate_exception(ex) from ex + + df = _finalize_dtypes(df, schema_fields) + + pandas_gbq.logger.debug("Got {} rows.\n".format(results.total_rows)) + return df diff --git a/pandas_gbq/core/sample.py b/pandas_gbq/core/sample.py new file mode 100644 index 00000000..b56b8798 --- /dev/null +++ b/pandas_gbq/core/sample.py @@ -0,0 +1,118 @@ +# Copyright (c) 2025 pandas-gbq Authors All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +from __future__ import annotations + +import typing +from typing import Optional, Sequence, cast + +import google.cloud.bigquery +import google.oauth2.credentials +import psutil + +import pandas_gbq.constants +import pandas_gbq.core.read +import pandas_gbq.gbq_connector + +# Only import at module-level at type checking time to avoid circular +# dependencies in the pandas package, which has an optional dependency on +# pandas-gbq. +if typing.TYPE_CHECKING: # pragma: NO COVER + import pandas + + +_TABLESAMPLE_ELIGIBLE_TYPES = ("TABLE", "EXTERNAL") + + +def _calculate_target_bytes(target_mb: Optional[int]) -> int: + if target_mb is not None: + return target_mb * pandas_gbq.constants.BYTES_IN_MIB + + mem = psutil.virtual_memory() + return max(100 * pandas_gbq.constants.BYTES_IN_MIB, mem.available // 4) + + +def _estimate_limit(target_bytes : int, num_rows: Optional[int]): + pass + + +def _estimate_row_bytes(fields: Sequence[google.cloud.bigquery.SchemaField]) -> int: + pass + + +def _sample_with_tablesample( + table: google.cloud.bigquery.Table, + bqclient: google.cloud.bigquery.Client, + proportion: float, + row_count: int, + progress_bar_type: Optional[str] = None, + use_bqstorage_api: bool = True, +) -> Optional[pandas.DataFrame]: + query = f""" + SELECT * + FROM `{table.project}.{table.dataset_id}.{table.table_id}` + TABLESAMPLE SYSTEM ({float(proportion) * 100.0} PERCENT) + ORDER BY RAND() DESC + LIMIT {int(row_count)}; + """ + rows = bqclient.query_and_wait(query) + return pandas_gbq.core.read.download_results( + rows, + bqclient=bqclient, + progress_bar_type=progress_bar_type, + warn_on_large_results=False, + max_results=None, + user_dtypes=None, + use_bqstorage_api=use_bqstorage_api, + ) + + +def _sample_with_limit( + bqclient: google.cloud.bigquery.Client, limit: int +) -> google.cloud.bigquery.TableReference: + pass + + +def sample( + table_id: str, + *, + target_mb: Optional[int] = None, + credentials: Optional[google.oauth2.credentials.Credentials] = None, + billing_project_id: Optional[str] = None, + progress_bar_type: Optional[str] = None, + use_bqstorage_api: bool = True, +) -> Optional[pandas.DataFrame]: + target_bytes = _calculate_target_bytes(target_mb) + connector = pandas_gbq.gbq_connector.GbqConnector( + project_id=billing_project_id, credentials=credentials + ) + credentials = cast(google.oauth2.credentials.Credentials, connector.credentials) + bqclient = connector.get_client() + table = bqclient.get_table(table_id) + num_rows = table.num_rows + + # Table is small enough to download the whole thing. + if (num_bytes := table.num_bytes) is not None and num_bytes <= target_bytes: + rows_iter = bqclient.list_rows(table) + return pandas_gbq.core.read.download_results( + rows_iter, + bqclient=bqclient, + progress_bar_type=progress_bar_type, + warn_on_large_results=False, + max_results=None, + user_dtypes=None, + use_bqstorage_api=use_bqstorage_api, + ) + + # Table is eligible for TABLESAMPLE. + if num_bytes is not None and table.table_type in _TABLESAMPLE_ELIGIBLE_TYPES: + proportion = target_bytes / num_bytes + row_count = max(1, int(num_rows * proportion)) if num_rows is not None else + return _sample_with_tablesample( + table, bqclient=bqclient, proportion=proportion + ) + table.num_rows + + # TODO: check table type to see if tablesample would be compatible. + table.table_type diff --git a/pandas_gbq/exceptions.py b/pandas_gbq/exceptions.py index 1acec712..ab5f05b4 100644 --- a/pandas_gbq/exceptions.py +++ b/pandas_gbq/exceptions.py @@ -110,3 +110,24 @@ class QueryTimeout(ValueError): Raised when the query request exceeds the timeoutMs value specified in the BigQuery configuration. """ + + +def translate_exception(ex): + # See `BigQuery Troubleshooting Errors + # `__ + + message = ( + ex.message.casefold() + if hasattr(ex, "message") and ex.message is not None + else "" + ) + if "cancelled" in message: + return QueryTimeout("Reason: {0}".format(ex)) + elif "schema does not match" in message: + error_message = ex.errors[0]["message"] + return InvalidSchema(f"Reason: {error_message}") + elif "already exists: table" in message: + error_message = ex.errors[0]["message"] + return TableCreationError(f"Reason: {error_message}") + else: + return GenericGBQException("Reason: {0}".format(ex)) diff --git a/pandas_gbq/gbq_connector.py b/pandas_gbq/gbq_connector.py index 2b3b716e..81f726f6 100644 --- a/pandas_gbq/gbq_connector.py +++ b/pandas_gbq/gbq_connector.py @@ -2,15 +2,14 @@ # Use of this source code is governed by a BSD-style # license that can be found in the LICENSE file. +from __future__ import annotations import logging import time import typing -from typing import Any, Dict, Optional, Sequence, Union +from typing import Any, Dict, Optional, Union import warnings -import numpy as np - # Only import at module-level at type checking time to avoid circular # dependencies in the pandas package, which has an optional dependency on # pandas-gbq. @@ -19,17 +18,12 @@ import pandas_gbq.constants from pandas_gbq.contexts import context +import pandas_gbq.core.read import pandas_gbq.environment as environment import pandas_gbq.exceptions -from pandas_gbq.exceptions import ( - GenericGBQException, - InvalidSchema, - QueryTimeout, - TableCreationError, -) +from pandas_gbq.exceptions import QueryTimeout from pandas_gbq.features import FEATURES import pandas_gbq.query -import pandas_gbq.timestamp try: import tqdm # noqa @@ -57,11 +51,9 @@ def __init__( rfc9110_delimiter=False, bigquery_client=None, ): - from google.api_core.exceptions import ClientError, GoogleAPIError - from pandas_gbq import auth - self.http_error = (ClientError, GoogleAPIError) + self.http_error = pandas_gbq.constants.HTTP_ERRORS self.project_id = project_id self.location = location self.reauth = reauth @@ -156,22 +148,7 @@ def get_client(self): def process_http_error(ex): # See `BigQuery Troubleshooting Errors # `__ - - message = ( - ex.message.casefold() - if hasattr(ex, "message") and ex.message is not None - else "" - ) - if "cancelled" in message: - raise QueryTimeout("Reason: {0}".format(ex)) - elif "schema does not match" in message: - error_message = ex.errors[0]["message"] - raise InvalidSchema(f"Reason: {error_message}") - elif "already exists: table" in message: - error_message = ex.errors[0]["message"] - raise TableCreationError(f"Reason: {error_message}") - else: - raise GenericGBQException("Reason: {0}".format(ex)) from ex + raise pandas_gbq.exceptions.translate_exception(ex) from ex def download_table( self, @@ -179,7 +156,7 @@ def download_table( max_results: Optional[int] = None, progress_bar_type: Optional[str] = None, dtypes: Optional[Dict[str, Union[str, Any]]] = None, - ) -> "pandas.DataFrame": + ) -> Optional[pandas.DataFrame]: from google.cloud import bigquery self._start_timer() @@ -274,61 +251,15 @@ def _download_results( progress_bar_type=None, user_dtypes=None, ): - # No results are desired, so don't bother downloading anything. - if max_results == 0: - return None - - if user_dtypes is None: - user_dtypes = {} - - create_bqstorage_client = self.use_bqstorage_api - if max_results is not None: - create_bqstorage_client = False - - # If we're downloading a large table, BigQuery DataFrames might be a - # better fit. Not all code paths will populate rows_iter._table, but - # if it's not populated that means we are working with a small result - # set. - if (table_ref := getattr(rows_iter, "_table", None)) is not None: - table = self.client.get_table(table_ref) - if ( - isinstance((num_bytes := table.num_bytes), int) - and num_bytes > pandas_gbq.constants.BYTES_TO_RECOMMEND_BIGFRAMES - ): - num_gib = num_bytes / pandas_gbq.constants.BYTES_IN_GIB - warnings.warn( - f"Recommendation: Your results are {num_gib:.1f} GiB. " - "Consider using BigQuery DataFrames (https://bit.ly/bigframes-intro)" - "to process large results with pandas compatible APIs with transparent SQL " - "pushdown to BigQuery engine. This provides an opportunity to save on costs " - "and improve performance. " - "Please reach out to bigframes-feedback@google.com with any " - "questions or concerns. To disable this message, run " - "warnings.simplefilter('ignore', category=pandas_gbq.exceptions.LargeResultsWarning)", - category=pandas_gbq.exceptions.LargeResultsWarning, - # user's code - # -> read_gbq - # -> run_query - # -> download_results - stacklevel=4, - ) - - try: - schema_fields = [field.to_api_repr() for field in rows_iter.schema] - conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) - conversion_dtypes.update(user_dtypes) - df = rows_iter.to_dataframe( - dtypes=conversion_dtypes, - progress_bar_type=progress_bar_type, - create_bqstorage_client=create_bqstorage_client, - ) - except self.http_error as ex: - self.process_http_error(ex) - - df = _finalize_dtypes(df, schema_fields) - - logger.debug("Got {} rows.\n".format(rows_iter.total_rows)) - return df + return pandas_gbq.core.read.download_results( + rows_iter, + bqclient=self.get_client(), + progress_bar_type=progress_bar_type, + warn_on_large_results=True, + max_results=max_results, + user_dtypes=user_dtypes, + use_bqstorage_api=self.use_bqstorage_api, + ) def load_data( self, @@ -369,90 +300,6 @@ def load_data( self.process_http_error(ex) -def _bqschema_to_nullsafe_dtypes(schema_fields): - """Specify explicit dtypes based on BigQuery schema. - - This function only specifies a dtype when the dtype allows nulls. - Otherwise, use pandas's default dtype choice. - - See: http://pandas.pydata.org/pandas-docs/dev/missing_data.html - #missing-data-casting-rules-and-indexing - """ - import db_dtypes - - # If you update this mapping, also update the table at - # `docs/reading.rst`. - dtype_map = { - "FLOAT": np.dtype(float), - "INTEGER": "Int64", - "TIME": db_dtypes.TimeDtype(), - # Note: Other types such as 'datetime64[ns]' and db_types.DateDtype() - # are not included because the pandas range does not align with the - # BigQuery range. We need to attempt a conversion to those types and - # fall back to 'object' when there are out-of-range values. - } - - # Amend dtype_map with newer extension types if pandas version allows. - if FEATURES.pandas_has_boolean_dtype: - dtype_map["BOOLEAN"] = "boolean" - - dtypes = {} - for field in schema_fields: - name = str(field["name"]) - # Array BigQuery type is represented as an object column containing - # list objects. - if field["mode"].upper() == "REPEATED": - dtypes[name] = "object" - continue - - dtype = dtype_map.get(field["type"].upper()) - if dtype: - dtypes[name] = dtype - - return dtypes - - -def _finalize_dtypes( - df: "pandas.DataFrame", schema_fields: Sequence[Dict[str, Any]] -) -> "pandas.DataFrame": - """ - Attempt to change the dtypes of those columns that don't map exactly. - - For example db_dtypes.DateDtype() and datetime64[ns] cannot represent - 0001-01-01, but they can represent dates within a couple hundred years of - 1970. See: - https://github.com/googleapis/python-bigquery-pandas/issues/365 - """ - import db_dtypes - import pandas.api.types - - # If you update this mapping, also update the table at - # `docs/reading.rst`. - dtype_map = { - "DATE": db_dtypes.DateDtype(), - "DATETIME": "datetime64[ns]", - "TIMESTAMP": "datetime64[ns]", - } - - for field in schema_fields: - # This method doesn't modify ARRAY/REPEATED columns. - if field["mode"].upper() == "REPEATED": - continue - - name = str(field["name"]) - dtype = dtype_map.get(field["type"].upper()) - - # Avoid deprecated conversion to timezone-naive dtype by only casting - # object dtypes. - if dtype and pandas.api.types.is_object_dtype(df[name]): - df[name] = df[name].astype(dtype, errors="ignore") - - # Ensure any TIMESTAMP columns are tz-aware. - df = pandas_gbq.timestamp.localize_df(df, schema_fields) - - return df - - def _get_client(user_agent, rfc9110_delimiter, project_id, credentials): import google.api_core.client_info diff --git a/setup.py b/setup.py index 893d801b..c52e92f1 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ "pandas >=1.1.4", "pyarrow >=4.0.0", "pydata-google-auth >=1.5.0", + "psutil", # TODO: pick a min version and add to constraints. # Note: google-api-core and google-auth are also included via transitive # dependency on google-cloud-bigquery, but this library also uses them # directly. From 27f0a2ca2a01c5fdcaa7b3a9541af9a70d3f1b29 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Fri, 14 Nov 2025 17:39:39 +0000 Subject: [PATCH 02/12] estimate row size --- pandas_gbq/__init__.py | 2 + pandas_gbq/core/sample.py | 132 +++++++++++++++++++++++++++++---- pandas_gbq/gbq.py | 3 - tests/unit/test_core_sample.py | 86 +++++++++++++++++++++ 4 files changed, 205 insertions(+), 18 deletions(-) create mode 100644 tests/unit/test_core_sample.py diff --git a/pandas_gbq/__init__.py b/pandas_gbq/__init__.py index b0a060bd..b18f8ab4 100644 --- a/pandas_gbq/__init__.py +++ b/pandas_gbq/__init__.py @@ -10,6 +10,7 @@ from . import _versions_helpers from .gbq import read_gbq, to_gbq # noqa +from pandas_gbq.core.sample import sample sys_major, sys_minor, sys_micro = _versions_helpers.extract_runtime_version() if sys_major == 3 and sys_minor < 9: @@ -32,4 +33,5 @@ "read_gbq", "Context", "context", + "sample", ] diff --git a/pandas_gbq/core/sample.py b/pandas_gbq/core/sample.py index b56b8798..240a7761 100644 --- a/pandas_gbq/core/sample.py +++ b/pandas_gbq/core/sample.py @@ -23,6 +23,34 @@ _TABLESAMPLE_ELIGIBLE_TYPES = ("TABLE", "EXTERNAL") + +# Base logical sizes for non-complex and non-variable types. +# https://docs.cloud.google.com/bigquery/docs/reference/standard-sql/data-types#data_type_sizes +_TYPE_SIZES = { + # Fixed size types + "BOOL": 1, + "DATE": 8, + "DATETIME": 8, + "FLOAT64": 8, + "INT64": 8, + "TIME": 8, + "TIMESTAMP": 8, + "INTERVAL": 16, + "NUMERIC": 16, + "RANGE": 16, + "BIGNUMERIC": 32, + # Variable types with a fixed-size assumption + "STRING": pandas_gbq.constants.BYTES_IN_KIB, + "JSON": pandas_gbq.constants.BYTES_IN_KIB, + "BYTES": pandas_gbq.constants.BYTES_IN_MIB, + # Formula: 16 logical bytes + 24 logical bytes * num_vertices + # Assuming a small, fixed number of vertices (e.g., 5) for estimation: + "GEOGRAPHY": 16 + (24 * 5), +} +# TODO(tswast): Choose an estimate based on actual BigQuery stats. +_ARRAY_LENGTH_ESTIMATE = 5 +_UNKNOWN_TYPE_SIZE_ESTIMATE = 4 +_MAX_ROW_BYTES = 100 * pandas_gbq.constants.BYTES_IN_MIB def _calculate_target_bytes(target_mb: Optional[int]) -> int: @@ -30,22 +58,68 @@ def _calculate_target_bytes(target_mb: Optional[int]) -> int: return target_mb * pandas_gbq.constants.BYTES_IN_MIB mem = psutil.virtual_memory() - return max(100 * pandas_gbq.constants.BYTES_IN_MIB, mem.available // 4) + return max(_MAX_ROW_BYTES, mem.available // 4) + + +def _estimate_limit(*, target_bytes : int, table_bytes: Optional[int], table_rows: Optional[int], fields: Sequence[google.cloud.bigquery.SchemaField]) -> int: + if table_bytes and table_rows: + proportion = target_bytes / table_bytes + return max(1, int(table_rows * proportion)) + + row_bytes_estimate = _estimate_row_bytes(fields) + assert row_bytes_estimate >= 0 + + if row_bytes_estimate == 0: + # Assume there's some overhead per row so we have some kind of limit. + return target_bytes + + return max(1, target_bytes // row_bytes_estimate) + + +def _estimate_field_bytes(field: google.cloud.bigquery.SchemaField) -> int: + """Recursive helper function to calculate the size of a single field.""" + field_type = field.field_type + + # If the field is REPEATED (ARRAY), its size is the sum of its elements. + if field.mode == "REPEATED": + # Create a temporary single-element field for size calculation + temp_field = google.cloud.bigquery.SchemaField(field.name, field.field_type, mode="NULLABLE", fields=field.fields) + element_size = _estimate_field_bytes(temp_field) + return _ARRAY_LENGTH_ESTIMATE * element_size + + if field_type == "STRUCT" or field_type == "RECORD": + # STRUCT has 0 logical bytes + the size of its contained fields. + return _estimate_row_bytes(field.fields) + + return _TYPE_SIZES.get(field_type.upper(), _UNKNOWN_TYPE_SIZE_ESTIMATE) -def _estimate_limit(target_bytes : int, num_rows: Optional[int]): - pass +def _estimate_row_bytes(fields: Sequence[google.cloud.bigquery.SchemaField]) -> int: + """ + Estimates the logical row size in bytes for a list of BigQuery SchemaField objects, + using the provided data type size chart and assuming 1MB for all STRING and BYTES + fields. + Args: + schema_fields: A list of google.cloud.bigquery.SchemaField objects + representing the table schema. -def _estimate_row_bytes(fields: Sequence[google.cloud.bigquery.SchemaField]) -> int: - pass + Returns: + An integer representing the estimated total row size in logical bytes. + """ + total_size = min( + _MAX_ROW_BYTES, + sum(_estimate_field_bytes(field) for field in fields), + ) + return total_size def _sample_with_tablesample( table: google.cloud.bigquery.Table, + *, bqclient: google.cloud.bigquery.Client, proportion: float, - row_count: int, + target_row_count: int, progress_bar_type: Optional[str] = None, use_bqstorage_api: bool = True, ) -> Optional[pandas.DataFrame]: @@ -54,7 +128,7 @@ def _sample_with_tablesample( FROM `{table.project}.{table.dataset_id}.{table.table_id}` TABLESAMPLE SYSTEM ({float(proportion) * 100.0} PERCENT) ORDER BY RAND() DESC - LIMIT {int(row_count)}; + LIMIT {int(target_row_count)}; """ rows = bqclient.query_and_wait(query) return pandas_gbq.core.read.download_results( @@ -69,9 +143,29 @@ def _sample_with_tablesample( def _sample_with_limit( - bqclient: google.cloud.bigquery.Client, limit: int -) -> google.cloud.bigquery.TableReference: - pass + table: google.cloud.bigquery.Table, + *, + bqclient: google.cloud.bigquery.Client, + target_row_count: int, + progress_bar_type: Optional[str] = None, + use_bqstorage_api: bool = True, +) -> Optional[pandas.DataFrame]: + query = f""" + SELECT * + FROM `{table.project}.{table.dataset_id}.{table.table_id}` + ORDER BY RAND() DESC + LIMIT {int(target_row_count)}; + """ + rows = bqclient.query_and_wait(query) + return pandas_gbq.core.read.download_results( + rows, + bqclient=bqclient, + progress_bar_type=progress_bar_type, + warn_on_large_results=False, + max_results=None, + user_dtypes=None, + use_bqstorage_api=use_bqstorage_api, + ) def sample( @@ -105,14 +199,22 @@ def sample( use_bqstorage_api=use_bqstorage_api, ) + target_row_count = _estimate_limit( + target_bytes=target_bytes, + table_bytes=num_bytes, + table_rows=num_rows, + fields=table.schema, + ) + # Table is eligible for TABLESAMPLE. if num_bytes is not None and table.table_type in _TABLESAMPLE_ELIGIBLE_TYPES: proportion = target_bytes / num_bytes - row_count = max(1, int(num_rows * proportion)) if num_rows is not None else return _sample_with_tablesample( - table, bqclient=bqclient, proportion=proportion + table, bqclient=bqclient, proportion=proportion, target_row_count=target_row_count, progress_bar_type=progress_bar_type, use_bqstorage_api=use_bqstorage_api, ) - table.num_rows - # TODO: check table type to see if tablesample would be compatible. - table.table_type + # Not eligible for TABLESAMPLE or reading directly, so take a random sample + # with a full table scan. + return _sample_with_limit( + table, bqclient=bqclient, target_row_count=target_row_count, progress_bar_type=progress_bar_type, use_bqstorage_api=use_bqstorage_api, + ) \ No newline at end of file diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 880dcef9..54c9e845 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -27,9 +27,6 @@ from pandas_gbq.features import FEATURES from pandas_gbq.gbq_connector import ( # noqa - backward compatible export GbqConnector, - _bqschema_to_nullsafe_dtypes, - _finalize_dtypes, - create_user_agent, ) from pandas_gbq.gbq_connector import _get_client # noqa - backward compatible export import pandas_gbq.schema diff --git a/tests/unit/test_core_sample.py b/tests/unit/test_core_sample.py new file mode 100644 index 00000000..4d8de243 --- /dev/null +++ b/tests/unit/test_core_sample.py @@ -0,0 +1,86 @@ +# Copyright (c) 2025 pandas-gbq Authors All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +from typing import Sequence + +import google.cloud.bigquery +import pytest + +import pandas_gbq.core.sample +import pandas_gbq.constants + + +test_cases = [ + pytest.param( + [ + google.cloud.bigquery.SchemaField("id", "INT64"), # 8 + google.cloud.bigquery.SchemaField("is_valid", "BOOL"), # 1 + google.cloud.bigquery.SchemaField("price", "NUMERIC"), # 16 + google.cloud.bigquery.SchemaField("big_value", "BIGNUMERIC"), # 32 + ], + 8 + 1 + 16 + 32, # 57 + id="Fixed_Size_Types", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField("coords", "RECORD", fields=[ + google.cloud.bigquery.SchemaField("lat", "FLOAT64"), # 8 + google.cloud.bigquery.SchemaField("lon", "FLOAT64"), # 8 + ]), + ], + 16, # 8 + 8 + id="Simple_Struct", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField("history", "TIMESTAMP", mode="REPEATED"), # 5 * 8 + ], + pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE * 8, # 40 + id="Simple_Array", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField("addresses", "RECORD", mode="REPEATED", fields=[ + google.cloud.bigquery.SchemaField("street", "STRING"), # 1KIB + google.cloud.bigquery.SchemaField("zip", "INT64"), # 8 + ]), + ], + pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE * (pandas_gbq.constants.BYTES_IN_KIB + 8), + id="Repeated_Struct", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField("empty_struct", "RECORD", fields=[]), # 0 + google.cloud.bigquery.SchemaField("simple_int", "INT64"), # 8 + ], + 8, # 0 + 8 + id="empty-struct", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField("bytes", "BYTES"), + ] * 9_999, + pandas_gbq.core.sample._MAX_ROW_BYTES, + id="many-bytes", + ), + # Case 8: Complex Mix (Combining multiple cases) + pytest.param( + [ + google.cloud.bigquery.SchemaField("key", "INT64"), # 8 + google.cloud.bigquery.SchemaField("notes", "STRING"), # 1KIB + google.cloud.bigquery.SchemaField("history", "TIMESTAMP", mode="REPEATED"), # 40 + google.cloud.bigquery.SchemaField("details", "RECORD", fields=[ + google.cloud.bigquery.SchemaField("d1", "NUMERIC"), # 16 + google.cloud.bigquery.SchemaField("d2", "BYTES"), # 1MB + ]), + ], + 8 + pandas_gbq.constants.BYTES_IN_KIB + 40 + (16 + pandas_gbq.constants.BYTES_IN_MIB), + id="Complex_Mix", + ), +] +@pytest.mark.parametrize("schema, expected_size", test_cases) +def test_estimate_row_size_parametrized(schema: Sequence[google.cloud.bigquery.SchemaField], expected_size: int): + + actual_size = pandas_gbq.core.sample._estimate_row_bytes(schema) + assert actual_size == expected_size From 76cc760cd355bfb348a925b3fd38558bed751019 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Fri, 14 Nov 2025 20:03:37 +0000 Subject: [PATCH 03/12] fix lint --- pandas_gbq/__init__.py | 2 +- pandas_gbq/core/sample.py | 47 ++++++++++++------ pandas_gbq/gbq.py | 4 +- tests/unit/test_core_sample.py | 90 +++++++++++++++++++++------------- 4 files changed, 91 insertions(+), 52 deletions(-) diff --git a/pandas_gbq/__init__.py b/pandas_gbq/__init__.py index b18f8ab4..0c243869 100644 --- a/pandas_gbq/__init__.py +++ b/pandas_gbq/__init__.py @@ -7,10 +7,10 @@ from pandas_gbq import version as pandas_gbq_version from pandas_gbq.contexts import Context, context +from pandas_gbq.core.sample import sample from . import _versions_helpers from .gbq import read_gbq, to_gbq # noqa -from pandas_gbq.core.sample import sample sys_major, sys_minor, sys_micro = _versions_helpers.extract_runtime_version() if sys_major == 3 and sys_minor < 9: diff --git a/pandas_gbq/core/sample.py b/pandas_gbq/core/sample.py index 240a7761..d82081bf 100644 --- a/pandas_gbq/core/sample.py +++ b/pandas_gbq/core/sample.py @@ -23,7 +23,7 @@ _TABLESAMPLE_ELIGIBLE_TYPES = ("TABLE", "EXTERNAL") - + # Base logical sizes for non-complex and non-variable types. # https://docs.cloud.google.com/bigquery/docs/reference/standard-sql/data-types#data_type_sizes _TYPE_SIZES = { @@ -50,7 +50,7 @@ # TODO(tswast): Choose an estimate based on actual BigQuery stats. _ARRAY_LENGTH_ESTIMATE = 5 _UNKNOWN_TYPE_SIZE_ESTIMATE = 4 -_MAX_ROW_BYTES = 100 * pandas_gbq.constants.BYTES_IN_MIB +_MAX_ROW_BYTES = 100 * pandas_gbq.constants.BYTES_IN_MIB def _calculate_target_bytes(target_mb: Optional[int]) -> int: @@ -61,32 +61,40 @@ def _calculate_target_bytes(target_mb: Optional[int]) -> int: return max(_MAX_ROW_BYTES, mem.available // 4) -def _estimate_limit(*, target_bytes : int, table_bytes: Optional[int], table_rows: Optional[int], fields: Sequence[google.cloud.bigquery.SchemaField]) -> int: +def _estimate_limit( + *, + target_bytes: int, + table_bytes: Optional[int], + table_rows: Optional[int], + fields: Sequence[google.cloud.bigquery.SchemaField], +) -> int: if table_bytes and table_rows: proportion = target_bytes / table_bytes return max(1, int(table_rows * proportion)) - + row_bytes_estimate = _estimate_row_bytes(fields) assert row_bytes_estimate >= 0 if row_bytes_estimate == 0: # Assume there's some overhead per row so we have some kind of limit. return target_bytes - - return max(1, target_bytes // row_bytes_estimate) - - + + return max(1, target_bytes // row_bytes_estimate) + + def _estimate_field_bytes(field: google.cloud.bigquery.SchemaField) -> int: """Recursive helper function to calculate the size of a single field.""" field_type = field.field_type - + # If the field is REPEATED (ARRAY), its size is the sum of its elements. if field.mode == "REPEATED": # Create a temporary single-element field for size calculation - temp_field = google.cloud.bigquery.SchemaField(field.name, field.field_type, mode="NULLABLE", fields=field.fields) + temp_field = google.cloud.bigquery.SchemaField( + field.name, field.field_type, mode="NULLABLE", fields=field.fields + ) element_size = _estimate_field_bytes(temp_field) return _ARRAY_LENGTH_ESTIMATE * element_size - + if field_type == "STRUCT" or field_type == "RECORD": # STRUCT has 0 logical bytes + the size of its contained fields. return _estimate_row_bytes(field.fields) @@ -101,7 +109,7 @@ def _estimate_row_bytes(fields: Sequence[google.cloud.bigquery.SchemaField]) -> fields. Args: - schema_fields: A list of google.cloud.bigquery.SchemaField objects + schema_fields: A list of google.cloud.bigquery.SchemaField objects representing the table schema. Returns: @@ -210,11 +218,20 @@ def sample( if num_bytes is not None and table.table_type in _TABLESAMPLE_ELIGIBLE_TYPES: proportion = target_bytes / num_bytes return _sample_with_tablesample( - table, bqclient=bqclient, proportion=proportion, target_row_count=target_row_count, progress_bar_type=progress_bar_type, use_bqstorage_api=use_bqstorage_api, + table, + bqclient=bqclient, + proportion=proportion, + target_row_count=target_row_count, + progress_bar_type=progress_bar_type, + use_bqstorage_api=use_bqstorage_api, ) # Not eligible for TABLESAMPLE or reading directly, so take a random sample # with a full table scan. return _sample_with_limit( - table, bqclient=bqclient, target_row_count=target_row_count, progress_bar_type=progress_bar_type, use_bqstorage_api=use_bqstorage_api, - ) \ No newline at end of file + table, + bqclient=bqclient, + target_row_count=target_row_count, + progress_bar_type=progress_bar_type, + use_bqstorage_api=use_bqstorage_api, + ) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 54c9e845..dcc96d49 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -25,9 +25,7 @@ from pandas_gbq.exceptions import InvalidSchema # noqa - backward compatible export from pandas_gbq.exceptions import QueryTimeout # noqa - backward compatible export from pandas_gbq.features import FEATURES -from pandas_gbq.gbq_connector import ( # noqa - backward compatible export - GbqConnector, -) +from pandas_gbq.gbq_connector import GbqConnector # noqa - backward compatible export from pandas_gbq.gbq_connector import _get_client # noqa - backward compatible export import pandas_gbq.schema import pandas_gbq.schema.pandas_to_bigquery diff --git a/tests/unit/test_core_sample.py b/tests/unit/test_core_sample.py index 4d8de243..c1b571cc 100644 --- a/tests/unit/test_core_sample.py +++ b/tests/unit/test_core_sample.py @@ -7,80 +7,104 @@ import google.cloud.bigquery import pytest -import pandas_gbq.core.sample import pandas_gbq.constants - +import pandas_gbq.core.sample test_cases = [ pytest.param( [ - google.cloud.bigquery.SchemaField("id", "INT64"), # 8 - google.cloud.bigquery.SchemaField("is_valid", "BOOL"), # 1 - google.cloud.bigquery.SchemaField("price", "NUMERIC"), # 16 - google.cloud.bigquery.SchemaField("big_value", "BIGNUMERIC"), # 32 + google.cloud.bigquery.SchemaField("id", "INT64"), # 8 + google.cloud.bigquery.SchemaField("is_valid", "BOOL"), # 1 + google.cloud.bigquery.SchemaField("price", "NUMERIC"), # 16 + google.cloud.bigquery.SchemaField("big_value", "BIGNUMERIC"), # 32 ], - 8 + 1 + 16 + 32, # 57 + 8 + 1 + 16 + 32, # 57 id="Fixed_Size_Types", ), pytest.param( [ - google.cloud.bigquery.SchemaField("coords", "RECORD", fields=[ - google.cloud.bigquery.SchemaField("lat", "FLOAT64"), # 8 - google.cloud.bigquery.SchemaField("lon", "FLOAT64"), # 8 - ]), + google.cloud.bigquery.SchemaField( + "coords", + "RECORD", + fields=[ + google.cloud.bigquery.SchemaField("lat", "FLOAT64"), # 8 + google.cloud.bigquery.SchemaField("lon", "FLOAT64"), # 8 + ], + ), ], - 16, # 8 + 8 + 16, # 8 + 8 id="Simple_Struct", ), pytest.param( [ - google.cloud.bigquery.SchemaField("history", "TIMESTAMP", mode="REPEATED"), # 5 * 8 + google.cloud.bigquery.SchemaField( + "history", "TIMESTAMP", mode="REPEATED" + ), # 5 * 8 ], - pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE * 8, # 40 + pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE * 8, # 40 id="Simple_Array", ), pytest.param( [ - google.cloud.bigquery.SchemaField("addresses", "RECORD", mode="REPEATED", fields=[ - google.cloud.bigquery.SchemaField("street", "STRING"), # 1KIB - google.cloud.bigquery.SchemaField("zip", "INT64"), # 8 - ]), + google.cloud.bigquery.SchemaField( + "addresses", + "RECORD", + mode="REPEATED", + fields=[ + google.cloud.bigquery.SchemaField("street", "STRING"), # 1KIB + google.cloud.bigquery.SchemaField("zip", "INT64"), # 8 + ], + ), ], - pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE * (pandas_gbq.constants.BYTES_IN_KIB + 8), + pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE + * (pandas_gbq.constants.BYTES_IN_KIB + 8), id="Repeated_Struct", ), pytest.param( [ - google.cloud.bigquery.SchemaField("empty_struct", "RECORD", fields=[]), # 0 - google.cloud.bigquery.SchemaField("simple_int", "INT64"), # 8 + google.cloud.bigquery.SchemaField("empty_struct", "RECORD", fields=[]), # 0 + google.cloud.bigquery.SchemaField("simple_int", "INT64"), # 8 ], - 8, # 0 + 8 + 8, # 0 + 8 id="empty-struct", ), pytest.param( [ google.cloud.bigquery.SchemaField("bytes", "BYTES"), - ] * 9_999, + ] + * 9_999, pandas_gbq.core.sample._MAX_ROW_BYTES, id="many-bytes", ), # Case 8: Complex Mix (Combining multiple cases) pytest.param( [ - google.cloud.bigquery.SchemaField("key", "INT64"), # 8 - google.cloud.bigquery.SchemaField("notes", "STRING"), # 1KIB - google.cloud.bigquery.SchemaField("history", "TIMESTAMP", mode="REPEATED"), # 40 - google.cloud.bigquery.SchemaField("details", "RECORD", fields=[ - google.cloud.bigquery.SchemaField("d1", "NUMERIC"), # 16 - google.cloud.bigquery.SchemaField("d2", "BYTES"), # 1MB - ]), + google.cloud.bigquery.SchemaField("key", "INT64"), # 8 + google.cloud.bigquery.SchemaField("notes", "STRING"), # 1KIB + google.cloud.bigquery.SchemaField( + "history", "TIMESTAMP", mode="REPEATED" + ), # 40 + google.cloud.bigquery.SchemaField( + "details", + "RECORD", + fields=[ + google.cloud.bigquery.SchemaField("d1", "NUMERIC"), # 16 + google.cloud.bigquery.SchemaField("d2", "BYTES"), # 1MB + ], + ), ], - 8 + pandas_gbq.constants.BYTES_IN_KIB + 40 + (16 + pandas_gbq.constants.BYTES_IN_MIB), + 8 + + pandas_gbq.constants.BYTES_IN_KIB + + 40 + + (16 + pandas_gbq.constants.BYTES_IN_MIB), id="Complex_Mix", ), ] + + @pytest.mark.parametrize("schema, expected_size", test_cases) -def test_estimate_row_size_parametrized(schema: Sequence[google.cloud.bigquery.SchemaField], expected_size: int): - +def test_estimate_row_size_parametrized( + schema: Sequence[google.cloud.bigquery.SchemaField], expected_size: int +): actual_size = pandas_gbq.core.sample._estimate_row_bytes(schema) assert actual_size == expected_size From b43b668ec9b8c5e94c6bdcb3cb81dfb7df78bda1 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Fri, 14 Nov 2025 21:30:30 +0000 Subject: [PATCH 04/12] add system tests for various table types --- pandas_gbq/core/sample.py | 15 +- tests/system/conftest.py | 23 ++ tests/system/test_sample.py | 97 ++++++++ tests/unit/test_core_sample.py | 411 +++++++++++++++++++++++++-------- 4 files changed, 451 insertions(+), 95 deletions(-) create mode 100644 tests/system/test_sample.py diff --git a/pandas_gbq/core/sample.py b/pandas_gbq/core/sample.py index d82081bf..52094876 100644 --- a/pandas_gbq/core/sample.py +++ b/pandas_gbq/core/sample.py @@ -22,6 +22,7 @@ import pandas +_READ_API_ELIGIBLE_TYPES = ("TABLE", "MATERIALIZED_VIEW", "EXTERNAL") _TABLESAMPLE_ELIGIBLE_TYPES = ("TABLE", "EXTERNAL") # Base logical sizes for non-complex and non-variable types. @@ -193,9 +194,19 @@ def sample( bqclient = connector.get_client() table = bqclient.get_table(table_id) num_rows = table.num_rows + num_bytes = table.num_bytes + table_type = table.table_type + + # Some tables such as views report 0 despite actually having rows. + if num_bytes == 0: + num_bytes = None # Table is small enough to download the whole thing. - if (num_bytes := table.num_bytes) is not None and num_bytes <= target_bytes: + if ( + table_type in _READ_API_ELIGIBLE_TYPES + and num_bytes is not None + and num_bytes <= target_bytes + ): rows_iter = bqclient.list_rows(table) return pandas_gbq.core.read.download_results( rows_iter, @@ -215,7 +226,7 @@ def sample( ) # Table is eligible for TABLESAMPLE. - if num_bytes is not None and table.table_type in _TABLESAMPLE_ELIGIBLE_TYPES: + if num_bytes is not None and table_type in _TABLESAMPLE_ELIGIBLE_TYPES: proportion = target_bytes / num_bytes return _sample_with_tablesample( table, diff --git a/tests/system/conftest.py b/tests/system/conftest.py index cb8aadb9..c761276d 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -45,6 +45,29 @@ def project(project_id): return project_id +@pytest.fixture +def external_table(bigquery_client, random_dataset, project_id): + table_id = prefixer.create_prefix() + full_table_id = str(random_dataset.table(table_id)) + table = bigquery.Table( + full_table_id, + [ + bigquery.SchemaField("name", "STRING"), + bigquery.SchemaField("post_abbr", "STRING"), + ], + ) + external_data_configuration = bigquery.ExternalConfig("CSV") + csv_options = bigquery.CSVOptions() + csv_options.skip_leading_rows = 1 + external_data_configuration.csv_options = csv_options + external_data_configuration.source_uris = [ + "gs://cloud-samples-data/bigquery/us-states/us-states.csv" + ] + table.external_data_configuration = external_data_configuration + bigquery_client.create_table(table) + return full_table_id + + @pytest.fixture def to_gbq(credentials, project_id): import pandas_gbq diff --git a/tests/system/test_sample.py b/tests/system/test_sample.py new file mode 100644 index 00000000..1e27e9a1 --- /dev/null +++ b/tests/system/test_sample.py @@ -0,0 +1,97 @@ +# Copyright (c) 2025 pandas-gbq Authors All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +import google.oauth2.credentials +import google.cloud.bigquery + +import pandas_gbq + + +def test_sample_small_table( + project_id: str, + credentials: google.oauth2.credentials.Credentials, + bigquery_client: google.cloud.bigquery.Client, +): + # Arrange + table_id = "bigquery-public-data.ml_datasets.penguins" + table = bigquery_client.get_table(table_id) + num_bytes = table.num_bytes + num_rows = table.num_rows + + # Act + df = pandas_gbq.sample( + table_id, + target_mb=1_000, + credentials=credentials, + billing_project_id=project_id, + ) + + # Assert + assert num_bytes is not None and num_bytes > 0 + assert num_rows is not None and num_rows > 0 + assert df is not None and len(df.index) == num_rows + + +def test_sample_large_table( + project_id: str, + credentials: google.oauth2.credentials.Credentials, + bigquery_client: google.cloud.bigquery.Client, +): + # Arrange + table_id = "bigquery-public-data-staging.chicago.taxi_trips" + table = bigquery_client.get_table(table_id) + num_bytes = table.num_bytes + num_rows = table.num_rows + + # Act + df = pandas_gbq.sample( + table_id, target_mb=10, credentials=credentials, billing_project_id=project_id + ) + + # Assert + assert num_bytes is not None and num_bytes > 0 + assert num_rows is not None and num_rows > 0 + assert df is not None + rows_downloaded = len(df.index) + assert rows_downloaded > 0 + assert rows_downloaded < num_rows + bytes_downloaded = df.memory_usage().sum() + assert bytes_downloaded < num_bytes + + +def test_sample_small_external_table( + project_id: str, + credentials: google.oauth2.credentials.Credentials, + external_table: str, +): + # Act + df = pandas_gbq.sample( + external_table, + target_mb=1_000, + credentials=credentials, + billing_project_id=project_id, + ) + + # Assert + assert df is not None + rows_downloaded = len(df.index) + assert rows_downloaded > 0 + + +def test_sample_view( + project_id: str, + credentials: google.oauth2.credentials.Credentials, +): + # Arrange + table_id = "bigquery-public-data.ethereum_blockchain.live_contracts" + + # Act + df = pandas_gbq.sample( + table_id, target_mb=10, credentials=credentials, billing_project_id=project_id + ) + + # Assert + assert df is not None + rows_downloaded = len(df.index) + assert rows_downloaded > 0 diff --git a/tests/unit/test_core_sample.py b/tests/unit/test_core_sample.py index c1b571cc..2c32c29e 100644 --- a/tests/unit/test_core_sample.py +++ b/tests/unit/test_core_sample.py @@ -3,6 +3,7 @@ # license that can be found in the LICENSE file. from typing import Sequence +from unittest import mock import google.cloud.bigquery import pytest @@ -10,101 +11,325 @@ import pandas_gbq.constants import pandas_gbq.core.sample -test_cases = [ - pytest.param( - [ - google.cloud.bigquery.SchemaField("id", "INT64"), # 8 - google.cloud.bigquery.SchemaField("is_valid", "BOOL"), # 1 - google.cloud.bigquery.SchemaField("price", "NUMERIC"), # 16 - google.cloud.bigquery.SchemaField("big_value", "BIGNUMERIC"), # 32 - ], - 8 + 1 + 16 + 32, # 57 - id="Fixed_Size_Types", - ), - pytest.param( - [ - google.cloud.bigquery.SchemaField( - "coords", - "RECORD", - fields=[ - google.cloud.bigquery.SchemaField("lat", "FLOAT64"), # 8 - google.cloud.bigquery.SchemaField("lon", "FLOAT64"), # 8 - ], - ), - ], - 16, # 8 + 8 - id="Simple_Struct", - ), - pytest.param( - [ - google.cloud.bigquery.SchemaField( - "history", "TIMESTAMP", mode="REPEATED" - ), # 5 * 8 - ], - pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE * 8, # 40 - id="Simple_Array", - ), - pytest.param( - [ - google.cloud.bigquery.SchemaField( - "addresses", - "RECORD", - mode="REPEATED", - fields=[ - google.cloud.bigquery.SchemaField("street", "STRING"), # 1KIB - google.cloud.bigquery.SchemaField("zip", "INT64"), # 8 - ], - ), - ], - pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE - * (pandas_gbq.constants.BYTES_IN_KIB + 8), - id="Repeated_Struct", - ), - pytest.param( - [ - google.cloud.bigquery.SchemaField("empty_struct", "RECORD", fields=[]), # 0 - google.cloud.bigquery.SchemaField("simple_int", "INT64"), # 8 - ], - 8, # 0 + 8 - id="empty-struct", - ), - pytest.param( - [ - google.cloud.bigquery.SchemaField("bytes", "BYTES"), - ] - * 9_999, - pandas_gbq.core.sample._MAX_ROW_BYTES, - id="many-bytes", - ), - # Case 8: Complex Mix (Combining multiple cases) - pytest.param( - [ - google.cloud.bigquery.SchemaField("key", "INT64"), # 8 - google.cloud.bigquery.SchemaField("notes", "STRING"), # 1KIB - google.cloud.bigquery.SchemaField( - "history", "TIMESTAMP", mode="REPEATED" - ), # 40 - google.cloud.bigquery.SchemaField( - "details", - "RECORD", - fields=[ - google.cloud.bigquery.SchemaField("d1", "NUMERIC"), # 16 - google.cloud.bigquery.SchemaField("d2", "BYTES"), # 1MB - ], - ), - ], - 8 - + pandas_gbq.constants.BYTES_IN_KIB - + 40 - + (16 + pandas_gbq.constants.BYTES_IN_MIB), - id="Complex_Mix", - ), -] - - -@pytest.mark.parametrize("schema, expected_size", test_cases) + +@pytest.mark.parametrize( + "schema, expected_size", + [ + pytest.param( + [ + google.cloud.bigquery.SchemaField("id", "INT64"), # 8 + google.cloud.bigquery.SchemaField("is_valid", "BOOL"), # 1 + google.cloud.bigquery.SchemaField("price", "NUMERIC"), # 16 + google.cloud.bigquery.SchemaField("big_value", "BIGNUMERIC"), # 32 + ], + 8 + 1 + 16 + 32, # 57 + id="Fixed_Size_Types", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField( + "coords", + "RECORD", + fields=[ + google.cloud.bigquery.SchemaField("lat", "FLOAT64"), # 8 + google.cloud.bigquery.SchemaField("lon", "FLOAT64"), # 8 + ], + ), + ], + 16, # 8 + 8 + id="Simple_Struct", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField( + "history", "TIMESTAMP", mode="REPEATED" + ), # 5 * 8 + ], + pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE * 8, # 40 + id="Simple_Array", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField( + "addresses", + "RECORD", + mode="REPEATED", + fields=[ + google.cloud.bigquery.SchemaField("street", "STRING"), # 1KIB + google.cloud.bigquery.SchemaField("zip", "INT64"), # 8 + ], + ), + ], + pandas_gbq.core.sample._ARRAY_LENGTH_ESTIMATE + * (pandas_gbq.constants.BYTES_IN_KIB + 8), + id="Repeated_Struct", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField( + "empty_struct", "RECORD", fields=[] + ), # 0 + google.cloud.bigquery.SchemaField("simple_int", "INT64"), # 8 + ], + 8, # 0 + 8 + id="empty-struct", + ), + pytest.param( + [ + google.cloud.bigquery.SchemaField("bytes", "BYTES"), + ] + * 9_999, + pandas_gbq.core.sample._MAX_ROW_BYTES, + id="many-bytes", + ), + # Case 8: Complex Mix (Combining multiple cases) + pytest.param( + [ + google.cloud.bigquery.SchemaField("key", "INT64"), # 8 + google.cloud.bigquery.SchemaField("notes", "STRING"), # 1KIB + google.cloud.bigquery.SchemaField( + "history", "TIMESTAMP", mode="REPEATED" + ), # 40 + google.cloud.bigquery.SchemaField( + "details", + "RECORD", + fields=[ + google.cloud.bigquery.SchemaField("d1", "NUMERIC"), # 16 + google.cloud.bigquery.SchemaField("d2", "BYTES"), # 1MB + ], + ), + ], + 8 + + pandas_gbq.constants.BYTES_IN_KIB + + 40 + + (16 + pandas_gbq.constants.BYTES_IN_MIB), + id="Complex_Mix", + ), + ], +) def test_estimate_row_size_parametrized( schema: Sequence[google.cloud.bigquery.SchemaField], expected_size: int ): actual_size = pandas_gbq.core.sample._estimate_row_bytes(schema) assert actual_size == expected_size + + +def test_calculate_target_bytes_with_target_mb(): + target_mb = 200 + expected_bytes = target_mb * pandas_gbq.constants.BYTES_IN_MIB + actual_bytes = pandas_gbq.core.sample._calculate_target_bytes(target_mb) + assert actual_bytes == expected_bytes + + +@mock.patch("psutil.virtual_memory") +def test_calculate_target_bytes_with_available_memory(mock_virtual_memory): + # Mock psutil.virtual_memory to return a mock object with an 'available' attribute. + available_memory = 8 * 1024 * 1024 * 1024 # 8 GB + mock_virtual_memory.return_value = mock.Mock(available=available_memory) + + # Expected bytes is available memory / 4 + expected_bytes = available_memory // 4 + actual_bytes = pandas_gbq.core.sample._calculate_target_bytes(None) + assert actual_bytes == expected_bytes + + +@mock.patch("psutil.virtual_memory") +def test_calculate_target_bytes_low_memory_uses_max_row_bytes(mock_virtual_memory): + # Mock psutil.virtual_memory to return a mock object with an 'available' attribute. + # Set available memory to a low value. + available_memory = 100 # 100 bytes + mock_virtual_memory.return_value = mock.Mock(available=available_memory) + + # Expected bytes should be _MAX_ROW_BYTES because available // 4 is less. + expected_bytes = pandas_gbq.core.sample._MAX_ROW_BYTES + actual_bytes = pandas_gbq.core.sample._calculate_target_bytes(None) + assert actual_bytes == expected_bytes + + +@pytest.mark.parametrize( + "target_bytes, table_bytes, table_rows, fields, expected_limit", + [ + # With table_bytes and table_rows, should use proportion + pytest.param( + 1000, 10000, 100, [], 10, id="with-stats-simple" + ), # 100 * (1000/10000) + pytest.param(1, 10000, 100, [], 1, id="with-stats-min-1"), # min is 1 + # Without stats, should estimate from schema + pytest.param( + 1000, + None, + None, + [google.cloud.bigquery.SchemaField("col1", "INT64")], # 8 bytes + 125, # 1000 // 8 + id="no-stats-simple", + ), + pytest.param( + 10, + None, + None, + [google.cloud.bigquery.SchemaField("col1", "NUMERIC")], # 16 bytes + 1, # max(1, 10 // 16) + id="no-stats-min-1", + ), + # Edge case: row_bytes_estimate is 0 + pytest.param( + 1000, + None, + None, + [], + 1000, + id="no-stats-zero-row-size", # empty schema -> 0 bytes + ), + ], +) +def test_estimate_limit(target_bytes, table_bytes, table_rows, fields, expected_limit): + limit = pandas_gbq.core.sample._estimate_limit( + target_bytes=target_bytes, + table_bytes=table_bytes, + table_rows=table_rows, + fields=fields, + ) + assert limit == expected_limit + + +@mock.patch("pandas_gbq.core.read.download_results") +def test_sample_with_tablesample(mock_download_results, mock_bigquery_client): + mock_table = mock.Mock(spec=google.cloud.bigquery.Table) + mock_table.project = "test-project" + mock_table.dataset_id = "test_dataset" + mock_table.table_id = "test_table" + + proportion = 0.1 + target_row_count = 100 + + pandas_gbq.core.sample._sample_with_tablesample( + mock_table, + bqclient=mock_bigquery_client, + proportion=proportion, + target_row_count=target_row_count, + ) + + mock_bigquery_client.query_and_wait.assert_called_once() + query = mock_bigquery_client.query_and_wait.call_args[0][0] + assert "TABLESAMPLE SYSTEM (10.0 PERCENT)" in query + assert "LIMIT 100" in query + assert ( + f"FROM `{mock_table.project}.{mock_table.dataset_id}.{mock_table.table_id}`" + in query + ) + + mock_download_results.assert_called_once() + + +@mock.patch("pandas_gbq.core.read.download_results") +def test_sample_with_limit(mock_download_results, mock_bigquery_client): + mock_table = mock.Mock(spec=google.cloud.bigquery.Table) + mock_table.project = "test-project" + mock_table.dataset_id = "test_dataset" + mock_table.table_id = "test_table" + + target_row_count = 200 + + pandas_gbq.core.sample._sample_with_limit( + mock_table, + bqclient=mock_bigquery_client, + target_row_count=target_row_count, + ) + + mock_bigquery_client.query_and_wait.assert_called_once() + query = mock_bigquery_client.query_and_wait.call_args[0][0] + assert "TABLESAMPLE" not in query + assert "LIMIT 200" in query + assert ( + f"FROM `{mock_table.project}.{mock_table.dataset_id}.{mock_table.table_id}`" + in query + ) + + mock_download_results.assert_called_once() + + +@pytest.fixture +def mock_gbq_connector(mock_bigquery_client): + with mock.patch("pandas_gbq.gbq_connector.GbqConnector") as mock_connector_class: + mock_connector = mock_connector_class.return_value + mock_connector.get_client.return_value = mock_bigquery_client + mock_connector.credentials = mock.Mock() + yield mock_connector + + +@mock.patch("pandas_gbq.core.read.download_results") +def test_sample_small_table_downloads_all( + mock_download_results, mock_gbq_connector, mock_bigquery_client +): + mock_table = mock.Mock(spec=google.cloud.bigquery.Table) + mock_table.num_bytes = 1000 + mock_table.num_rows = 10 + mock_table.schema = [] + mock_bigquery_client.get_table.return_value = mock_table + + with mock.patch( + "pandas_gbq.core.sample._calculate_target_bytes", return_value=2000 + ): + pandas_gbq.core.sample.sample("my-project.my_dataset.my_table") + + mock_bigquery_client.list_rows.assert_called_once_with(mock_table) + mock_download_results.assert_called_once() + # Check that we didn't try to run a query for sampling + mock_bigquery_client.query_and_wait.assert_not_called() + + +@mock.patch("pandas_gbq.core.sample._sample_with_tablesample") +def test_sample_uses_tablesample( + mock_sample_with_tablesample, mock_gbq_connector, mock_bigquery_client +): + mock_table = mock.Mock(spec=google.cloud.bigquery.Table) + mock_table.num_bytes = 10000 + mock_table.num_rows = 100 + mock_table.table_type = "TABLE" + mock_table.schema = [google.cloud.bigquery.SchemaField("col1", "INT64")] + mock_bigquery_client.get_table.return_value = mock_table + + with mock.patch( + "pandas_gbq.core.sample._calculate_target_bytes", return_value=1000 + ): + pandas_gbq.core.sample.sample("my-project.my_dataset.my_table") + + mock_sample_with_tablesample.assert_called_once() + + +@mock.patch("pandas_gbq.core.sample._sample_with_limit") +def test_sample_uses_limit_fallback( + mock_sample_with_limit, mock_gbq_connector, mock_bigquery_client +): + mock_table = mock.Mock(spec=google.cloud.bigquery.Table) + mock_table.num_bytes = 10000 + mock_table.num_rows = 100 + mock_table.table_type = "VIEW" # Not eligible for TABLESAMPLE + mock_table.schema = [google.cloud.bigquery.SchemaField("col1", "INT64")] + mock_bigquery_client.get_table.return_value = mock_table + + with mock.patch( + "pandas_gbq.core.sample._calculate_target_bytes", return_value=1000 + ): + pandas_gbq.core.sample.sample("my-project.my_dataset.my_table") + + mock_sample_with_limit.assert_called_once() + + +@mock.patch("pandas_gbq.core.sample._sample_with_limit") +def test_sample_uses_limit_fallback_no_bytes( + mock_sample_with_limit, mock_gbq_connector, mock_bigquery_client +): + mock_table = mock.Mock(spec=google.cloud.bigquery.Table) + mock_table.num_bytes = None # num_bytes can be None + mock_table.num_rows = 100 + mock_table.table_type = "TABLE" + mock_table.schema = [google.cloud.bigquery.SchemaField("col1", "INT64")] + mock_bigquery_client.get_table.return_value = mock_table + + with mock.patch( + "pandas_gbq.core.sample._calculate_target_bytes", return_value=1000 + ): + pandas_gbq.core.sample.sample("my-project.my_dataset.my_table") + + mock_sample_with_limit.assert_called_once() From 9c35e06e8744a8549fb81f2a163897af24da601e Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Fri, 14 Nov 2025 21:35:28 +0000 Subject: [PATCH 05/12] fix unit tests --- tests/unit/test_core_sample.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_core_sample.py b/tests/unit/test_core_sample.py index 2c32c29e..93b42a66 100644 --- a/tests/unit/test_core_sample.py +++ b/tests/unit/test_core_sample.py @@ -262,9 +262,10 @@ def test_sample_small_table_downloads_all( mock_download_results, mock_gbq_connector, mock_bigquery_client ): mock_table = mock.Mock(spec=google.cloud.bigquery.Table) - mock_table.num_bytes = 1000 - mock_table.num_rows = 10 - mock_table.schema = [] + type(mock_table).table_type = mock.PropertyMock(return_value="TABLE") + type(mock_table).num_bytes = mock.PropertyMock(return_value=1000) + type(mock_table).num_rows = mock.PropertyMock(return_value=10) + type(mock_table).schema = mock.PropertyMock(return_value=[]) mock_bigquery_client.get_table.return_value = mock_table with mock.patch( From 59571b9bac329d50cfe1dffe0120e7428309a80f Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Fri, 14 Nov 2025 21:42:04 +0000 Subject: [PATCH 06/12] fix unit tests --- tests/unit/test_core_sample.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/unit/test_core_sample.py b/tests/unit/test_core_sample.py index 93b42a66..ba61dc33 100644 --- a/tests/unit/test_core_sample.py +++ b/tests/unit/test_core_sample.py @@ -284,16 +284,15 @@ def test_sample_uses_tablesample( mock_sample_with_tablesample, mock_gbq_connector, mock_bigquery_client ): mock_table = mock.Mock(spec=google.cloud.bigquery.Table) - mock_table.num_bytes = 10000 - mock_table.num_rows = 100 - mock_table.table_type = "TABLE" - mock_table.schema = [google.cloud.bigquery.SchemaField("col1", "INT64")] + type(mock_table).table_type = mock.PropertyMock(return_value="TABLE") + type(mock_table).num_bytes = mock.PropertyMock(return_value=1_000_000_000_000) + type(mock_table).num_rows = mock.PropertyMock(return_value=1_000) + type(mock_table).schema = mock.PropertyMock( + return_value=[google.cloud.bigquery.SchemaField("col1", "INT64")] + ) mock_bigquery_client.get_table.return_value = mock_table - with mock.patch( - "pandas_gbq.core.sample._calculate_target_bytes", return_value=1000 - ): - pandas_gbq.core.sample.sample("my-project.my_dataset.my_table") + pandas_gbq.core.sample.sample("my-project.my_dataset.my_table", target_mb=1) mock_sample_with_tablesample.assert_called_once() From 0e338fe0944e669c48b7b7f17792bbf62724cea7 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Fri, 14 Nov 2025 21:50:34 +0000 Subject: [PATCH 07/12] add docstring --- pandas_gbq/core/sample.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/pandas_gbq/core/sample.py b/pandas_gbq/core/sample.py index 52094876..fdddc89a 100644 --- a/pandas_gbq/core/sample.py +++ b/pandas_gbq/core/sample.py @@ -186,6 +186,43 @@ def sample( progress_bar_type: Optional[str] = None, use_bqstorage_api: bool = True, ) -> Optional[pandas.DataFrame]: + """Sample a BigQuery table, attempting to limit the amount of data read. + + This function attempts to sample a BigQuery table to a target size in + memory. It prioritizes methods that minimize data scanned and downloaded. + + The sampling strategy is as follows: + 1. If the table is small enough (based on `target_mb` or available memory) + and eligible for the BigQuery Storage Read API, the entire table is + downloaded. + 2. If the table is larger than the target size and eligible for + `TABLESAMPLE SYSTEM` (e.g., a regular table), a `TABLESAMPLE` query + is used to retrieve a proportion of rows, followed by `ORDER BY RAND()` + and `LIMIT` to get the `target_row_count`. + 3. If `TABLESAMPLE` is not applicable (e.g., for views) or `num_bytes` is + not available, a full table scan is performed with `ORDER BY RAND()` + and `LIMIT` to retrieve the `target_row_count`. + + Args: + table_id: The BigQuery table ID to sample, in the format + "project.dataset.table" or "dataset.table". + target_mb: Optional. The target size in megabytes for the sampled + DataFrame. If not specified, it defaults to 1/4 of available + system memory, with a minimum of 100MB. + credentials: Optional. The credentials to use for BigQuery access. + If not provided, `pandas_gbq` will attempt to infer them. + billing_project_id: Optional. The ID of the Google Cloud project to + bill for the BigQuery job. If not provided, `pandas_gbq` will + attempt to infer it. + progress_bar_type: Optional. Type of progress bar to display. + See `pandas_gbq.core.read.download_results` for options. + use_bqstorage_api: Optional. If `True`, use the BigQuery Storage Read + API for faster downloads. Defaults to `True`. + + Returns: + A `pandas.DataFrame` containing the sampled data, or `None` if no data + could be sampled. + """ target_bytes = _calculate_target_bytes(target_mb) connector = pandas_gbq.gbq_connector.GbqConnector( project_id=billing_project_id, credentials=credentials From 88d52b2f4372891f7d66132e2a5ee0549de7a237 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Fri, 14 Nov 2025 22:15:05 +0000 Subject: [PATCH 08/12] fix more unit tests --- setup.py | 8 ++++---- testing/constraints-3.9.txt | 7 ++++--- tests/unit/test_gbq.py | 3 ++- tests/unit/test_to_gbq.py | 10 +++++----- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/setup.py b/setup.py index c52e92f1..8b1e0260 100644 --- a/setup.py +++ b/setup.py @@ -27,16 +27,16 @@ "pandas >=1.1.4", "pyarrow >=4.0.0", "pydata-google-auth >=1.5.0", - "psutil", # TODO: pick a min version and add to constraints. + "psutil >=5.9.8", # Note: google-api-core and google-auth are also included via transitive # dependency on google-cloud-bigquery, but this library also uses them # directly. - "google-api-core >= 2.10.2, <3.0.0", - "google-auth >=2.13.0", + "google-api-core >= 2.15.0, <3.0.0", + "google-auth >=2.14.1", "google-auth-oauthlib >=0.7.0", # Please also update the minimum version in pandas_gbq/features.py to # allow pandas-gbq to detect invalid package versions at runtime. - "google-cloud-bigquery >=3.4.2,<4.0.0", + "google-cloud-bigquery >=3.20.0,<4.0.0", "packaging >=22.0.0", ] extras = { diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index db8a499a..aff46b28 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -8,12 +8,13 @@ db-dtypes==1.0.4 numpy==1.19.4 pandas==1.1.4 +psutil==5.9.8 pyarrow==4.0.0 pydata-google-auth==1.5.0 -google-api-core==2.10.2 -google-auth==2.13.0 +google-api-core==2.15.0 +google-auth==2.14.1 google-auth-oauthlib==0.7.0 -google-cloud-bigquery==3.4.2 +google-cloud-bigquery==3.20.0 packaging==22.0.0 # Extras google-cloud-bigquery-storage==2.16.2 diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index 75574820..6eafe9e2 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -21,6 +21,7 @@ from pandas_gbq import gbq import pandas_gbq.constants +import pandas_gbq.core.read import pandas_gbq.exceptions import pandas_gbq.features from pandas_gbq.features import FEATURES @@ -104,7 +105,7 @@ def get_table(table_ref_or_id, **kwargs): ], ) def test__bqschema_to_nullsafe_dtypes(type_, expected): - result = gbq._bqschema_to_nullsafe_dtypes( + result = pandas_gbq.core.read._bqschema_to_nullsafe_dtypes( [dict(name="x", type=type_, mode="NULLABLE")] ) if not expected: diff --git a/tests/unit/test_to_gbq.py b/tests/unit/test_to_gbq.py index 681f18b8..6f00ccf8 100644 --- a/tests/unit/test_to_gbq.py +++ b/tests/unit/test_to_gbq.py @@ -206,7 +206,7 @@ def test_to_gbq_with_if_exists_unknown(): ], ) def test_create_user_agent(user_agent, rfc9110_delimiter, expected): - from pandas_gbq.gbq import create_user_agent + from pandas_gbq.gbq_connector import create_user_agent result = create_user_agent(user_agent, rfc9110_delimiter) assert result == expected @@ -214,14 +214,14 @@ def test_create_user_agent(user_agent, rfc9110_delimiter, expected): @mock.patch.dict(os.environ, {"VSCODE_PID": "1234"}, clear=True) def test_create_user_agent_vscode(): - from pandas_gbq.gbq import create_user_agent + from pandas_gbq.gbq_connector import create_user_agent assert create_user_agent() == f"pandas-{pd.__version__} vscode" @mock.patch.dict(os.environ, {"VSCODE_PID": "1234"}, clear=True) def test_create_user_agent_vscode_plugin(): - from pandas_gbq.gbq import create_user_agent + from pandas_gbq.gbq_connector import create_user_agent with tempfile.TemporaryDirectory() as tmpdir: user_home = Path(tmpdir) @@ -247,14 +247,14 @@ def test_create_user_agent_vscode_plugin(): @mock.patch.dict(os.environ, {"JPY_PARENT_PID": "1234"}, clear=True) def test_create_user_agent_jupyter(): - from pandas_gbq.gbq import create_user_agent + from pandas_gbq.gbq_connector import create_user_agent assert create_user_agent() == f"pandas-{pd.__version__} jupyter" @mock.patch.dict(os.environ, {"JPY_PARENT_PID": "1234"}, clear=True) def test_create_user_agent_jupyter_extension(): - from pandas_gbq.gbq import create_user_agent + from pandas_gbq.gbq_connector import create_user_agent def custom_import_module_side_effect(name, package=None): if name == "bigquery_jupyter_plugin": From 32b9a2091232f6aee5028da1334e90aeeb11597c Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Fri, 14 Nov 2025 22:23:15 +0000 Subject: [PATCH 09/12] feat: Add test for _calculate_target_bytes capping at 1 GiB and fix existing test --- pandas_gbq/core/sample.py | 3 ++- tests/unit/test_core_sample.py | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pandas_gbq/core/sample.py b/pandas_gbq/core/sample.py index fdddc89a..e7258b81 100644 --- a/pandas_gbq/core/sample.py +++ b/pandas_gbq/core/sample.py @@ -52,6 +52,7 @@ _ARRAY_LENGTH_ESTIMATE = 5 _UNKNOWN_TYPE_SIZE_ESTIMATE = 4 _MAX_ROW_BYTES = 100 * pandas_gbq.constants.BYTES_IN_MIB +_MAX_AUTO_TARGET_BYTES = 1 * pandas_gbq.constants.BYTES_IN_GIB def _calculate_target_bytes(target_mb: Optional[int]) -> int: @@ -59,7 +60,7 @@ def _calculate_target_bytes(target_mb: Optional[int]) -> int: return target_mb * pandas_gbq.constants.BYTES_IN_MIB mem = psutil.virtual_memory() - return max(_MAX_ROW_BYTES, mem.available // 4) + return min(_MAX_AUTO_TARGET_BYTES, max(_MAX_ROW_BYTES, mem.available // 4)) def _estimate_limit( diff --git a/tests/unit/test_core_sample.py b/tests/unit/test_core_sample.py index ba61dc33..5e5a15e7 100644 --- a/tests/unit/test_core_sample.py +++ b/tests/unit/test_core_sample.py @@ -124,10 +124,10 @@ def test_calculate_target_bytes_with_target_mb(): @mock.patch("psutil.virtual_memory") def test_calculate_target_bytes_with_available_memory(mock_virtual_memory): # Mock psutil.virtual_memory to return a mock object with an 'available' attribute. - available_memory = 8 * 1024 * 1024 * 1024 # 8 GB + available_memory = 2 * pandas_gbq.constants.BYTES_IN_GIB # 2 GB mock_virtual_memory.return_value = mock.Mock(available=available_memory) - # Expected bytes is available memory / 4 + # Expected bytes is available memory / 4, as it falls between _MAX_ROW_BYTES and _MAX_AUTO_TARGET_BYTES expected_bytes = available_memory // 4 actual_bytes = pandas_gbq.core.sample._calculate_target_bytes(None) assert actual_bytes == expected_bytes @@ -146,6 +146,19 @@ def test_calculate_target_bytes_low_memory_uses_max_row_bytes(mock_virtual_memor assert actual_bytes == expected_bytes +@mock.patch("psutil.virtual_memory") +def test_calculate_target_bytes_caps_at_max_auto_target_bytes(mock_virtual_memory): + # Mock psutil.virtual_memory to return a mock object with an 'available' attribute. + # Set available memory to a high value (e.g., 8 GB) so that available // 4 > _MAX_AUTO_TARGET_BYTES. + available_memory = 8 * pandas_gbq.constants.BYTES_IN_GIB # 8 GB + mock_virtual_memory.return_value = mock.Mock(available=available_memory) + + # Expected bytes should be _MAX_AUTO_TARGET_BYTES (1 GiB) because available // 4 (2 GiB) is capped. + expected_bytes = pandas_gbq.core.sample._MAX_AUTO_TARGET_BYTES + actual_bytes = pandas_gbq.core.sample._calculate_target_bytes(None) + assert actual_bytes == expected_bytes + + @pytest.mark.parametrize( "target_bytes, table_bytes, table_rows, fields, expected_limit", [ From d719d9e8e0b5e99ebe1bc6e33a876261b886b4e5 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Mon, 17 Nov 2025 16:57:52 +0000 Subject: [PATCH 10/12] download results in parallel --- pandas_gbq/core/sample.py | 57 ++++++++++++++++++++++++++++++------- tests/system/test_sample.py | 2 +- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/pandas_gbq/core/sample.py b/pandas_gbq/core/sample.py index e7258b81..49eee4b5 100644 --- a/pandas_gbq/core/sample.py +++ b/pandas_gbq/core/sample.py @@ -8,6 +8,7 @@ from typing import Optional, Sequence, cast import google.cloud.bigquery +import google.cloud.bigquery.table import google.oauth2.credentials import psutil @@ -124,6 +125,36 @@ def _estimate_row_bytes(fields: Sequence[google.cloud.bigquery.SchemaField]) -> return total_size +def _download_results_in_parallel( + rows: google.cloud.bigquery.table.RowIterator, + *, + bqclient: google.cloud.bigquery.Client, + progress_bar_type: Optional[str] = None, + use_bqstorage_api: bool = True, +): + table_reference = getattr(rows, "_table", None) + schema = getattr(rows, "_schema", None) + + # If the results are large enough to materialize a table, break the + # connection to the original query that contains an ORDER BY clause to allow + # reading with multiple streams. + if table_reference is not None and schema is not None: + rows = bqclient.list_rows( + table_reference, + selected_fields=schema, + ) + + return pandas_gbq.core.read.download_results( + rows, + bqclient=bqclient, + progress_bar_type=progress_bar_type, + warn_on_large_results=False, + max_results=None, + user_dtypes=None, + use_bqstorage_api=use_bqstorage_api, + ) + + def _sample_with_tablesample( table: google.cloud.bigquery.Table, *, @@ -141,13 +172,10 @@ def _sample_with_tablesample( LIMIT {int(target_row_count)}; """ rows = bqclient.query_and_wait(query) - return pandas_gbq.core.read.download_results( + return _download_results_in_parallel( rows, bqclient=bqclient, progress_bar_type=progress_bar_type, - warn_on_large_results=False, - max_results=None, - user_dtypes=None, use_bqstorage_api=use_bqstorage_api, ) @@ -167,13 +195,10 @@ def _sample_with_limit( LIMIT {int(target_row_count)}; """ rows = bqclient.query_and_wait(query) - return pandas_gbq.core.read.download_results( + return _download_results_in_parallel( rows, bqclient=bqclient, progress_bar_type=progress_bar_type, - warn_on_large_results=False, - max_results=None, - user_dtypes=None, use_bqstorage_api=use_bqstorage_api, ) @@ -192,7 +217,19 @@ def sample( This function attempts to sample a BigQuery table to a target size in memory. It prioritizes methods that minimize data scanned and downloaded. - The sampling strategy is as follows: + The target size is based on an estimate of the row size and this method + return more or less than expected. If the table metadata doesn't include + a size, such as with views, an estimate based on the table schema is + used. + + Sampling is based on the `BigQuery TABLESAMPLE + `_ feature, + which can provide a biased sample if data is not randomly distributed + among file blocks. For more control over sampling, use BigQuery + DataFrames ``read_gbq_table`` and ``DataFrame.sample`` methods. + + Specificially, the sampling strategy is as follows: + 1. If the table is small enough (based on `target_mb` or available memory) and eligible for the BigQuery Storage Read API, the entire table is downloaded. @@ -209,7 +246,7 @@ def sample( "project.dataset.table" or "dataset.table". target_mb: Optional. The target size in megabytes for the sampled DataFrame. If not specified, it defaults to 1/4 of available - system memory, with a minimum of 100MB. + system memory, with a minimum of 100MB and maximum of 1 GB. credentials: Optional. The credentials to use for BigQuery access. If not provided, `pandas_gbq` will attempt to infer them. billing_project_id: Optional. The ID of the Google Cloud project to diff --git a/tests/system/test_sample.py b/tests/system/test_sample.py index 1e27e9a1..049adce8 100644 --- a/tests/system/test_sample.py +++ b/tests/system/test_sample.py @@ -2,8 +2,8 @@ # Use of this source code is governed by a BSD-style # license that can be found in the LICENSE file. -import google.oauth2.credentials import google.cloud.bigquery +import google.oauth2.credentials import pandas_gbq From 6fea3eb11ded26ef5c6391049ea0a9691b548ce2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Mon, 17 Nov 2025 11:18:18 -0600 Subject: [PATCH 11/12] use public table --- tests/system/test_sample.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_sample.py b/tests/system/test_sample.py index 049adce8..dd7175a6 100644 --- a/tests/system/test_sample.py +++ b/tests/system/test_sample.py @@ -39,7 +39,7 @@ def test_sample_large_table( bigquery_client: google.cloud.bigquery.Client, ): # Arrange - table_id = "bigquery-public-data-staging.chicago.taxi_trips" + table_id = "bigquery-public-data.chicago.taxi_trips" table = bigquery_client.get_table(table_id) num_bytes = table.num_bytes num_rows = table.num_rows From f2e5df0d1b2a452808c2671be43805d3d295eb5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Mon, 17 Nov 2025 11:32:21 -0600 Subject: [PATCH 12/12] fix dataset name --- tests/system/test_sample.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_sample.py b/tests/system/test_sample.py index dd7175a6..7dbd89a5 100644 --- a/tests/system/test_sample.py +++ b/tests/system/test_sample.py @@ -39,7 +39,7 @@ def test_sample_large_table( bigquery_client: google.cloud.bigquery.Client, ): # Arrange - table_id = "bigquery-public-data.chicago.taxi_trips" + table_id = "bigquery-public-data.chicago_taxi_trips.taxi_trips" table = bigquery_client.get_table(table_id) num_bytes = table.num_bytes num_rows = table.num_rows