-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement RandomQueue scheduler strategy #1914
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks great, I'm going to test it a bit with a few plugins, also with OTEL enabled
// | ||
// - If the queue is empty, check `IsIdle()` to check if no workers are active. | ||
// - If workers are still active, call `Wait()` to block until state changes. | ||
type activeWorkSignal struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 🚀 Much better than what I did before 🙈
scheduler/strategy.go
Outdated
StrategyDFS: "dfs", | ||
StrategyRoundRobin: "round-robin", | ||
StrategyShuffle: "shuffle", | ||
StrategyRandomQueue: "random-queue", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also call it shuffle-queue
or just queue
since we already have shuffle
that means random
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to shuffle-queue
.
tableMetrics.Duration.Store(&duration) | ||
tableMetrics.OtelEndTime(ctx, endTime) | ||
if parent == nil { | ||
logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Dur("duration_ms", duration).Msg("table sync finished") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK now I remember what I removed here. We used to log a message here with the metrics of all relations after a parent finished.
Not we don't do that since reaching this code doesn't mean all relations finished syncing, it only means the parent finished pushing all the new work units into the queue.
See
plugin-sdk/scheduler/scheduler_dfs.go
Line 125 in 2977ae3
if parent == nil { // Log only for root tables and relations only after resolving is done, otherwise we spam per object instead of per table. |
I don't think it's a blocker and we can add logs alter on. But still something to consider
Seems there's a flaky test which can be seen in a neighboring PR: https://github.com/cloudquery/plugin-sdk/actions/runs/11166226858/job/31039731830?pr=1920 |
🤖 I have created a release *beep* *boop* --- ## [4.65.0](v4.64.1...v4.65.0) (2024-10-04) ### Features * Implement RandomQueue scheduler strategy ([#1914](#1914)) ([af8ac87](af8ac87)) ### Bug Fixes * Revert "fix: Error handling in StreamingBatchWriter" ([#1918](#1918)) ([38b4bfd](38b4bfd)) * **tests:** WriterTestSuite.handleNulls should not overwrite columns ([#1920](#1920)) ([08e18e2](08e18e2)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
This PR implements a new Scheduler Strategy based on a Concurrent Random Queue. It is based on @erezrokah 's Priority Queue Scheduler Strategy.
How does it work
This is hopefully a much simpler scheduling strategy. It doesn't have any semaphores; it just uses the existing concurrency setting.
Table resolvers (and their relations) get
Push
ed into a work queue, andconcurrency
workersPull
from this queue, but they pull a random element from it.Why it should work better
The key benefit of this strategy is this:
int32
range, all relation API calls are evenly spread throughout the sync, thus optimally minimising rate limits!Does it work better?
Still working on results. Notably AWS & Azure yield mixed results; still have to look into why.
GCP
Before
UPDATE: GCP is moving to Round Robin strategy, and it's much faster with this strategy:
After
Result: 76.22% reduction in time, or 3.21 times faster.
Result against Round Robin: 15% reduction in time, or 0.18 times faster (probably within margin of error)
BigQuery
Before
After
Result: 32.28% reduction in time, or 0.48 times faster
SentinelOne
Before (it was already quite fast due to previous merged improvement)
After
Result: 46.67% reduction in time, or 0.875 times faster
How to test
go.mod
replace for sdk:replace github.com/cloudquery/plugin-sdk/v4 => github.com/cloudquery/plugin-sdk/v4 v4.63.1-0.20241002131015-243705c940c6
(check last commit on this PR)scheduler.StrategyRandomQueue
.How scary is it to merge