Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement destination sinks #752

Closed
rudolfix opened this issue Nov 10, 2023 · 3 comments · Fixed by #891
Closed

implement destination sinks #752

rudolfix opened this issue Nov 10, 2023 · 3 comments · Fixed by #891
Assignees
Labels
community This issue came from slack community workspace support This issue is monitored by Solution Engineer

Comments

@rudolfix
Copy link
Collaborator

rudolfix commented Nov 10, 2023

Background
Sinks are the reverse of the source. A Python function decorated with @dlt.sink should generate destination factory (#746 ) that may be passed to pipeline in destination argument. Sink will consume data that is sent to it during the load stage.
Sink will be fed data in configurable chunks and will keep state of what got consumed already so it will be "transactional" when size of the chunk == 1.
Sink state is separate from source state but initially will be handled together with it - and preserved in file. For "transactional" sinks there will be tons of state updates so general mechanism we use now (store state at destination) does not make sense. We may support bucket storage for load.py and load storage for fully resumable, transactional sinks.

Tasks
This is rather a big ticket, tasks below are just outline:

    • implement sink destination (like any other) with a proper destination factory
    • implement @dlt.sink decorator working similarly to @dlt.transformer. It returns configured destination factory
    • sink decorator should accept a sink function, where first argument is TDataItems, second a table schema (and maybe something else), and optional parameters (same as parametrized transformer)
    • sink decorator should allow to specify batch size, file format (json, parquet). batch_size=1 sends single rows, >1 sends lists of objects (or arrow tables if parquet), batch_size=0 sends the FileDict as data item so user can handle file processing as s/he wants
    • sink should accept naming convention to be used (a lot of people will use direct)
    • we should decorate function properly. we should allow for additional parameters which should become a SPEC of this destination and is used as such (ideally we would derive this spec from DestinationClientConfiguration (trivial change in with_config may be needed)
    • expose current load package context via dlt.current.load_package which should at least contain load id and state dict (I think we have it)
    • (optional) allow for 3 callbacks in decorator: on_init (called o schema update), on_completed (called on completed, mind the abort flag), on_garbage_collected (called when factory is garbage collected, that allows for some kind of cleanup)

Implementation
Basic implementation may be quite simple. Sink destination implements a copy job that opens a job file and then streams it into a sink function in chunks that were configured. After each chunk it emits sink state.

On restart the state must be retrieved and processed rows must be skipped.

State handling could be part of LoadStorage and we may make the LoadStorage comparible with fsspec (maybe in separate tickets - it depends a lot on "file move" to be fast and atomic)

@rudolfix
Copy link
Collaborator Author

optionally sinks could be chained after main destination consumes data and not just replace it. there's for sure a clever way of doing that without complicating user interface

@rudolfix rudolfix moved this from Todo to Planned in dlt core library Dec 13, 2023
@rudolfix rudolfix moved this from Planned to Todo in dlt core library Dec 18, 2023
@sh-rp sh-rp self-assigned this Jan 9, 2024
@sh-rp sh-rp moved this from Todo to In Progress in dlt core library Jan 11, 2024
@sh-rp
Copy link
Collaborator

sh-rp commented Jan 12, 2024

Questions:

  • Should we support awaitable sink functions?
  • Should we add a flag to guarantee sink function execution on the main thread for each item, probably using multiprocessing queues?
  • I don't get what the dltsource interaction should be, we should talk about it :)
  • where should we store the pipeline and the schemas plus the execution state of the current loadpackage (if at all?)

Other ideas:

  • Should we sent sentinel values as an extra function parameter for special occassions? For example at the start of the load and the end of the load (or start and end of a file)? This would be useful for creating and destroying database or queue connections.
  • Should we allow the user to return certain sentinel values to indicate that this batch should be retried or the load should fail or something like that?
  • Should we provide a context object (simple dictionary) that can carry over state between sink function calls? Would be useful to store open database or queue connections. It could also be one context object per thread for example, so we can enable multithreaded processing.

@sh-rp sh-rp linked a pull request Jan 22, 2024 that will close this issue
@rudolfix rudolfix assigned rudolfix and unassigned sh-rp Feb 12, 2024
@VioletM VioletM added support This issue is monitored by Solution Engineer community This issue came from slack community workspace labels Feb 26, 2024
@sh-rp
Copy link
Collaborator

sh-rp commented Mar 4, 2024

@rudolfix what do you mean with "filedict"? In my current pr i am simply passing the full filepath as a string into the sink function.

@sh-rp sh-rp closed this as completed in #891 Mar 7, 2024
@github-project-automation github-project-automation bot moved this from In Progress to Done in dlt core library Mar 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community This issue came from slack community workspace support This issue is monitored by Solution Engineer
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants