Skip to content
Permalink
Browse files
feat: add manual layer for v1 endpoint (#16)
* feat: add manual layer for v1 endpoint

* migrate the manual client/reader work for v1
* TableReferences went away as a first class message, now just a
  formatted string
* changes to read rows
  * estimated_row_count removed due to differences in reported status
  * you no longer have to deal with a StreamPosition message, instead
    there's just a stream name and an offset as top-level request fields
* session creation changes
  * you now provide a prototypical ReadSession message when requesting
    a new read session, and most options (like selected fields, table
    modifiers, and data format) have moved inside it.
  * requested_streams -> max_stream_count

There's additional changes to the surface, but there wasn't much manual
help in front of it so not reflected here.
  • Loading branch information
shollyman committed Mar 3, 2020
1 parent e522bf8 commit a0fc0af5b4447ce8b50c365d4d081b9443b8490e
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import pkg_resources

__version__ = pkg_resources.get_distribution(
"google-cloud-bigquery-storage"
).version # noqa

from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import client
from google.cloud.bigquery_storage_v1.gapic import enums


class BigQueryReadClient(client.BigQueryReadClient):
__doc__ = client.BigQueryReadClient.__doc__
enums = enums


__all__ = (
# google.cloud.bigquery_storage_v1
"__version__",
"types",
# google.cloud.bigquery_storage_v1.client
"BigQueryReadClient",
# google.cloud.bigquery_storage_v1.gapic
"enums",
)
@@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Parent client for calling the Cloud BigQuery Storage API.
This is the base from which all interactions with the API occur.
"""

from __future__ import absolute_import

import google.api_core.gapic_v1.method

from google.cloud.bigquery_storage_v1 import reader
from google.cloud.bigquery_storage_v1.gapic import big_query_read_client # noqa


_SCOPES = (
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/cloud-platform",
)


class BigQueryReadClient(big_query_read_client.BigQueryReadClient):
"""Client for interacting with BigQuery Storage API.
The BigQuery storage API can be used to read data stored in BigQuery.
"""

def read_rows(
self,
name,
offset=0,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
metadata=None,
):
"""
Reads rows from the table in the format prescribed by the read
session. Each response contains one or more table rows, up to a
maximum of 10 MiB per response; read requests which attempt to read
individual rows larger than this will fail.
Each request also returns a set of stream statistics reflecting the
estimated total number of rows in the read stream. This number is
computed based on the total table size and the number of active
streams in the read session, and may change as other streams continue
to read data.
Example:
>>> from google.cloud import bigquery_storage_v1
>>>
>>> client = bigquery_storage_v1.BigQueryReadClient()
>>>
>>> # TODO: Initialize ``table``:
>>> table = "projects/{}/datasets/{}/tables/{}".format(
... 'project_id': 'your-data-project-id',
... 'dataset_id': 'your_dataset_id',
... 'table_id': 'your_table_id',
... )
>>>
>>> # TODO: Initialize `parent`:
>>> parent = 'projects/your-billing-project-id'
>>>
>>> session = client.create_read_session(table, parent)
>>> stream=session.streams[0], # TODO: Read the other streams.
...
>>>
>>> for element in client.read_rows(stream):
... # process element
... pass
Args:
name (str):
Required. Name of the stream to start
reading from, of the form
`projects/{project_id}/locations/{location}/sessions/{session_id}/streams/{stream_id}`
offset (Optional[int]):
The starting offset from which to begin reading rows from
in the stream. The offset requested must be less than the last
row read from ReadRows. Requesting a larger offset is
undefined.
retry (Optional[google.api_core.retry.Retry]): A retry object used
to retry requests. If ``None`` is specified, requests will not
be retried.
timeout (Optional[float]): The amount of time, in seconds, to wait
for the request to complete. Note that if ``retry`` is
specified, the timeout applies to each individual attempt.
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
that is provided to the method.
Returns:
~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
An iterable of
:class:`~google.cloud.bigquery_storage_v1.types.ReadRowsResponse`.
Raises:
google.api_core.exceptions.GoogleAPICallError: If the request
failed for any reason.
google.api_core.exceptions.RetryError: If the request failed due
to a retryable error and retry attempts failed.
ValueError: If the parameters are invalid.
"""
gapic_client = super(BigQueryReadClient, self)
stream = gapic_client.read_rows(
read_stream=name,
offset=offset,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return reader.ReadRowsStream(
stream,
gapic_client,
name,
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
)

0 comments on commit a0fc0af

Please sign in to comment.