diff --git a/tests/system/conftest.py b/tests/system/conftest.py index dd42e736..9d476dcb 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -88,7 +88,7 @@ def dataset(project_id, bq_client): bq_client.delete_dataset(dataset, delete_contents=True) -@pytest.fixture(scope="session") +@pytest.fixture def table(project_id, dataset, bq_client): from google.cloud import bigquery diff --git a/tests/system/reader/test_reader.py b/tests/system/reader/test_reader.py index d0328041..92bfcc54 100644 --- a/tests/system/reader/test_reader.py +++ b/tests/system/reader/test_reader.py @@ -26,6 +26,9 @@ from google.cloud import bigquery +_TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}" + + def _to_bq_table_ref(table_name_string, partition_suffix=""): """Converts protobuf table reference to bigquery table reference. @@ -177,25 +180,37 @@ def test_column_selection_read( assert sorted(row.keys()) == ["age", "first_name"] -def test_snapshot(client_and_types, project_id, table_with_data_ref, bq_client): +@pytest.mark.parametrize("data_format", ("AVRO", "ARROW")) +def test_snapshot(client_and_types, project_id, table, bq_client, data_format): client, types = client_and_types - before_new_data = dt.datetime.now(tz=dt.timezone.utc) + + # load original data into the table + original_data = [ + {"first_name": "OGFoo", "last_name": "Smith", "age": 44}, + {"first_name": "OGBar", "last_name": "Jones", "age": 33}, + ] + og_job = bq_client.load_table_from_json(original_data, table).result() + og_time = og_job.ended # load additional data into the table new_data = [ - {"first_name": "NewGuyFoo", "last_name": "Smith", "age": 46}, - {"first_name": "NewGuyBar", "last_name": "Jones", "age": 30}, + {"first_name": "NewFoo", "last_name": "Smiff", "age": 43}, + {"first_name": "NewBar", "last_name": "Jomes", "age": 34}, ] + new_job = bq_client.load_table_from_json(new_data, table).result() + new_time = new_job.ended - destination = _to_bq_table_ref(table_with_data_ref) - bq_client.load_table_from_json(new_data, destination).result() + # Because we want our snapshot to be between when we loaded the original + # data and when the new data was loaded, take the average of the two load + # job completion times. + before_new_data = og_time + ((new_time - og_time) / 2) # read data using the timestamp before the additional data load - + table_path = _TABLE_FORMAT.format(table.project, table.dataset_id, table.table_id) read_session = types.ReadSession() - read_session.table = table_with_data_ref + read_session.table = table_path read_session.table_modifiers.snapshot_time = before_new_data - read_session.data_format = types.DataFormat.AVRO + read_session.data_format = data_format session = client.create_read_session( request={ @@ -209,10 +224,11 @@ def test_snapshot(client_and_types, project_id, table_with_data_ref, bq_client): rows = list(client.read_rows(stream).rows(session)) # verify that only the data before the timestamp was returned - assert len(rows) == 5 # all initial records + assert len(rows) == 2 # all initial records for row in rows: - assert "NewGuy" not in row["first_name"] # no new records + assert "OG" in str(row["first_name"]) + assert "New" not in str(row["first_name"]) def test_column_partitioned_table(