From 1246da86b78b03ca1aa2c45ec71649e294cfb2f1 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 1 Jul 2021 19:17:01 +0200 Subject: [PATCH] feat: make it easier to disable best-effort deduplication with streaming inserts (#734) * feat: make it easier to disable row insert IDs * Also accept any iterables for row_ids --- google/cloud/bigquery/__init__.py | 2 + google/cloud/bigquery/client.py | 47 +++++++-- google/cloud/bigquery/enums.py | 7 ++ tests/unit/test_client.py | 153 ++++++++++++++++++++++++++++-- 4 files changed, 195 insertions(+), 14 deletions(-) diff --git a/google/cloud/bigquery/__init__.py b/google/cloud/bigquery/__init__.py index 94f87304a..dfe3a6320 100644 --- a/google/cloud/bigquery/__init__.py +++ b/google/cloud/bigquery/__init__.py @@ -37,6 +37,7 @@ from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery import enums +from google.cloud.bigquery.enums import AutoRowIDs from google.cloud.bigquery.enums import KeyResultStatementKind from google.cloud.bigquery.enums import SqlTypeNames from google.cloud.bigquery.enums import StandardSqlDataTypes @@ -144,6 +145,7 @@ "DEFAULT_RETRY", # Enum Constants "enums", + "AutoRowIDs", "Compression", "CreateDisposition", "DestinationFormat", diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 2b7a5273e..2a02c7629 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -68,6 +68,7 @@ from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetListItem from google.cloud.bigquery.dataset import DatasetReference +from google.cloud.bigquery.enums import AutoRowIDs from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError from google.cloud.bigquery.opentelemetry_tracing import create_span from google.cloud.bigquery import job @@ -3349,7 +3350,7 @@ def insert_rows_json( self, table: Union[Table, TableReference, str], json_rows: Sequence[Dict], - row_ids: Sequence[str] = None, + row_ids: Union[Iterable[str], AutoRowIDs, None] = AutoRowIDs.GENERATE_UUID, skip_invalid_rows: bool = None, ignore_unknown_values: bool = None, template_suffix: str = None, @@ -3371,11 +3372,20 @@ def insert_rows_json( json_rows (Sequence[Dict]): Row data to be inserted. Keys must match the table schema fields and values must be JSON-compatible representations. - row_ids (Optional[Sequence[Optional[str]]]): + row_ids (Union[Iterable[str], AutoRowIDs, None]): Unique IDs, one per row being inserted. An ID can also be ``None``, indicating that an explicit insert ID should **not** be used for that row. If the argument is omitted altogether, unique IDs are created automatically. + + .. versionchanged:: 2.21.0 + Can also be an iterable, not just a sequence, or an + :class:`AutoRowIDs` enum member. + + .. deprecated:: 2.21.0 + Passing ``None`` to explicitly request autogenerating insert IDs is + deprecated, use :attr:`AutoRowIDs.GENERATE_UUID` instead. + skip_invalid_rows (Optional[bool]): Insert all valid rows of a request, even if invalid rows exist. The default value is ``False``, which causes the entire request @@ -3415,12 +3425,37 @@ def insert_rows_json( rows_info = [] data = {"rows": rows_info} - for index, row in enumerate(json_rows): + if row_ids is None: + warnings.warn( + "Passing None for row_ids is deprecated. To explicitly request " + "autogenerated insert IDs, use AutoRowIDs.GENERATE_UUID instead", + category=DeprecationWarning, + ) + row_ids = AutoRowIDs.GENERATE_UUID + + if not isinstance(row_ids, AutoRowIDs): + try: + row_ids_iter = iter(row_ids) + except TypeError: + msg = "row_ids is neither an iterable nor an AutoRowIDs enum member" + raise TypeError(msg) + + for i, row in enumerate(json_rows): info = {"json": row} - if row_ids is not None: - info["insertId"] = row_ids[index] - else: + + if row_ids is AutoRowIDs.GENERATE_UUID: info["insertId"] = str(uuid.uuid4()) + elif row_ids is AutoRowIDs.DISABLED: + info["insertId"] = None + else: + try: + insert_id = next(row_ids_iter) + except StopIteration: + msg = f"row_ids did not generate enough IDs, error at index {i}" + raise ValueError(msg) + else: + info["insertId"] = insert_id + rows_info.append(info) if skip_invalid_rows is not None: diff --git a/google/cloud/bigquery/enums.py b/google/cloud/bigquery/enums.py index edf991b6f..dbbd02635 100644 --- a/google/cloud/bigquery/enums.py +++ b/google/cloud/bigquery/enums.py @@ -21,6 +21,13 @@ from google.cloud.bigquery.query import ScalarQueryParameterType +class AutoRowIDs(enum.Enum): + """How to handle automatic insert IDs when inserting rows as a stream.""" + + DISABLED = enum.auto() + GENERATE_UUID = enum.auto() + + class Compression(object): """The compression type to use for exported files. The default value is :attr:`NONE`. diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index f6811e207..dffe7bdba 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -5434,7 +5434,7 @@ def test_insert_rows_from_dataframe_w_explicit_none_insert_ids(self): method="POST", path=API_PATH, data=EXPECTED_SENT_DATA, timeout=None ) - def test_insert_rows_json(self): + def test_insert_rows_json_default_behavior(self): from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table @@ -5481,8 +5481,10 @@ def test_insert_rows_json(self): method="POST", path="/%s" % PATH, data=SENT, timeout=7.5, ) - def test_insert_rows_json_with_string_id(self): - rows = [{"col1": "val1"}] + def test_insert_rows_json_w_explicitly_requested_autogenerated_insert_ids(self): + from google.cloud.bigquery import AutoRowIDs + + rows = [{"col1": "val1"}, {"col2": "val2"}] creds = _make_credentials() http = object() client = self._make_one( @@ -5490,20 +5492,116 @@ def test_insert_rows_json_with_string_id(self): ) conn = client._connection = make_connection({}) - with mock.patch("uuid.uuid4", side_effect=map(str, range(len(rows)))): - errors = client.insert_rows_json("proj.dset.tbl", rows) + uuid_patcher = mock.patch("uuid.uuid4", side_effect=map(str, range(len(rows)))) + with uuid_patcher: + errors = client.insert_rows_json( + "proj.dset.tbl", rows, row_ids=AutoRowIDs.GENERATE_UUID + ) self.assertEqual(len(errors), 0) - expected = { - "rows": [{"json": row, "insertId": str(i)} for i, row in enumerate(rows)] + + # Check row data sent to the backend. + expected_row_data = { + "rows": [ + {"json": {"col1": "val1"}, "insertId": "0"}, + {"json": {"col2": "val2"}, "insertId": "1"}, + ] } conn.api_request.assert_called_once_with( method="POST", path="/projects/proj/datasets/dset/tables/tbl/insertAll", - data=expected, + data=expected_row_data, + timeout=None, + ) + + def test_insert_rows_json_w_explicitly_disabled_insert_ids(self): + from google.cloud.bigquery import AutoRowIDs + + rows = [{"col1": "val1"}, {"col2": "val2"}] + creds = _make_credentials() + http = object() + client = self._make_one( + project="default-project", credentials=creds, _http=http + ) + conn = client._connection = make_connection({}) + + errors = client.insert_rows_json( + "proj.dset.tbl", rows, row_ids=AutoRowIDs.DISABLED, + ) + + self.assertEqual(len(errors), 0) + + expected_row_data = { + "rows": [ + {"json": {"col1": "val1"}, "insertId": None}, + {"json": {"col2": "val2"}, "insertId": None}, + ] + } + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/proj/datasets/dset/tables/tbl/insertAll", + data=expected_row_data, + timeout=None, + ) + + def test_insert_rows_json_with_iterator_row_ids(self): + rows = [{"col1": "val1"}, {"col2": "val2"}, {"col3": "val3"}] + creds = _make_credentials() + http = object() + client = self._make_one( + project="default-project", credentials=creds, _http=http + ) + conn = client._connection = make_connection({}) + + row_ids_iter = map(str, itertools.count(42)) + errors = client.insert_rows_json("proj.dset.tbl", rows, row_ids=row_ids_iter) + + self.assertEqual(len(errors), 0) + expected_row_data = { + "rows": [ + {"json": {"col1": "val1"}, "insertId": "42"}, + {"json": {"col2": "val2"}, "insertId": "43"}, + {"json": {"col3": "val3"}, "insertId": "44"}, + ] + } + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/proj/datasets/dset/tables/tbl/insertAll", + data=expected_row_data, timeout=None, ) + def test_insert_rows_json_with_non_iterable_row_ids(self): + rows = [{"col1": "val1"}] + creds = _make_credentials() + http = object() + client = self._make_one( + project="default-project", credentials=creds, _http=http + ) + client._connection = make_connection({}) + + with self.assertRaises(TypeError) as exc: + client.insert_rows_json("proj.dset.tbl", rows, row_ids=object()) + + err_msg = str(exc.exception) + self.assertIn("row_ids", err_msg) + self.assertIn("iterable", err_msg) + + def test_insert_rows_json_with_too_few_row_ids(self): + rows = [{"col1": "val1"}, {"col2": "val2"}, {"col3": "val3"}] + creds = _make_credentials() + http = object() + client = self._make_one( + project="default-project", credentials=creds, _http=http + ) + client._connection = make_connection({}) + + insert_ids = ["10", "20"] + + error_msg_pattern = "row_ids did not generate enough IDs.*index 2" + with self.assertRaisesRegex(ValueError, error_msg_pattern): + client.insert_rows_json("proj.dset.tbl", rows, row_ids=insert_ids) + def test_insert_rows_json_w_explicit_none_insert_ids(self): rows = [{"col1": "val1"}, {"col2": "val2"}] creds = _make_credentials() @@ -5526,6 +5624,45 @@ def test_insert_rows_json_w_explicit_none_insert_ids(self): timeout=None, ) + def test_insert_rows_json_w_none_insert_ids_sequence(self): + rows = [{"col1": "val1"}, {"col2": "val2"}] + creds = _make_credentials() + http = object() + client = self._make_one( + project="default-project", credentials=creds, _http=http + ) + conn = client._connection = make_connection({}) + + uuid_patcher = mock.patch("uuid.uuid4", side_effect=map(str, range(len(rows)))) + with warnings.catch_warnings(record=True) as warned, uuid_patcher: + errors = client.insert_rows_json("proj.dset.tbl", rows, row_ids=None) + + self.assertEqual(len(errors), 0) + + # Passing row_ids=None should have resulted in a deprecation warning. + matches = [ + warning + for warning in warned + if issubclass(warning.category, DeprecationWarning) + and "row_ids" in str(warning) + and "AutoRowIDs.GENERATE_UUID" in str(warning) + ] + assert matches, "The expected deprecation warning was not raised." + + # Check row data sent to the backend. + expected_row_data = { + "rows": [ + {"json": {"col1": "val1"}, "insertId": "0"}, + {"json": {"col2": "val2"}, "insertId": "1"}, + ] + } + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/proj/datasets/dset/tables/tbl/insertAll", + data=expected_row_data, + timeout=None, + ) + def test_insert_rows_w_wrong_arg(self): from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.schema import SchemaField