Skip to content

Conversation

@marianogappa
Copy link
Contributor

Problem

The DFS scheduler implements concurrent resolution of resources, but only if resources are sent in batches to the top-level res channel. Thus, the same plugin syncs a lot quicker simply by batching resources before sending them through the channel.

Solution this PR implements

// BatchSender is a helper struct that batches items and sends them in batches of batchSize or after batchTimeout.
//
// - If item is already a slice, it will be sent directly
// - Otherwise, it will be added to the current batch
// - If the current batch has reached the batch size, it will be sent immediately
// - Otherwise, a timer will be started to send the current batch after the batch timeout
type BatchSender struct {

Currently with a batch timeout of 100ms and batch size of 100, it manages a 5x improvement on a sync that reproduces the issue brought forward by the community (https://github.com/jeromewir/cq-source-concurrency-childtable-example):

Screenshot 2024-12-03 at 17 17 23

The PR tries to touch the least amount of scheduler code, and stays away from tricky language constructs as much as possible.

@marianogappa marianogappa requested review from a team and erezrokah December 3, 2024 13:25
@github-actions github-actions bot added the feat label Dec 3, 2024
@marianogappa
Copy link
Contributor Author

marianogappa commented Dec 3, 2024

😱 Looks like the solution doesn't contemplate that the res channel might get closed on the last batch. Potentially cannot use timer.AfterFunc since I cannot .Wait on it 🤔

Anyway, it shows promise but we might not need it if we choose to move to shuffle-queue. I can look into the special case, though.

@marianogappa
Copy link
Contributor Author

I think this can be mitigated with a .Close() method. It will stop the timer if set, and flush the last batch immediately.

Copy link
Member

@erezrokah erezrokah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍

  1. Can we add some tests that cover the new code (if we don't have them already via the scheduler tests)? I mean sending both single items and slices to the result channel
  2. I think batchSize should default to 5 which is our singleResourceMaxConcurrency. There's not point in sending more than 5 items at a time as they will block on
    resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency))
    I think

Copy link
Member

@erezrokah erezrokah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to add e2e test using our source and destination test plugins that will cover the changes in this PR

@marianogappa
Copy link
Contributor Author

@erezrokah passing E2E: cloudquery/cloudquery#19875

@marianogappa
Copy link
Contributor Author

Will merge this based on it passing E2E tests in CI

Screenshot 2024-12-09 at 14 30 42

@kodiakhq kodiakhq bot merged commit 371b20f into main Dec 9, 2024
9 checks passed
@kodiakhq kodiakhq bot deleted the mariano/batch-sender branch December 9, 2024 10:37
@marianogappa
Copy link
Contributor Author

The testing I did initially was flawed (upgraded SDK on CLI rather than on source plugin), but since then I did a separate test that passed as well:
cloudquery/cloudquery#19908

kodiakhq bot pushed a commit that referenced this pull request Dec 9, 2024
🤖 I have created a release *beep* *boop*
---


## [4.71.0](v4.70.2...v4.71.0) (2024-12-09)


### Features

* Implement batch sender. ([#1995](#1995)) ([371b20f](371b20f))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants