Skip to content

Commit

Permalink
pr fixes and move pipeline state saving to committing of extracted pa…
Browse files Browse the repository at this point in the history
…ckages
  • Loading branch information
sh-rp committed Apr 17, 2024
1 parent 0d5423c commit b2b5913
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 69 deletions.
22 changes: 14 additions & 8 deletions dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,27 @@
"""Loader file formats with internal job types"""


class TPipelineStateDoc(TypedDict, total=False):
"""Corresponds to the StateInfo Tuple"""

version: int
engine_version: int
pipeline_name: str
state: str
version_hash: str
created_at: datetime.datetime
dlt_load_id: NotRequired[str]


class TLoadPackageState(TVersionedState, total=False):
created_at: DateTime
"""Timestamp when the load package was created"""
pipeline_state: NotRequired[TPipelineStateDoc]
"""Pipeline state, added at the end of the extraction phase"""

"""A section of state that does not participate in change merging and version control"""
destination_state: NotRequired[Dict[str, Any]]
"""private space for destinations to store state relevant only to the load package"""
source_state: NotRequired[Dict[str, Any]]
"""private space for source to store state relevant only to the load package, currently used for storing pipeline state"""


class TLoadPackage(TypedDict, total=False):
Expand Down Expand Up @@ -691,12 +703,6 @@ def destination_state() -> DictStrAny:
return lp["state"].setdefault("destination_state", {})


def load_package_source_state() -> DictStrAny:
"""Get segment of load package state that is specific to the sources."""
lp = load_package()
return lp["state"].setdefault("source_state", {})


def clear_destination_state(commit: bool = True) -> None:
"""Clear segment of load package state that is specific to the current destination. Optionally commit to load package."""
lp = load_package()
Expand Down
43 changes: 27 additions & 16 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import posixpath
import os
import base64
from types import TracebackType
from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple
from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple, cast
from fsspec import AbstractFileSystem
from contextlib import contextmanager
from dlt.common import json, pendulum
Expand Down Expand Up @@ -35,6 +36,7 @@


INIT_FILE_NAME = "init"
FILENAME_SEPARATOR = "__"


class LoadFilesystemJob(LoadJob):
Expand Down Expand Up @@ -261,14 +263,15 @@ def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None:
self.fs_client.write_text(filepath, json.dumps(data), "utf-8")

def _to_path_safe_string(self, s: str) -> str:
return "".join([c for c in s if re.match(r"\w", c)]) if s else None
"""for base64 strings"""
return base64.b64decode(s).hex() if s else None

def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]:
if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
for filepath in self.fs_client.listdir(dirname, detail=False):
for filepath in self.fs_client.ls(dirname, detail=False, refresh=True):
filename = os.path.splitext(os.path.basename(filepath))[0]
fileparts = filename.split("__")
fileparts = filename.split(FILENAME_SEPARATOR)
if len(fileparts) != 3:
continue
yield filepath, fileparts
Expand All @@ -283,7 +286,11 @@ def _store_load(self, load_id: str) -> None:
"inserted_at": pendulum.now().isoformat(),
"schema_version_hash": self.schema.version_hash,
}
filepath = f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}__{load_id}.jsonl"
filepath = posixpath.join(
self.dataset_path,
self.schema.loads_table_name,
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}.jsonl",
)
self._write_to_json_file(filepath, load_data)

def complete_load(self, load_id: str) -> None:
Expand All @@ -297,32 +304,32 @@ def complete_load(self, load_id: str) -> None:

def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl"
return posixpath.join(
self.dataset_path,
self.schema.state_table_name,
f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _store_current_state(self, load_id: str) -> None:
# don't save the state this way when used as staging
if self.config.as_staging:
return
# get state doc from current pipeline
from dlt.pipeline.current import load_package_source_state
from dlt.pipeline.state_sync import LOAD_PACKAGE_STATE_KEY
from dlt.pipeline.current import load_package

doc = load_package_source_state().get(LOAD_PACKAGE_STATE_KEY, {})
pipeline_state_doc = load_package()["state"].get("pipeline_state")

if not doc:
if not pipeline_state_doc:
return

# this is not done in other destinations...
doc["dlt_load_id"] = load_id

# get paths
pipeline_name = doc["pipeline_name"]
pipeline_name = pipeline_state_doc["pipeline_name"]
hash_path = self._get_state_file_name(
pipeline_name, self.schema.stored_version_hash, load_id
)

# write
self._write_to_json_file(hash_path, doc)
self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc))

def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# get base dir
Expand Down Expand Up @@ -350,7 +357,11 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:

def _get_schema_file_name(self, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl"
return posixpath.join(
self.dataset_path,
self.schema.version_table_name,
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _get_stored_schema_by_hash_or_newest(
self, version_hash: str = None
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dlt.extract.resource import DltResource, with_table_name, with_hints, with_package_state
from dlt.extract.resource import DltResource, with_table_name, with_hints
from dlt.extract.hints import make_hints
from dlt.extract.source import DltSource
from dlt.extract.decorators import source, resource, transformer, defer
Expand Down
9 changes: 7 additions & 2 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
WithStepInfo,
reset_resource_state,
)
from dlt.common.typing import DictStrAny
from dlt.common.runtime import signals
from dlt.common.runtime.collector import Collector, NULL_COLLECTOR
from dlt.common.schema import Schema, utils
Expand All @@ -30,6 +31,7 @@
from dlt.common.storages.load_package import (
ParsedLoadJobFileName,
LoadPackageStateInjectableContext,
TPipelineStateDoc,
)


Expand Down Expand Up @@ -400,10 +402,13 @@ def extract(
)
return load_id

def commit_packages(self) -> None:
"""Commits all extracted packages to normalize storage"""
def commit_packages(self, pipline_state_doc: TPipelineStateDoc = None) -> None:
"""Commits all extracted packages to normalize storage, and adds the pipeline state to the load package"""
# commit load packages
for load_id, metrics in self._load_id_metrics.items():
package_state = self.extract_storage.new_packages.get_load_package_state(load_id)
package_state["pipeline_state"] = pipline_state_doc
self.extract_storage.new_packages.save_load_package_state(load_id, package_state)
self.extract_storage.commit_new_load_package(
load_id, self.schema_storage[metrics[0]["schema_name"]]
)
Expand Down
9 changes: 1 addition & 8 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
TPartialTableSchema,
)
from dlt.extract.hints import HintsMeta
from dlt.extract.resource import DltResource, LoadPackageStateMeta
from dlt.extract.resource import DltResource
from dlt.extract.items import TableNameMeta
from dlt.extract.storage import ExtractorItemStorage

Expand Down Expand Up @@ -88,13 +88,6 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No
meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type]
self._reset_contracts_cache()

# if we have a load package state meta, store to load package
if isinstance(meta, LoadPackageStateMeta):
from dlt.pipeline.current import load_package_source_state, commit_load_package_state

load_package_source_state()[meta.state_key_name] = items
commit_load_package_state()

if table_name := self._get_static_table_name(resource, meta):
# write item belonging to table with static name
self._write_to_static_table(resource, table_name, items, meta)
Expand Down
16 changes: 0 additions & 16 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,6 @@ def with_hints(
return DataItemWithMeta(HintsMeta(hints, create_table_variant), item)


class LoadPackageStateMeta:
__slots__ = "state_key_name"

def __init__(self, state_key_name: str) -> None:
self.state_key_name = state_key_name


def with_package_state(item: TDataItems, state_key_name: str) -> DataItemWithMeta:
"""Marks `item` to also be inserted into the package state.
Will create a separate variant of hints for a table if `name` is provided in `hints` and `create_table_variant` is set.
"""
return DataItemWithMeta(LoadPackageStateMeta(state_key_name), item)


class DltResource(Iterable[TDataItem], DltResourceHints):
"""Implements dlt resource. Contains a data pipe that wraps a generating item and table schema that can be adjusted"""

Expand Down
1 change: 0 additions & 1 deletion dlt/pipeline/current.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
load_package,
commit_load_package_state,
destination_state,
load_package_source_state,
clear_destination_state,
)
from dlt.extract.decorators import get_source_schema, get_source
Expand Down
1 change: 0 additions & 1 deletion dlt/pipeline/mark.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@
with_table_name,
with_hints,
make_hints,
with_package_state,
materialize_schema_item as materialize_table_schema,
)
11 changes: 6 additions & 5 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
mark_state_extracted,
migrate_pipeline_state,
state_resource,
state_doc,
default_pipeline_state,
)
from dlt.pipeline.warnings import credentials_argument_deprecated
Expand Down Expand Up @@ -427,13 +428,13 @@ def extract(
raise SourceExhausted(source.name)
self._extract_source(extract_step, source, max_parallel_items, workers)
# extract state
state = None
if self.config.restore_from_destination:
# this will update state version hash so it will not be extracted again by with_state_sync
self._bump_version_and_extract_state(
self._container[StateInjectableContext].state, True, extract_step
)
# commit load packages
extract_step.commit_packages()
state = self._container[StateInjectableContext].state
self._bump_version_and_extract_state(state, True, extract_step)
# commit load packages with state
extract_step.commit_packages(state_doc(state) if state else None)
return self._get_step_info(extract_step)
except Exception as exc:
# emit step info
Expand Down
16 changes: 10 additions & 6 deletions dlt/pipeline/state_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlt.common.pendulum import pendulum
from dlt.common.typing import DictStrAny
from dlt.common.schema.typing import STATE_TABLE_NAME, TTableSchemaColumns
from dlt.common.destination.reference import WithStateSync, Destination
from dlt.common.destination.reference import WithStateSync, Destination, StateInfo
from dlt.common.versioned_state import (
generate_state_version_hash,
bump_state_version_if_modified,
Expand All @@ -14,7 +14,7 @@
decompress_state,
)
from dlt.common.pipeline import TPipelineState

from dlt.common.storages.load_package import TPipelineStateDoc
from dlt.extract import DltResource

from dlt.pipeline.exceptions import (
Expand Down Expand Up @@ -94,25 +94,29 @@ def migrate_pipeline_state(
return cast(TPipelineState, state)


def state_doc(state: TPipelineState) -> DictStrAny:
def state_doc(state: TPipelineState, load_id: str = None) -> TPipelineStateDoc:
state = copy(state)
state.pop("_local")
state_str = compress_state(state)
doc = {
doc: TPipelineStateDoc = {
"version": state["_state_version"],
"engine_version": state["_state_engine_version"],
"pipeline_name": state["pipeline_name"],
"state": state_str,
"created_at": pendulum.now(),
"version_hash": state["_version_hash"],
}
if load_id:
doc["dlt_load_id"] = load_id
return doc


def state_resource(state: TPipelineState) -> DltResource:
doc = dlt.mark.with_package_state(state_doc(state), LOAD_PACKAGE_STATE_KEY)
return dlt.resource(
[doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS
[state_doc(state)],
name=STATE_TABLE_NAME,
write_disposition="append",
columns=STATE_TABLE_COLUMNS,
)


Expand Down
7 changes: 4 additions & 3 deletions tests/load/filesystem/test_filesystem_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from dlt.destinations.impl.filesystem.filesystem import (
FilesystemDestinationClientConfiguration,
INIT_FILE_NAME,
)

from dlt.destinations.path_utils import create_path, prepare_datetime_params
Expand Down Expand Up @@ -156,10 +157,10 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non
client.dataset_path, detail=False, refresh=True
):
# remove internal paths
if "_dlt" in basedir or "init" in basedir:
if "_dlt" in basedir:
continue
for f in files:
if f == "init":
if f == INIT_FILE_NAME:
continue
paths.append(posixpath.join(basedir, f))

Expand Down Expand Up @@ -222,7 +223,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None
if "_dlt" in basedir:
continue
for f in files:
if f == "init":
if f == INIT_FILE_NAME:
continue
paths.append(posixpath.join(basedir, f))
assert list(sorted(paths)) == expected_files
7 changes: 5 additions & 2 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
from tests.common.utils import load_json_case
from tests.utils import ALL_TEST_DATA_ITEM_FORMATS, TestDataItemFormat, skip_if_not_active
from dlt.destinations.path_utils import create_path
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration
from tests.load.pipeline.utils import load_table_counts
from tests.load.pipeline.utils import (
destinations_configs,
DestinationTestConfiguration,
load_table_counts,
)


skip_if_not_active("filesystem")
Expand Down

0 comments on commit b2b5913

Please sign in to comment.