Skip to content
Permalink
Browse files
feat: add support for table snapshots (#740)
* feat: add support for table snapshots

* Add system test for table snapshots

* Make test taxonomy resource name unique

* Store timezone aware snapshot time on snapshots

* Make copy config tests more detailed

* Use unique resource ID differently for display name

* Add new classes to docs
  • Loading branch information
plamut committed Jul 10, 2021
1 parent 7d2d3e9 commit ba86b2a
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 2 deletions.
@@ -59,6 +59,7 @@ Job-Related Types
job.CreateDisposition
job.DestinationFormat
job.Encoding
job.OperationType
job.QueryPlanEntry
job.QueryPlanEntryStep
job.QueryPriority
@@ -90,6 +91,7 @@ Table
table.RangePartitioning
table.Row
table.RowIterator
table.SnapshotDefinition
table.Table
table.TableListItem
table.TableReference
@@ -61,6 +61,7 @@
from google.cloud.bigquery.job import ExtractJobConfig
from google.cloud.bigquery.job import LoadJob
from google.cloud.bigquery.job import LoadJobConfig
from google.cloud.bigquery.job import OperationType
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.job import QueryJobConfig
from google.cloud.bigquery.job import QueryPriority
@@ -87,6 +88,7 @@
from google.cloud.bigquery.table import PartitionRange
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import Row
from google.cloud.bigquery.table import SnapshotDefinition
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import TimePartitioningType
@@ -115,6 +117,7 @@
"PartitionRange",
"RangePartitioning",
"Row",
"SnapshotDefinition",
"TimePartitioning",
"TimePartitioningType",
# Jobs
@@ -155,6 +158,7 @@
"ExternalSourceFormat",
"Encoding",
"KeyResultStatementKind",
"OperationType",
"QueryPriority",
"SchemaUpdateOption",
"SourceFormat",
@@ -25,6 +25,7 @@
from google.cloud.bigquery.job.base import UnknownJob
from google.cloud.bigquery.job.copy_ import CopyJob
from google.cloud.bigquery.job.copy_ import CopyJobConfig
from google.cloud.bigquery.job.copy_ import OperationType
from google.cloud.bigquery.job.extract import ExtractJob
from google.cloud.bigquery.job.extract import ExtractJobConfig
from google.cloud.bigquery.job.load import LoadJob
@@ -59,6 +60,7 @@
"UnknownJob",
"CopyJob",
"CopyJobConfig",
"OperationType",
"ExtractJob",
"ExtractJobConfig",
"LoadJob",
@@ -14,6 +14,8 @@

"""Classes for copy jobs."""

from typing import Optional

from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
from google.cloud.bigquery import _helpers
from google.cloud.bigquery.table import TableReference
@@ -23,6 +25,25 @@
from google.cloud.bigquery.job.base import _JobReference


class OperationType:
"""Different operation types supported in table copy job.
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#operationtype
"""

OPERATION_TYPE_UNSPECIFIED = "OPERATION_TYPE_UNSPECIFIED"
"""Unspecified operation type."""

COPY = "COPY"
"""The source and destination table have the same table type."""

SNAPSHOT = "SNAPSHOT"
"""The source table type is TABLE and the destination table type is SNAPSHOT."""

RESTORE = "RESTORE"
"""The source table type is SNAPSHOT and the destination table type is TABLE."""


class CopyJobConfig(_JobConfig):
"""Configuration options for copy jobs.
@@ -85,6 +106,23 @@ def destination_encryption_configuration(self, value):
api_repr = value.to_api_repr()
self._set_sub_prop("destinationEncryptionConfiguration", api_repr)

@property
def operation_type(self) -> str:
"""The operation to perform with this copy job.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationTableCopy.FIELDS.operation_type
"""
return self._get_sub_prop(
"operationType", OperationType.OPERATION_TYPE_UNSPECIFIED
)

@operation_type.setter
def operation_type(self, value: Optional[str]):
if value is None:
value = OperationType.OPERATION_TYPE_UNSPECIFIED
self._set_sub_prop("operationType", value)


class CopyJob(_AsyncJob):
"""Asynchronous job: copy data into a table from other tables.
@@ -321,6 +321,7 @@ class Table(object):
"range_partitioning": "rangePartitioning",
"time_partitioning": "timePartitioning",
"schema": "schema",
"snapshot_definition": "snapshotDefinition",
"streaming_buffer": "streamingBuffer",
"self_link": "selfLink",
"table_id": ["tableReference", "tableId"],
@@ -910,6 +911,19 @@ def external_data_configuration(self, value):
self._PROPERTY_TO_API_FIELD["external_data_configuration"]
] = api_repr

@property
def snapshot_definition(self) -> Optional["SnapshotDefinition"]:
"""Information about the snapshot. This value is set via snapshot creation.
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table.FIELDS.snapshot_definition
"""
snapshot_info = self._properties.get(
self._PROPERTY_TO_API_FIELD["snapshot_definition"]
)
if snapshot_info is not None:
snapshot_info = SnapshotDefinition(snapshot_info)
return snapshot_info

@classmethod
def from_string(cls, full_table_id: str) -> "Table":
"""Construct a table from fully-qualified table ID.
@@ -1274,6 +1288,29 @@ def __init__(self, resource):
)


class SnapshotDefinition:
"""Information about base table and snapshot time of the snapshot.
See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#snapshotdefinition
Args:
resource: Snapshot definition representation returned from the API.
"""

def __init__(self, resource: Dict[str, Any]):
self.base_table_reference = None
if "baseTableReference" in resource:
self.base_table_reference = TableReference.from_api_repr(
resource["baseTableReference"]
)

self.snapshot_time = None
if "snapshotTime" in resource:
self.snapshot_time = google.cloud._helpers._rfc3339_to_datetime(
resource["snapshotTime"]
)


class Row(object):
"""A BigQuery row.
@@ -394,7 +394,7 @@ def test_create_table_with_real_custom_policy(self):
taxonomy_parent = f"projects/{Config.CLIENT.project}/locations/us"

new_taxonomy = datacatalog_types.Taxonomy(
display_name="Custom test taxonomy",
display_name="Custom test taxonomy" + unique_resource_id(),
description="This taxonomy is ony used for a test.",
activated_policy_types=[
datacatalog_types.Taxonomy.PolicyType.FINE_GRAINED_ACCESS_CONTROL
@@ -2370,6 +2370,75 @@ def test_parameterized_types_round_trip(self):

self.assertEqual(tuple(s._key()[:2] for s in table2.schema), fields)

def test_table_snapshots(self):
from google.cloud.bigquery import CopyJobConfig
from google.cloud.bigquery import OperationType

client = Config.CLIENT

source_table_path = f"{client.project}.{Config.DATASET}.test_table"
snapshot_table_path = f"{source_table_path}_snapshot"

# Create the table before loading so that the column order is predictable.
schema = [
bigquery.SchemaField("foo", "INTEGER"),
bigquery.SchemaField("bar", "STRING"),
]
source_table = helpers.retry_403(Config.CLIENT.create_table)(
Table(source_table_path, schema=schema)
)
self.to_delete.insert(0, source_table)

# Populate the table with initial data.
rows = [{"foo": 1, "bar": "one"}, {"foo": 2, "bar": "two"}]
load_job = Config.CLIENT.load_table_from_json(rows, source_table)
load_job.result()

# Now create a snapshot before modifying the original table data.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.SNAPSHOT

copy_job = client.copy_table(
sources=source_table_path,
destination=snapshot_table_path,
job_config=copy_config,
)
copy_job.result()

snapshot_table = client.get_table(snapshot_table_path)
self.to_delete.insert(0, snapshot_table)

# Modify data in original table.
sql = f'INSERT INTO `{source_table_path}`(foo, bar) VALUES (3, "three")'
query_job = client.query(sql)
query_job.result()

# List rows from the source table and compare them to rows from the snapshot.
rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two"), (3, "three")]

rows_iter = client.list_rows(snapshot_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]

# Now restore the table from the snapshot and it should again contain the old
# set of rows.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.RESTORE
copy_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

copy_job = client.copy_table(
sources=snapshot_table_path,
destination=source_table_path,
job_config=copy_config,
)
copy_job.result()

rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]

def temp_dataset(self, dataset_id, location=None):
project = Config.CLIENT.project
dataset_ref = bigquery.DatasetReference(project, dataset_id)
@@ -28,18 +28,34 @@ def _get_target_class():

return CopyJobConfig

def test_ctor_defaults(self):
from google.cloud.bigquery.job import OperationType

config = self._make_one()

assert config.create_disposition is None
assert config.write_disposition is None
assert config.destination_encryption_configuration is None
assert config.operation_type == OperationType.OPERATION_TYPE_UNSPECIFIED

def test_ctor_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import OperationType
from google.cloud.bigquery.job import WriteDisposition

create_disposition = CreateDisposition.CREATE_NEVER
write_disposition = WriteDisposition.WRITE_TRUNCATE
snapshot_operation = OperationType.SNAPSHOT

config = self._get_target_class()(
create_disposition=create_disposition, write_disposition=write_disposition
create_disposition=create_disposition,
write_disposition=write_disposition,
operation_type=snapshot_operation,
)

self.assertEqual(config.create_disposition, create_disposition)
self.assertEqual(config.write_disposition, write_disposition)
self.assertEqual(config.operation_type, snapshot_operation)

def test_to_api_repr_with_encryption(self):
from google.cloud.bigquery.encryption_configuration import (
@@ -70,6 +86,22 @@ def test_to_api_repr_with_encryption_none(self):
resource, {"copy": {"destinationEncryptionConfiguration": None}}
)

def test_operation_type_setting_none(self):
from google.cloud.bigquery.job import OperationType

config = self._make_one(operation_type=OperationType.SNAPSHOT)

# Setting it to None is the same as setting it to OPERATION_TYPE_UNSPECIFIED.
config.operation_type = None
assert config.operation_type == OperationType.OPERATION_TYPE_UNSPECIFIED

def test_operation_type_setting_non_none(self):
from google.cloud.bigquery.job import OperationType

config = self._make_one(operation_type=None)
config.operation_type = OperationType.RESTORE
assert config.operation_type == OperationType.RESTORE


class TestCopyJob(_Base):
JOB_TYPE = "copy"

0 comments on commit ba86b2a

Please sign in to comment.