From d21dae27dfb000a1fc85bda2c4fe320c6ff031af Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Sat, 18 May 2024 15:26:10 -0400 Subject: [PATCH] Test existing schema hash --- tests/pipeline/test_refresh_modes.py | 41 ++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/pipeline/test_refresh_modes.py b/tests/pipeline/test_refresh_modes.py index a7cce42f96..f4864a11eb 100644 --- a/tests/pipeline/test_refresh_modes.py +++ b/tests/pipeline/test_refresh_modes.py @@ -141,6 +141,47 @@ def test_refresh_drop_dataset(): assert_state_is_wiped(destination_state["sources"]["refresh_source"]) +def test_existing_schema_hash(): + """Test when new schema is identical to a previously stored schema after dropping and re-creating tables. + The change should be detected regardless and tables are created again in destination db + """ + pipeline = dlt.pipeline( + "refresh_full_test", + destination="duckdb", + refresh="drop_dataset", + dataset_name="refresh_full_test", + ) + + info = pipeline.run(refresh_source(first_run=True, drop_dataset=True)) + assert_load_info(info) + first_schema_hash = pipeline.default_schema.version_hash + + # Second run with all tables dropped and only some tables re-created + info = pipeline.run( + refresh_source(first_run=False, drop_dataset=True).with_resources( + "some_data_1", "some_data_2" + ) + ) + + # Just check the local schema + new_table_names = set( + t["name"] for t in pipeline.default_schema.data_tables(include_incomplete=True) + ) + assert new_table_names == {"some_data_1", "some_data_2", "some_data_4"} + + # Run again with all tables to ensure they are re-created + # The new schema in this case should match the schema of the first run exactly + info = pipeline.run(refresh_source(first_run=True, drop_dataset=True)) + # Check table 3 was re-created + with pipeline.sql_client() as client: + result = client.execute_sql("SELECT id, name FROM some_data_3 ORDER BY id") + assert result == [(9, "Jack"), (10, "Jill")] + + # Schema is identical to first schema + new_schema_hash = pipeline.default_schema.version_hash + assert new_schema_hash == first_schema_hash + + def test_refresh_drop_tables(): # First run pipeline with load to destination so tables are created pipeline = dlt.pipeline(