diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5e564c6492..9eabf6138b 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "runtime/debug" + "sync" "sync/atomic" "time" @@ -65,6 +66,12 @@ func WithStrategy(strategy Strategy) Option { } } +func WithSingleNestedTableMaxConcurrency(concurrency int64) Option { + return func(s *Scheduler) { + s.singleNestedTableMaxConcurrency = concurrency + } +} + type SyncOption func(*syncClient) func WithSyncDeterministicCQID(deterministicCQID bool) SyncOption { @@ -88,6 +95,10 @@ type Scheduler struct { // Logger to call, this logger is passed to the serve.Serve Client, if not defined Serve will create one instead. logger zerolog.Logger concurrency int + // This Map holds all of the concurrency semaphores for each table+client pair. + singleTableConcurrency sync.Map + // The maximum number of go routines that can be spawned for a single table+client pair + singleNestedTableMaxConcurrency int64 } type syncClient struct { @@ -120,6 +131,11 @@ func NewScheduler(opts ...Option) *Scheduler { tableConcurrency = max(tableConcurrency/2, minTableConcurrency) } s.resourceSem = semaphore.NewWeighted(int64(resourceConcurrency)) + + // To preserve backwards compatibility, if singleTableMaxConcurrency is not set, set it to the max concurrency + if s.singleNestedTableMaxConcurrency == 0 { + s.singleNestedTableMaxConcurrency = int64(tableConcurrency) + } return &s } diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 8a42b3e1e8..37f8775b95 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -13,6 +13,7 @@ import ( "github.com/getsentry/sentry-go" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "golang.org/x/sync/semaphore" ) func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *schema.Resource) { @@ -193,14 +194,26 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl resolvedResources <- resource for _, relation := range resource.Table.Relations { relation := relation + tableConcurrencyKey := table.Name + "-" + client.ID() + // Acquire the semaphore for the table + tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleNestedTableMaxConcurrency)) + tableSem := tableSemVal.(*semaphore.Weighted) + if err := tableSem.Acquire(ctx, 1); err != nil { + // This means context was cancelled + wg.Wait() + return + } + // Once table semaphore is acquired we can acquire the global semaphore if err := s.scheduler.tableSems[depth].Acquire(ctx, 1); err != nil { // This means context was cancelled + tableSem.Release(1) wg.Wait() return } wg.Add(1) go func() { defer wg.Done() + defer tableSem.Release(1) defer s.scheduler.tableSems[depth].Release(1) s.resolveTableDfs(ctx, relation, client, resource, resolvedResources, depth+1) }()