Skip to content

Commit

Permalink
feat: use faster query_and_wait method from google-cloud-bigquery whe…
Browse files Browse the repository at this point in the history
…n available (#722)

* feat: use faster query_and_wait method from google-cloud-bigquery when available

fix unit tests

fix python 3.7

fix python 3.7

fix python 3.7

fix python 3.7

fix wait_timeout units

boost test coverage

remove dead code

boost a little more coverage

* restore missing test
  • Loading branch information
tswast committed Jan 25, 2024
1 parent e1c384e commit ac3ce3f
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 62 deletions.
10 changes: 9 additions & 1 deletion pandas_gbq/features.py
Expand Up @@ -4,8 +4,9 @@

"""Module for checking dependency versions and supported features."""

# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md
# https://github.com/googleapis/python-bigquery/blob/main/CHANGELOG.md
BIGQUERY_MINIMUM_VERSION = "3.3.5"
BIGQUERY_QUERY_AND_WAIT_VERSION = "3.14.0"
PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0"
PANDAS_BOOLEAN_DTYPE_VERSION = "1.0.0"
PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION = "1.1.0"
Expand Down Expand Up @@ -45,6 +46,13 @@ def bigquery_try_import(self):

return google.cloud.bigquery

@property
def bigquery_has_query_and_wait(self):
import packaging.version

min_version = packaging.version.parse(BIGQUERY_QUERY_AND_WAIT_VERSION)
return self.bigquery_installed_version >= min_version

@property
def pandas_installed_version(self):
import pandas
Expand Down
44 changes: 31 additions & 13 deletions pandas_gbq/gbq.py
Expand Up @@ -351,12 +351,17 @@ def process_http_error(ex):
# See `BigQuery Troubleshooting Errors
# <https://cloud.google.com/bigquery/troubleshooting-errors>`__

if "cancelled" in ex.message:
message = (
ex.message.casefold()
if hasattr(ex, "message") and ex.message is not None
else ""
)
if "cancelled" in message:
raise QueryTimeout("Reason: {0}".format(ex))
elif "Provided Schema does not match" in ex.message:
elif "schema does not match" in message:
error_message = ex.errors[0]["message"]
raise InvalidSchema(f"Reason: {error_message}")
elif "Already Exists: Table" in ex.message:
elif "already exists: table" in message:
error_message = ex.errors[0]["message"]
raise TableCreationError(f"Reason: {error_message}")
else:
Expand Down Expand Up @@ -410,16 +415,29 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):

self._start_timer()
job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict)
rows_iter = pandas_gbq.query.query_and_wait(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)

if FEATURES.bigquery_has_query_and_wait:
rows_iter = pandas_gbq.query.query_and_wait_via_client_library(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)
else:
rows_iter = pandas_gbq.query.query_and_wait(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)

dtypes = kwargs.get("dtypes")
return self._download_results(
Expand Down
75 changes: 56 additions & 19 deletions pandas_gbq/query.py
Expand Up @@ -5,9 +5,11 @@
from __future__ import annotations

import concurrent.futures
import functools
import logging
from typing import Optional

import google.auth.exceptions
from google.cloud import bigquery

import pandas_gbq.exceptions
Expand Down Expand Up @@ -78,6 +80,26 @@ def _wait_for_query_job(
connector.process_http_error(ex)


def try_query(connector, query_fn):
try:
logger.debug("Requesting query... ")
return query_fn()
except concurrent.futures.TimeoutError as ex:
raise pandas_gbq.exceptions.QueryTimeout("Reason: {0}".format(ex))
except (google.auth.exceptions.RefreshError, ValueError) as ex:
if connector.private_key:
raise pandas_gbq.exceptions.AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise pandas_gbq.exceptions.AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except connector.http_error as ex:
connector.process_http_error(ex)


def query_and_wait(
connector,
client: bigquery.Client,
Expand Down Expand Up @@ -122,29 +144,17 @@ def query_and_wait(
Result iterator from which we can download the results in the
desired format (pandas.DataFrame).
"""
from google.auth.exceptions import RefreshError

try:
logger.debug("Requesting query... ")
query_reply = client.query(
query_reply = try_query(
connector,
functools.partial(
client.query,
query,
job_config=job_config,
location=location,
project=project_id,
)
logger.debug("Query running...")
except (RefreshError, ValueError) as ex:
if connector.private_key:
raise pandas_gbq.exceptions.AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise pandas_gbq.exceptions.AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except connector.http_error as ex:
connector.process_http_error(ex)
),
)
logger.debug("Query running...")

job_id = query_reply.job_id
logger.debug("Job ID: %s" % job_id)
Expand Down Expand Up @@ -173,3 +183,30 @@ def query_and_wait(
return query_reply.result(max_results=max_results)
except connector.http_error as ex:
connector.process_http_error(ex)


def query_and_wait_via_client_library(
connector,
client: bigquery.Client,
query: str,
*,
job_config: bigquery.QueryJobConfig,
location: Optional[str],
project_id: Optional[str],
max_results: Optional[int],
timeout_ms: Optional[int],
):
rows_iter = try_query(
connector,
functools.partial(
client.query_and_wait,
query,
job_config=job_config,
location=location,
project=project_id,
max_results=max_results,
wait_timeout=timeout_ms / 1000.0 if timeout_ms else None,
),
)
logger.debug("Query done.\n")
return rows_iter
42 changes: 41 additions & 1 deletion tests/unit/test_context.py
Expand Up @@ -8,6 +8,7 @@

import google.cloud.bigquery
import google.cloud.bigquery.table
import packaging.version
import pytest


Expand Down Expand Up @@ -55,8 +56,15 @@ def test_read_gbq_should_save_credentials(mock_get_credentials):
mock_get_credentials.assert_not_called()


def test_read_gbq_should_use_dialect(mock_bigquery_client):
def test_read_gbq_should_use_dialect_with_query(monkeypatch, mock_bigquery_client):
import pandas_gbq
import pandas_gbq.features

monkeypatch.setattr(
pandas_gbq.features.FEATURES,
"_bigquery_installed_version",
packaging.version.parse(pandas_gbq.features.BIGQUERY_MINIMUM_VERSION),
)

assert pandas_gbq.context.dialect is None
pandas_gbq.context.dialect = "legacy"
Expand All @@ -71,3 +79,35 @@ def test_read_gbq_should_use_dialect(mock_bigquery_client):
_, kwargs = mock_bigquery_client.query.call_args
assert not kwargs["job_config"].use_legacy_sql
pandas_gbq.context.dialect = None # Reset the global state.


def test_read_gbq_should_use_dialect_with_query_and_wait(
monkeypatch, mock_bigquery_client
):
if not hasattr(mock_bigquery_client, "query_and_wait"):
pytest.skip(
f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait"
)

import pandas_gbq
import pandas_gbq.features

monkeypatch.setattr(
pandas_gbq.features.FEATURES,
"_bigquery_installed_version",
packaging.version.parse(pandas_gbq.features.BIGQUERY_QUERY_AND_WAIT_VERSION),
)

assert pandas_gbq.context.dialect is None
pandas_gbq.context.dialect = "legacy"
pandas_gbq.read_gbq("SELECT 1")

_, kwargs = mock_bigquery_client.query_and_wait.call_args
assert kwargs["job_config"].use_legacy_sql

pandas_gbq.context.dialect = "standard"
pandas_gbq.read_gbq("SELECT 1")

_, kwargs = mock_bigquery_client.query_and_wait.call_args
assert not kwargs["job_config"].use_legacy_sql
pandas_gbq.context.dialect = None # Reset the global state.
17 changes: 17 additions & 0 deletions tests/unit/test_features.py
Expand Up @@ -13,6 +13,23 @@ def fresh_bigquery_version(monkeypatch):
monkeypatch.setattr(FEATURES, "_pandas_installed_version", None)


@pytest.mark.parametrize(
["bigquery_version", "expected"],
[
("1.99.100", False),
("2.99.999", False),
("3.13.11", False),
("3.14.0", True),
("4.999.999", True),
],
)
def test_bigquery_has_query_and_wait(monkeypatch, bigquery_version, expected):
import google.cloud.bigquery

monkeypatch.setattr(google.cloud.bigquery, "__version__", bigquery_version)
assert FEATURES.bigquery_has_query_and_wait == expected


@pytest.mark.parametrize(
["pandas_version", "expected"],
[
Expand Down

0 comments on commit ac3ce3f

Please sign in to comment.