Skip to content

Commit

Permalink
feat: read_gbq suggests using BigQuery DataFrames with large results (
Browse files Browse the repository at this point in the history
#769)

* feat: `read_gbq` suggests using BigQuery DataFrames with large results

* update docs

* guard against non-int bytes

* tweak message

* remove unnecessary also

* remove dead code

* remove directory that doesn't exist

* comment about GiB vs GB
  • Loading branch information
tswast committed May 20, 2024
1 parent 12a8db7 commit f937edf
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 51 deletions.
6 changes: 6 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ Note: The canonical version of this documentation can always be found on the
`BigQuery sandbox <https://cloud.google.com/bigquery/docs/sandbox>`__ to
try the service for free.

Also, consider using `BigQuery DataFrames
<https://cloud.google.com/bigquery/docs/bigquery-dataframes-introduction>`__
to process large results with pandas compatible APIs with transparent SQL
pushdown to BigQuery engine. This provides an opportunity to save on costs
and improve performance.

While BigQuery uses standard SQL syntax, it has some important differences
from traditional databases both in functionality, API limitations (size and
quantity of queries or uploads), and how Google charges for use of the
Expand Down
9 changes: 9 additions & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,15 @@ def cover(session):
session.install("coverage", "pytest-cov")
session.run("coverage", "report", "--show-missing", "--fail-under=96")

# Make sure there is no dead code in our test directories.
session.run(
"coverage",
"report",
"--show-missing",
"--include=tests/unit/*",
"--fail-under=100",
)

session.run("coverage", "erase")


Expand Down
12 changes: 12 additions & 0 deletions pandas_gbq/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) 2024 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

# BigQuery uses powers of 2 in calculating data sizes. See:
# https://cloud.google.com/bigquery/pricing#data The documentation uses
# GiB rather than GB to disambiguate from the alternative base 10 units.
# https://en.wikipedia.org/wiki/Byte#Multiple-byte_units
BYTES_IN_KIB = 1024
BYTES_IN_MIB = 1024 * BYTES_IN_KIB
BYTES_IN_GIB = 1024 * BYTES_IN_MIB
BYTES_TO_RECOMMEND_BIGFRAMES = BYTES_IN_GIB
4 changes: 4 additions & 0 deletions pandas_gbq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class InvalidPrivateKeyFormat(ValueError):
"""


class LargeResultsWarning(UserWarning):
"""Raise when results are beyond that recommended for pandas DataFrame."""


class PerformanceWarning(RuntimeWarning):
"""
Raised when a performance-related feature is requested, but unsupported.
Expand Down
10 changes: 0 additions & 10 deletions pandas_gbq/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
BIGQUERY_QUERY_AND_WAIT_VERSION = "3.14.0"
PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0"
PANDAS_BOOLEAN_DTYPE_VERSION = "1.0.0"
PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION = "1.1.0"


class Features:
Expand Down Expand Up @@ -82,14 +81,5 @@ def pandas_has_boolean_dtype(self):
desired_version = packaging.version.parse(PANDAS_BOOLEAN_DTYPE_VERSION)
return self.pandas_installed_version >= desired_version

@property
def pandas_has_parquet_with_lossless_timestamp(self):
import packaging.version

desired_version = packaging.version.parse(
PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION
)
return self.pandas_installed_version >= desired_version


FEATURES = Features()
59 changes: 46 additions & 13 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

import pandas_gbq.constants
import pandas_gbq.exceptions
from pandas_gbq.exceptions import GenericGBQException, QueryTimeout
from pandas_gbq.features import FEATURES
import pandas_gbq.query
Expand Down Expand Up @@ -478,6 +480,35 @@ def _download_results(
if max_results is not None:
create_bqstorage_client = False

# If we're downloading a large table, BigQuery DataFrames might be a
# better fit. Not all code paths will populate rows_iter._table, but
# if it's not populated that means we are working with a small result
# set.
if (table_ref := getattr(rows_iter, "_table", None)) is not None:
table = self.client.get_table(table_ref)
if (
isinstance((num_bytes := table.num_bytes), int)
and num_bytes > pandas_gbq.constants.BYTES_TO_RECOMMEND_BIGFRAMES
):
num_gib = num_bytes / pandas_gbq.constants.BYTES_IN_GIB
warnings.warn(
f"Recommendation: Your results are {num_gib:.1f} GiB. "
"Consider using BigQuery DataFrames "
"(https://cloud.google.com/bigquery/docs/bigquery-dataframes-introduction) "
"to process large results with pandas compatible APIs with transparent SQL "
"pushdown to BigQuery engine. This provides an opportunity to save on costs "
"and improve performance. "
"Please reach out to bigframes-feedback@google.com with any "
"questions or concerns. To disable this message, run "
"warnings.simplefilter('ignore', category=pandas_gbq.exceptions.LargeResultsWarning)",
category=pandas_gbq.exceptions.LargeResultsWarning,
# user's code
# -> read_gbq
# -> run_query
# -> download_results
stacklevel=4,
)

try:
schema_fields = [field.to_api_repr() for field in rows_iter.schema]
conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
Expand Down Expand Up @@ -663,18 +694,25 @@ def read_gbq(
*,
col_order=None,
):
r"""Load data from Google BigQuery using google-cloud-python
The main method a user calls to execute a Query in Google BigQuery
and read results into a pandas DataFrame.
r"""Read data from Google BigQuery to a pandas DataFrame.
This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
<https://googleapis.dev/python/bigquery/latest/index.html>`__.
Run a SQL query in BigQuery or read directly from a table
the `Python client library for BigQuery
<https://cloud.google.com/python/docs/reference/bigquery/latest/index.html>`__
and for `BigQuery Storage
<https://cloud.google.com/python/docs/reference/bigquerystorage/latest>`__
to make API requests.
See the :ref:`How to authenticate with Google BigQuery <authentication>`
guide for authentication instructions.
.. note::
Consider using `BigQuery DataFrames
<https://cloud.google.com/bigquery/docs/dataframes-quickstart>`__ to
process large results with pandas compatible APIs that run in the
BigQuery SQL query engine. This provides an opportunity to save on
costs and improve performance.
Parameters
----------
query_or_table : str
Expand Down Expand Up @@ -1050,12 +1088,7 @@ def to_gbq(
)

if api_method == "default":
# Avoid using parquet if pandas doesn't support lossless conversions to
# parquet timestamp. See: https://stackoverflow.com/a/69758676/101923
if FEATURES.pandas_has_parquet_with_lossless_timestamp:
api_method = "load_parquet"
else:
api_method = "load_csv"
api_method = "load_parquet"

if chunksize is not None:
if api_method == "load_parquet":
Expand Down
99 changes: 75 additions & 24 deletions tests/unit/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@

import copy
import datetime
import re
from unittest import mock
import warnings

import google.api_core.exceptions
import google.cloud.bigquery
import google.cloud.bigquery.table
import numpy
import packaging.version
import pandas
from pandas import DataFrame
import pytest

from pandas_gbq import gbq
import pandas_gbq.constants
import pandas_gbq.exceptions
import pandas_gbq.features
from pandas_gbq.features import FEATURES

Expand Down Expand Up @@ -147,6 +152,62 @@ def test__transform_read_gbq_configuration_makes_copy(original, expected):
assert did_change == should_change


def test_GbqConnector_download_results_warns_for_large_tables(default_bigquery_client):
gbq._test_google_api_imports()
connector = _make_connector()
rows_iter = mock.create_autospec(
google.cloud.bigquery.table.RowIterator, instance=True
)
table = google.cloud.bigquery.Table.from_api_repr(
{
"tableReference": {
"projectId": "my-proj",
"datasetId": "my-dset",
"tableId": "my_tbl",
},
"numBytes": 2 * pandas_gbq.constants.BYTES_IN_GIB,
},
)
rows_iter._table = table
default_bigquery_client.get_table.reset_mock(side_effect=True)
default_bigquery_client.get_table.return_value = table

with pytest.warns(
pandas_gbq.exceptions.LargeResultsWarning,
match=re.escape("Your results are 2.0 GiB. Consider using BigQuery DataFrames"),
):
connector._download_results(rows_iter)


def test_GbqConnector_download_results_doesnt_warn_for_small_tables(
default_bigquery_client,
):
gbq._test_google_api_imports()
connector = _make_connector()
rows_iter = mock.create_autospec(
google.cloud.bigquery.table.RowIterator, instance=True
)
table = google.cloud.bigquery.Table.from_api_repr(
{
"tableReference": {
"projectId": "my-proj",
"datasetId": "my-dset",
"tableId": "my_tbl",
},
"numBytes": 999 * pandas_gbq.constants.BYTES_IN_MIB,
},
)
rows_iter._table = table
default_bigquery_client.get_table.reset_mock(side_effect=True)
default_bigquery_client.get_table.return_value = table

with warnings.catch_warnings():
warnings.simplefilter(
"error", category=pandas_gbq.exceptions.LargeResultsWarning
)
connector._download_results(rows_iter)


def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client):
gbq._test_google_api_imports()
pytest.importorskip("google.api_core.client_info")
Expand Down Expand Up @@ -191,16 +252,13 @@ def test_to_gbq_with_chunksize_warns_deprecation(
api_method, warning_message, warning_type
):
with pytest.warns(warning_type, match=warning_message):
try:
gbq.to_gbq(
DataFrame([[1]]),
"dataset.tablename",
project_id="my-project",
api_method=api_method,
chunksize=100,
)
except gbq.TableCreationError:
pass
gbq.to_gbq(
DataFrame([[1]]),
"dataset.tablename",
project_id="my-project",
api_method=api_method,
chunksize=100,
)


@pytest.mark.parametrize(["verbose"], [(True,), (False,)])
Expand All @@ -211,15 +269,12 @@ def test_to_gbq_with_verbose_new_pandas_warns_deprecation(monkeypatch, verbose):
mock.PropertyMock(return_value=True),
)
with pytest.warns(FutureWarning, match="verbose is deprecated"):
try:
gbq.to_gbq(
DataFrame([[1]]),
"dataset.tablename",
project_id="my-project",
verbose=verbose,
)
except gbq.TableCreationError:
pass
gbq.to_gbq(
DataFrame([[1]]),
"dataset.tablename",
project_id="my-project",
verbose=verbose,
)


def test_to_gbq_with_private_key_raises_notimplementederror():
Expand All @@ -233,11 +288,7 @@ def test_to_gbq_with_private_key_raises_notimplementederror():


def test_to_gbq_doesnt_run_query(mock_bigquery_client):
try:
gbq.to_gbq(DataFrame([[1]]), "dataset.tablename", project_id="my-project")
except gbq.TableCreationError:
pass

gbq.to_gbq(DataFrame([[1]]), "dataset.tablename", project_id="my-project")
mock_bigquery_client.query.assert_not_called()


Expand Down
5 changes: 1 addition & 4 deletions tests/unit/test_to_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
import pytest

from pandas_gbq import gbq
from pandas_gbq.features import FEATURES


@pytest.fixture
def expected_load_method(mock_bigquery_client):
if FEATURES.pandas_has_parquet_with_lossless_timestamp:
return mock_bigquery_client.load_table_from_dataframe
return mock_bigquery_client.load_table_from_file
return mock_bigquery_client.load_table_from_dataframe


def test_to_gbq_create_dataset_with_location(mock_bigquery_client):
Expand Down

0 comments on commit f937edf

Please sign in to comment.