diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 9318dca535..5422414cf3 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -273,7 +273,9 @@ def drop_storage(self) -> None: pass def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: """Updates storage to the current schema. @@ -434,6 +436,7 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]: @abstractmethod def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo: + """retrieves the stored schema by hash""" pass @abstractmethod diff --git a/dlt/common/storages/fsspecs/google_drive.py b/dlt/common/storages/fsspecs/google_drive.py index 1be862668c..258a8622d1 100644 --- a/dlt/common/storages/fsspecs/google_drive.py +++ b/dlt/common/storages/fsspecs/google_drive.py @@ -237,7 +237,7 @@ def export(self, path: str, mime_type: str) -> Any: fileId=file_id, mimeType=mime_type, supportsAllDrives=True ).execute() - def ls(self, path: str, detail: Optional[bool] = False) -> Any: + def ls(self, path: str, detail: Optional[bool] = False, refresh: Optional[bool] = False) -> Any: """List files in a directory. Args: diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index 1c76fd39cd..1752039775 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -54,9 +54,23 @@ """Loader file formats with internal job types""" +class TPipelineStateDoc(TypedDict, total=False): + """Corresponds to the StateInfo Tuple""" + + version: int + engine_version: int + pipeline_name: str + state: str + version_hash: str + created_at: datetime.datetime + dlt_load_id: NotRequired[str] + + class TLoadPackageState(TVersionedState, total=False): created_at: DateTime """Timestamp when the load package was created""" + pipeline_state: NotRequired[TPipelineStateDoc] + """Pipeline state, added at the end of the extraction phase""" """A section of state that does not participate in change merging and version control""" destination_state: NotRequired[Dict[str, Any]] diff --git a/dlt/destinations/impl/destination/destination.py b/dlt/destinations/impl/destination/destination.py index 4d0f081aa6..69d1d1d98a 100644 --- a/dlt/destinations/impl/destination/destination.py +++ b/dlt/destinations/impl/destination/destination.py @@ -47,7 +47,9 @@ def drop_storage(self) -> None: pass def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: return super().update_stored_schema(only_tables, expected_update) diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index 47ae25828e..16affbc164 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -126,7 +126,9 @@ def drop_storage(self) -> None: pass def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: applied_update = super().update_stored_schema(only_tables, expected_update) if self.config.fail_schema_update: diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index f06cf5ae54..5dae4bf295 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,12 +1,17 @@ import posixpath import os +import base64 from types import TracebackType -from typing import ClassVar, List, Type, Iterable, Set, Iterator +from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple, cast from fsspec import AbstractFileSystem from contextlib import contextmanager +from dlt.common import json, pendulum +from dlt.common.typing import DictStrAny + +import re import dlt -from dlt.common import logger +from dlt.common import logger, time from dlt.common.schema import Schema, TSchemaTables, TTableSchema from dlt.common.storages import FileStorage, fsspec_from_config from dlt.common.destination import DestinationCapabilitiesContext @@ -17,8 +22,12 @@ JobClientBase, FollowupJob, WithStagingDataset, + WithStateSync, + StorageSchemaInfo, + StateInfo, + DoNothingJob, ) - +from dlt.common.destination.exceptions import DestinationUndefinedEntity from dlt.destinations.job_impl import EmptyLoadJob from dlt.destinations.impl.filesystem import capabilities from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration @@ -26,6 +35,10 @@ from dlt.destinations import path_utils +INIT_FILE_NAME = "init" +FILENAME_SEPARATOR = "__" + + class LoadFilesystemJob(LoadJob): def __init__( self, @@ -86,7 +99,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]: return jobs -class FilesystemClient(JobClientBase, WithStagingDataset): +class FilesystemClient(JobClientBase, WithStagingDataset, WithStateSync): """filesystem client storing jobs in memory""" capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() @@ -166,31 +179,56 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: " should be created previously!" ) + # we mark the storage folder as initialized + self.fs_client.makedirs(self.dataset_path, exist_ok=True) + self.fs_client.touch(posixpath.join(self.dataset_path, INIT_FILE_NAME)) + def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> TSchemaTables: # create destination dirs for all tables - dirs_to_create = self._get_table_dirs(only_tables or self.schema.tables.keys()) - for directory in dirs_to_create: + table_names = only_tables or self.schema.tables.keys() + dirs_to_create = self._get_table_dirs(table_names) + for tables_name, directory in zip(table_names, dirs_to_create): self.fs_client.makedirs(directory, exist_ok=True) + # we need to mark the folders of the data tables as initialized + if tables_name in self.schema.dlt_table_names(): + self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME)) + + # don't store schema when used as staging + if not self.config.as_staging: + self._store_current_schema() + return expected_update - def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: - """Gets unique directories where table data is stored.""" - table_dirs: Set[str] = set() + def _get_table_dirs(self, table_names: Iterable[str]) -> List[str]: + """Gets directories where table data is stored.""" + table_dirs: List[str] = [] for table_name in table_names: - table_prefix = self.table_prefix_layout.format( - schema_name=self.schema.name, table_name=table_name - ) + # dlt tables do not respect layout (for now) + if table_name in self.schema.dlt_table_names(): + table_prefix = posixpath.join(table_name, "") + else: + table_prefix = self.table_prefix_layout.format( + schema_name=self.schema.name, table_name=table_name + ) destination_dir = posixpath.join(self.dataset_path, table_prefix) # extract the path component - table_dirs.add(os.path.dirname(destination_dir)) + table_dirs.append(posixpath.dirname(destination_dir)) return table_dirs def is_storage_initialized(self) -> bool: - return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] + return self.fs_client.exists(posixpath.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return] def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: + # skip the state table, we create a jsonl file in the complete_load step + # this does not apply to scenarios where we are using filesystem as staging + # where we want to load the state the regular way + if table["name"] == self.schema.state_table_name and not self.config.as_staging: + return DoNothingJob(file_path) + cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob return cls( file_path, @@ -203,12 +241,6 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> def restore_file_load(self, file_path: str) -> LoadJob: return EmptyLoadJob.from_file_path(file_path, "completed") - def complete_load(self, load_id: str) -> None: - schema_name = self.schema.name - table_name = self.schema.loads_table_name - file_name = f"{schema_name}.{table_name}.{load_id}" - self.fs_client.touch(posixpath.join(self.dataset_path, file_name)) - def __enter__(self) -> "FilesystemClient": return self @@ -219,3 +251,170 @@ def __exit__( def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: return False + + # + # state stuff + # + + def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: + dirname = posixpath.dirname(filepath) + if not self.fs_client.isdir(dirname): + return + self.fs_client.write_text(filepath, json.dumps(data), "utf-8") + + def _to_path_safe_string(self, s: str) -> str: + """for base64 strings""" + return base64.b64decode(s).hex() if s else None + + def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]: + if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)): + raise DestinationUndefinedEntity({"dir": dirname}) + for filepath in self.fs_client.ls(dirname, detail=False, refresh=True): + filename = os.path.splitext(os.path.basename(filepath))[0] + fileparts = filename.split(FILENAME_SEPARATOR) + if len(fileparts) != 3: + continue + yield filepath, fileparts + + def _store_load(self, load_id: str) -> None: + # write entry to load "table" + # TODO: this is also duplicate across all destinations. DRY this. + load_data = { + "load_id": load_id, + "schema_name": self.schema.name, + "status": 0, + "inserted_at": pendulum.now().isoformat(), + "schema_version_hash": self.schema.version_hash, + } + filepath = posixpath.join( + self.dataset_path, + self.schema.loads_table_name, + f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}.jsonl", + ) + self._write_to_json_file(filepath, load_data) + + def complete_load(self, load_id: str) -> None: + # store current state + self._store_current_state(load_id) + self._store_load(load_id) + + # + # state read/write + # + + def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str: + """gets full path for schema file for a given hash""" + return posixpath.join( + self.dataset_path, + self.schema.state_table_name, + f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", + ) + + def _store_current_state(self, load_id: str) -> None: + # don't save the state this way when used as staging + if self.config.as_staging: + return + # get state doc from current pipeline + from dlt.pipeline.current import load_package + + pipeline_state_doc = load_package()["state"].get("pipeline_state") + + if not pipeline_state_doc: + return + + # get paths + pipeline_name = pipeline_state_doc["pipeline_name"] + hash_path = self._get_state_file_name( + pipeline_name, self.schema.stored_version_hash, load_id + ) + + # write + self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc)) + + def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: + # get base dir + dirname = posixpath.dirname(self._get_state_file_name(pipeline_name, "", "")) + + # search newest state + selected_path = None + newest_load_id = "0" + for filepath, fileparts in self._list_dlt_dir(dirname): + if fileparts[0] == pipeline_name and fileparts[1] > newest_load_id: + newest_load_id = fileparts[1] + selected_path = filepath + + # Load compressed state from destination + if selected_path: + state_json = json.loads(self.fs_client.read_text(selected_path)) + state_json.pop("version_hash") + return StateInfo(**state_json) + + return None + + # + # Schema read/write + # + + def _get_schema_file_name(self, version_hash: str, load_id: str) -> str: + """gets full path for schema file for a given hash""" + return posixpath.join( + self.dataset_path, + self.schema.version_table_name, + f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", + ) + + def _get_stored_schema_by_hash_or_newest( + self, version_hash: str = None + ) -> Optional[StorageSchemaInfo]: + """Get the schema by supplied hash, falls back to getting the newest version matching the existing schema name""" + version_hash = self._to_path_safe_string(version_hash) + dirname = posixpath.dirname(self._get_schema_file_name("", "")) + # find newest schema for pipeline or by version hash + selected_path = None + newest_load_id = "0" + for filepath, fileparts in self._list_dlt_dir(dirname): + if ( + not version_hash + and fileparts[0] == self.schema.name + and fileparts[1] > newest_load_id + ): + newest_load_id = fileparts[1] + selected_path = filepath + elif fileparts[2] == version_hash: + selected_path = filepath + break + + if selected_path: + return StorageSchemaInfo(**json.loads(self.fs_client.read_text(selected_path))) + + return None + + def _store_current_schema(self) -> None: + # check if schema with hash exists + current_hash = self.schema.stored_version_hash + if self._get_stored_schema_by_hash_or_newest(current_hash): + return + + # get paths + schema_id = str(time.precise_time()) + filepath = self._get_schema_file_name(self.schema.stored_version_hash, schema_id) + + # TODO: duplicate of weaviate implementation, should be abstracted out + version_info = { + "version_hash": self.schema.stored_version_hash, + "schema_name": self.schema.name, + "version": self.schema.version, + "engine_version": self.schema.ENGINE_VERSION, + "inserted_at": pendulum.now(), + "schema": json.dumps(self.schema.to_dict()), + } + + # we always keep tabs on what the current schema is + self._write_to_json_file(filepath, version_info) + + def get_stored_schema(self) -> Optional[StorageSchemaInfo]: + """Retrieves newest schema from destination storage""" + return self._get_stored_schema_by_hash_or_newest() + + def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: + return self._get_stored_schema_by_hash_or_newest(version_hash) diff --git a/dlt/destinations/impl/qdrant/qdrant_client.py b/dlt/destinations/impl/qdrant/qdrant_client.py index 5a5e5f8cfd..9898b28c86 100644 --- a/dlt/destinations/impl/qdrant/qdrant_client.py +++ b/dlt/destinations/impl/qdrant/qdrant_client.py @@ -283,7 +283,9 @@ def _delete_sentinel_collection(self) -> None: self.db_client.delete_collection(self.sentinel_collection) def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: super().update_stored_schema(only_tables, expected_update) applied_update: TSchemaTables = {} diff --git a/dlt/destinations/impl/weaviate/weaviate_client.py b/dlt/destinations/impl/weaviate/weaviate_client.py index ab2bea54ef..2d75ca0809 100644 --- a/dlt/destinations/impl/weaviate/weaviate_client.py +++ b/dlt/destinations/impl/weaviate/weaviate_client.py @@ -424,7 +424,9 @@ def _delete_sentinel_class(self) -> None: @wrap_weaviate_error def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: super().update_stored_schema(only_tables, expected_update) # Retrieve the schema from Weaviate @@ -524,17 +526,6 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: state["dlt_load_id"] = state.pop("_dlt_load_id") return StateInfo(**state) - # def get_stored_states(self, state_table: str) -> List[StateInfo]: - # state_records = self.get_records(state_table, - # sort={ - # "path": ["created_at"], - # "order": "desc" - # }, properties=self.state_properties) - - # for state in state_records: - # state["dlt_load_id"] = state.pop("_dlt_load_id") - # return [StateInfo(**state) for state in state_records] - def get_stored_schema(self) -> Optional[StorageSchemaInfo]: """Retrieves newest schema from destination storage""" try: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 7f1403eb30..5838ab2ab7 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -180,7 +180,9 @@ def is_storage_initialized(self) -> bool: return self.sql_client.has_dataset() def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: super().update_stored_schema(only_tables, expected_update) applied_update: TSchemaTables = {} @@ -373,15 +375,6 @@ def get_stored_state(self, pipeline_name: str) -> StateInfo: return None return StateInfo(row[0], row[1], row[2], row[3], pendulum.instance(row[4])) - # def get_stored_states(self, state_table: str) -> List[StateInfo]: - # """Loads list of compressed states from destination storage, optionally filtered by pipeline name""" - # query = f"SELECT {self.STATE_TABLE_COLUMNS} FROM {state_table} AS s ORDER BY created_at DESC" - # result: List[StateInfo] = [] - # with self.sql_client.execute_query(query) as cur: - # for row in cur.fetchall(): - # result.append(StateInfo(row[0], row[1], row[2], row[3], pendulum.instance(row[4]))) - # return result - def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo: name = self.sql_client.make_qualified_table_name(self.schema.version_table_name) query = f"SELECT {self.version_table_schema_columns} FROM {name} WHERE version_hash = %s;" diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index cc2b03c50b..d4298f2f6b 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -17,6 +17,7 @@ WithStepInfo, reset_resource_state, ) +from dlt.common.typing import DictStrAny from dlt.common.runtime import signals from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema import Schema, utils @@ -27,7 +28,13 @@ TWriteDispositionConfig, ) from dlt.common.storages import NormalizeStorageConfiguration, LoadPackageInfo, SchemaStorage -from dlt.common.storages.load_package import ParsedLoadJobFileName +from dlt.common.storages.load_package import ( + ParsedLoadJobFileName, + LoadPackageStateInjectableContext, + TPipelineStateDoc, +) + + from dlt.common.utils import get_callable_name, get_full_class_name from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext @@ -367,7 +374,13 @@ def extract( load_id = self.extract_storage.create_load_package(source.discover_schema()) with Container().injectable_context( SourceSchemaInjectableContext(source.schema) - ), Container().injectable_context(SourceInjectableContext(source)): + ), Container().injectable_context( + SourceInjectableContext(source) + ), Container().injectable_context( + LoadPackageStateInjectableContext( + storage=self.extract_storage.new_packages, load_id=load_id + ) + ): # inject the config section with the current source name with inject_section( ConfigSectionContext( @@ -389,10 +402,14 @@ def extract( ) return load_id - def commit_packages(self) -> None: - """Commits all extracted packages to normalize storage""" + def commit_packages(self, pipline_state_doc: TPipelineStateDoc = None) -> None: + """Commits all extracted packages to normalize storage, and adds the pipeline state to the load package""" # commit load packages for load_id, metrics in self._load_id_metrics.items(): + if pipline_state_doc: + package_state = self.extract_storage.new_packages.get_load_package_state(load_id) + package_state["pipeline_state"] = {**pipline_state_doc, "dlt_load_id": load_id} + self.extract_storage.new_packages.save_load_package_state(load_id, package_state) self.extract_storage.commit_new_load_package( load_id, self.schema_storage[metrics[0]["schema_name"]] ) diff --git a/dlt/load/load.py b/dlt/load/load.py index c5790d467b..66ddb1c308 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -341,7 +341,13 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False) # do not commit load id for aborted packages if not aborted: with self.get_destination_client(schema) as job_client: - job_client.complete_load(load_id) + with Container().injectable_context( + LoadPackageStateInjectableContext( + storage=self.load_storage.normalized_packages, + load_id=load_id, + ) + ): + job_client.complete_load(load_id) self.load_storage.complete_load_package(load_id, aborted) # collect package info self._loaded_packages.append(self.load_storage.get_load_package_info(load_id)) @@ -469,10 +475,9 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics: schema = self.load_storage.normalized_packages.load_schema(load_id) logger.info(f"Loaded schema name {schema.name} and version {schema.stored_version}") - container = Container() # get top load id and mark as being processed with self.collector(f"Load {schema.name} in {load_id}"): - with container.injectable_context( + with Container().injectable_context( LoadPackageStateInjectableContext( storage=self.load_storage.normalized_packages, load_id=load_id, diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index ebff4dfa1d..e1821a9ac8 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -63,6 +63,7 @@ LoadJobInfo, LoadPackageInfo, ) +from dlt.common.storages.load_package import TPipelineStateDoc from dlt.common.destination import ( DestinationCapabilitiesContext, merge_caps_file_formats, @@ -138,6 +139,7 @@ mark_state_extracted, migrate_pipeline_state, state_resource, + state_doc, default_pipeline_state, ) from dlt.pipeline.warnings import credentials_argument_deprecated @@ -427,13 +429,14 @@ def extract( raise SourceExhausted(source.name) self._extract_source(extract_step, source, max_parallel_items, workers) # extract state + state: TPipelineStateDoc = None if self.config.restore_from_destination: # this will update state version hash so it will not be extracted again by with_state_sync - self._bump_version_and_extract_state( + state = self._bump_version_and_extract_state( self._container[StateInjectableContext].state, True, extract_step ) - # commit load packages - extract_step.commit_packages() + # commit load packages with state + extract_step.commit_packages(state) return self._get_step_info(extract_step) except Exception as exc: # emit step info @@ -728,7 +731,6 @@ def sync_destination( remote_state["schema_names"], always_download=True ) # TODO: we should probably wipe out pipeline here - # if we didn't full refresh schemas, get only missing schemas if restored_schemas is None: restored_schemas = self._get_schemas_from_destination( @@ -1513,7 +1515,7 @@ def _props_to_state(self, state: TPipelineState) -> TPipelineState: def _bump_version_and_extract_state( self, state: TPipelineState, extract_state: bool, extract: Extract = None - ) -> None: + ) -> TPipelineStateDoc: """Merges existing state into `state` and extracts state using `storage` if extract_state is True. Storage will be created on demand. In that case the extracted package will be immediately committed. @@ -1521,7 +1523,7 @@ def _bump_version_and_extract_state( _, hash_, _ = bump_pipeline_state_version_if_modified(self._props_to_state(state)) should_extract = hash_ != state["_local"].get("_last_extracted_hash") if should_extract and extract_state: - data = state_resource(state) + data, doc = state_resource(state) extract_ = extract or Extract( self._schema_storage, self._normalize_storage_config(), original_data=data ) @@ -1532,7 +1534,9 @@ def _bump_version_and_extract_state( mark_state_extracted(state, hash_) # commit only if we created storage if not extract: - extract_.commit_packages() + extract_.commit_packages(doc) + return doc + return None def _list_schemas_sorted(self) -> List[str]: """Lists schema names sorted to have deterministic state""" diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index d38010f842..41009f2909 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -5,7 +5,7 @@ from dlt.common.pendulum import pendulum from dlt.common.typing import DictStrAny from dlt.common.schema.typing import STATE_TABLE_NAME, TTableSchemaColumns -from dlt.common.destination.reference import WithStateSync, Destination +from dlt.common.destination.reference import WithStateSync, Destination, StateInfo from dlt.common.versioned_state import ( generate_state_version_hash, bump_state_version_if_modified, @@ -14,7 +14,7 @@ decompress_state, ) from dlt.common.pipeline import TPipelineState - +from dlt.common.storages.load_package import TPipelineStateDoc from dlt.extract import DltResource from dlt.pipeline.exceptions import ( @@ -22,6 +22,7 @@ ) PIPELINE_STATE_ENGINE_VERSION = 4 +LOAD_PACKAGE_STATE_KEY = "pipeline_state" # state table columns STATE_TABLE_COLUMNS: TTableSchemaColumns = { @@ -93,11 +94,11 @@ def migrate_pipeline_state( return cast(TPipelineState, state) -def state_resource(state: TPipelineState) -> DltResource: +def state_doc(state: TPipelineState, load_id: str = None) -> TPipelineStateDoc: state = copy(state) state.pop("_local") state_str = compress_state(state) - state_doc = { + doc: TPipelineStateDoc = { "version": state["_state_version"], "engine_version": state["_state_engine_version"], "pipeline_name": state["pipeline_name"], @@ -105,8 +106,21 @@ def state_resource(state: TPipelineState) -> DltResource: "created_at": pendulum.now(), "version_hash": state["_version_hash"], } - return dlt.resource( - [state_doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS + if load_id: + doc["dlt_load_id"] = load_id + return doc + + +def state_resource(state: TPipelineState) -> Tuple[DltResource, TPipelineStateDoc]: + doc = state_doc(state) + return ( + dlt.resource( + [doc], + name=STATE_TABLE_NAME, + write_disposition="append", + columns=STATE_TABLE_COLUMNS, + ), + doc, ) diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 3124026bd5..76e0108461 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -288,8 +288,7 @@ A few things to know when specifying your filename layout: - have a separator after the table_name placeholder Please note: -- `dlt` will not dump the current schema content to the bucket -- `dlt` will mark complete loads by creating an empty file that corresponds to `_dlt_loads` table. For example, if `chess._dlt_loads.1685299832` file is present in dataset folders, you can be sure that all files for the load package `1685299832` are completely loaded +- `dlt` will mark complete loads by creating a json file in the `./_dlt_loads` folders that corresponds to the`_dlt_loads` table. For example, if `chess__1685299832.jsonl` file is present in the loads folder, you can be sure that all files for the load package `1685299832` are completely loaded ### Advanced layout configuration @@ -387,7 +386,14 @@ You can choose the following file formats: ## Syncing of `dlt` state -This destination does not support restoring the `dlt` state. You can change that by requesting the [feature](https://github.com/dlt-hub/dlt/issues/new/choose) or contributing to the core library 😄 -You can, however, easily [backup and restore the pipeline working folder](https://gist.github.com/rudolfix/ee6e16d8671f26ac4b9ffc915ad24b6e) - reusing the bucket and credentials used to store files. +This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination). To this end, special folders and files that will be created at your destination which hold information about your pipeline state, schemas and completed loads. These folders DO NOT respect your +settings in the layout section. When using filesystem as a staging destination, not all of these folders are created, as the state and schemas are +managed in the regular way by the final destination you have configured. + +You will also notice `init` files being present in the root folder and the special `dlt` folders. In the absence of the concepts of schemas and tables +in blob storages and directories, `dlt` uses these special files to harmonize the behavior of the `filesystem` destination with the other implemented destinations. + + + \ No newline at end of file diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 5d2404ff48..41b98bdec3 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -9,6 +9,7 @@ from dlt.destinations.impl.filesystem.filesystem import ( FilesystemDestinationClientConfiguration, + INIT_FILE_NAME, ) from dlt.destinations.path_utils import create_path, prepare_datetime_params @@ -155,7 +156,12 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non for basedir, _dirs, files in client.fs_client.walk( client.dataset_path, detail=False, refresh=True ): + # remove internal paths + if "_dlt" in basedir: + continue for f in files: + if f == INIT_FILE_NAME: + continue paths.append(posixpath.join(basedir, f)) ls = set(paths) @@ -213,6 +219,11 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None for basedir, _dirs, files in client.fs_client.walk( client.dataset_path, detail=False, refresh=True ): + # remove internal paths + if "_dlt" in basedir: + continue for f in files: + if f == INIT_FILE_NAME: + continue paths.append(posixpath.join(basedir, f)) assert list(sorted(paths)) == expected_files diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index d24b799349..20f326b160 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -2,7 +2,7 @@ import os import posixpath from pathlib import Path -from typing import Any, Callable +from typing import Any, Callable, List, Dict, cast import dlt import pytest @@ -19,6 +19,11 @@ from tests.common.utils import load_json_case from tests.utils import ALL_TEST_DATA_ITEM_FORMATS, TestDataItemFormat, skip_if_not_active from dlt.destinations.path_utils import create_path +from tests.load.pipeline.utils import ( + destinations_configs, + DestinationTestConfiguration, + load_table_counts, +) skip_if_not_active("filesystem") @@ -98,11 +103,15 @@ def some_source(): for job in pkg.jobs["completed_jobs"]: assert_file_matches(layout, job, pkg.load_id, client) - complete_fn = f"{client.schema.name}.{LOADS_TABLE_NAME}.%s" + complete_fn = f"{client.schema.name}__%s.jsonl" # Test complete_load markers are saved - assert client.fs_client.isfile(posixpath.join(client.dataset_path, complete_fn % load_id1)) - assert client.fs_client.isfile(posixpath.join(client.dataset_path, complete_fn % load_id2)) + assert client.fs_client.isfile( + posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id1) + ) + assert client.fs_client.isfile( + posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id2) + ) # Force replace pipeline.run(some_source(), write_disposition="replace") @@ -244,13 +253,19 @@ def count(*args, **kwargs) -> Any: expected_files = set() known_files = set() for basedir, _dirs, files in client.fs_client.walk(client.dataset_path): # type: ignore[attr-defined] + # strip out special tables + if "_dlt" in basedir: + continue for file in files: - if file.endswith("jsonl"): + if ".jsonl" in file: expected_files.add(os.path.join(basedir, file)) for load_package in load_info.load_packages: for load_info in load_package.jobs["completed_jobs"]: # type: ignore[assignment] job_info = ParsedLoadJobFileName.parse(load_info.file_path) # type: ignore[attr-defined] + # state file gets loaded a differentn way + if job_info.table_name == "_dlt_pipeline_state": + continue path = create_path( layout, file_name=job_info.file_name(), @@ -262,10 +277,156 @@ def count(*args, **kwargs) -> Any: ) full_path = os.path.join(client.dataset_path, path) # type: ignore[attr-defined] assert os.path.exists(full_path) - if full_path.endswith("jsonl"): + if ".jsonl" in full_path: known_files.add(full_path) assert expected_files == known_files + assert known_files # 6 is because simple_row contains two rows # and in this test scenario we have 3 callbacks assert call_count >= 6 + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_buckets_filesystem_configs=True), + ids=lambda x: x.name, +) +def test_state_files(destination_config: DestinationTestConfiguration) -> None: + def _collect_files(p) -> List[str]: + client = p.destination_client() + found = [] + for basedir, _dirs, files in client.fs_client.walk(client.dataset_path): + for file in files: + found.append(os.path.join(basedir, file).replace(client.dataset_path, "")) + return found + + def _collect_table_counts(p) -> Dict[str, int]: + return load_table_counts( + p, "items", "items2", "items3", "_dlt_loads", "_dlt_version", "_dlt_pipeline_state" + ) + + # generate 4 loads from 2 pipelines, store load ids + p1 = destination_config.setup_pipeline("p1", dataset_name="layout_test") + p2 = destination_config.setup_pipeline("p2", dataset_name="layout_test") + c1 = cast(FilesystemClient, p1.destination_client()) + c2 = cast(FilesystemClient, p2.destination_client()) + + # first two loads + p1.run([1, 2, 3], table_name="items").loads_ids[0] + load_id_2_1 = p2.run([4, 5, 6], table_name="items").loads_ids[0] + assert _collect_table_counts(p1) == { + "items": 6, + "_dlt_loads": 2, + "_dlt_pipeline_state": 2, + "_dlt_version": 2, + } + sc1_old = c1.get_stored_schema() + sc2_old = c2.get_stored_schema() + s1_old = c1.get_stored_state("p1") + s2_old = c1.get_stored_state("p2") + + created_files = _collect_files(p1) + # 4 init files, 2 item files, 2 load files, 2 state files, 2 version files + assert len(created_files) == 12 + + # second two loads + @dlt.resource(table_name="items2") + def some_data(): + dlt.current.resource_state()["state"] = {"some": "state"} + yield from [1, 2, 3] + + load_id_1_2 = p1.run(some_data(), table_name="items2").loads_ids[ + 0 + ] # force state and migration bump here + p2.run([4, 5, 6], table_name="items").loads_ids[0] # no migration here + + # 4 loads for 2 pipelines, one schema and state change on p2 changes so 3 versions and 3 states + assert _collect_table_counts(p1) == { + "items": 9, + "items2": 3, + "_dlt_loads": 4, + "_dlt_pipeline_state": 3, + "_dlt_version": 3, + } + + # test accessors for state + s1 = c1.get_stored_state("p1") + s2 = c1.get_stored_state("p2") + assert s1.dlt_load_id == load_id_1_2 # second load + assert s2.dlt_load_id == load_id_2_1 # first load + assert s1_old.version != s1.version + assert s2_old.version == s2.version + + # test accessors for schema + sc1 = c1.get_stored_schema() + sc2 = c2.get_stored_schema() + assert sc1.version_hash != sc1_old.version_hash + assert sc2.version_hash == sc2_old.version_hash + assert sc1.version_hash != sc2.version_hash + + assert not c1.get_stored_schema_by_hash("blah") + assert c2.get_stored_schema_by_hash(sc1_old.version_hash) + + created_files = _collect_files(p1) + # 4 init files, 4 item files, 4 load files, 3 state files, 3 version files + assert len(created_files) == 18 + + # drop it + p1.destination_client().drop_storage() + created_files = _collect_files(p1) + assert len(created_files) == 0 + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_buckets_filesystem_configs=True), + ids=lambda x: x.name, +) +def test_knows_dataset_state(destination_config: DestinationTestConfiguration) -> None: + # check if pipeline knows initializisation state of dataset + p1 = destination_config.setup_pipeline("p1", dataset_name="layout_test") + assert not p1.destination_client().is_storage_initialized() + p1.run([1, 2, 3], table_name="items") + assert p1.destination_client().is_storage_initialized() + p1.destination_client().drop_storage() + assert not p1.destination_client().is_storage_initialized() + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_buckets_filesystem_configs=True), + ids=lambda x: x.name, +) +@pytest.mark.parametrize("restore", [True, False]) +def test_simple_incremental( + destination_config: DestinationTestConfiguration, + restore: bool, +) -> None: + os.environ["RESTORE_FROM_DESTINATION"] = str(restore) + + p = destination_config.setup_pipeline("p1", dataset_name="incremental_test") + + @dlt.resource(name="items") + def my_resource(prim_key=dlt.sources.incremental("id")): + yield from [ + {"id": 1}, + {"id": 2}, + ] + + @dlt.resource(name="items") + def my_resource_inc(prim_key=dlt.sources.incremental("id")): + yield from [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + {"id": 4}, + ] + + p.run(my_resource) + p._wipe_working_folder() + + p = destination_config.setup_pipeline("p1", dataset_name="incremental_test") + p.run(my_resource_inc) + + assert load_table_counts(p, "items") == {"items": 4 if restore else 6} diff --git a/tests/load/pipeline/test_replace_disposition.py b/tests/load/pipeline/test_replace_disposition.py index 6efde6e019..09a746433f 100644 --- a/tests/load/pipeline/test_replace_disposition.py +++ b/tests/load/pipeline/test_replace_disposition.py @@ -41,8 +41,6 @@ def test_replace_disposition( # make duckdb to reuse database in working folder os.environ["DESTINATION__DUCKDB__CREDENTIALS"] = "duckdb:///test_replace_disposition.duckdb" - # TODO: start storing _dlt_loads with right json content - increase_loads = lambda x: x if destination_config.destination == "filesystem" else x + 1 increase_state_loads = lambda info: len( [ job @@ -52,11 +50,9 @@ def test_replace_disposition( ] ) - # filesystem does not have versions and child tables + # filesystem does not have child tables, prepend defaults def norm_table_counts(counts: Dict[str, int], *child_tables: str) -> Dict[str, int]: - if destination_config.destination != "filesystem": - return counts - return {**{"_dlt_version": 0}, **{t: 0 for t in child_tables}, **counts} + return {**{t: 0 for t in child_tables}, **counts} dataset_name = "test_replace_strategies_ds" + uniq_id() pipeline = destination_config.setup_pipeline( @@ -108,8 +104,8 @@ def append_items(): assert_load_info(info) # count state records that got extracted state_records = increase_state_loads(info) - dlt_loads: int = increase_loads(0) - dlt_versions: int = increase_loads(0) + dlt_loads: int = 1 + dlt_versions: int = 1 # second run with higher offset so we can check the results offset = 1000 @@ -118,11 +114,11 @@ def append_items(): ) assert_load_info(info) state_records += increase_state_loads(info) - dlt_loads = increase_loads(dlt_loads) + dlt_loads += 1 # we should have all items loaded table_counts = load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) - assert norm_table_counts(table_counts) == { + assert table_counts == { "append_items": 24, # loaded twice "items": 120, "items__sub_items": 240, @@ -166,7 +162,7 @@ def load_items_none(): ) assert_load_info(info) state_records += increase_state_loads(info) - dlt_loads = increase_loads(dlt_loads) + dlt_loads += 1 # table and child tables should be cleared table_counts = load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) @@ -200,8 +196,8 @@ def load_items_none(): assert_load_info(info) new_state_records = increase_state_loads(info) assert new_state_records == 1 - dlt_loads = increase_loads(dlt_loads) - dlt_versions = increase_loads(dlt_versions) + dlt_loads += 1 + dlt_versions += 1 # check trace assert pipeline_2.last_trace.last_normalize_info.row_counts == { "items_copy": 120, @@ -214,18 +210,18 @@ def load_items_none(): assert_load_info(info) new_state_records = increase_state_loads(info) assert new_state_records == 0 - dlt_loads = increase_loads(dlt_loads) + dlt_loads += 1 # new pipeline table_counts = load_table_counts(pipeline_2, *pipeline_2.default_schema.tables.keys()) - assert norm_table_counts(table_counts) == { + assert table_counts == { "append_items": 48, "items_copy": 120, "items_copy__sub_items": 240, "items_copy__sub_items__sub_sub_items": 120, "_dlt_pipeline_state": state_records + 1, "_dlt_loads": dlt_loads, - "_dlt_version": increase_loads(dlt_versions), + "_dlt_version": dlt_versions + 1, } # check trace assert pipeline_2.last_trace.last_normalize_info.row_counts == { @@ -243,7 +239,7 @@ def load_items_none(): "items__sub_items__sub_sub_items": 0, "_dlt_pipeline_state": state_records + 1, "_dlt_loads": dlt_loads, # next load - "_dlt_version": increase_loads(dlt_versions), # new table name -> new schema + "_dlt_version": dlt_versions + 1, # new table name -> new schema } diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index d421819121..6518ca46ae 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -43,7 +43,10 @@ def duckdb_pipeline_location() -> None: @pytest.mark.parametrize( "destination_config", destinations_configs( - default_staging_configs=True, default_sql_configs=True, default_vector_configs=True + default_staging_configs=True, + default_sql_configs=True, + default_vector_configs=True, + all_buckets_filesystem_configs=True, ), ids=lambda x: x.name, ) @@ -62,8 +65,9 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - load_pipeline_state_from_destination(p.pipeline_name, job_client) # sync the schema p.sync_schema() - exists, _ = job_client.get_storage_table(schema.version_table_name) - assert exists is True + # check if schema exists + stored_schema = job_client.get_stored_schema() + assert stored_schema is not None # dataset exists, still no table with pytest.raises(DestinationUndefinedEntity): load_pipeline_state_from_destination(p.pipeline_name, job_client) @@ -72,7 +76,7 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - initial_state["_local"]["_last_extracted_at"] = pendulum.now() initial_state["_local"]["_last_extracted_hash"] = initial_state["_version_hash"] # add _dlt_id and _dlt_load_id - resource = state_resource(initial_state) + resource, _ = state_resource(initial_state) resource.apply_hints( columns={ "_dlt_id": {"name": "_dlt_id", "data_type": "text", "nullable": False}, @@ -86,8 +90,8 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - # then in database. parquet is created in schema order and in Redshift it must exactly match the order. # schema.bump_version() p.sync_schema() - exists, _ = job_client.get_storage_table(schema.state_table_name) - assert exists is True + stored_schema = job_client.get_stored_schema() + assert stored_schema is not None # table is there but no state assert load_pipeline_state_from_destination(p.pipeline_name, job_client) is None # extract state @@ -180,7 +184,9 @@ def test_silently_skip_on_invalid_credentials( @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) @pytest.mark.parametrize("use_single_dataset", [True, False]) @@ -263,7 +269,9 @@ def _make_dn_name(schema_name: str) -> str: @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_restore_state_pipeline(destination_config: DestinationTestConfiguration) -> None: @@ -387,7 +395,9 @@ def some_data(): @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_ignore_state_unfinished_load(destination_config: DestinationTestConfiguration) -> None: @@ -417,7 +427,9 @@ def complete_package_mock(self, load_id: str, schema: Schema, aborted: bool = Fa @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_restore_schemas_while_import_schemas_exist( @@ -503,7 +515,9 @@ def test_restore_change_dataset_and_destination(destination_name: str) -> None: @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_restore_state_parallel_changes(destination_config: DestinationTestConfiguration) -> None: @@ -609,7 +623,9 @@ def some_data(param: str) -> Any: @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_reset_pipeline_on_deleted_dataset( diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index c4e1f5314b..036154b582 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -3,6 +3,7 @@ import pytest import random from os import environ +import io import dlt from dlt.common import json, sleep @@ -80,7 +81,7 @@ def assert_data_table_counts(p: dlt.Pipeline, expected_counts: DictStrAny) -> No ), f"Table counts do not match, expected {expected_counts}, got {table_counts}" -def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: +def load_file(fs_client, path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: """ util function to load a filesystem destination file and return parsed content values may not be cast to the right type, especially for insert_values, please @@ -96,26 +97,22 @@ def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: # table name will be last element of path table_name = path.split("/")[-1] - - # skip loads table - if table_name == "_dlt_loads": - return table_name, [] - full_path = posixpath.join(path, file) # load jsonl if ext == "jsonl": - with open(full_path, "rU", encoding="utf-8") as f: - for line in f: + file_text = fs_client.read_text(full_path) + for line in file_text.split("\n"): + if line: result.append(json.loads(line)) # load insert_values (this is a bit volatile if the exact format of the source file changes) elif ext == "insert_values": - with open(full_path, "rU", encoding="utf-8") as f: - lines = f.readlines() - # extract col names - cols = lines[0][15:-2].split(",") - for line in lines[2:]: + file_text = fs_client.read_text(full_path) + lines = file_text.split("\n") + cols = lines[0][15:-2].split(",") + for line in lines[2:]: + if line: values = line[1:-3].split(",") result.append(dict(zip(cols, values))) @@ -123,20 +120,20 @@ def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: elif ext == "parquet": import pyarrow.parquet as pq - with open(full_path, "rb") as f: - table = pq.read_table(f) - cols = table.column_names - count = 0 - for column in table: - column_name = cols[count] - item_count = 0 - for item in column.to_pylist(): - if len(result) <= item_count: - result.append({column_name: item}) - else: - result[item_count][column_name] = item - item_count += 1 - count += 1 + file_bytes = fs_client.read_bytes(full_path) + table = pq.read_table(io.BytesIO(file_bytes)) + cols = table.column_names + count = 0 + for column in table: + column_name = cols[count] + item_count = 0 + for item in column.to_pylist(): + if len(result) <= item_count: + result.append({column_name: item}) + else: + result[item_count][column_name] = item + item_count += 1 + count += 1 return table_name, result @@ -149,7 +146,7 @@ def load_files(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, A client.dataset_path, detail=False, refresh=True ): for file in files: - table_name, items = load_file(basedir, file) + table_name, items = load_file(client.fs_client, basedir, file) if table_name not in table_names: continue if table_name in result: @@ -157,10 +154,6 @@ def load_files(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, A else: result[table_name] = items - # loads file is special case - if LOADS_TABLE_NAME in table_names and file.find(".{LOADS_TABLE_NAME}."): - result[LOADS_TABLE_NAME] = [] - return result