Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allows to run parallel pipelines in separate threads #813

Merged
merged 15 commits into from
Dec 14, 2023

Conversation

rudolfix
Copy link
Collaborator

@rudolfix rudolfix commented Dec 9, 2023

Description

  1. Allows to run pipelines in parallel, providing that they run in separate threads.
  2. Makes injection containers thread-affine.
  3. See test_parallel_threads_pipeline for an example.
  4. Also updates docs in Performance to show how to use asyncio to run pipelines in parallel

Contains content of #807

  1. Adds metrics for closed files in data writers and collects them when creating StepInfo [WIP]
  2. Fixes edge cases with multiple load storages in normalize

Copy link

netlify bot commented Dec 9, 2023

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit 910bfa0
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/65759b10d7b7fa00087673c4
😎 Deploy Preview https://deploy-preview-813--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@rudolfix rudolfix requested a review from sh-rp December 10, 2023 17:18
@rudolfix rudolfix self-assigned this Dec 10, 2023
@rudolfix rudolfix added the devel label Dec 10, 2023
@rudolfix rudolfix changed the title [WIP] allows to run parallel pipelines in separate threads allows to run parallel pipelines in separate threads Dec 10, 2023
@rudolfix rudolfix marked this pull request as ready for review December 10, 2023 17:21
context = self._thread_context(spec)
return spec in context

def _thread_context(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you use pythons thread local context to do all this? https://docs.python.org/3/library/threading.html#thread-local-data

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I know it but when you look at the code, there are exceptions to that behavior.

  1. some type of context are available globally (I use main thread id)
  2. there's a special treatment of the executor thread pool. I use a context of a thread that started a pool, not the current thread

so yeah I could use local() but there are exceptions so I'd need to keep more dictionaries. or you can force the thread id for local()?

class DataWriterMetrics(NamedTuple):
file_path: str
items_count: int
file_size: int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe column count? but that is not really important tbh.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to add elapsed time (start stop). Column count is not known at this moment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not during extract. but it is known during normalize. you can however get the column count from the relevant schema...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes elapsed would be cool too!

@@ -249,6 +250,17 @@ The default is to not parallelize normalization and to perform it in the main pr
Normalization is CPU bound and can easily saturate all your cores. Never allow `dlt` to use all cores on your local machine.
:::

:::caution
The default method of spawning a process pool on Linux is **fork**. If you are using threads in your code (or libraries that use threads),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a link to some further explanation of this in the python docs maybe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! could you propose a link? then I'll add it

@@ -1,6 +1,6 @@
[tool.poetry]
name = "dlt"
version = "0.4.1a0"
version = "0.4.1a1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a test somewhere to check that the version number is in sync everywhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean. there's just one source of truth for the version and this is the toml file. or you want to have a tests that compares the toml file with the installed package? or maybe that should be a lint step where we force people to make dev when versions are not in sync?

Copy link
Collaborator

@sh-rp sh-rp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my main question is why not use the python built in thread locals? or do you need to be able to access another threads locals?

@rudolfix rudolfix merged commit 00c2725 into devel Dec 14, 2023
44 checks passed
@rudolfix rudolfix deleted the rfix/container-thread-affine branch December 14, 2023 11:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants