Skip to content
Permalink
Browse files
feat: detect obsolete BQ Storage extra at runtime (#666)
* feat: detect obsolete BQ Storage extra at runtime

* Cover the changes with unit tests

* Skip BQ Storage version tests if extra missing

* Rename and improve _create_bqstorage_client()

The method is renamed to _ensure_bqstorage_client() and now performs
a check if BQ Storage dependency is recent enough.

* Remove BQ Storage check from dbapi.Cursor

The check is now performed in dbapi.Connection, which is sufficient.

* Remove BQ Storage check in _pandas_helpers

The methods in higher layers already do the same check before
a BQ Storage client instance is passed to
_pandas_helpers._download_table_bqstorage() helper.

* Simplify BQ Storage client factory in magics

Lean more heavily on client._ensure_bqstorage_client() to de-duplicate
logic.

* Cover missing code lines with tests
  • Loading branch information
plamut committed May 20, 2021
1 parent 82f6c32 commit bd7dbdae5c972b16bafc53c67911eeaa3255a880
@@ -39,6 +39,7 @@
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import SqlTypeNames
from google.cloud.bigquery.enums import StandardSqlDataTypes
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.external_config import BigtableOptions
from google.cloud.bigquery.external_config import BigtableColumnFamily
@@ -152,6 +153,8 @@
"WriteDisposition",
# EncryptionConfiguration
"EncryptionConfiguration",
# Custom exceptions
"LegacyBigQueryStorageError",
]


@@ -25,6 +25,10 @@
from google.cloud._helpers import _RFC3339_MICROS
from google.cloud._helpers import _RFC3339_NO_FRACTION
from google.cloud._helpers import _to_bytes
import pkg_resources

from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError


_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
_TIMEONLY_WO_MICROS = "%H:%M:%S"
@@ -36,6 +40,32 @@
re.VERBOSE,
)

_MIN_BQ_STORAGE_VERSION = pkg_resources.parse_version("2.0.0")


def _verify_bq_storage_version():
"""Verify that a recent enough version of BigQuery Storage extra is installed.
The function assumes that google-cloud-bigquery-storage extra is installed, and
should thus be used in places where this assumption holds.
Because `pip` can install an outdated version of this extra despite the constraints
in setup.py, the the calling code can use this helper to verify the version
compatibility at runtime.
"""
from google.cloud import bigquery_storage

installed_version = pkg_resources.parse_version(
getattr(bigquery_storage, "__version__", "legacy")
)

if installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= 2.0.0 (version found: {installed_version})."
)
raise LegacyBigQueryStorageError(msg)


def _not_null(value, field):
"""Check whether 'value' should be coerced to 'field' type."""
@@ -50,16 +50,25 @@
from google.cloud import exceptions # pytype: disable=import-error
from google.cloud.client import ClientWithProject # pytype: disable=import-error

try:
from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
)
except ImportError:
DEFAULT_BQSTORAGE_CLIENT_INFO = None

from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_bq_storage_version
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
from google.cloud.bigquery.job import (
@@ -445,15 +454,38 @@ def dataset(self, dataset_id: str, project: str = None) -> DatasetReference:
)
return DatasetReference(project, dataset_id)

def _create_bqstorage_client(self):
def _ensure_bqstorage_client(
self,
bqstorage_client: Optional[
"google.cloud.bigquery_storage.BigQueryReadClient"
] = None,
client_options: Optional[google.api_core.client_options.ClientOptions] = None,
client_info: Optional[
"google.api_core.gapic_v1.client_info.ClientInfo"
] = DEFAULT_BQSTORAGE_CLIENT_INFO,
) -> Optional["google.cloud.bigquery_storage.BigQueryReadClient"]:
"""Create a BigQuery Storage API client using this client's credentials.
If a client cannot be created due to missing dependencies, raise a
warning and return ``None``.
If a client cannot be created due to a missing or outdated dependency
`google-cloud-bigquery-storage`, raise a warning and return ``None``.
If the `bqstorage_client` argument is not ``None``, still perform the version
check and return the argument back to the caller if the check passes. If it
fails, raise a warning and return ``None``.
Args:
bqstorage_client:
An existing BigQuery Storage client instance to check for version
compatibility. If ``None``, a new instance is created and returned.
client_options:
Custom options used with a new BigQuery Storage client instance if one
is created.
client_info:
The client info used with a new BigQuery Storage client instance if one
is created.
Returns:
Optional[google.cloud.bigquery_storage.BigQueryReadClient]:
A BigQuery Storage API client.
A BigQuery Storage API client.
"""
try:
from google.cloud import bigquery_storage
@@ -464,7 +496,20 @@ def _create_bqstorage_client(self):
)
return None

return bigquery_storage.BigQueryReadClient(credentials=self._credentials)
try:
_verify_bq_storage_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return None

if bqstorage_client is None:
bqstorage_client = bigquery_storage.BigQueryReadClient(
credentials=self._credentials,
client_options=client_options,
client_info=client_info,
)

return bqstorage_client

def _dataset_from_arg(self, dataset):
if isinstance(dataset, str):
@@ -47,12 +47,14 @@ def __init__(self, client=None, bqstorage_client=None):
else:
self._owns_client = False

# A warning is already raised by the BQ Storage client factory factory if
# instantiation fails, or if the given BQ Storage client instance is outdated.
if bqstorage_client is None:
# A warning is already raised by the factory if instantiation fails.
bqstorage_client = client._create_bqstorage_client()
bqstorage_client = client._ensure_bqstorage_client()
self._owns_bqstorage_client = bqstorage_client is not None
else:
self._owns_bqstorage_client = False
bqstorage_client = client._ensure_bqstorage_client(bqstorage_client)

self._client = client
self._bqstorage_client = bqstorage_client
@@ -0,0 +1,21 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class BigQueryError(Exception):
"""Base class for all custom exceptions defined by the BigQuery client."""


class LegacyBigQueryStorageError(BigQueryError):
"""Raised when too old a version of BigQuery Storage extra is detected at runtime."""
@@ -644,7 +644,7 @@ def _cell_magic(line, query):
bqstorage_client_options.api_endpoint = args.bqstorage_api_endpoint

bqstorage_client = _make_bqstorage_client(
use_bqstorage_api, context.credentials, bqstorage_client_options,
client, use_bqstorage_api, bqstorage_client_options,
)

close_transports = functools.partial(_close_transports, client, bqstorage_client)
@@ -762,12 +762,12 @@ def _split_args_line(line):
return params_option_value, rest_of_args


def _make_bqstorage_client(use_bqstorage_api, credentials, client_options):
def _make_bqstorage_client(client, use_bqstorage_api, client_options):
if not use_bqstorage_api:
return None

try:
from google.cloud import bigquery_storage
from google.cloud import bigquery_storage # noqa: F401
except ImportError as err:
customized_error = ImportError(
"The default BigQuery Storage API client cannot be used, install "
@@ -785,10 +785,9 @@ def _make_bqstorage_client(use_bqstorage_api, credentials, client_options):
)
raise customized_error from err

return bigquery_storage.BigQueryReadClient(
credentials=credentials,
client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT),
return client._ensure_bqstorage_client(
client_options=client_options,
client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT),
)


@@ -41,6 +41,7 @@
import google.cloud._helpers
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
@@ -1519,6 +1520,17 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
)
return False

try:
from google.cloud import bigquery_storage # noqa: F401
except ImportError:
return False

try:
_helpers._verify_bq_storage_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return False

return True

def _get_next_page_response(self):
@@ -1655,7 +1667,7 @@ def to_arrow(

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
bqstorage_client = self.client._create_bqstorage_client()
bqstorage_client = self.client._ensure_bqstorage_client()
owns_bqstorage_client = bqstorage_client is not None

try:
@@ -19,6 +19,44 @@

import mock

try:
from google.cloud import bigquery_storage
except ImportError: # pragma: NO COVER
bigquery_storage = None


@unittest.skipIf(bigquery_storage is None, "Requires `google-cloud-bigquery-storage`")
class Test_verify_bq_storage_version(unittest.TestCase):
def _call_fut(self):
from google.cloud.bigquery._helpers import _verify_bq_storage_version

return _verify_bq_storage_version()

def test_raises_no_error_w_recent_bqstorage(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

with mock.patch("google.cloud.bigquery_storage.__version__", new="2.0.0"):
try:
self._call_fut()
except LegacyBigQueryStorageError: # pragma: NO COVER
self.fail("Legacy error raised with a non-legacy dependency version.")

def test_raises_error_w_legacy_bqstorage(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

with mock.patch("google.cloud.bigquery_storage.__version__", new="1.9.9"):
with self.assertRaises(LegacyBigQueryStorageError):
self._call_fut()

def test_raises_error_w_unknown_bqstorage_version(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

with mock.patch("google.cloud.bigquery_storage", autospec=True) as fake_module:
del fake_module.__version__
error_pattern = r"version found: legacy"
with self.assertRaisesRegex(LegacyBigQueryStorageError, error_pattern):
self._call_fut()


class Test_not_null(unittest.TestCase):
def _call_fut(self, value, field):
Loading

0 comments on commit bd7dbda

Please sign in to comment.