Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic destination / sink decorator #1065

Merged
merged 51 commits into from Mar 14, 2024
Merged

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Mar 7, 2024

Description

Implementation of #752

This is a re-submit of PR #891 which I cannot reopen after an accidental merge

Notes:

  • We also introduce the concept of a load package state here

ToDos Alpha release:

  • Tests for locked injection context
  • Tests for resolved partial
  • Tests for migration from no load package state to having load package state

Follow up tasks:

  • Nice example of loading to bigquery or kafka with the generic destination
  • Update CLI scaffolding scripts to create a sink function if "destination" is selected as Destination

# Conflicts:
#	dlt/destinations/__init__.py
#	tests/utils.py
…sink_decorator

# Conflicts:
#	dlt/pipeline/current.py
# Conflicts:
#	dlt/common/storages/load_package.py
#	dlt/destinations/impl/athena/athena.py
#	dlt/destinations/impl/bigquery/bigquery.py
#	dlt/load/load.py
Copy link

netlify bot commented Mar 7, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 246df72
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/65f2ac5d1bc1930008b15772

@sh-rp
Copy link
Collaborator Author

sh-rp commented Mar 7, 2024

@rudolfix current questions:

  • I have create a method to create a partial from a function decorated with "with_config", can you let me know if this looks good? Then I can add some tests
  • Can you show me how an example of a sink function with google cloud credentials should look like? Do you want to send the spec as one arg or what is the interface for the user you have in mind? Then I will better understand what you have in mind

Generally you can have a look at my changes, I did most of the stuff of your last review.

@sh-rp sh-rp marked this pull request as ready for review March 7, 2024 11:03
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls see review. I'll have more comments but let's fix those first. overall this is so good

dlt/common/configuration/inject.py Show resolved Hide resolved
dlt/common/reflection/spec.py Show resolved Hide resolved
dlt/common/storages/load_package.py Outdated Show resolved Hide resolved
@@ -334,8 +408,13 @@ def create_package(self, load_id: str) -> None:
self.storage.create_folder(os.path.join(load_id, PackageStorage.COMPLETED_JOBS_FOLDER))
self.storage.create_folder(os.path.join(load_id, PackageStorage.FAILED_JOBS_FOLDER))
self.storage.create_folder(os.path.join(load_id, PackageStorage.STARTED_JOBS_FOLDER))
# ensure created timestamp is set in state when load package is created
state = self.get_load_package_state(load_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we must make this change backward compatible or explicitly upgrade all storages that have load packages internally,

see this test test_pipeline_with_dlt_update and similar: they actually upgrade dlt from 0.3 to 0.4 and test packages in process

my worry: people will have old completed load packages or packages in process, they upgrade dlt and it stops working

so you need to test it.

dlt/common/storages/load_package.py Outdated Show resolved Hide resolved
try:
state_ctx = container[LoadPackageStateInjectableContext]
except ContextDefaultCannotBeCreated:
raise Exception("Load package state not available")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please derive exception properly. see other similar exceptions.
provide explanation why this is happening. ie when you access this outside of dlt.destination decorated function (we inject package only in loader, btw. we can start injecting it in normalize and extract as well)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

naming_convention: str = "direct",
spec: Type[SinkClientConfiguration] = SinkClientConfiguration,
) -> Any:
def decorator(destination_callable: TSinkCallable) -> TDestinationReferenceArg:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decorator returns Destination but the user should see the signature of the decorated function as return type, We''ll add __call__ to Destination to simulate it being a function. this is same trick we do with DltResource

I can do the above if you want

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I think I get what you want here. Yes why not, maybe you can add that one small change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think to me it is not quite clear what i should do here, i changed the decorator typings a bit and made it so a wrapped function is returned with the args in place (i think at least) but maybe you need to make some changes here.


def __init__(
self,
destination_callable: t.Union[TSinkCallable, str] = None, # noqa: A003
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can still configure this right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as in give a function name / module path in a config bar? then yes, see the tests.

dlt/destinations/impl/destination/factory.py Show resolved Hide resolved
dlt/load/load.py Show resolved Hide resolved
add test for nested configs
@sh-rp
Copy link
Collaborator Author

sh-rp commented Mar 7, 2024

@rudolfix thanks for the comments, will continue on this later, there is just one thing I'd maybe need your input on: I added a lock now on the method to resolve the partial, this a solution for the problem that the sink function and the client can be run in parallel threads and thus the context for that generated spec can be injected simultaneously and create configcontextmangled errors. The fixes for this can be:

  • Create a third affinity which is thread based and not pipeline based, would be quite easy to do
  • Allow for locking when resolving config vars, I had a version before where you could say that it should lock in the with_config provider.
  • Have a setting that somehow prevents stacking of the same context type and retains the first one.

I think the best way would be to have a true thread based affinity, but not 100% lmk.

lock injection context for wrapped functions
small pr fixes
else:
SPEC = spec

if SPEC is None:
return f

func_id = id(f)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use the id of the wrapped function as part of the key to created the threadlock, if the same function is wrapped multiple times, then the key will collide, so we could also consider a uniqid

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks we do not need it, we can just pass a flag to create a lock on a thread context

@@ -58,6 +65,7 @@ def with_config(
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a "thread_safe" parameter so we only lock on decorated functions where we know that there might be issues?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will always lock when resolving configuration so not optional

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK we are really close here! changes seem to be easy to make. test coverage is good. my take:
implement two tests that you mention

  • we should add tests for the inject context with lock (when you fix the code) where we makes sure two threads are getting into critical section at the same time
  • that we actually resolve config once in create_resolved_partial (and possibly can pass external one - see my review)
  • test for callable destination factory
    after that we can merge and do alpha release. a dlt migration test for load packages is a must before stable release

@@ -2,6 +2,8 @@
import inspect
import contextlib
import dataclasses
import threading
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

@@ -58,6 +65,7 @@ def with_config(
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will always lock when resolving configuration so not optional

else:
SPEC = spec

if SPEC is None:
return f

func_id = id(f)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks we do not need it, we can just pass a flag to create a lock on a thread context

nonlocal config

# Do we need an exception here?
if spec_arg and spec_arg.name in kwargs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we care at all? spec_arg did his job in resolve_config and created config from itself. right? if it is present in subsequent calls we can simply ignore it. (or I do not understand something)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but in the case of partially resolving, the resolve_config is called before even the func args are available, do you know what i mean? It's a bit complicated but also quite clear. We can also remove it if you like, it's just a warning that made sense to put in given the mechanics of the code.

lock: AbstractContextManager[Any]

# if there is a lock_id, we need a lock for the lock_id in the scope of the current context
if lock_id:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we just need a boolean flag to create lock in given thread context. as discussed

return self._spec

@property
def client_class(self) -> t.Type["SinkClient"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make it work with the sink decorator, it should implement __callable__ so factory behaves like function. the arguments should be passed to conifg_kwargs exactly like in init method.

we should also test it. if I have

@dlt.destination()
    def my_gcp_sink(
        file_path,
        table,
        credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] = dlt.secrets.value,
         use_stream_insert: bool = False
    ):

I should be able to

sink = my_gcp_sink(GcpCredentials(...), use_stream_insert=True)

and then pass it to dlt.pipeline as destination.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not even need to implement the call function, just forward the kwargs to the destination and then use the pre-resolved config in the create resolved partial function (see the code, I have a test for this too). Very nice!

super().__init__(schema, config)
self.config: SinkClientConfiguration = config
# create pre-resolved callable to avoid multiple config resolutions during execution of the jobs
self.destination_callable = create_resolved_partial(self.config.destination_callable)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you should pass config to create_resolved_partial. config is at this point resolved via configuration() of the factory. so maybe create_resolved_partial could take a resolved config parameter (optional) that is used instead of resolving in create_resolved_partial (the inner one)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works, nice!

@@ -197,3 +253,10 @@ def last_config(**kwargs: Any) -> Any:

def get_orig_args(**kwargs: Any) -> Tuple[Tuple[Any], DictStrAny]:
return kwargs[_ORIGINAL_ARGS] # type: ignore


def create_resolved_partial(f: AnyFun) -> AnyFun:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you could rename this or the inner function with the same name? I'm conflating them even when doing the review...

dlt/load/load.py Outdated
@@ -1,4 +1,4 @@
import contextlib
import contextlib, threading
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need threading here. not use import



@pytest.mark.parametrize("loader_file_format", SUPPORTED_LOADER_FORMATS)
def test_all_datatypes(loader_file_format: TLoaderFileFormat) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be possible to move all tests that do not require bigquery or special credentials to tests/destination?
then we can run them in common test:

- name: Install pipeline dependencies
        run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline

      - run: |
          poetry run pytest tests/extract tests/pipeline tests/libs tests/cli/common tests/destinations

and those tests are runnable from forks which is a big win

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the dependency on google stuff and moved them into the commons test area

)

# we may give the value via __callable__ function
dlt.pipeline("sink_test", destination=my_sink(my_val="something"), full_refresh=True).run(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: callable my_sink is tested here

rudolfix
rudolfix previously approved these changes Mar 13, 2024
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

I'm not entirely sure that calling former sinks GenericDestination is what we want. maybe DecoratedDestination? anyway! let's merge this and write a followup ticket

@sh-rp sh-rp merged commit 7f43e76 into devel Mar 14, 2024
52 of 57 checks passed
@sh-rp sh-rp deleted the d#/generic_destination_decorator branch March 14, 2024 08:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants