From 3810fc1747ebd4f8d5c00649914629cd5b81231c Mon Sep 17 00:00:00 2001 From: bbernays Date: Sun, 10 Dec 2023 17:37:17 -0600 Subject: [PATCH 01/11] Update scheduler_dfs.go --- scheduler/scheduler_dfs.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 8a42b3e1e8..e1f21363d1 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -13,6 +13,7 @@ import ( "github.com/getsentry/sentry-go" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "golang.org/x/sync/semaphore" ) func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *schema.Resource) { @@ -198,10 +199,19 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl wg.Wait() return } + tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(table.Name+"-"+client.ID(), semaphore.NewWeighted(int64(s.scheduler.singleTableMaxConcurrency))) + tableSem := tableSemVal.(*semaphore.Weighted) + if err := tableSem.Acquire(ctx, 1); err != nil { + // This means context was cancelled + s.scheduler.tableSems[depth].Release(1) + wg.Wait() + return + } wg.Add(1) go func() { defer wg.Done() defer s.scheduler.tableSems[depth].Release(1) + defer tableSem.Release(1) s.resolveTableDfs(ctx, relation, client, resource, resolvedResources, depth+1) }() } From 5d4a124c38c02bbe48117a038af05b59c97b7e63 Mon Sep 17 00:00:00 2001 From: bbernays Date: Sun, 10 Dec 2023 17:37:20 -0600 Subject: [PATCH 02/11] Update scheduler.go --- scheduler/scheduler.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5e564c6492..8ee7be9a01 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "runtime/debug" + "sync" "sync/atomic" "time" @@ -65,6 +66,12 @@ func WithStrategy(strategy Strategy) Option { } } +func WithSingleTableMaxConcurrency(concurrency int) Option { + return func(s *Scheduler) { + s.singleTableMaxConcurrency = concurrency + } +} + type SyncOption func(*syncClient) func WithSyncDeterministicCQID(deterministicCQID bool) SyncOption { @@ -86,8 +93,10 @@ type Scheduler struct { // tableSem is a semaphore that limits the number of concurrent tables being fetched tableSems []*semaphore.Weighted // Logger to call, this logger is passed to the serve.Serve Client, if not defined Serve will create one instead. - logger zerolog.Logger - concurrency int + logger zerolog.Logger + concurrency int + singleTableConcurrency sync.Map + singleTableMaxConcurrency int } type syncClient struct { From e80ede8adb932abc74333ca567ee1fd298c91fdf Mon Sep 17 00:00:00 2001 From: bbernays Date: Sun, 10 Dec 2023 17:40:50 -0600 Subject: [PATCH 03/11] Update scheduler.go --- scheduler/scheduler.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 8ee7be9a01..5884e59803 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -25,11 +25,12 @@ import ( ) const ( - DefaultConcurrency = 50000 - DefaultMaxDepth = 4 - minTableConcurrency = 1 - minResourceConcurrency = 100 - otelName = "schedule" + DefaultConcurrency = 50000 + DefaultMaxDepth = 4 + minTableConcurrency = 1 + minResourceConcurrency = 100 + otelName = "schedule" + defaultSingleTableMaxConcurrency = 10 ) var ErrNoTables = errors.New("no tables specified for syncing, review `tables` and `skip_tables` in your config and specify at least one table to sync") @@ -111,9 +112,10 @@ type syncClient struct { func NewScheduler(opts ...Option) *Scheduler { s := Scheduler{ - caser: caser.New(), - concurrency: DefaultConcurrency, - maxDepth: DefaultMaxDepth, + caser: caser.New(), + concurrency: DefaultConcurrency, + maxDepth: DefaultMaxDepth, + singleTableMaxConcurrency: defaultSingleTableMaxConcurrency, } for _, opt := range opts { opt(&s) From 3f60732870d85568f64e4879c7964319536ef7d9 Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 11 Dec 2023 08:23:06 -0600 Subject: [PATCH 04/11] Update scheduler_dfs.go --- scheduler/scheduler_dfs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index e1f21363d1..57a3a8715d 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -199,7 +199,7 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl wg.Wait() return } - tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(table.Name+"-"+client.ID(), semaphore.NewWeighted(int64(s.scheduler.singleTableMaxConcurrency))) + tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(table.Name+"-"+client.ID(), semaphore.NewWeighted(s.scheduler.singleTableMaxConcurrency)) tableSem := tableSemVal.(*semaphore.Weighted) if err := tableSem.Acquire(ctx, 1); err != nil { // This means context was cancelled From 349eb7870c3315435f0a8e3bb01c2b9b95ecc02c Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 11 Dec 2023 08:23:08 -0600 Subject: [PATCH 05/11] Update scheduler.go --- scheduler/scheduler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5884e59803..4b32946a7d 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -69,7 +69,7 @@ func WithStrategy(strategy Strategy) Option { func WithSingleTableMaxConcurrency(concurrency int) Option { return func(s *Scheduler) { - s.singleTableMaxConcurrency = concurrency + s.singleTableMaxConcurrency = int64(concurrency) } } @@ -97,7 +97,7 @@ type Scheduler struct { logger zerolog.Logger concurrency int singleTableConcurrency sync.Map - singleTableMaxConcurrency int + singleTableMaxConcurrency int64 } type syncClient struct { From bc828ad3fc4a27ad3f805671f7c7704028783c29 Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 11 Dec 2023 08:30:10 -0600 Subject: [PATCH 06/11] Update scheduler.go --- scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 4b32946a7d..40ef0229d6 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -30,7 +30,7 @@ const ( minTableConcurrency = 1 minResourceConcurrency = 100 otelName = "schedule" - defaultSingleTableMaxConcurrency = 10 + defaultSingleTableMaxConcurrency = 50 ) var ErrNoTables = errors.New("no tables specified for syncing, review `tables` and `skip_tables` in your config and specify at least one table to sync") From 9c1dfae4afbf7b77d043f0208b800e82d2b0abd1 Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 11 Dec 2023 08:46:12 -0600 Subject: [PATCH 07/11] Update scheduler.go --- scheduler/scheduler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 40ef0229d6..33ee33dfd8 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -67,9 +67,9 @@ func WithStrategy(strategy Strategy) Option { } } -func WithSingleTableMaxConcurrency(concurrency int) Option { +func WithSingleTableMaxConcurrency(concurrency int64) Option { return func(s *Scheduler) { - s.singleTableMaxConcurrency = int64(concurrency) + s.singleTableMaxConcurrency = concurrency } } From f21d89a8fabcc415ac102da85551e33c8f6d71e9 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 12 Dec 2023 09:38:09 -0600 Subject: [PATCH 08/11] Update scheduler.go --- scheduler/scheduler.go | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 33ee33dfd8..23b97cffc3 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -25,12 +25,11 @@ import ( ) const ( - DefaultConcurrency = 50000 - DefaultMaxDepth = 4 - minTableConcurrency = 1 - minResourceConcurrency = 100 - otelName = "schedule" - defaultSingleTableMaxConcurrency = 50 + DefaultConcurrency = 50000 + DefaultMaxDepth = 4 + minTableConcurrency = 1 + minResourceConcurrency = 100 + otelName = "schedule" ) var ErrNoTables = errors.New("no tables specified for syncing, review `tables` and `skip_tables` in your config and specify at least one table to sync") @@ -73,6 +72,12 @@ func WithSingleTableMaxConcurrency(concurrency int64) Option { } } +func WithGlobalRateLimiting() Option { + return func(s *Scheduler) { + s.globalRateLimiting = true + } +} + type SyncOption func(*syncClient) func WithSyncDeterministicCQID(deterministicCQID bool) SyncOption { @@ -94,10 +99,14 @@ type Scheduler struct { // tableSem is a semaphore that limits the number of concurrent tables being fetched tableSems []*semaphore.Weighted // Logger to call, this logger is passed to the serve.Serve Client, if not defined Serve will create one instead. - logger zerolog.Logger - concurrency int - singleTableConcurrency sync.Map + logger zerolog.Logger + concurrency int + // This Map holds all of the concurrency semaphores for each table+client pair. + singleTableConcurrency sync.Map + // The maximum number of go routines that can be spawned for a single table+client pair. singleTableMaxConcurrency int64 + // A boolean that indicates whether or not to use the global rate limiter. By default rate limiting is assumed to be table+client specific. + globalRateLimiting bool } type syncClient struct { @@ -112,10 +121,9 @@ type syncClient struct { func NewScheduler(opts ...Option) *Scheduler { s := Scheduler{ - caser: caser.New(), - concurrency: DefaultConcurrency, - maxDepth: DefaultMaxDepth, - singleTableMaxConcurrency: defaultSingleTableMaxConcurrency, + caser: caser.New(), + concurrency: DefaultConcurrency, + maxDepth: DefaultMaxDepth, } for _, opt := range opts { opt(&s) @@ -131,6 +139,11 @@ func NewScheduler(opts ...Option) *Scheduler { tableConcurrency = max(tableConcurrency/2, minTableConcurrency) } s.resourceSem = semaphore.NewWeighted(int64(resourceConcurrency)) + + // To preserve backwards compatibility, if singleTableMaxConcurrency is not set, set it to the max concurrency + if s.singleTableMaxConcurrency == 0 { + s.singleTableMaxConcurrency = int64(tableConcurrency) + } return &s } From fb0b152c65e6504e1b5d2f226e8b0f0c84fff6eb Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 12 Dec 2023 09:38:13 -0600 Subject: [PATCH 09/11] Update scheduler_dfs.go --- scheduler/scheduler_dfs.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 57a3a8715d..c50d51e212 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -194,24 +194,31 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl resolvedResources <- resource for _, relation := range resource.Table.Relations { relation := relation - if err := s.scheduler.tableSems[depth].Acquire(ctx, 1); err != nil { + // Depending on the source, rate limiting might be done on the table basis (GCP) or client + table (AWS) + tableConcurrencyKey := table.Name + "-" + client.ID() + if s.scheduler.globalRateLimiting { + tableConcurrencyKey = table.Name + } + // Acquire the semaphore for the table + tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleTableMaxConcurrency)) + tableSem := tableSemVal.(*semaphore.Weighted) + if err := tableSem.Acquire(ctx, 1); err != nil { // This means context was cancelled wg.Wait() return } - tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(table.Name+"-"+client.ID(), semaphore.NewWeighted(s.scheduler.singleTableMaxConcurrency)) - tableSem := tableSemVal.(*semaphore.Weighted) - if err := tableSem.Acquire(ctx, 1); err != nil { + // Once table semaphore is acquired we can acquire the global semaphore + if err := s.scheduler.tableSems[depth].Acquire(ctx, 1); err != nil { // This means context was cancelled - s.scheduler.tableSems[depth].Release(1) + tableSem.Release(1) wg.Wait() return } wg.Add(1) go func() { defer wg.Done() - defer s.scheduler.tableSems[depth].Release(1) defer tableSem.Release(1) + defer s.scheduler.tableSems[depth].Release(1) s.resolveTableDfs(ctx, relation, client, resource, resolvedResources, depth+1) }() } From 7523a3e6ec9de9a4ae150ab2d1479a590c20e8cc Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 12 Dec 2023 10:45:04 -0600 Subject: [PATCH 10/11] remove `globalRateLimiting` --- scheduler/scheduler.go | 8 -------- scheduler/scheduler_dfs.go | 4 ---- 2 files changed, 12 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 23b97cffc3..21ad6b7f94 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -72,12 +72,6 @@ func WithSingleTableMaxConcurrency(concurrency int64) Option { } } -func WithGlobalRateLimiting() Option { - return func(s *Scheduler) { - s.globalRateLimiting = true - } -} - type SyncOption func(*syncClient) func WithSyncDeterministicCQID(deterministicCQID bool) SyncOption { @@ -105,8 +99,6 @@ type Scheduler struct { singleTableConcurrency sync.Map // The maximum number of go routines that can be spawned for a single table+client pair. singleTableMaxConcurrency int64 - // A boolean that indicates whether or not to use the global rate limiter. By default rate limiting is assumed to be table+client specific. - globalRateLimiting bool } type syncClient struct { diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index c50d51e212..bbfdc03656 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -194,11 +194,7 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl resolvedResources <- resource for _, relation := range resource.Table.Relations { relation := relation - // Depending on the source, rate limiting might be done on the table basis (GCP) or client + table (AWS) tableConcurrencyKey := table.Name + "-" + client.ID() - if s.scheduler.globalRateLimiting { - tableConcurrencyKey = table.Name - } // Acquire the semaphore for the table tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleTableMaxConcurrency)) tableSem := tableSemVal.(*semaphore.Weighted) From b060e10b6d68cd47f2951a79317895504d3a7de5 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 12 Dec 2023 15:19:35 -0600 Subject: [PATCH 11/11] updated name of new attribute --- scheduler/scheduler.go | 12 ++++++------ scheduler/scheduler_dfs.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 21ad6b7f94..9eabf6138b 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -66,9 +66,9 @@ func WithStrategy(strategy Strategy) Option { } } -func WithSingleTableMaxConcurrency(concurrency int64) Option { +func WithSingleNestedTableMaxConcurrency(concurrency int64) Option { return func(s *Scheduler) { - s.singleTableMaxConcurrency = concurrency + s.singleNestedTableMaxConcurrency = concurrency } } @@ -97,8 +97,8 @@ type Scheduler struct { concurrency int // This Map holds all of the concurrency semaphores for each table+client pair. singleTableConcurrency sync.Map - // The maximum number of go routines that can be spawned for a single table+client pair. - singleTableMaxConcurrency int64 + // The maximum number of go routines that can be spawned for a single table+client pair + singleNestedTableMaxConcurrency int64 } type syncClient struct { @@ -133,8 +133,8 @@ func NewScheduler(opts ...Option) *Scheduler { s.resourceSem = semaphore.NewWeighted(int64(resourceConcurrency)) // To preserve backwards compatibility, if singleTableMaxConcurrency is not set, set it to the max concurrency - if s.singleTableMaxConcurrency == 0 { - s.singleTableMaxConcurrency = int64(tableConcurrency) + if s.singleNestedTableMaxConcurrency == 0 { + s.singleNestedTableMaxConcurrency = int64(tableConcurrency) } return &s } diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index bbfdc03656..37f8775b95 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -196,7 +196,7 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl relation := relation tableConcurrencyKey := table.Name + "-" + client.ID() // Acquire the semaphore for the table - tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleTableMaxConcurrency)) + tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleNestedTableMaxConcurrency)) tableSem := tableSemVal.(*semaphore.Weighted) if err := tableSem.Acquire(ctx, 1); err != nil { // This means context was cancelled