Skip to content
Permalink
Browse files
feat: add support for transaction statistics (#849)
* feat: add support for transaction statistics

* Hoist transaction_info into base job class

* Add versionadded directive to new property and class

* Include new class in docs reference
  • Loading branch information
plamut committed Aug 10, 2021
1 parent 9c6614f commit 7f7b1a8
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 0 deletions.
@@ -68,6 +68,7 @@ Job-Related Types
job.SourceFormat
job.WriteDisposition
job.SchemaUpdateOption
job.TransactionInfo


Dataset
@@ -70,6 +70,7 @@
from google.cloud.bigquery.job import ScriptOptions
from google.cloud.bigquery.job import SourceFormat
from google.cloud.bigquery.job import UnknownJob
from google.cloud.bigquery.job import TransactionInfo
from google.cloud.bigquery.job import WriteDisposition
from google.cloud.bigquery.model import Model
from google.cloud.bigquery.model import ModelReference
@@ -149,6 +150,7 @@
"GoogleSheetsOptions",
"ParquetOptions",
"ScriptOptions",
"TransactionInfo",
"DEFAULT_RETRY",
# Enum Constants
"enums",
@@ -22,6 +22,7 @@
from google.cloud.bigquery.job.base import ReservationUsage
from google.cloud.bigquery.job.base import ScriptStatistics
from google.cloud.bigquery.job.base import ScriptStackFrame
from google.cloud.bigquery.job.base import TransactionInfo
from google.cloud.bigquery.job.base import UnknownJob
from google.cloud.bigquery.job.copy_ import CopyJob
from google.cloud.bigquery.job.copy_ import CopyJobConfig
@@ -81,5 +82,6 @@
"QueryPriority",
"SchemaUpdateOption",
"SourceFormat",
"TransactionInfo",
"WriteDisposition",
]
@@ -19,6 +19,7 @@
import http
import threading
import typing
from typing import Dict, Optional

from google.api_core import exceptions
import google.api_core.future.polling
@@ -88,6 +89,22 @@ def _error_result_to_exception(error_result):
)


class TransactionInfo(typing.NamedTuple):
"""[Alpha] Information of a multi-statement transaction.
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo
.. versionadded:: 2.24.0
"""

transaction_id: str
"""Output only. ID of the transaction."""

@classmethod
def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo":
return cls(transaction_info["transactionId"])


class _JobReference(object):
"""A reference to a job.
@@ -336,6 +353,18 @@ def reservation_usage(self):
for usage in usage_stats_raw
]

@property
def transaction_info(self) -> Optional[TransactionInfo]:
"""Information of the multi-statement transaction if this job is part of one.
.. versionadded:: 2.24.0
"""
info = self._properties.get("statistics", {}).get("transactionInfo")
if info is None:
return None
else:
return TransactionInfo.from_api_repr(info)

@property
def error_result(self):
"""Error information about the job as a whole.
@@ -1557,6 +1557,40 @@ def test_dml_statistics(self):
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 3

def test_transaction_info(self):
table_schema = (
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("bar", "INTEGER"),
)

dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = f"{Config.CLIENT.project}.{dataset_id}.test_dml_statistics"

# Create the table before loading so that the column order is deterministic.
table = helpers.retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

# Insert a few rows and check the stats.
sql = f"""
BEGIN TRANSACTION;
INSERT INTO `{table_id}`
VALUES ("one", 1), ("two", 2), ("three", 3), ("four", 4);
UPDATE `{table_id}`
SET bar = bar + 1
WHERE bar > 2;
COMMIT TRANSACTION;
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

# Transaction ID set by the server should be accessible
assert query_job.transaction_info is not None
assert query_job.transaction_info.transaction_id != ""

def test_dbapi_w_standard_sql_types(self):
for sql, expected in helpers.STANDARD_SQL_EXAMPLES:
Config.CURSOR.execute(sql)
@@ -162,6 +162,7 @@ def _verifyInitialReadonlyProperties(self, job):
self.assertIsNone(job.created)
self.assertIsNone(job.started)
self.assertIsNone(job.ended)
self.assertIsNone(job.transaction_info)

# derived from resource['status']
self.assertIsNone(job.error_result)
@@ -227,6 +227,20 @@ def test_script_statistics(self):
self.assertEqual(stack_frame.end_column, 14)
self.assertEqual(stack_frame.text, "QUERY TEXT")

def test_transaction_info(self):
from google.cloud.bigquery.job.base import TransactionInfo

client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
assert job.transaction_info is None

statistics = job._properties["statistics"] = {}
assert job.transaction_info is None

statistics["transactionInfo"] = {"transactionId": "123-abc-xyz"}
assert isinstance(job.transaction_info, TransactionInfo)
assert job.transaction_info.transaction_id == "123-abc-xyz"

def test_num_child_jobs(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
@@ -128,6 +128,18 @@ def _verify_dml_stats_resource_properties(self, job, resource):
else:
assert job.dml_stats is None

def _verify_transaction_info_resource_properties(self, job, resource):
resource_stats = resource.get("statistics", {})

if "transactionInfo" in resource_stats:
resource_transaction_info = resource_stats["transactionInfo"]
job_transaction_info = job.transaction_info
assert job_transaction_info.transaction_id == resource_transaction_info.get(
"transactionId"
)
else:
assert job.transaction_info is None

def _verify_configuration_properties(self, job, configuration):
if "dryRun" in configuration:
self.assertEqual(job.dry_run, configuration["dryRun"])
@@ -137,6 +149,7 @@ def _verify_configuration_properties(self, job, configuration):
def _verifyResourceProperties(self, job, resource):
self._verifyReadonlyResourceProperties(job, resource)
self._verify_dml_stats_resource_properties(job, resource)
self._verify_transaction_info_resource_properties(job, resource)

configuration = resource.get("configuration", {})
self._verify_configuration_properties(job, configuration)
@@ -325,6 +338,22 @@ def test_from_api_repr_with_dml_stats(self):
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_with_transaction_info(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {
"id": self.JOB_ID,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"configuration": {"query": {"query": self.QUERY}},
"statistics": {"transactionInfo": {"transactionId": "1a2b-3c4d"}},
}
klass = self._get_target_class()

job = klass.from_api_repr(RESOURCE, client=client)

self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import SchemaUpdateOption

0 comments on commit 7f7b1a8

Please sign in to comment.