Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrent cdk: improve resource usage and stop waiting on the main thread #33669

Merged
merged 57 commits into from
Jan 18, 2024

Conversation

girarda
Copy link
Contributor

@girarda girarda commented Dec 20, 2023

What

  • A stripe connection has been experiencing OOM.
  • This was caused by the concurrent_read_processor keeping all partitions in memory

Two more issues were uncovered along the way:

  • While the partition enqueuer and the partition reader should backoff, the main thread shouldn't as it should try to process records as fast as possible
    • Instead of backing off on the main thread, we can backoff on the task threads. PartitionEnqueuer and PartitionReader now wait before enqueing elements if there are too many futures in the threadpool's task list
  • futures.exception is a blocking call. since this was called on the main thread, pruning the futures blocked the threads until the futures completed. this can be resolved by checking if a future raised an exception only if it completed

How

  • Instead of keeping all partitions in memory and checking if they are done, only keep the partitions that are currently running in memory and remove them when they are done
  • Before putting new items on the queue, check if we're already at the maximum number of futures. If we are, wait. The queue was wrapped in a ThrottledQueue for ease of use

Recommended reading order

  1. airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
  2. airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py
  3. airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/throttler.py
  4. airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partitions/throttled_queue.py

Copy link

vercel bot commented Dec 20, 2023

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jan 18, 2024 7:16am

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit labels Dec 20, 2023
Copy link
Contributor

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

for record in parent_records:
self.logger.info(f"Fetching parent stream slices for stream {self.name}.")
yield {"parent": record}
# def stream_slices(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the stripe connector was modified to add prints. This had a significant impact on the conenctor's performance because it prints one line per parent record. this should be reverted before updating the connector again

@@ -32,6 +37,8 @@ def process_partition(self, partition: Partition) -> None:
"""
try:
for record in partition.read():
while self._queue.qsize() > self._max_size:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should have a max attempt

@@ -34,6 +39,8 @@ def generate_partitions(self, stream: AbstractStream) -> None:
"""
try:
for partition in stream.generate_partitions():
while self._queue.qsize() >= self._max_size:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should have a max attempt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we explain why this is needed on top of the priority? It's not so obvious why both would be needed

if len(futures) < self._max_concurrent_tasks:
break
self._logger.info("Main thread is sleeping because the task queue is full...")
time.sleep(self._sleep_time)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid waiting on the main thread

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, the idea of waiting in the child threads instead of the main one makes sense to me. I'll continue thinking about this until our sync

class QueueItemObject:
def __init__(self, item: QueueItem):
self.value = item
self._order = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could be a constant

@@ -34,6 +39,8 @@ def generate_partitions(self, stream: AbstractStream) -> None:
"""
try:
for partition in stream.generate_partitions():
while self._queue.qsize() >= self._max_size:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we explain why this is needed on top of the priority? It's not so obvious why both would be needed

@@ -34,7 +39,9 @@ def generate_partitions(self, stream: AbstractStream) -> None:
"""
try:
for partition in stream.generate_partitions():
self._queue.put(partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like there is some logic associated with how we access the queue. I wonder if it would make sense to regroup this in a class. We have similar logic in PartitionReader for example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for sure! I actually ran into another bottleneck preventing the sync from succeeding in a reasonable amount of time. I'll clean this up before asking for a formal review

actual_partitions.append(partition)

assert actual_partitions == partitions
assert queue.put.has_calls([call(p) for p in partitions] + [call(PartitionGenerationCompletedSentinel(stream))])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verify that the partitions and the sentinel were put on the queue

@@ -1,7 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from queue import Queue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import


def __init__(self, futures_list: List[Future[Any]], sleep_time: float, max_concurrent_tasks: int):
"""
:param futures_list: The list of futures to monitor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is obvious, but why do we need to throttle workers enqueuing items based on the number of pending tasks? I'm wondering if we can solve some of these memory issues by setting queue.maxsize, which prevents items from being added until something else is dequeued, creating a sort of back pressure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question!

The issue I ran into with blocking on the size of the queue is that the main thread will remove elements from the queue before potentially adding them to the list of futures. Since the main thread doesn't wait, it'll be able to remove items from the queue even if the list of futures is large, so the tasks won't wait.

Screenshot 2024-01-16 at 2 16 22 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks for the clarification! I think we should document the reason that we need this. Mind adding a docstring on this file?

Copy link
Contributor

@clnoll clnoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good @girarda! Just one small documentation request.

Would you also mind doing some memory profiling of this branch versus master to show that it fixes the problem?

for record in parent_records:
self.logger.info(f"Fetching parent stream slices for stream {self.name}.")
yield {"parent": record}
# def stream_slices(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO @girarda: revert this change before merging

self._sleep_time = sleep_time
self._max_concurrent_tasks = max_concurrent_tasks

def wait_and_acquire(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have unit tests for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, done!

self._throttler = throttler
self._timeout = timeout

def put(self, item: QueueItem) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have unit tests for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, done!

@girarda
Copy link
Contributor Author

girarda commented Jan 17, 2024

@girarda girarda merged commit 0faa69d into master Jan 18, 2024
25 checks passed
@girarda girarda deleted the alex/concurrent_no_wait_main branch January 18, 2024 07:54
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit connectors/source/stripe
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants