From b2b591303c9a650644b11da4017f1af257a22489 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 12:03:09 +0200 Subject: [PATCH] pr fixes and move pipeline state saving to committing of extracted packages --- dlt/common/storages/load_package.py | 22 ++++++---- .../impl/filesystem/filesystem.py | 43 ++++++++++++------- dlt/extract/__init__.py | 2 +- dlt/extract/extract.py | 9 +++- dlt/extract/extractors.py | 9 +--- dlt/extract/resource.py | 16 ------- dlt/pipeline/current.py | 1 - dlt/pipeline/mark.py | 1 - dlt/pipeline/pipeline.py | 11 ++--- dlt/pipeline/state_sync.py | 16 ++++--- .../load/filesystem/test_filesystem_client.py | 7 +-- .../load/pipeline/test_filesystem_pipeline.py | 7 ++- 12 files changed, 75 insertions(+), 69 deletions(-) diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index 6d4f4a0521..1752039775 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -54,15 +54,27 @@ """Loader file formats with internal job types""" +class TPipelineStateDoc(TypedDict, total=False): + """Corresponds to the StateInfo Tuple""" + + version: int + engine_version: int + pipeline_name: str + state: str + version_hash: str + created_at: datetime.datetime + dlt_load_id: NotRequired[str] + + class TLoadPackageState(TVersionedState, total=False): created_at: DateTime """Timestamp when the load package was created""" + pipeline_state: NotRequired[TPipelineStateDoc] + """Pipeline state, added at the end of the extraction phase""" """A section of state that does not participate in change merging and version control""" destination_state: NotRequired[Dict[str, Any]] """private space for destinations to store state relevant only to the load package""" - source_state: NotRequired[Dict[str, Any]] - """private space for source to store state relevant only to the load package, currently used for storing pipeline state""" class TLoadPackage(TypedDict, total=False): @@ -691,12 +703,6 @@ def destination_state() -> DictStrAny: return lp["state"].setdefault("destination_state", {}) -def load_package_source_state() -> DictStrAny: - """Get segment of load package state that is specific to the sources.""" - lp = load_package() - return lp["state"].setdefault("source_state", {}) - - def clear_destination_state(commit: bool = True) -> None: """Clear segment of load package state that is specific to the current destination. Optionally commit to load package.""" lp = load_package() diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 8edfa7f744..5dae4bf295 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,7 +1,8 @@ import posixpath import os +import base64 from types import TracebackType -from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple +from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple, cast from fsspec import AbstractFileSystem from contextlib import contextmanager from dlt.common import json, pendulum @@ -35,6 +36,7 @@ INIT_FILE_NAME = "init" +FILENAME_SEPARATOR = "__" class LoadFilesystemJob(LoadJob): @@ -261,14 +263,15 @@ def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: self.fs_client.write_text(filepath, json.dumps(data), "utf-8") def _to_path_safe_string(self, s: str) -> str: - return "".join([c for c in s if re.match(r"\w", c)]) if s else None + """for base64 strings""" + return base64.b64decode(s).hex() if s else None def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]: if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)): raise DestinationUndefinedEntity({"dir": dirname}) - for filepath in self.fs_client.listdir(dirname, detail=False): + for filepath in self.fs_client.ls(dirname, detail=False, refresh=True): filename = os.path.splitext(os.path.basename(filepath))[0] - fileparts = filename.split("__") + fileparts = filename.split(FILENAME_SEPARATOR) if len(fileparts) != 3: continue yield filepath, fileparts @@ -283,7 +286,11 @@ def _store_load(self, load_id: str) -> None: "inserted_at": pendulum.now().isoformat(), "schema_version_hash": self.schema.version_hash, } - filepath = f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}__{load_id}.jsonl" + filepath = posixpath.join( + self.dataset_path, + self.schema.loads_table_name, + f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}.jsonl", + ) self._write_to_json_file(filepath, load_data) def complete_load(self, load_id: str) -> None: @@ -297,32 +304,32 @@ def complete_load(self, load_id: str) -> None: def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str: """gets full path for schema file for a given hash""" - return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl" + return posixpath.join( + self.dataset_path, + self.schema.state_table_name, + f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", + ) def _store_current_state(self, load_id: str) -> None: # don't save the state this way when used as staging if self.config.as_staging: return # get state doc from current pipeline - from dlt.pipeline.current import load_package_source_state - from dlt.pipeline.state_sync import LOAD_PACKAGE_STATE_KEY + from dlt.pipeline.current import load_package - doc = load_package_source_state().get(LOAD_PACKAGE_STATE_KEY, {}) + pipeline_state_doc = load_package()["state"].get("pipeline_state") - if not doc: + if not pipeline_state_doc: return - # this is not done in other destinations... - doc["dlt_load_id"] = load_id - # get paths - pipeline_name = doc["pipeline_name"] + pipeline_name = pipeline_state_doc["pipeline_name"] hash_path = self._get_state_file_name( pipeline_name, self.schema.stored_version_hash, load_id ) # write - self._write_to_json_file(hash_path, doc) + self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc)) def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: # get base dir @@ -350,7 +357,11 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: def _get_schema_file_name(self, version_hash: str, load_id: str) -> str: """gets full path for schema file for a given hash""" - return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl" + return posixpath.join( + self.dataset_path, + self.schema.version_table_name, + f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", + ) def _get_stored_schema_by_hash_or_newest( self, version_hash: str = None diff --git a/dlt/extract/__init__.py b/dlt/extract/__init__.py index 7e0dd3d0fc..03b2e59539 100644 --- a/dlt/extract/__init__.py +++ b/dlt/extract/__init__.py @@ -1,4 +1,4 @@ -from dlt.extract.resource import DltResource, with_table_name, with_hints, with_package_state +from dlt.extract.resource import DltResource, with_table_name, with_hints from dlt.extract.hints import make_hints from dlt.extract.source import DltSource from dlt.extract.decorators import source, resource, transformer, defer diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index f07c4ccbc6..159a5e7c23 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -17,6 +17,7 @@ WithStepInfo, reset_resource_state, ) +from dlt.common.typing import DictStrAny from dlt.common.runtime import signals from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema import Schema, utils @@ -30,6 +31,7 @@ from dlt.common.storages.load_package import ( ParsedLoadJobFileName, LoadPackageStateInjectableContext, + TPipelineStateDoc, ) @@ -400,10 +402,13 @@ def extract( ) return load_id - def commit_packages(self) -> None: - """Commits all extracted packages to normalize storage""" + def commit_packages(self, pipline_state_doc: TPipelineStateDoc = None) -> None: + """Commits all extracted packages to normalize storage, and adds the pipeline state to the load package""" # commit load packages for load_id, metrics in self._load_id_metrics.items(): + package_state = self.extract_storage.new_packages.get_load_package_state(load_id) + package_state["pipeline_state"] = pipline_state_doc + self.extract_storage.new_packages.save_load_package_state(load_id, package_state) self.extract_storage.commit_new_load_package( load_id, self.schema_storage[metrics[0]["schema_name"]] ) diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index 86888fed0a..b4afc5b1f8 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -19,7 +19,7 @@ TPartialTableSchema, ) from dlt.extract.hints import HintsMeta -from dlt.extract.resource import DltResource, LoadPackageStateMeta +from dlt.extract.resource import DltResource from dlt.extract.items import TableNameMeta from dlt.extract.storage import ExtractorItemStorage @@ -88,13 +88,6 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type] self._reset_contracts_cache() - # if we have a load package state meta, store to load package - if isinstance(meta, LoadPackageStateMeta): - from dlt.pipeline.current import load_package_source_state, commit_load_package_state - - load_package_source_state()[meta.state_key_name] = items - commit_load_package_state() - if table_name := self._get_static_table_name(resource, meta): # write item belonging to table with static name self._write_to_static_table(resource, table_name, items, meta) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 87e4fd7f76..4776158bbb 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -76,22 +76,6 @@ def with_hints( return DataItemWithMeta(HintsMeta(hints, create_table_variant), item) -class LoadPackageStateMeta: - __slots__ = "state_key_name" - - def __init__(self, state_key_name: str) -> None: - self.state_key_name = state_key_name - - -def with_package_state(item: TDataItems, state_key_name: str) -> DataItemWithMeta: - """Marks `item` to also be inserted into the package state. - - Will create a separate variant of hints for a table if `name` is provided in `hints` and `create_table_variant` is set. - - """ - return DataItemWithMeta(LoadPackageStateMeta(state_key_name), item) - - class DltResource(Iterable[TDataItem], DltResourceHints): """Implements dlt resource. Contains a data pipe that wraps a generating item and table schema that can be adjusted""" diff --git a/dlt/pipeline/current.py b/dlt/pipeline/current.py index 4bbe74a123..25fd398623 100644 --- a/dlt/pipeline/current.py +++ b/dlt/pipeline/current.py @@ -7,7 +7,6 @@ load_package, commit_load_package_state, destination_state, - load_package_source_state, clear_destination_state, ) from dlt.extract.decorators import get_source_schema, get_source diff --git a/dlt/pipeline/mark.py b/dlt/pipeline/mark.py index 0b753539be..3956d9bbe2 100644 --- a/dlt/pipeline/mark.py +++ b/dlt/pipeline/mark.py @@ -3,6 +3,5 @@ with_table_name, with_hints, make_hints, - with_package_state, materialize_schema_item as materialize_table_schema, ) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 5d719882a5..2b6d5c7a85 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -138,6 +138,7 @@ mark_state_extracted, migrate_pipeline_state, state_resource, + state_doc, default_pipeline_state, ) from dlt.pipeline.warnings import credentials_argument_deprecated @@ -427,13 +428,13 @@ def extract( raise SourceExhausted(source.name) self._extract_source(extract_step, source, max_parallel_items, workers) # extract state + state = None if self.config.restore_from_destination: # this will update state version hash so it will not be extracted again by with_state_sync - self._bump_version_and_extract_state( - self._container[StateInjectableContext].state, True, extract_step - ) - # commit load packages - extract_step.commit_packages() + state = self._container[StateInjectableContext].state + self._bump_version_and_extract_state(state, True, extract_step) + # commit load packages with state + extract_step.commit_packages(state_doc(state) if state else None) return self._get_step_info(extract_step) except Exception as exc: # emit step info diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index 70a58d1f98..e5d937d05a 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -5,7 +5,7 @@ from dlt.common.pendulum import pendulum from dlt.common.typing import DictStrAny from dlt.common.schema.typing import STATE_TABLE_NAME, TTableSchemaColumns -from dlt.common.destination.reference import WithStateSync, Destination +from dlt.common.destination.reference import WithStateSync, Destination, StateInfo from dlt.common.versioned_state import ( generate_state_version_hash, bump_state_version_if_modified, @@ -14,7 +14,7 @@ decompress_state, ) from dlt.common.pipeline import TPipelineState - +from dlt.common.storages.load_package import TPipelineStateDoc from dlt.extract import DltResource from dlt.pipeline.exceptions import ( @@ -94,11 +94,11 @@ def migrate_pipeline_state( return cast(TPipelineState, state) -def state_doc(state: TPipelineState) -> DictStrAny: +def state_doc(state: TPipelineState, load_id: str = None) -> TPipelineStateDoc: state = copy(state) state.pop("_local") state_str = compress_state(state) - doc = { + doc: TPipelineStateDoc = { "version": state["_state_version"], "engine_version": state["_state_engine_version"], "pipeline_name": state["pipeline_name"], @@ -106,13 +106,17 @@ def state_doc(state: TPipelineState) -> DictStrAny: "created_at": pendulum.now(), "version_hash": state["_version_hash"], } + if load_id: + doc["dlt_load_id"] = load_id return doc def state_resource(state: TPipelineState) -> DltResource: - doc = dlt.mark.with_package_state(state_doc(state), LOAD_PACKAGE_STATE_KEY) return dlt.resource( - [doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS + [state_doc(state)], + name=STATE_TABLE_NAME, + write_disposition="append", + columns=STATE_TABLE_COLUMNS, ) diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 65dd947367..41b98bdec3 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -9,6 +9,7 @@ from dlt.destinations.impl.filesystem.filesystem import ( FilesystemDestinationClientConfiguration, + INIT_FILE_NAME, ) from dlt.destinations.path_utils import create_path, prepare_datetime_params @@ -156,10 +157,10 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non client.dataset_path, detail=False, refresh=True ): # remove internal paths - if "_dlt" in basedir or "init" in basedir: + if "_dlt" in basedir: continue for f in files: - if f == "init": + if f == INIT_FILE_NAME: continue paths.append(posixpath.join(basedir, f)) @@ -222,7 +223,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None if "_dlt" in basedir: continue for f in files: - if f == "init": + if f == INIT_FILE_NAME: continue paths.append(posixpath.join(basedir, f)) assert list(sorted(paths)) == expected_files diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 37afbcb07a..20f326b160 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -19,8 +19,11 @@ from tests.common.utils import load_json_case from tests.utils import ALL_TEST_DATA_ITEM_FORMATS, TestDataItemFormat, skip_if_not_active from dlt.destinations.path_utils import create_path -from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration -from tests.load.pipeline.utils import load_table_counts +from tests.load.pipeline.utils import ( + destinations_configs, + DestinationTestConfiguration, + load_table_counts, +) skip_if_not_active("filesystem")