Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic destination / sink decorator #1065

Merged
merged 51 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
8b6028b
start sink
sh-rp Jan 11, 2024
6a924d9
parquet sink prototype
sh-rp Jan 11, 2024
83c1af6
some more sink implementations
sh-rp Jan 12, 2024
b972f23
finish first batch of helpers
sh-rp Jan 12, 2024
9dabeff
add missing tests and fix linting
sh-rp Jan 13, 2024
af6defd
make configuratio more versatile
sh-rp Jan 15, 2024
4d730e8
implement sink function progress state
sh-rp Jan 15, 2024
3b577b0
move to iterator
sh-rp Jan 15, 2024
5527689
persist sink load state in pipeline state
sh-rp Jan 25, 2024
0657034
fix unrelated typo
sh-rp Jan 30, 2024
b5db5b8
move sink state storage to loadpackage state
sh-rp Jan 30, 2024
189d24b
additional pr fixes
sh-rp Jan 30, 2024
57ed090
disable creating empty state file on loadpackage init
sh-rp Jan 30, 2024
4f53bc4
add sink docs page
sh-rp Jan 31, 2024
c6c06ba
small changes
sh-rp Feb 1, 2024
2f6a15a
Merge branch 'devel' into d#/data_sink_decorator
sh-rp Feb 6, 2024
374b267
make loadstorage state versioned and separate out common base functions
sh-rp Feb 6, 2024
daae33e
restrict access of destinations to load package state in accessor fun…
sh-rp Feb 6, 2024
644e6f3
fix tests
sh-rp Feb 6, 2024
9930ad6
add tests for state and new injectable context
sh-rp Feb 6, 2024
678d187
fix linter
sh-rp Feb 6, 2024
376832d
fix linter error
sh-rp Feb 7, 2024
79fce9e
some pr fixes
sh-rp Feb 7, 2024
8eb3e1a
Merge commit '17aea98527eab19f3300ee161114541f7eb2b5b5' into d#/data_…
sh-rp Feb 8, 2024
105569a
more pr fixes
sh-rp Feb 8, 2024
27b8b2c
small readme changes
sh-rp Feb 8, 2024
e60f2f1
Merge branch 'devel' into d#/data_sink_decorator
sh-rp Mar 4, 2024
3229745
add load id to loadpackage info in current
sh-rp Mar 4, 2024
dbbbe7c
add support for directly passing through the naming convention to the…
sh-rp Mar 4, 2024
db9b488
add support for batch size zero (filepath passthrouh)
sh-rp Mar 4, 2024
3c39f41
use patched version of flak8 encoding
sh-rp Mar 5, 2024
3dfcf39
fix tests
sh-rp Mar 5, 2024
bc44618
add support for secrets and config in sink
sh-rp Mar 5, 2024
db8d0ed
update sink docs
sh-rp Mar 5, 2024
d8719c1
revert encodings branch
sh-rp Mar 5, 2024
d7eb19d
fix small linting problem
sh-rp Mar 5, 2024
ef35502
add support for config specs
sh-rp Mar 6, 2024
2db3430
add possibility to create a resolved partial
sh-rp Mar 7, 2024
9582b9f
Merge branch 'devel' into d#/data_sink_decorator
sh-rp Mar 7, 2024
f488210
add lock for resolving config
sh-rp Mar 7, 2024
4d2563b
change resolved partial method to dedicated function
sh-rp Mar 7, 2024
12b5cbd
change signatures in decorator
sh-rp Mar 12, 2024
3e5f808
fixes bug in inject wrapper refactor
sh-rp Mar 12, 2024
53dc2f9
mark destination decorator as experimental in the docs
sh-rp Mar 12, 2024
d1e1e38
change injection context locking strategy
sh-rp Mar 13, 2024
943ac3e
make tests independent from gcp imports
sh-rp Mar 13, 2024
1f47a05
move generic destination tests into common tests section destinations
sh-rp Mar 13, 2024
b66c653
fix global instantiation test after file move
sh-rp Mar 13, 2024
31b661b
add tests for locking injection context
sh-rp Mar 13, 2024
97ed409
make inject test a bit better
sh-rp Mar 13, 2024
246df72
skip generic destination in init test
sh-rp Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

from dlt import sources
from dlt.extract.decorators import source, resource, transformer, defer
from dlt.destinations.decorators import destination

from dlt.pipeline import (
pipeline as _pipeline,
run,
Expand Down Expand Up @@ -62,6 +64,7 @@
"resource",
"transformer",
"defer",
"destination",
"pipeline",
"run",
"attach",
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .specs.base_configuration import configspec, is_valid_hint, is_secret_hint, resolve_type
from .specs import known_sections
from .resolve import resolve_configuration, inject_section
from .inject import with_config, last_config, get_fun_spec
from .inject import with_config, last_config, get_fun_spec, create_resolved_partial

from .exceptions import (
ConfigFieldMissingException,
Expand Down
67 changes: 43 additions & 24 deletions dlt/common/configuration/container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from contextlib import contextmanager
from contextlib import contextmanager, nullcontext, AbstractContextManager
import re
import threading
from typing import ClassVar, Dict, Iterator, Tuple, Type, TypeVar
from typing import ClassVar, Dict, Iterator, Tuple, Type, TypeVar, Any

from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext
from dlt.common.configuration.exceptions import (
Expand Down Expand Up @@ -34,13 +34,17 @@ class Container:

thread_contexts: Dict[int, Dict[Type[ContainerInjectableContext], ContainerInjectableContext]]
"""A thread aware mapping of injection context """
_context_container_locks: Dict[str, threading.Lock]
"""Locks for container types on threads."""

main_context: Dict[Type[ContainerInjectableContext], ContainerInjectableContext]
"""Injection context for the main thread"""

def __new__(cls: Type["Container"]) -> "Container":
if not cls._INSTANCE:
cls._INSTANCE = super().__new__(cls)
cls._INSTANCE.thread_contexts = {}
cls._INSTANCE._context_container_locks = {}
cls._INSTANCE.main_context = cls._INSTANCE.thread_contexts[
Container._MAIN_THREAD_ID
] = {}
Expand Down Expand Up @@ -84,22 +88,22 @@ def _thread_context(
self, spec: Type[TConfiguration]
) -> Dict[Type[ContainerInjectableContext], ContainerInjectableContext]:
if spec.global_affinity:
context = self.main_context
return self.main_context
else:
# thread pool names used in dlt contain originating thread id. use this id over pool id
if m := re.match(r"dlt-pool-(\d+)-", threading.currentThread().getName()):
thread_id = int(m.group(1))
else:
thread_id = threading.get_ident()

# return main context for main thread
if thread_id == Container._MAIN_THREAD_ID:
return self.main_context
# we may add a new empty thread context so lock here
with Container._LOCK:
context = self.thread_contexts.get(thread_id)
if context is None:
if (context := self.thread_contexts.get(thread_id)) is None:
context = self.thread_contexts[thread_id] = {}
return context
return context

def _thread_getitem(
self, spec: Type[TConfiguration]
Expand Down Expand Up @@ -127,29 +131,44 @@ def _thread_delitem(
del context[spec]

@contextmanager
def injectable_context(self, config: TConfiguration) -> Iterator[TConfiguration]:
def injectable_context(
self, config: TConfiguration, lock_context: bool = False
) -> Iterator[TConfiguration]:
"""A context manager that will insert `config` into the container and restore the previous value when it gets out of scope."""

config.resolve()
spec = type(config)
previous_config: ContainerInjectableContext = None
context, previous_config = self._thread_getitem(spec)

# set new config and yield context
self._thread_setitem(context, spec, config)
try:
yield config
finally:
# before setting the previous config for given spec, check if there was no overlapping modification
context, current_config = self._thread_getitem(spec)
if current_config is config:
# config is injected for spec so restore previous
if previous_config is None:
self._thread_delitem(context, spec)
context = self._thread_context(spec)
lock: AbstractContextManager[Any]

# if there is a lock_id, we need a lock for the lock_id in the scope of the current context
if lock_context:
lock_key = f"{id(context)}"
if (lock := self._context_container_locks.get(lock_key)) is None:
with Container._LOCK:
self._context_container_locks[lock_key] = lock = threading.Lock()
else:
lock = nullcontext()

with lock:
# remember context and set item
previous_config = context.get(spec)
self._thread_setitem(context, spec, config)
try:
yield config
finally:
# before setting the previous config for given spec, check if there was no overlapping modification
context, current_config = self._thread_getitem(spec)
if current_config is config:
# config is injected for spec so restore previous
if previous_config is None:
self._thread_delitem(context, spec)
else:
self._thread_setitem(context, spec, previous_config)
else:
self._thread_setitem(context, spec, previous_config)
else:
# value was modified in the meantime and not restored
raise ContainerInjectableContextMangled(spec, context[spec], config)
# value was modified in the meantime and not restored
raise ContainerInjectableContextMangled(spec, context[spec], config)

@staticmethod
def thread_pool_prefix() -> str:
Expand Down
146 changes: 104 additions & 42 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import inspect

from functools import wraps
from typing import Callable, Dict, Type, Any, Optional, Tuple, TypeVar, overload
from typing import Callable, Dict, Type, Any, Optional, Tuple, TypeVar, overload, cast
from inspect import Signature, Parameter
from contextlib import nullcontext

from dlt.common.typing import DictStrAny, StrAny, TFun, AnyFun
from dlt.common.configuration.resolve import resolve_configuration, inject_section
from dlt.common.configuration.specs.base_configuration import BaseConfiguration
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext

from dlt.common.reflection.spec import spec_from_signature


Expand All @@ -32,6 +35,8 @@ def with_config(
auto_pipeline_section: bool = False,
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: BaseConfiguration = None,
base: Type[BaseConfiguration] = BaseConfiguration,
) -> TFun: ...


Expand All @@ -45,6 +50,8 @@ def with_config(
auto_pipeline_section: bool = False,
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
base: Type[BaseConfiguration] = BaseConfiguration,
) -> Callable[[TFun], TFun]: ...


Expand All @@ -58,6 +65,7 @@ def with_config(
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a "thread_safe" parameter so we only lock on decorated functions where we know that there might be issues?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will always lock when resolving configuration so not optional

base: Type[BaseConfiguration] = BaseConfiguration,
) -> Callable[[TFun], TFun]:
"""Injects values into decorated function arguments following the specification in `spec` or by deriving one from function's signature.

Expand All @@ -71,10 +79,11 @@ def with_config(
prefer_existing_sections: (bool, optional): When joining existing section context, the existing context will be preferred to the one in `sections`. Default: False
auto_pipeline_section (bool, optional): If True, a top level pipeline section will be added if `pipeline_name` argument is present . Defaults to False.
include_defaults (bool, optional): If True then arguments with default values will be included in synthesized spec. If False only the required arguments marked with `dlt.secrets.value` and `dlt.config.value` are included

base (Type[BaseConfiguration], optional): A base class for synthesized spec. Defaults to BaseConfiguration.
Returns:
Callable[[TFun], TFun]: A decorated function
"""

section_f: Callable[[StrAny], str] = None
# section may be a function from function arguments to section
if callable(sections):
Expand All @@ -88,9 +97,8 @@ def decorator(f: TFun) -> TFun:
)
spec_arg: Parameter = None
pipeline_name_arg: Parameter = None

if spec is None:
SPEC = spec_from_signature(f, sig, include_defaults)
SPEC = spec_from_signature(f, sig, include_defaults, base=base)
else:
SPEC = spec

Expand All @@ -109,49 +117,52 @@ def decorator(f: TFun) -> TFun:
pipeline_name_arg = p
pipeline_name_arg_default = None if p.default == Parameter.empty else p.default

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any:
def resolve_config(bound_args: inspect.BoundArguments) -> BaseConfiguration:
"""Resolve arguments using the provided spec"""
# bind parameters to signature
bound_args = sig.bind(*args, **kwargs)
# for calls containing resolved spec in the kwargs, we do not need to resolve again
config: BaseConfiguration = None
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)

# if section derivation function was provided then call it
if section_f:
curr_sections: Tuple[str, ...] = (section_f(bound_args.arguments),)
# sections may be a string
elif isinstance(sections, str):
curr_sections = (sections,)
else:
# if section derivation function was provided then call it
if section_f:
curr_sections: Tuple[str, ...] = (section_f(bound_args.arguments),)
# sections may be a string
elif isinstance(sections, str):
curr_sections = (sections,)
else:
curr_sections = sections

# if one of arguments is spec the use it as initial value
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
curr_pipeline_name = bound_args.arguments.get(
pipeline_name_arg.name, pipeline_name_arg_default
)
else:
curr_pipeline_name = None
section_context = ConfigSectionContext(
pipeline_name=curr_pipeline_name,
sections=curr_sections,
merge_style=sections_merge_style,
curr_sections = sections

# if one of arguments is spec the use it as initial value
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
curr_pipeline_name = bound_args.arguments.get(
pipeline_name_arg.name, pipeline_name_arg_default
)
# this may be called from many threads so section_context is thread affine
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
config = resolve_configuration(
config or SPEC(),
explicit_value=bound_args.arguments,
accept_partial=accept_partial,
)
else:
curr_pipeline_name = None
section_context = ConfigSectionContext(
pipeline_name=curr_pipeline_name,
sections=curr_sections,
merge_style=sections_merge_style,
)

# this may be called from many threads so section_context is thread affine
with inject_section(section_context, lock_context=True):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
return resolve_configuration(
config or SPEC(),
explicit_value=bound_args.arguments,
accept_partial=accept_partial,
)

def update_bound_args(
bound_args: inspect.BoundArguments, config: BaseConfiguration, args: Any, kwargs: Any
) -> None:
# overwrite or add resolved params
resolved_params = dict(config)
# overwrite or add resolved params
for p in sig.parameters.values():
Expand All @@ -167,12 +178,56 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
bound_args.arguments[kwargs_arg.name].update(resolved_params)
bound_args.arguments[kwargs_arg.name][_LAST_DLT_CONFIG] = config
bound_args.arguments[kwargs_arg.name][_ORIGINAL_ARGS] = (args, kwargs)

def with_partially_resolved_config(config: Optional[BaseConfiguration] = None) -> Any:
# creates a pre-resolved partial of the decorated function
empty_bound_args = sig.bind_partial()
if not config:
config = resolve_config(empty_bound_args)

def wrapped(*args: Any, **kwargs: Any) -> Any:
nonlocal config

# Do we need an exception here?
if spec_arg and spec_arg.name in kwargs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we care at all? spec_arg did his job in resolve_config and created config from itself. right? if it is present in subsequent calls we can simply ignore it. (or I do not understand something)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but in the case of partially resolving, the resolve_config is called before even the func args are available, do you know what i mean? It's a bit complicated but also quite clear. We can also remove it if you like, it's just a warning that made sense to put in given the mechanics of the code.

from dlt.common import logger

logger.warning(
"Spec argument is provided in kwargs, ignoring it for resolved partial"
" function."
)

# we can still overwrite the config
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)

# call the function with the pre-resolved config
bound_args = sig.bind(*args, **kwargs)
update_bound_args(bound_args, config, args, kwargs)
return f(*bound_args.args, **bound_args.kwargs)

return wrapped

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
# Resolve config
config: BaseConfiguration = None
bound_args = sig.bind(*args, **kwargs)
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)
else:
config = resolve_config(bound_args)

# call the function with resolved config
update_bound_args(bound_args, config, args, kwargs)
return f(*bound_args.args, **bound_args.kwargs)

# register the spec for a wrapped function
_FUNC_SPECS[id(_wrap)] = SPEC

# add a method to create a pre-resolved partial
setattr(_wrap, "__RESOLVED_PARTIAL_FUNC__", with_partially_resolved_config) # noqa: B010

return _wrap # type: ignore

# See if we're being called as @with_config or @with_config().
Expand All @@ -197,3 +252,10 @@ def last_config(**kwargs: Any) -> Any:

def get_orig_args(**kwargs: Any) -> Tuple[Tuple[Any], DictStrAny]:
return kwargs[_ORIGINAL_ARGS] # type: ignore


def create_resolved_partial(f: AnyFun, config: Optional[BaseConfiguration] = None) -> AnyFun:
"""Create a pre-resolved partial of the with_config decorated function"""
if partial_func := getattr(f, "__RESOLVED_PARTIAL_FUNC__", None):
return cast(AnyFun, partial_func(config))
return f
5 changes: 3 additions & 2 deletions dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ def initialize_credentials(hint: Any, initial_value: Any) -> CredentialsConfigur


def inject_section(
section_context: ConfigSectionContext, merge_existing: bool = True
section_context: ConfigSectionContext, merge_existing: bool = True, lock_context: bool = False
) -> ContextManager[ConfigSectionContext]:
"""Context manager that sets section specified in `section_context` to be used during configuration resolution. Optionally merges the context already in the container with the one provided

Args:
section_context (ConfigSectionContext): Instance providing a pipeline name and section context
merge_existing (bool, optional): Merges existing section context with `section_context` in the arguments by executing `merge_style` function on `section_context`. Defaults to True.
lock_context (bool, optional): Instruct to threadlock the current thread to prevent race conditions in context injection.

Default Merge Style:
Gets `pipeline_name` and `sections` from existing context if they are not provided in `section_context` argument.
Expand All @@ -112,7 +113,7 @@ def inject_section(
if merge_existing:
section_context.merge(existing_context)

return container.injectable_context(section_context)
return container.injectable_context(section_context, lock_context=lock_context)


def _maybe_parse_native_value(
Expand Down
Loading
Loading