Skip to content

Commit

Permalink
filesystem state sync (#1184)
Browse files Browse the repository at this point in the history
* clean some stuff

* first messy version of filesystem state sync

* clean up a bit

* fix bug in state sync

* enable state tests for all bucket providers

* do not store state to uninitialized dataset folders

* fix linter errors

* get current pipeline from pipeline context

* fix bug in filesystem table init

* update testing pipe

* move away from "current" file, rather iterator bucket path contents

* store pipeline state in load package state and send to filesystem destination from there

* fix tests for changed number of files in filesystem destination

* remove dev code

* create init file also to mark datasets

* fix tests to respect new init file
change filesystem to fallback, to old state loading when used as staging destination

* update filesystem docs

* fix incoming tests of placeholders

* small fixes

* adds some tests for filesystem state
also fixes table count loading to work for all bucket destinations

* fix test helper

* save schema with timestamp instead of load_id

* pr fixes and move pipeline state saving to committing of extracted packages

* ensure pipeline state is only saved to load package if it has changed

* adds missing state injection into state package

* fix athena iceberg locations

* fix google drive filesystem with missing argument
  • Loading branch information
sh-rp authored and zem360 committed Apr 17, 2024
1 parent 652bbfa commit 981c26b
Show file tree
Hide file tree
Showing 19 changed files with 568 additions and 139 deletions.
5 changes: 4 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ def drop_storage(self) -> None:
pass

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
"""Updates storage to the current schema.
Expand Down Expand Up @@ -434,6 +436,7 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]:

@abstractmethod
def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo:
"""retrieves the stored schema by hash"""
pass

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/fsspecs/google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def export(self, path: str, mime_type: str) -> Any:
fileId=file_id, mimeType=mime_type, supportsAllDrives=True
).execute()

def ls(self, path: str, detail: Optional[bool] = False) -> Any:
def ls(self, path: str, detail: Optional[bool] = False, refresh: Optional[bool] = False) -> Any:
"""List files in a directory.
Args:
Expand Down
14 changes: 14 additions & 0 deletions dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,23 @@
"""Loader file formats with internal job types"""


class TPipelineStateDoc(TypedDict, total=False):
"""Corresponds to the StateInfo Tuple"""

version: int
engine_version: int
pipeline_name: str
state: str
version_hash: str
created_at: datetime.datetime
dlt_load_id: NotRequired[str]


class TLoadPackageState(TVersionedState, total=False):
created_at: DateTime
"""Timestamp when the load package was created"""
pipeline_state: NotRequired[TPipelineStateDoc]
"""Pipeline state, added at the end of the extraction phase"""

"""A section of state that does not participate in change merging and version control"""
destination_state: NotRequired[Dict[str, Any]]
Expand Down
4 changes: 3 additions & 1 deletion dlt/destinations/impl/destination/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def drop_storage(self) -> None:
pass

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
return super().update_stored_schema(only_tables, expected_update)

Expand Down
4 changes: 3 additions & 1 deletion dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ def drop_storage(self) -> None:
pass

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
applied_update = super().update_stored_schema(only_tables, expected_update)
if self.config.fail_schema_update:
Expand Down
241 changes: 220 additions & 21 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import posixpath
import os
import base64
from types import TracebackType
from typing import ClassVar, List, Type, Iterable, Set, Iterator
from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple, cast
from fsspec import AbstractFileSystem
from contextlib import contextmanager
from dlt.common import json, pendulum
from dlt.common.typing import DictStrAny

import re

import dlt
from dlt.common import logger
from dlt.common import logger, time
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.storages import FileStorage, fsspec_from_config
from dlt.common.destination import DestinationCapabilitiesContext
Expand All @@ -17,15 +22,23 @@
JobClientBase,
FollowupJob,
WithStagingDataset,
WithStateSync,
StorageSchemaInfo,
StateInfo,
DoNothingJob,
)

from dlt.common.destination.exceptions import DestinationUndefinedEntity
from dlt.destinations.job_impl import EmptyLoadJob
from dlt.destinations.impl.filesystem import capabilities
from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations import path_utils


INIT_FILE_NAME = "init"
FILENAME_SEPARATOR = "__"


class LoadFilesystemJob(LoadJob):
def __init__(
self,
Expand Down Expand Up @@ -86,7 +99,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
return jobs


class FilesystemClient(JobClientBase, WithStagingDataset):
class FilesystemClient(JobClientBase, WithStagingDataset, WithStateSync):
"""filesystem client storing jobs in memory"""

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
Expand Down Expand Up @@ -166,31 +179,56 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
" should be created previously!"
)

# we mark the storage folder as initialized
self.fs_client.makedirs(self.dataset_path, exist_ok=True)
self.fs_client.touch(posixpath.join(self.dataset_path, INIT_FILE_NAME))

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> TSchemaTables:
# create destination dirs for all tables
dirs_to_create = self._get_table_dirs(only_tables or self.schema.tables.keys())
for directory in dirs_to_create:
table_names = only_tables or self.schema.tables.keys()
dirs_to_create = self._get_table_dirs(table_names)
for tables_name, directory in zip(table_names, dirs_to_create):
self.fs_client.makedirs(directory, exist_ok=True)
# we need to mark the folders of the data tables as initialized
if tables_name in self.schema.dlt_table_names():
self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME))

# don't store schema when used as staging
if not self.config.as_staging:
self._store_current_schema()

return expected_update

def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]:
"""Gets unique directories where table data is stored."""
table_dirs: Set[str] = set()
def _get_table_dirs(self, table_names: Iterable[str]) -> List[str]:
"""Gets directories where table data is stored."""
table_dirs: List[str] = []
for table_name in table_names:
table_prefix = self.table_prefix_layout.format(
schema_name=self.schema.name, table_name=table_name
)
# dlt tables do not respect layout (for now)
if table_name in self.schema.dlt_table_names():
table_prefix = posixpath.join(table_name, "")
else:
table_prefix = self.table_prefix_layout.format(
schema_name=self.schema.name, table_name=table_name
)
destination_dir = posixpath.join(self.dataset_path, table_prefix)
# extract the path component
table_dirs.add(os.path.dirname(destination_dir))
table_dirs.append(posixpath.dirname(destination_dir))
return table_dirs

def is_storage_initialized(self) -> bool:
return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return]
return self.fs_client.exists(posixpath.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return]

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
# skip the state table, we create a jsonl file in the complete_load step
# this does not apply to scenarios where we are using filesystem as staging
# where we want to load the state the regular way
if table["name"] == self.schema.state_table_name and not self.config.as_staging:
return DoNothingJob(file_path)

cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob
return cls(
file_path,
Expand All @@ -203,12 +241,6 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
def restore_file_load(self, file_path: str) -> LoadJob:
return EmptyLoadJob.from_file_path(file_path, "completed")

def complete_load(self, load_id: str) -> None:
schema_name = self.schema.name
table_name = self.schema.loads_table_name
file_name = f"{schema_name}.{table_name}.{load_id}"
self.fs_client.touch(posixpath.join(self.dataset_path, file_name))

def __enter__(self) -> "FilesystemClient":
return self

Expand All @@ -219,3 +251,170 @@ def __exit__(

def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
return False

#
# state stuff
#

def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None:
dirname = posixpath.dirname(filepath)
if not self.fs_client.isdir(dirname):
return
self.fs_client.write_text(filepath, json.dumps(data), "utf-8")

def _to_path_safe_string(self, s: str) -> str:
"""for base64 strings"""
return base64.b64decode(s).hex() if s else None

def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]:
if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
for filepath in self.fs_client.ls(dirname, detail=False, refresh=True):
filename = os.path.splitext(os.path.basename(filepath))[0]
fileparts = filename.split(FILENAME_SEPARATOR)
if len(fileparts) != 3:
continue
yield filepath, fileparts

def _store_load(self, load_id: str) -> None:
# write entry to load "table"
# TODO: this is also duplicate across all destinations. DRY this.
load_data = {
"load_id": load_id,
"schema_name": self.schema.name,
"status": 0,
"inserted_at": pendulum.now().isoformat(),
"schema_version_hash": self.schema.version_hash,
}
filepath = posixpath.join(
self.dataset_path,
self.schema.loads_table_name,
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}.jsonl",
)
self._write_to_json_file(filepath, load_data)

def complete_load(self, load_id: str) -> None:
# store current state
self._store_current_state(load_id)
self._store_load(load_id)

#
# state read/write
#

def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return posixpath.join(
self.dataset_path,
self.schema.state_table_name,
f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _store_current_state(self, load_id: str) -> None:
# don't save the state this way when used as staging
if self.config.as_staging:
return
# get state doc from current pipeline
from dlt.pipeline.current import load_package

pipeline_state_doc = load_package()["state"].get("pipeline_state")

if not pipeline_state_doc:
return

# get paths
pipeline_name = pipeline_state_doc["pipeline_name"]
hash_path = self._get_state_file_name(
pipeline_name, self.schema.stored_version_hash, load_id
)

# write
self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc))

def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# get base dir
dirname = posixpath.dirname(self._get_state_file_name(pipeline_name, "", ""))

# search newest state
selected_path = None
newest_load_id = "0"
for filepath, fileparts in self._list_dlt_dir(dirname):
if fileparts[0] == pipeline_name and fileparts[1] > newest_load_id:
newest_load_id = fileparts[1]
selected_path = filepath

# Load compressed state from destination
if selected_path:
state_json = json.loads(self.fs_client.read_text(selected_path))
state_json.pop("version_hash")
return StateInfo(**state_json)

return None

#
# Schema read/write
#

def _get_schema_file_name(self, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return posixpath.join(
self.dataset_path,
self.schema.version_table_name,
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _get_stored_schema_by_hash_or_newest(
self, version_hash: str = None
) -> Optional[StorageSchemaInfo]:
"""Get the schema by supplied hash, falls back to getting the newest version matching the existing schema name"""
version_hash = self._to_path_safe_string(version_hash)
dirname = posixpath.dirname(self._get_schema_file_name("", ""))
# find newest schema for pipeline or by version hash
selected_path = None
newest_load_id = "0"
for filepath, fileparts in self._list_dlt_dir(dirname):
if (
not version_hash
and fileparts[0] == self.schema.name
and fileparts[1] > newest_load_id
):
newest_load_id = fileparts[1]
selected_path = filepath
elif fileparts[2] == version_hash:
selected_path = filepath
break

if selected_path:
return StorageSchemaInfo(**json.loads(self.fs_client.read_text(selected_path)))

return None

def _store_current_schema(self) -> None:
# check if schema with hash exists
current_hash = self.schema.stored_version_hash
if self._get_stored_schema_by_hash_or_newest(current_hash):
return

# get paths
schema_id = str(time.precise_time())
filepath = self._get_schema_file_name(self.schema.stored_version_hash, schema_id)

# TODO: duplicate of weaviate implementation, should be abstracted out
version_info = {
"version_hash": self.schema.stored_version_hash,
"schema_name": self.schema.name,
"version": self.schema.version,
"engine_version": self.schema.ENGINE_VERSION,
"inserted_at": pendulum.now(),
"schema": json.dumps(self.schema.to_dict()),
}

# we always keep tabs on what the current schema is
self._write_to_json_file(filepath, version_info)

def get_stored_schema(self) -> Optional[StorageSchemaInfo]:
"""Retrieves newest schema from destination storage"""
return self._get_stored_schema_by_hash_or_newest()

def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]:
return self._get_stored_schema_by_hash_or_newest(version_hash)
4 changes: 3 additions & 1 deletion dlt/destinations/impl/qdrant/qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ def _delete_sentinel_collection(self) -> None:
self.db_client.delete_collection(self.sentinel_collection)

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
applied_update: TSchemaTables = {}
Expand Down

0 comments on commit 981c26b

Please sign in to comment.