Skip to content
Permalink
Browse files
feat: add support for more detailed DML stats (#758)
* feat: add support for more detailed DML stats

* Move is None check of DmlStats one level higher
  • Loading branch information
plamut committed Jul 15, 2021
1 parent c45a738 commit 36fe86f
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 0 deletions.
@@ -58,6 +58,7 @@ Job-Related Types
job.Compression
job.CreateDisposition
job.DestinationFormat
job.DmlStats
job.Encoding
job.OperationType
job.QueryPlanEntry
@@ -56,6 +56,7 @@
from google.cloud.bigquery.job import CopyJobConfig
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import DestinationFormat
from google.cloud.bigquery.job import DmlStats
from google.cloud.bigquery.job import Encoding
from google.cloud.bigquery.job import ExtractJob
from google.cloud.bigquery.job import ExtractJobConfig
@@ -142,6 +143,7 @@
"BigtableOptions",
"BigtableColumnFamily",
"BigtableColumn",
"DmlStats",
"CSVOptions",
"GoogleSheetsOptions",
"ParquetOptions",
@@ -31,6 +31,7 @@
from google.cloud.bigquery.job.load import LoadJob
from google.cloud.bigquery.job.load import LoadJobConfig
from google.cloud.bigquery.job.query import _contains_order_by
from google.cloud.bigquery.job.query import DmlStats
from google.cloud.bigquery.job.query import QueryJob
from google.cloud.bigquery.job.query import QueryJobConfig
from google.cloud.bigquery.job.query import QueryPlanEntry
@@ -66,6 +67,7 @@
"LoadJob",
"LoadJobConfig",
"_contains_order_by",
"DmlStats",
"QueryJob",
"QueryJobConfig",
"QueryPlanEntry",
@@ -114,6 +114,35 @@ def _to_api_repr_table_defs(value):
return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}


class DmlStats(typing.NamedTuple):
"""Detailed statistics for DML statements.
https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats
"""

inserted_row_count: int = 0
"""Number of inserted rows. Populated by DML INSERT and MERGE statements."""

deleted_row_count: int = 0
"""Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements.
"""

updated_row_count: int = 0
"""Number of updated rows. Populated by DML UPDATE and MERGE statements."""

@classmethod
def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats":
# NOTE: The field order here must match the order of fields set at the
# class level.
api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount")

args = (
int(stats.get(api_field, default_val))
for api_field, default_val in zip(api_fields, cls.__new__.__defaults__)
)
return cls(*args)


class ScriptOptions:
"""Options controlling the execution of scripts.
@@ -1079,6 +1108,14 @@ def estimated_bytes_processed(self):
result = int(result)
return result

@property
def dml_stats(self) -> Optional[DmlStats]:
stats = self._job_statistics().get("dmlStats")
if stats is None:
return None
else:
return DmlStats.from_api_repr(stats)

def _blocking_poll(self, timeout=None, **kwargs):
self._done_timeout = timeout
self._transport_timeout = timeout
@@ -1521,6 +1521,62 @@ def test_query_statistics(self):
self.assertGreater(stages_with_inputs, 0)
self.assertGreater(len(plan), stages_with_inputs)

def test_dml_statistics(self):
table_schema = (
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("bar", "INTEGER"),
)

dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.test_dml_statistics".format(Config.CLIENT.project, dataset_id)

# Create the table before loading so that the column order is deterministic.
table = helpers.retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

# Insert a few rows and check the stats.
sql = f"""
INSERT INTO `{table_id}`
VALUES ("one", 1), ("two", 2), ("three", 3), ("four", 4);
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

assert query_job.dml_stats is not None
assert query_job.dml_stats.inserted_row_count == 4
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 0

# Update some of the rows.
sql = f"""
UPDATE `{table_id}`
SET bar = bar + 1
WHERE bar > 2;
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

assert query_job.dml_stats is not None
assert query_job.dml_stats.inserted_row_count == 0
assert query_job.dml_stats.updated_row_count == 2
assert query_job.dml_stats.deleted_row_count == 0

# Now delete a few rows and check the stats.
sql = f"""
DELETE FROM `{table_id}`
WHERE foo != "two";
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

assert query_job.dml_stats is not None
assert query_job.dml_stats.inserted_row_count == 0
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 3

def test_dbapi_w_standard_sql_types(self):
for sql, expected in helpers.STANDARD_SQL_EXAMPLES:
Config.CURSOR.execute(sql)
@@ -110,6 +110,24 @@ def _verify_table_definitions(self, job, config):
self.assertIsNotNone(expected_ec)
self.assertEqual(found_ec.to_api_repr(), expected_ec)

def _verify_dml_stats_resource_properties(self, job, resource):
query_stats = resource.get("statistics", {}).get("query", {})

if "dmlStats" in query_stats:
resource_dml_stats = query_stats["dmlStats"]
job_dml_stats = job.dml_stats
assert str(job_dml_stats.inserted_row_count) == resource_dml_stats.get(
"insertedRowCount", "0"
)
assert str(job_dml_stats.updated_row_count) == resource_dml_stats.get(
"updatedRowCount", "0"
)
assert str(job_dml_stats.deleted_row_count) == resource_dml_stats.get(
"deletedRowCount", "0"
)
else:
assert job.dml_stats is None

def _verify_configuration_properties(self, job, configuration):
if "dryRun" in configuration:
self.assertEqual(job.dry_run, configuration["dryRun"])
@@ -118,6 +136,7 @@ def _verify_configuration_properties(self, job, configuration):

def _verifyResourceProperties(self, job, resource):
self._verifyReadonlyResourceProperties(job, resource)
self._verify_dml_stats_resource_properties(job, resource)

configuration = resource.get("configuration", {})
self._verify_configuration_properties(job, configuration)
@@ -130,16 +149,19 @@ def _verifyResourceProperties(self, job, resource):
self._verify_table_definitions(job, query_config)

self.assertEqual(job.query, query_config["query"])

if "createDisposition" in query_config:
self.assertEqual(job.create_disposition, query_config["createDisposition"])
else:
self.assertIsNone(job.create_disposition)

if "defaultDataset" in query_config:
ds_ref = job.default_dataset
ds_ref = {"projectId": ds_ref.project, "datasetId": ds_ref.dataset_id}
self.assertEqual(ds_ref, query_config["defaultDataset"])
else:
self.assertIsNone(job.default_dataset)

if "destinationTable" in query_config:
table = job.destination
tb_ref = {
@@ -150,14 +172,17 @@ def _verifyResourceProperties(self, job, resource):
self.assertEqual(tb_ref, query_config["destinationTable"])
else:
self.assertIsNone(job.destination)

if "priority" in query_config:
self.assertEqual(job.priority, query_config["priority"])
else:
self.assertIsNone(job.priority)

if "writeDisposition" in query_config:
self.assertEqual(job.write_disposition, query_config["writeDisposition"])
else:
self.assertIsNone(job.write_disposition)

if "destinationEncryptionConfiguration" in query_config:
self.assertIsNotNone(job.destination_encryption_configuration)
self.assertEqual(
@@ -166,6 +191,7 @@ def _verifyResourceProperties(self, job, resource):
)
else:
self.assertIsNone(job.destination_encryption_configuration)

if "schemaUpdateOptions" in query_config:
self.assertEqual(
job.schema_update_options, query_config["schemaUpdateOptions"]
@@ -190,6 +216,7 @@ def test_ctor_defaults(self):
self.assertIsNone(job.create_disposition)
self.assertIsNone(job.default_dataset)
self.assertIsNone(job.destination)
self.assertIsNone(job.dml_stats)
self.assertIsNone(job.flatten_results)
self.assertIsNone(job.priority)
self.assertIsNone(job.use_query_cache)
@@ -278,6 +305,26 @@ def test_from_api_repr_with_encryption(self):
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_with_dml_stats(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {
"id": self.JOB_ID,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"configuration": {"query": {"query": self.QUERY}},
"statistics": {
"query": {
"dmlStats": {"insertedRowCount": "15", "updatedRowCount": "2"},
},
},
}
klass = self._get_target_class()

job = klass.from_api_repr(RESOURCE, client=client)

self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import SchemaUpdateOption
@@ -815,6 +862,23 @@ def test_estimated_bytes_processed(self):
query_stats["estimatedBytesProcessed"] = str(est_bytes)
self.assertEqual(job.estimated_bytes_processed, est_bytes)

def test_dml_stats(self):
from google.cloud.bigquery.job.query import DmlStats

client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, self.QUERY, client)
assert job.dml_stats is None

statistics = job._properties["statistics"] = {}
assert job.dml_stats is None

query_stats = statistics["query"] = {}
assert job.dml_stats is None

query_stats["dmlStats"] = {"insertedRowCount": "35"}
assert isinstance(job.dml_stats, DmlStats)
assert job.dml_stats.inserted_row_count == 35

def test_result(self):
from google.cloud.bigquery.table import RowIterator

@@ -15,6 +15,43 @@
from .helpers import _Base


class TestDmlStats:
@staticmethod
def _get_target_class():
from google.cloud.bigquery.job import DmlStats

return DmlStats

def _make_one(self, *args, **kw):
return self._get_target_class()(*args, **kw)

def test_ctor_defaults(self):
dml_stats = self._make_one()
assert dml_stats.inserted_row_count == 0
assert dml_stats.deleted_row_count == 0
assert dml_stats.updated_row_count == 0

def test_from_api_repr_partial_stats(self):
klass = self._get_target_class()
result = klass.from_api_repr({"deletedRowCount": "12"})

assert isinstance(result, klass)
assert result.inserted_row_count == 0
assert result.deleted_row_count == 12
assert result.updated_row_count == 0

def test_from_api_repr_full_stats(self):
klass = self._get_target_class()
result = klass.from_api_repr(
{"updatedRowCount": "4", "insertedRowCount": "7", "deletedRowCount": "25"}
)

assert isinstance(result, klass)
assert result.inserted_row_count == 7
assert result.deleted_row_count == 25
assert result.updated_row_count == 4


class TestQueryPlanEntryStep(_Base):
KIND = "KIND"
SUBSTEPS = ("SUB1", "SUB2")

0 comments on commit 36fe86f

Please sign in to comment.