From 3f73a430953478fc9c6615684965619c65ae5719 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 27 Apr 2020 18:39:30 +0200 Subject: [PATCH] Use BQ Storage v1 stable in DB API --- google/cloud/bigquery/dbapi/connection.py | 4 ++-- google/cloud/bigquery/dbapi/cursor.py | 25 +++++++++++++---------- tests/unit/test_dbapi_connection.py | 12 +++++------ tests/unit/test_dbapi_cursor.py | 20 +++++++++--------- 4 files changed, 32 insertions(+), 29 deletions(-) diff --git a/google/cloud/bigquery/dbapi/connection.py b/google/cloud/bigquery/dbapi/connection.py index f8670725e..23e966486 100644 --- a/google/cloud/bigquery/dbapi/connection.py +++ b/google/cloud/bigquery/dbapi/connection.py @@ -30,7 +30,7 @@ class Connection(object): A REST API client used to connect to BigQuery. If not passed, a client is created using default options inferred from the environment. bqstorage_client(\ - Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \ + Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \ ): A client that uses the faster BigQuery Storage API to fetch rows from BigQuery. If not passed, it is created using the same credentials @@ -106,7 +106,7 @@ def connect(client=None, bqstorage_client=None): A REST API client used to connect to BigQuery. If not passed, a client is created using default options inferred from the environment. bqstorage_client(\ - Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \ + Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \ ): A client that uses the faster BigQuery Storage API to fetch rows from BigQuery. If not passed, it is created using the same credentials diff --git a/google/cloud/bigquery/dbapi/cursor.py b/google/cloud/bigquery/dbapi/cursor.py index bd0bdab86..f1642577d 100644 --- a/google/cloud/bigquery/dbapi/cursor.py +++ b/google/cloud/bigquery/dbapi/cursor.py @@ -259,7 +259,7 @@ def _bqstorage_fetch(self, bqstorage_client): Args: bqstorage_client(\ - google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ + google.cloud.bigquery_storage_v1.BigQueryReadClient \ ): A client tha know how to talk to the BigQuery Storage API. @@ -267,26 +267,29 @@ def _bqstorage_fetch(self, bqstorage_client): Iterable[Mapping]: A sequence of rows, represented as dictionaries. """ - # NOTE: Given that BQ storage client instance is passed in, it means - # that bigquery_storage_v1beta1 library is available (no ImportError). - from google.cloud import bigquery_storage_v1beta1 + # Hitting this code path with a BQ Storage client instance implies that + # bigquery_storage_v1 can indeed be imported here without errors. + from google.cloud import bigquery_storage_v1 table_reference = self._query_job.destination + requested_session = bigquery_storage_v1.types.ReadSession( + table=table_reference.to_bqstorage(), + data_format=bigquery_storage_v1.enums.DataFormat.AVRO, + ) + read_session = bqstorage_client.create_read_session( - table_reference.to_bqstorage(), - "projects/{}".format(table_reference.project), + parent="projects/{}".format(table_reference.project), + read_session=requested_session, # a single stream only, as DB API is not well-suited for multithreading - requested_streams=1, + max_stream_count=1, ) if not read_session.streams: return iter([]) # empty table, nothing to read - read_position = bigquery_storage_v1beta1.types.StreamPosition( - stream=read_session.streams[0], - ) - read_rows_stream = bqstorage_client.read_rows(read_position) + stream_name = read_session.streams[0].name + read_rows_stream = bqstorage_client.read_rows(stream_name) rows_iterable = read_rows_stream.rows(read_session) return rows_iterable diff --git a/tests/unit/test_dbapi_connection.py b/tests/unit/test_dbapi_connection.py index 15a28e4c7..96ec41c51 100644 --- a/tests/unit/test_dbapi_connection.py +++ b/tests/unit/test_dbapi_connection.py @@ -19,9 +19,9 @@ import six try: - from google.cloud import bigquery_storage_v1beta1 + from google.cloud import bigquery_storage_v1 except ImportError: # pragma: NO COVER - bigquery_storage_v1beta1 = None + bigquery_storage_v1 = None class TestConnection(unittest.TestCase): @@ -41,9 +41,9 @@ def _mock_client(self): return mock_client def _mock_bqstorage_client(self): - from google.cloud.bigquery_storage_v1beta1 import client + from google.cloud.bigquery_storage_v1 import client - mock_client = mock.create_autospec(client.BigQueryStorageClient) + mock_client = mock.create_autospec(client.BigQueryReadClient) mock_client.transport = mock.Mock(spec=["channel"]) mock_client.transport.channel = mock.Mock(spec=["close"]) return mock_client @@ -61,7 +61,7 @@ def test_ctor_wo_bqstorage_client(self): self.assertIs(connection._bqstorage_client, mock_bqstorage_client) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_ctor_w_bqstorage_client(self): from google.cloud.bigquery.dbapi import Connection @@ -99,7 +99,7 @@ def test_connect_w_client(self): self.assertIs(connection._bqstorage_client, mock_bqstorage_client) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_connect_w_both_clients(self): from google.cloud.bigquery.dbapi import connect diff --git a/tests/unit/test_dbapi_cursor.py b/tests/unit/test_dbapi_cursor.py index dafecf9b4..44af12b12 100644 --- a/tests/unit/test_dbapi_cursor.py +++ b/tests/unit/test_dbapi_cursor.py @@ -21,9 +21,9 @@ from google.api_core import exceptions try: - from google.cloud import bigquery_storage_v1beta1 + from google.cloud import bigquery_storage_v1 except ImportError: # pragma: NO COVER - bigquery_storage_v1beta1 = None + bigquery_storage_v1 = None class TestCursor(unittest.TestCase): @@ -58,17 +58,17 @@ def _mock_client(self, rows=None, schema=None, num_dml_affected_rows=None): return mock_client def _mock_bqstorage_client(self, rows=None, stream_count=0): - from google.cloud.bigquery_storage_v1beta1 import client - from google.cloud.bigquery_storage_v1beta1 import types + from google.cloud.bigquery_storage_v1 import client + from google.cloud.bigquery_storage_v1 import types if rows is None: rows = [] - mock_client = mock.create_autospec(client.BigQueryStorageClient) + mock_client = mock.create_autospec(client.BigQueryReadClient) mock_read_session = mock.MagicMock( streams=[ - types.Stream(name="streams/stream_{}".format(i)) + types.ReadStream(name="streams/stream_{}".format(i)) for i in range(stream_count) ] ) @@ -242,7 +242,7 @@ def test_fetchall_w_row(self): self.assertEqual(rows[0], (1,)) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_fetchall_w_bqstorage_client_fetch_success(self): from google.cloud.bigquery import dbapi @@ -285,7 +285,7 @@ def test_fetchall_w_bqstorage_client_fetch_success(self): self.assertEqual(sorted_row_data, expected_row_data) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_fetchall_w_bqstorage_client_fetch_no_rows(self): from google.cloud.bigquery import dbapi @@ -308,7 +308,7 @@ def test_fetchall_w_bqstorage_client_fetch_no_rows(self): self.assertEqual(rows, []) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self): from google.cloud.bigquery import dbapi @@ -336,7 +336,7 @@ def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self): mock_client.list_rows.assert_not_called() @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_fetchall_w_bqstorage_client_fetch_error_fallback_on_client(self): from google.cloud.bigquery import dbapi