Skip to content
Merged
16 changes: 16 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -65,6 +66,12 @@ func WithStrategy(strategy Strategy) Option {
}
}

func WithSingleNestedTableMaxConcurrency(concurrency int64) Option {
return func(s *Scheduler) {
s.singleNestedTableMaxConcurrency = concurrency
}
}

type SyncOption func(*syncClient)

func WithSyncDeterministicCQID(deterministicCQID bool) SyncOption {
Expand All @@ -88,6 +95,10 @@ type Scheduler struct {
// Logger to call, this logger is passed to the serve.Serve Client, if not defined Serve will create one instead.
logger zerolog.Logger
concurrency int
// This Map holds all of the concurrency semaphores for each table+client pair.
singleTableConcurrency sync.Map
// The maximum number of go routines that can be spawned for a single table+client pair
singleNestedTableMaxConcurrency int64
}

type syncClient struct {
Expand Down Expand Up @@ -120,6 +131,11 @@ func NewScheduler(opts ...Option) *Scheduler {
tableConcurrency = max(tableConcurrency/2, minTableConcurrency)
}
s.resourceSem = semaphore.NewWeighted(int64(resourceConcurrency))

// To preserve backwards compatibility, if singleTableMaxConcurrency is not set, set it to the max concurrency
if s.singleNestedTableMaxConcurrency == 0 {
s.singleNestedTableMaxConcurrency = int64(tableConcurrency)
}
return &s
}

Expand Down
13 changes: 13 additions & 0 deletions scheduler/scheduler_dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/getsentry/sentry-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/semaphore"
)

func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *schema.Resource) {
Expand Down Expand Up @@ -193,14 +194,26 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
resolvedResources <- resource
for _, relation := range resource.Table.Relations {
relation := relation
tableConcurrencyKey := table.Name + "-" + client.ID()
// Acquire the semaphore for the table
tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleNestedTableMaxConcurrency))
tableSem := tableSemVal.(*semaphore.Weighted)
if err := tableSem.Acquire(ctx, 1); err != nil {
// This means context was cancelled
wg.Wait()
return
}
// Once table semaphore is acquired we can acquire the global semaphore
if err := s.scheduler.tableSems[depth].Acquire(ctx, 1); err != nil {
// This means context was cancelled
tableSem.Release(1)
wg.Wait()
return
}
wg.Add(1)
go func() {
defer wg.Done()
defer tableSem.Release(1)
defer s.scheduler.tableSems[depth].Release(1)
s.resolveTableDfs(ctx, relation, client, resource, resolvedResources, depth+1)
}()
Expand Down