Skip to content

Commit

Permalink
Match stored schema by version+version_hash
Browse files Browse the repository at this point in the history
solves detecting when dropped tables need to be recreated
  • Loading branch information
steinitzu committed Apr 19, 2024
1 parent 7120835 commit 8990f11
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
19 changes: 15 additions & 4 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Iterator,
ContextManager,
cast,
Union,
)
import zlib
import re
Expand Down Expand Up @@ -186,7 +187,10 @@ def update_stored_schema(
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
applied_update: TSchemaTables = {}
schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash)
self.schema.version
schema_info = self.get_stored_schema_by_hash(
self.schema.stored_version_hash, self.schema.version
)
if schema_info is None:
logger.info(
f"Schema with hash {self.schema.stored_version_hash} not found in the storage."
Expand Down Expand Up @@ -375,10 +379,17 @@ def get_stored_state(self, pipeline_name: str) -> StateInfo:
return None
return StateInfo(row[0], row[1], row[2], row[3], pendulum.instance(row[4]))

def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo:
def get_stored_schema_by_hash(
self, version_hash: str, version: Optional[int] = None
) -> StorageSchemaInfo:
name = self.sql_client.make_qualified_table_name(self.schema.version_table_name)
query = f"SELECT {self.version_table_schema_columns} FROM {name} WHERE version_hash = %s;"
return self._row_to_schema_info(query, version_hash)
query = f"SELECT {self.version_table_schema_columns} FROM {name} WHERE version_hash = %s"
params: List[Union[int, str]] = [version_hash]
if version is not None:
params.append(version)
query += " AND version = %s"
query += ";"
return self._row_to_schema_info(query, *params)

def _execute_schema_update_sql(self, only_tables: Iterable[str]) -> TSchemaTables:
sql_scripts, schema_update = self._build_schema_update_sql(only_tables)
Expand Down
14 changes: 5 additions & 9 deletions dlt/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,24 +150,20 @@ def _init_dataset_and_update_schema(
f"Client for {job_client.config.destination_type} will start initialize storage"
f" {staging_text}"
)
job_client.initialize_storage()
if drop_tables:
old_schema = job_client.schema
new_schema = job_client.schema.clone()
job_client.schema = new_schema
for table in drop_tables:
new_schema.tables.pop(table["name"], None)
new_schema._bump_version()
drop_table_names = [table["name"] for table in drop_tables]
if hasattr(job_client, "drop_tables"):
logger.info(
f"Client for {job_client.config.destination_type} will drop tables {staging_text}"
)
job_client.drop_tables(*[table["name"] for table in drop_tables], replace_schema=True)
job_client.schema = old_schema
job_client.initialize_storage()
job_client.drop_tables(*drop_table_names, replace_schema=False)

logger.info(
f"Client for {job_client.config.destination_type} will update schema to package schema"
f" {staging_text}"
)

applied_update = job_client.update_stored_schema(
only_tables=update_tables, expected_update=expected_update
)
Expand Down

0 comments on commit 8990f11

Please sign in to comment.