From a76e7755f2f57cd4f2337f1ef37e24dd7799b1d2 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Mon, 18 Sep 2023 16:10:56 +0100 Subject: [PATCH 1/5] Add shuffle scheduler --- scheduler/scheduler.go | 4 ++- scheduler/scheduler_shuffle.go | 64 ++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 scheduler/scheduler_shuffle.go diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 4bb43f2e97..87c93fe839 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -34,6 +34,7 @@ const ( const ( StrategyDFS Strategy = iota StrategyRoundRobin + StrategyShuffle ) type Strategy int @@ -84,10 +85,11 @@ func (s *Strategy) Validate() error { return fmt.Errorf("unknown scheduler strategy: %d", s) } -var AllStrategies = Strategies{StrategyDFS, StrategyRoundRobin} +var AllStrategies = Strategies{StrategyDFS, StrategyRoundRobin, StrategyShuffle} var AllStrategyNames = [...]string{ StrategyDFS: "dfs", StrategyRoundRobin: "round-robin", + StrategyShuffle: "shuffle", } func StrategyForName(s string) (Strategy, error) { diff --git a/scheduler/scheduler_shuffle.go b/scheduler/scheduler_shuffle.go new file mode 100644 index 0000000000..3523528fb1 --- /dev/null +++ b/scheduler/scheduler_shuffle.go @@ -0,0 +1,64 @@ +package scheduler + +import ( + "context" + "math/rand" + "sync" + + "github.com/cloudquery/plugin-sdk/v4/schema" +) + +func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *schema.Resource) { + // we have this because plugins can return sometimes clients in a random way which will cause + // differences between this run and the next one. + preInitialisedClients := make([][]schema.ClientMeta, len(s.tables)) + for i, table := range s.tables { + clients := []schema.ClientMeta{s.client} + if table.Multiplex != nil { + clients = table.Multiplex(s.client) + } + preInitialisedClients[i] = clients + // we do this here to avoid locks so we initial the metrics structure once in the main goroutines + // and then we can just read from it in the other goroutines concurrently given we are not writing to it. + s.metrics.initWithClients(table, clients) + } + + // first interleave the tables like in round-robin + tableClients := roundRobinInterleave(s.tables, preInitialisedClients) + + // then shuffle the tableClients to randomize the order in which they are retrieved + shuffle(tableClients) + + var wg sync.WaitGroup + for _, tc := range tableClients { + table := tc.table + cl := tc.client + if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil { + // This means context was cancelled + wg.Wait() + return + } + wg.Add(1) + go func() { + defer wg.Done() + defer s.scheduler.tableSems[0].Release(1) + // not checking for error here as nothing much to do. + // the error is logged and this happens when context is cancelled + // Round Robin currently uses the DFS algorithm to resolve the tables, but this + // may change in the future. + s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1) + }() + } + + // Wait for all the worker goroutines to finish + wg.Wait() +} + +func shuffle(tableClients []tableClient) { + // use a fixed seed so that runs with the same tables and clients perform similarly across syncs + r := rand.New(rand.NewSource(99)) + for i := range tableClients { + j := r.Intn(i + 1) + tableClients[i], tableClients[j] = tableClients[j], tableClients[i] + } +} From b3eb7e0ac20a60a6d1daa1d6bb1fc8402c07b40b Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Mon, 18 Sep 2023 16:13:39 +0100 Subject: [PATCH 2/5] Add switch case --- scheduler/scheduler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 87c93fe839..c5379811f2 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -252,6 +252,8 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s syncClient.syncDfs(ctx, resources) case StrategyRoundRobin: syncClient.syncRoundRobin(ctx, resources) + case StrategyShuffle: + syncClient.syncShuffle(ctx, resources) default: panic(fmt.Errorf("unknown scheduler %s", s.strategy.String())) } From 7c5f03f17e99fcb2845bb8eaac08425e1cff6182 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Mon, 18 Sep 2023 16:28:19 +0100 Subject: [PATCH 3/5] Update comment --- scheduler/scheduler_shuffle.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/scheduler_shuffle.go b/scheduler/scheduler_shuffle.go index 3523528fb1..17d3460baa 100644 --- a/scheduler/scheduler_shuffle.go +++ b/scheduler/scheduler_shuffle.go @@ -44,7 +44,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- * defer s.scheduler.tableSems[0].Release(1) // not checking for error here as nothing much to do. // the error is logged and this happens when context is cancelled - // Round Robin currently uses the DFS algorithm to resolve the tables, but this + // This currently uses the DFS algorithm to resolve the tables, but this // may change in the future. s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1) }() From 8643b60ea78687855fadad1af1da8ef20f53fd36 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Mon, 18 Sep 2023 16:34:26 +0100 Subject: [PATCH 4/5] Use random shuffle --- scheduler/scheduler_shuffle.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/scheduler/scheduler_shuffle.go b/scheduler/scheduler_shuffle.go index 17d3460baa..cf4c4cf470 100644 --- a/scheduler/scheduler_shuffle.go +++ b/scheduler/scheduler_shuffle.go @@ -57,8 +57,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- * func shuffle(tableClients []tableClient) { // use a fixed seed so that runs with the same tables and clients perform similarly across syncs r := rand.New(rand.NewSource(99)) - for i := range tableClients { - j := r.Intn(i + 1) + r.Shuffle(len(tableClients), func(i, j int) { tableClients[i], tableClients[j] = tableClients[j], tableClients[i] - } + }) } From 27c4e22fc4dd2db79b2e7ede35ecd1c135afe28c Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Mon, 18 Sep 2023 16:45:53 +0100 Subject: [PATCH 5/5] Hash table names for random seed --- scheduler/scheduler_shuffle.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/scheduler/scheduler_shuffle.go b/scheduler/scheduler_shuffle.go index cf4c4cf470..8e4dfe9c1a 100644 --- a/scheduler/scheduler_shuffle.go +++ b/scheduler/scheduler_shuffle.go @@ -2,17 +2,21 @@ package scheduler import ( "context" + "hash/fnv" "math/rand" + "strings" "sync" "github.com/cloudquery/plugin-sdk/v4/schema" ) func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *schema.Resource) { - // we have this because plugins can return sometimes clients in a random way which will cause + // We have this because plugins can return sometimes clients in a random way which will cause // differences between this run and the next one. preInitialisedClients := make([][]schema.ClientMeta, len(s.tables)) + tableNames := make([]string, len(s.tables)) for i, table := range s.tables { + tableNames[i] = table.Name clients := []schema.ClientMeta{s.client} if table.Multiplex != nil { clients = table.Multiplex(s.client) @@ -23,11 +27,15 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- * s.metrics.initWithClients(table, clients) } - // first interleave the tables like in round-robin + // First interleave the tables like in round-robin tableClients := roundRobinInterleave(s.tables, preInitialisedClients) - // then shuffle the tableClients to randomize the order in which they are retrieved - shuffle(tableClients) + // Then shuffle the tableClients to randomize the order in which they are retrieved. + // We use a fixed seed so that runs with the same tables and clients perform similarly across syncs + // however, if the table order changes, the seed will change and the shuffle order will be different, + // so users have a little bit of control over the randomization. + seed := hashTableNames(tableNames) + shuffle(tableClients, seed) var wg sync.WaitGroup for _, tc := range tableClients { @@ -42,8 +50,8 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- * go func() { defer wg.Done() defer s.scheduler.tableSems[0].Release(1) - // not checking for error here as nothing much to do. - // the error is logged and this happens when context is cancelled + // Not checking for error here as nothing much to do. + // the error is logged and this happens when context is cancelled. // This currently uses the DFS algorithm to resolve the tables, but this // may change in the future. s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1) @@ -54,9 +62,14 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- * wg.Wait() } -func shuffle(tableClients []tableClient) { - // use a fixed seed so that runs with the same tables and clients perform similarly across syncs - r := rand.New(rand.NewSource(99)) +func hashTableNames(tableNames []string) int64 { + h := fnv.New32a() + h.Write([]byte(strings.Join(tableNames, ","))) + return int64(h.Sum32()) +} + +func shuffle(tableClients []tableClient, seed int64) { + r := rand.New(rand.NewSource(seed)) r.Shuffle(len(tableClients), func(i, j int) { tableClients[i], tableClients[j] = tableClients[j], tableClients[i] })