diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 738a11c069..5422414cf3 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -274,7 +274,6 @@ def drop_storage(self) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: @@ -284,7 +283,6 @@ def update_stored_schema( destination has single writer and no other processes modify the schema. Args: - load_id (str, optional): Load id during which the schema is updated only_tables (Sequence[str], optional): Updates only listed tables. Defaults to None. expected_update (TSchemaTables, optional): Update that is expected to be applied to the destination Returns: diff --git a/dlt/destinations/impl/destination/destination.py b/dlt/destinations/impl/destination/destination.py index 4a526720d2..69d1d1d98a 100644 --- a/dlt/destinations/impl/destination/destination.py +++ b/dlt/destinations/impl/destination/destination.py @@ -48,11 +48,10 @@ def drop_storage(self) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - return super().update_stored_schema(load_id, only_tables, expected_update) + return super().update_stored_schema(only_tables, expected_update) def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: # skip internal tables and remove columns from schema if so configured diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index 4650a7651c..16affbc164 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -127,11 +127,10 @@ def drop_storage(self) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - applied_update = super().update_stored_schema(load_id, only_tables, expected_update) + applied_update = super().update_stored_schema(only_tables, expected_update) if self.config.fail_schema_update: raise DestinationTransientException( "Raise on schema update due to fail_schema_update config flag" diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 8c4c0ca5b1..8edfa7f744 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -10,7 +10,7 @@ import re import dlt -from dlt.common import logger +from dlt.common import logger, time from dlt.common.schema import Schema, TSchemaTables, TTableSchema from dlt.common.storages import FileStorage, fsspec_from_config from dlt.common.destination import DestinationCapabilitiesContext @@ -183,7 +183,6 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> TSchemaTables: @@ -198,7 +197,7 @@ def update_stored_schema( # don't store schema when used as staging if not self.config.as_staging: - self._store_current_schema(load_id or "1") + self._store_current_schema() return expected_update @@ -379,14 +378,15 @@ def _get_stored_schema_by_hash_or_newest( return None - def _store_current_schema(self, load_id: str) -> None: + def _store_current_schema(self) -> None: # check if schema with hash exists current_hash = self.schema.stored_version_hash if self._get_stored_schema_by_hash_or_newest(current_hash): return # get paths - filepath = self._get_schema_file_name(self.schema.stored_version_hash, load_id) + schema_id = str(time.precise_time()) + filepath = self._get_schema_file_name(self.schema.stored_version_hash, schema_id) # TODO: duplicate of weaviate implementation, should be abstracted out version_info = { diff --git a/dlt/destinations/impl/qdrant/qdrant_client.py b/dlt/destinations/impl/qdrant/qdrant_client.py index 970580eb51..9898b28c86 100644 --- a/dlt/destinations/impl/qdrant/qdrant_client.py +++ b/dlt/destinations/impl/qdrant/qdrant_client.py @@ -284,11 +284,10 @@ def _delete_sentinel_collection(self) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - super().update_stored_schema(load_id, only_tables, expected_update) + super().update_stored_schema(only_tables, expected_update) applied_update: TSchemaTables = {} schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash) if schema_info is None: diff --git a/dlt/destinations/impl/weaviate/weaviate_client.py b/dlt/destinations/impl/weaviate/weaviate_client.py index 3bbc2a8c7d..2d75ca0809 100644 --- a/dlt/destinations/impl/weaviate/weaviate_client.py +++ b/dlt/destinations/impl/weaviate/weaviate_client.py @@ -425,11 +425,10 @@ def _delete_sentinel_class(self) -> None: @wrap_weaviate_error def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - super().update_stored_schema(load_id, only_tables, expected_update) + super().update_stored_schema(only_tables, expected_update) # Retrieve the schema from Weaviate applied_update: TSchemaTables = {} try: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index cfb95b0af2..5838ab2ab7 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -181,11 +181,10 @@ def is_storage_initialized(self) -> bool: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - super().update_stored_schema(load_id, only_tables, expected_update) + super().update_stored_schema(only_tables, expected_update) applied_update: TSchemaTables = {} schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash) if schema_info is None: diff --git a/dlt/load/load.py b/dlt/load/load.py index 7bedb3dfa6..66ddb1c308 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -375,7 +375,6 @@ def load_single_package(self, load_id: str, schema: Schema) -> None: if isinstance(job_client, WithStagingDataset) else None ), - load_id=load_id, ) # init staging client @@ -392,7 +391,6 @@ def load_single_package(self, load_id: str, schema: Schema) -> None: expected_update, job_client.should_truncate_table_before_load_on_staging_destination, job_client.should_load_data_to_staging_dataset_on_staging_destination, - load_id=load_id, ) self.load_storage.commit_schema_update(load_id, applied_update) diff --git a/dlt/load/utils.py b/dlt/load/utils.py index 1635d21efe..067ae33613 100644 --- a/dlt/load/utils.py +++ b/dlt/load/utils.py @@ -66,7 +66,6 @@ def init_client( expected_update: TSchemaTables, truncate_filter: Callable[[TTableSchema], bool], load_staging_filter: Callable[[TTableSchema], bool], - load_id: str = None, ) -> TSchemaTables: """Initializes destination storage including staging dataset if supported @@ -98,7 +97,7 @@ def init_client( ) applied_update = _init_dataset_and_update_schema( - job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables, load_id=load_id + job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables ) # update the staging dataset if client supports this @@ -118,7 +117,6 @@ def init_client( staging_tables | {schema.version_table_name}, # keep only schema version staging_tables, # all eligible tables must be also truncated staging_info=True, - load_id=load_id, ) return applied_update @@ -130,7 +128,6 @@ def _init_dataset_and_update_schema( update_tables: Iterable[str], truncate_tables: Iterable[str] = None, staging_info: bool = False, - load_id: str = None, ) -> TSchemaTables: staging_text = "for staging dataset" if staging_info else "" logger.info( @@ -143,7 +140,7 @@ def _init_dataset_and_update_schema( f" {staging_text}" ) applied_update = job_client.update_stored_schema( - load_id=load_id, only_tables=update_tables, expected_update=expected_update + only_tables=update_tables, expected_update=expected_update ) logger.info( f"Client for {job_client.config.destination_type} will truncate tables {staging_text}"