Skip to content
This repository was archived by the owner on Nov 12, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def dataset(project_id, bq_client):
bq_client.delete_dataset(dataset, delete_contents=True)


@pytest.fixture(scope="session")
@pytest.fixture
def table(project_id, dataset, bq_client):
from google.cloud import bigquery

Expand Down
38 changes: 27 additions & 11 deletions tests/system/reader/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from google.cloud import bigquery


_TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}"


def _to_bq_table_ref(table_name_string, partition_suffix=""):
"""Converts protobuf table reference to bigquery table reference.

Expand Down Expand Up @@ -177,25 +180,37 @@ def test_column_selection_read(
assert sorted(row.keys()) == ["age", "first_name"]


def test_snapshot(client_and_types, project_id, table_with_data_ref, bq_client):
@pytest.mark.parametrize("data_format", ("AVRO", "ARROW"))
def test_snapshot(client_and_types, project_id, table, bq_client, data_format):
client, types = client_and_types
before_new_data = dt.datetime.now(tz=dt.timezone.utc)

# load original data into the table
original_data = [
{"first_name": "OGFoo", "last_name": "Smith", "age": 44},
{"first_name": "OGBar", "last_name": "Jones", "age": 33},
]
og_job = bq_client.load_table_from_json(original_data, table).result()
og_time = og_job.ended

# load additional data into the table
new_data = [
{"first_name": "NewGuyFoo", "last_name": "Smith", "age": 46},
{"first_name": "NewGuyBar", "last_name": "Jones", "age": 30},
{"first_name": "NewFoo", "last_name": "Smiff", "age": 43},
{"first_name": "NewBar", "last_name": "Jomes", "age": 34},
]
new_job = bq_client.load_table_from_json(new_data, table).result()
new_time = new_job.ended

destination = _to_bq_table_ref(table_with_data_ref)
bq_client.load_table_from_json(new_data, destination).result()
# Because we want our snapshot to be between when we loaded the original
# data and when the new data was loaded, take the average of the two load
# job completion times.
before_new_data = og_time + ((new_time - og_time) / 2)

# read data using the timestamp before the additional data load

table_path = _TABLE_FORMAT.format(table.project, table.dataset_id, table.table_id)
read_session = types.ReadSession()
read_session.table = table_with_data_ref
read_session.table = table_path
read_session.table_modifiers.snapshot_time = before_new_data
read_session.data_format = types.DataFormat.AVRO
read_session.data_format = data_format

session = client.create_read_session(
request={
Expand All @@ -209,10 +224,11 @@ def test_snapshot(client_and_types, project_id, table_with_data_ref, bq_client):
rows = list(client.read_rows(stream).rows(session))

# verify that only the data before the timestamp was returned
assert len(rows) == 5 # all initial records
assert len(rows) == 2 # all initial records

for row in rows:
assert "NewGuy" not in row["first_name"] # no new records
assert "OG" in str(row["first_name"])
assert "New" not in str(row["first_name"])


def test_column_partitioned_table(
Expand Down