Skip to content

Commit

Permalink
save schema with timestamp instead of load_id
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Apr 16, 2024
1 parent fce47c6 commit 0d5423c
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 24 deletions.
2 changes: 0 additions & 2 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ def drop_storage(self) -> None:

def update_stored_schema(
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
Expand All @@ -284,7 +283,6 @@ def update_stored_schema(
destination has single writer and no other processes modify the schema.
Args:
load_id (str, optional): Load id during which the schema is updated
only_tables (Sequence[str], optional): Updates only listed tables. Defaults to None.
expected_update (TSchemaTables, optional): Update that is expected to be applied to the destination
Returns:
Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/impl/destination/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ def drop_storage(self) -> None:

def update_stored_schema(
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
return super().update_stored_schema(load_id, only_tables, expected_update)
return super().update_stored_schema(only_tables, expected_update)

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
# skip internal tables and remove columns from schema if so configured
Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,10 @@ def drop_storage(self) -> None:

def update_stored_schema(
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
applied_update = super().update_stored_schema(load_id, only_tables, expected_update)
applied_update = super().update_stored_schema(only_tables, expected_update)
if self.config.fail_schema_update:
raise DestinationTransientException(
"Raise on schema update due to fail_schema_update config flag"
Expand Down
10 changes: 5 additions & 5 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import re

import dlt
from dlt.common import logger
from dlt.common import logger, time
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.storages import FileStorage, fsspec_from_config
from dlt.common.destination import DestinationCapabilitiesContext
Expand Down Expand Up @@ -183,7 +183,6 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:

def update_stored_schema(
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> TSchemaTables:
Expand All @@ -198,7 +197,7 @@ def update_stored_schema(

# don't store schema when used as staging
if not self.config.as_staging:
self._store_current_schema(load_id or "1")
self._store_current_schema()

return expected_update

Expand Down Expand Up @@ -379,14 +378,15 @@ def _get_stored_schema_by_hash_or_newest(

return None

def _store_current_schema(self, load_id: str) -> None:
def _store_current_schema(self) -> None:
# check if schema with hash exists
current_hash = self.schema.stored_version_hash
if self._get_stored_schema_by_hash_or_newest(current_hash):
return

# get paths
filepath = self._get_schema_file_name(self.schema.stored_version_hash, load_id)
schema_id = str(time.precise_time())
filepath = self._get_schema_file_name(self.schema.stored_version_hash, schema_id)

# TODO: duplicate of weaviate implementation, should be abstracted out
version_info = {
Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/impl/qdrant/qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,10 @@ def _delete_sentinel_collection(self) -> None:

def update_stored_schema(
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
super().update_stored_schema(load_id, only_tables, expected_update)
super().update_stored_schema(only_tables, expected_update)
applied_update: TSchemaTables = {}
schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash)
if schema_info is None:
Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/impl/weaviate/weaviate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,10 @@ def _delete_sentinel_class(self) -> None:
@wrap_weaviate_error
def update_stored_schema(
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
super().update_stored_schema(load_id, only_tables, expected_update)
super().update_stored_schema(only_tables, expected_update)
# Retrieve the schema from Weaviate
applied_update: TSchemaTables = {}
try:
Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,10 @@ def is_storage_initialized(self) -> bool:

def update_stored_schema(
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
super().update_stored_schema(load_id, only_tables, expected_update)
super().update_stored_schema(only_tables, expected_update)
applied_update: TSchemaTables = {}
schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash)
if schema_info is None:
Expand Down
2 changes: 0 additions & 2 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
if isinstance(job_client, WithStagingDataset)
else None
),
load_id=load_id,
)

# init staging client
Expand All @@ -392,7 +391,6 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
expected_update,
job_client.should_truncate_table_before_load_on_staging_destination,
job_client.should_load_data_to_staging_dataset_on_staging_destination,
load_id=load_id,
)

self.load_storage.commit_schema_update(load_id, applied_update)
Expand Down
7 changes: 2 additions & 5 deletions dlt/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def init_client(
expected_update: TSchemaTables,
truncate_filter: Callable[[TTableSchema], bool],
load_staging_filter: Callable[[TTableSchema], bool],
load_id: str = None,
) -> TSchemaTables:
"""Initializes destination storage including staging dataset if supported
Expand Down Expand Up @@ -98,7 +97,7 @@ def init_client(
)

applied_update = _init_dataset_and_update_schema(
job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables, load_id=load_id
job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables
)

# update the staging dataset if client supports this
Expand All @@ -118,7 +117,6 @@ def init_client(
staging_tables | {schema.version_table_name}, # keep only schema version
staging_tables, # all eligible tables must be also truncated
staging_info=True,
load_id=load_id,
)

return applied_update
Expand All @@ -130,7 +128,6 @@ def _init_dataset_and_update_schema(
update_tables: Iterable[str],
truncate_tables: Iterable[str] = None,
staging_info: bool = False,
load_id: str = None,
) -> TSchemaTables:
staging_text = "for staging dataset" if staging_info else ""
logger.info(
Expand All @@ -143,7 +140,7 @@ def _init_dataset_and_update_schema(
f" {staging_text}"
)
applied_update = job_client.update_stored_schema(
load_id=load_id, only_tables=update_tables, expected_update=expected_update
only_tables=update_tables, expected_update=expected_update
)
logger.info(
f"Client for {job_client.config.destination_type} will truncate tables {staging_text}"
Expand Down

0 comments on commit 0d5423c

Please sign in to comment.