Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use faster query_and_wait method from google-cloud-bigquery when available #722

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pandas_gbq/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

"""Module for checking dependency versions and supported features."""

# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md
# https://github.com/googleapis/python-bigquery/blob/main/CHANGELOG.md
BIGQUERY_MINIMUM_VERSION = "3.3.5"
BIGQUERY_QUERY_AND_WAIT_VERSION = "3.14.0"
PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0"
PANDAS_BOOLEAN_DTYPE_VERSION = "1.0.0"
PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION = "1.1.0"
Expand Down Expand Up @@ -45,6 +46,13 @@ def bigquery_try_import(self):

return google.cloud.bigquery

@property
def bigquery_has_query_and_wait(self):
import packaging.version

min_version = packaging.version.parse(BIGQUERY_QUERY_AND_WAIT_VERSION)
return self.bigquery_installed_version >= min_version

@property
def pandas_installed_version(self):
import pandas
Expand Down
44 changes: 31 additions & 13 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,17 @@ def process_http_error(ex):
# See `BigQuery Troubleshooting Errors
# <https://cloud.google.com/bigquery/troubleshooting-errors>`__

if "cancelled" in ex.message:
message = (
ex.message.casefold()
if hasattr(ex, "message") and ex.message is not None
else ""
)
if "cancelled" in message:
raise QueryTimeout("Reason: {0}".format(ex))
elif "Provided Schema does not match" in ex.message:
elif "schema does not match" in message:
error_message = ex.errors[0]["message"]
raise InvalidSchema(f"Reason: {error_message}")
elif "Already Exists: Table" in ex.message:
elif "already exists: table" in message:
error_message = ex.errors[0]["message"]
raise TableCreationError(f"Reason: {error_message}")
else:
Expand Down Expand Up @@ -410,16 +415,29 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):

self._start_timer()
job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict)
rows_iter = pandas_gbq.query.query_and_wait(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)

if FEATURES.bigquery_has_query_and_wait:
rows_iter = pandas_gbq.query.query_and_wait_via_client_library(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)
else:
rows_iter = pandas_gbq.query.query_and_wait(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)

dtypes = kwargs.get("dtypes")
return self._download_results(
Expand Down
75 changes: 56 additions & 19 deletions pandas_gbq/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from __future__ import annotations

import concurrent.futures
import functools
import logging
from typing import Optional

import google.auth.exceptions
from google.cloud import bigquery

import pandas_gbq.exceptions
Expand Down Expand Up @@ -78,6 +80,26 @@ def _wait_for_query_job(
connector.process_http_error(ex)


def try_query(connector, query_fn):
try:
logger.debug("Requesting query... ")
return query_fn()
except concurrent.futures.TimeoutError as ex:
raise pandas_gbq.exceptions.QueryTimeout("Reason: {0}".format(ex))
except (google.auth.exceptions.RefreshError, ValueError) as ex:
if connector.private_key:
raise pandas_gbq.exceptions.AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise pandas_gbq.exceptions.AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except connector.http_error as ex:
connector.process_http_error(ex)


def query_and_wait(
connector,
client: bigquery.Client,
Expand Down Expand Up @@ -122,29 +144,17 @@ def query_and_wait(
Result iterator from which we can download the results in the
desired format (pandas.DataFrame).
"""
from google.auth.exceptions import RefreshError

try:
logger.debug("Requesting query... ")
query_reply = client.query(
query_reply = try_query(
connector,
functools.partial(
client.query,
query,
job_config=job_config,
location=location,
project=project_id,
)
logger.debug("Query running...")
except (RefreshError, ValueError) as ex:
if connector.private_key:
raise pandas_gbq.exceptions.AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise pandas_gbq.exceptions.AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except connector.http_error as ex:
connector.process_http_error(ex)
),
)
logger.debug("Query running...")

job_id = query_reply.job_id
logger.debug("Job ID: %s" % job_id)
Expand Down Expand Up @@ -173,3 +183,30 @@ def query_and_wait(
return query_reply.result(max_results=max_results)
except connector.http_error as ex:
connector.process_http_error(ex)


def query_and_wait_via_client_library(
connector,
client: bigquery.Client,
query: str,
*,
job_config: bigquery.QueryJobConfig,
location: Optional[str],
project_id: Optional[str],
max_results: Optional[int],
timeout_ms: Optional[int],
):
rows_iter = try_query(
connector,
functools.partial(
client.query_and_wait,
query,
job_config=job_config,
location=location,
project=project_id,
max_results=max_results,
wait_timeout=timeout_ms / 1000.0 if timeout_ms else None,
),
)
logger.debug("Query done.\n")
return rows_iter
42 changes: 41 additions & 1 deletion tests/unit/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import google.cloud.bigquery
import google.cloud.bigquery.table
import packaging.version
import pytest


Expand Down Expand Up @@ -55,8 +56,15 @@ def test_read_gbq_should_save_credentials(mock_get_credentials):
mock_get_credentials.assert_not_called()


def test_read_gbq_should_use_dialect(mock_bigquery_client):
def test_read_gbq_should_use_dialect_with_query(monkeypatch, mock_bigquery_client):
import pandas_gbq
import pandas_gbq.features

monkeypatch.setattr(
pandas_gbq.features.FEATURES,
"_bigquery_installed_version",
packaging.version.parse(pandas_gbq.features.BIGQUERY_MINIMUM_VERSION),
)

assert pandas_gbq.context.dialect is None
pandas_gbq.context.dialect = "legacy"
Expand All @@ -71,3 +79,35 @@ def test_read_gbq_should_use_dialect(mock_bigquery_client):
_, kwargs = mock_bigquery_client.query.call_args
assert not kwargs["job_config"].use_legacy_sql
pandas_gbq.context.dialect = None # Reset the global state.


def test_read_gbq_should_use_dialect_with_query_and_wait(
monkeypatch, mock_bigquery_client
):
if not hasattr(mock_bigquery_client, "query_and_wait"):
pytest.skip(
f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait"
)

import pandas_gbq
import pandas_gbq.features

monkeypatch.setattr(
pandas_gbq.features.FEATURES,
"_bigquery_installed_version",
packaging.version.parse(pandas_gbq.features.BIGQUERY_QUERY_AND_WAIT_VERSION),
)

assert pandas_gbq.context.dialect is None
pandas_gbq.context.dialect = "legacy"
pandas_gbq.read_gbq("SELECT 1")

_, kwargs = mock_bigquery_client.query_and_wait.call_args
assert kwargs["job_config"].use_legacy_sql

pandas_gbq.context.dialect = "standard"
pandas_gbq.read_gbq("SELECT 1")

_, kwargs = mock_bigquery_client.query_and_wait.call_args
assert not kwargs["job_config"].use_legacy_sql
pandas_gbq.context.dialect = None # Reset the global state.
17 changes: 17 additions & 0 deletions tests/unit/test_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ def fresh_bigquery_version(monkeypatch):
monkeypatch.setattr(FEATURES, "_pandas_installed_version", None)


@pytest.mark.parametrize(
["bigquery_version", "expected"],
[
("1.99.100", False),
("2.99.999", False),
("3.13.11", False),
("3.14.0", True),
("4.999.999", True),
],
)
def test_bigquery_has_query_and_wait(monkeypatch, bigquery_version, expected):
import google.cloud.bigquery

monkeypatch.setattr(google.cloud.bigquery, "__version__", bigquery_version)
assert FEATURES.bigquery_has_query_and_wait == expected


@pytest.mark.parametrize(
["pandas_version", "expected"],
[
Expand Down
Loading