Skip to content

Commit

Permalink
Fix filesystem + test
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed May 29, 2024
1 parent 87cfd16 commit 879e4e5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
16 changes: 10 additions & 6 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
if not delete_schema:
return
# Delete all stored schemas
for filename, _ in self._iter_stored_schema_files():
self._delete_file(filename)
for filename, fileparts in self._iter_stored_schema_files():
if fileparts[0] == self.schema.name:
self._delete_file(filename)

def truncate_tables(self, table_names: List[str]) -> None:
"""Truncate table with given name"""
Expand Down Expand Up @@ -414,10 +415,9 @@ def _get_schema_file_name(self, version_hash: str, load_id: str) -> str:
)

def _iter_stored_schema_files(self) -> Iterator[Tuple[str, List[str]]]:
"""Iterator over all schema files matching the current schema name"""
"""Iterator over all stored schema files"""
for filepath, fileparts in self._list_dlt_table_files(self.schema.version_table_name):
if fileparts[0] == self.schema.name:
yield filepath, fileparts
yield filepath, fileparts

def _get_stored_schema_by_hash_or_newest(
self, version_hash: str = None
Expand All @@ -428,7 +428,11 @@ def _get_stored_schema_by_hash_or_newest(
selected_path = None
newest_load_id = "0"
for filepath, fileparts in self._iter_stored_schema_files():
if not version_hash and fileparts[1] > newest_load_id:
if (
not version_hash
and fileparts[0] == self.schema.name
and fileparts[1] > newest_load_id
):
newest_load_id = fileparts[1]
selected_path = filepath
elif fileparts[2] == version_hash:
Expand Down
5 changes: 3 additions & 2 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,9 @@ def _collect_table_counts(p) -> Dict[str, int]:
)

# generate 4 loads from 2 pipelines, store load ids
p1 = destination_config.setup_pipeline("p1", dataset_name="layout_test")
p2 = destination_config.setup_pipeline("p2", dataset_name="layout_test")
dataset_name = "layout_test_" + uniq_id()
p1 = destination_config.setup_pipeline("p1", dataset_name=dataset_name)
p2 = destination_config.setup_pipeline("p2", dataset_name=dataset_name)
c1 = cast(FilesystemClient, p1.destination_client())
c2 = cast(FilesystemClient, p2.destination_client())

Expand Down

0 comments on commit 879e4e5

Please sign in to comment.