Skip to content

Commit

Permalink
Test existing schema hash
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed May 18, 2024
1 parent f5e94d3 commit d21dae2
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions tests/pipeline/test_refresh_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,47 @@ def test_refresh_drop_dataset():
assert_state_is_wiped(destination_state["sources"]["refresh_source"])


def test_existing_schema_hash():
"""Test when new schema is identical to a previously stored schema after dropping and re-creating tables.
The change should be detected regardless and tables are created again in destination db
"""
pipeline = dlt.pipeline(
"refresh_full_test",
destination="duckdb",
refresh="drop_dataset",
dataset_name="refresh_full_test",
)

info = pipeline.run(refresh_source(first_run=True, drop_dataset=True))
assert_load_info(info)
first_schema_hash = pipeline.default_schema.version_hash

# Second run with all tables dropped and only some tables re-created
info = pipeline.run(
refresh_source(first_run=False, drop_dataset=True).with_resources(
"some_data_1", "some_data_2"
)
)

# Just check the local schema
new_table_names = set(
t["name"] for t in pipeline.default_schema.data_tables(include_incomplete=True)
)
assert new_table_names == {"some_data_1", "some_data_2", "some_data_4"}

# Run again with all tables to ensure they are re-created
# The new schema in this case should match the schema of the first run exactly
info = pipeline.run(refresh_source(first_run=True, drop_dataset=True))
# Check table 3 was re-created
with pipeline.sql_client() as client:
result = client.execute_sql("SELECT id, name FROM some_data_3 ORDER BY id")
assert result == [(9, "Jack"), (10, "Jill")]

# Schema is identical to first schema
new_schema_hash = pipeline.default_schema.version_hash
assert new_schema_hash == first_schema_hash


def test_refresh_drop_tables():
# First run pipeline with load to destination so tables are created
pipeline = dlt.pipeline(
Expand Down

0 comments on commit d21dae2

Please sign in to comment.