Skip to content
Permalink
Browse files
feat: add manual wrapper for v1beta2 read client (#117)
* feat: add manual wrapper for v1beta2 read client

* add missing v1beta2

* sort versions so v1 is last

* tests: unify v1 and v1beta2 system tests

* tests: use proto object for DataFormat checks

* blacken

* docs: add handwritten class to docs
  • Loading branch information
tswast committed Jan 22, 2021
1 parent d9691f1 commit 798cd341fbe0734f99b9c2ac3c50ae09886d1c90
@@ -0,0 +1,6 @@
Bigquery Storage v1beta2 API Library
====================================

.. automodule:: google.cloud.bigquery_storage_v1beta2.client
:members:
:inherited-members:
@@ -21,6 +21,7 @@ API Reference
bigquery_storage_v1/library
bigquery_storage_v1/services
bigquery_storage_v1/types
bigquery_storage_v1beta2/library
bigquery_storage_v1beta2/services
bigquery_storage_v1beta2/types

@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import pkg_resources

__version__ = pkg_resources.get_distribution(
"google-cloud-bigquery-storage"
).version # noqa

from google.cloud.bigquery_storage_v1beta2 import client
from google.cloud.bigquery_storage_v1beta2 import types


class BigQueryReadClient(client.BigQueryReadClient):
__doc__ = client.BigQueryReadClient.__doc__


__all__ = (
# google.cloud.bigquery_storage_v1beta2
"__version__",
"types",
# google.cloud.bigquery_storage_v1beta2.client
"BigQueryReadClient",
)
@@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Parent client for calling the Cloud BigQuery Storage API.
This is the base from which all interactions with the API occur.
"""

from __future__ import absolute_import

import google.api_core.gapic_v1.method

from google.cloud.bigquery_storage_v1 import reader
from google.cloud.bigquery_storage_v1beta2.services import big_query_read


_SCOPES = (
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/cloud-platform",
)


class BigQueryReadClient(big_query_read.BigQueryReadClient):
"""Client for interacting with BigQuery Storage API.
The BigQuery storage API can be used to read data stored in BigQuery.
"""

def read_rows(
self,
name,
offset=0,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
metadata=(),
):
"""
Reads rows from the table in the format prescribed by the read
session. Each response contains one or more table rows, up to a
maximum of 10 MiB per response; read requests which attempt to read
individual rows larger than this will fail.
Each request also returns a set of stream statistics reflecting the
estimated total number of rows in the read stream. This number is
computed based on the total table size and the number of active
streams in the read session, and may change as other streams continue
to read data.
Example:
>>> from google.cloud import bigquery_storage
>>>
>>> client = bigquery_storage.BigQueryReadClient()
>>>
>>> # TODO: Initialize ``table``:
>>> table = "projects/{}/datasets/{}/tables/{}".format(
... 'project_id': 'your-data-project-id',
... 'dataset_id': 'your_dataset_id',
... 'table_id': 'your_table_id',
... )
>>>
>>> # TODO: Initialize `parent`:
>>> parent = 'projects/your-billing-project-id'
>>>
>>> requested_session = bigquery_storage.types.ReadSession(
... table=table,
... data_format=bigquery_storage.types.DataFormat.AVRO,
... )
>>> session = client.create_read_session(
... parent=parent, read_session=requested_session
... )
>>>
>>> stream = session.streams[0], # TODO: Also read any other streams.
>>> read_rows_stream = client.read_rows(stream.name)
>>>
>>> for element in read_rows_stream.rows(session):
... # process element
... pass
Args:
name (str):
Required. Name of the stream to start
reading from, of the form
`projects/{project_id}/locations/{location}/sessions/{session_id}/streams/{stream_id}`
offset (Optional[int]):
The starting offset from which to begin reading rows from
in the stream. The offset requested must be less than the last
row read from ReadRows. Requesting a larger offset is
undefined.
retry (Optional[google.api_core.retry.Retry]): A retry object used
to retry requests. If ``None`` is specified, requests will not
be retried.
timeout (Optional[float]): The amount of time, in seconds, to wait
for the request to complete. Note that if ``retry`` is
specified, the timeout applies to each individual attempt.
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
that is provided to the method.
Returns:
~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
An iterable of
:class:`~google.cloud.bigquery_storage_v1.types.ReadRowsResponse`.
Raises:
google.api_core.exceptions.GoogleAPICallError: If the request
failed for any reason.
google.api_core.exceptions.RetryError: If the request failed due
to a retryable error and retry attempts failed.
ValueError: If the parameters are invalid.
"""
gapic_client = super(BigQueryReadClient, self)
stream = gapic_client.read_rows(
read_stream=name,
offset=offset,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return reader.ReadRowsStream(
stream,
gapic_client,
name,
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
)
@@ -22,7 +22,7 @@

gapic = gcp.GAPICBazel()
common = gcp.CommonTemplates()
versions = ["v1"]
versions = ["v1beta2", "v1"]

for version in versions:
library = gapic.py_library(
@@ -20,11 +20,9 @@

import pytest

from google.cloud import bigquery_storage

_TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}"

_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../assets")
_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets")


@pytest.fixture(scope="session")
@@ -52,11 +50,6 @@ def credentials(use_mtls):
return service_account.Credentials.from_service_account_file(filename)


@pytest.fixture(scope="session")
def client(credentials):
return bigquery_storage.BigQueryReadClient(credentials=credentials)


@pytest.fixture()
def table_reference():
return _TABLE_FORMAT.format("bigquery-public-data", "usa_names", "usa_1910_2013")
@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""System tests for reading rows from tables."""

import pytest

from google.cloud import bigquery_storage
from google.cloud import bigquery_storage_v1beta2


@pytest.fixture(scope="session")
def client_v1(credentials):
return bigquery_storage.BigQueryReadClient(credentials=credentials)


@pytest.fixture(scope="session")
def client_v1beta2(credentials):
return bigquery_storage_v1beta2.BigQueryReadClient(credentials=credentials)


@pytest.fixture(scope="session", params=["v1", "v1beta2"])
def client_and_types(request, client_v1, client_v1beta2):
if request.param == "v1":
return client_v1, bigquery_storage.types
return client_v1beta2, bigquery_storage_v1beta2.types

0 comments on commit 798cd34

Please sign in to comment.