diff --git a/scheduler/batchsender/batch_sender.go b/scheduler/batchsender/batch_sender.go new file mode 100644 index 0000000000..941b6aef83 --- /dev/null +++ b/scheduler/batchsender/batch_sender.go @@ -0,0 +1,85 @@ +package batchsender + +import ( + "sync" + "time" + + "github.com/cloudquery/plugin-sdk/v4/helpers" +) + +const ( + batchSize = 100 + batchTimeout = 100 * time.Millisecond +) + +// BatchSender is a helper struct that batches items and sends them in batches of batchSize or after batchTimeout. +// +// - If item is already a slice, it will be sent directly +// - Otherwise, it will be added to the current batch +// - If the current batch has reached the batch size, it will be sent immediately +// - Otherwise, a timer will be started to send the current batch after the batch timeout +type BatchSender struct { + sendFn func(any) + items []any + timer *time.Timer + itemsLock sync.Mutex +} + +func NewBatchSender(sendFn func(any)) *BatchSender { + return &BatchSender{sendFn: sendFn} +} + +func (bs *BatchSender) Send(item any) { + if bs.timer != nil { + bs.timer.Stop() + } + + items := helpers.InterfaceSlice(item) + + // If item is already a slice, send it directly + // together with the current batch + if len(items) > 1 { + bs.flush(items...) + return + } + + // Otherwise, add item to the current batch + bs.appendToBatch(items...) + + // If the current batch has reached the batch size, send it + if len(bs.items) >= batchSize { + bs.flush() + return + } + + // Otherwise, start a timer to send the current batch after the batch timeout + bs.timer = time.AfterFunc(batchTimeout, func() { bs.flush() }) +} + +func (bs *BatchSender) appendToBatch(items ...any) { + bs.itemsLock.Lock() + defer bs.itemsLock.Unlock() + + bs.items = append(bs.items, items...) +} + +func (bs *BatchSender) flush(items ...any) { + bs.itemsLock.Lock() + defer bs.itemsLock.Unlock() + + bs.items = append(bs.items, items...) + + if len(bs.items) == 0 { + return + } + + bs.sendFn(bs.items) + bs.items = nil +} + +func (bs *BatchSender) Close() { + if bs.timer != nil { + bs.timer.Stop() + } + bs.flush() +} diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index a3aece7422..bfb027a95b 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -9,6 +9,7 @@ import ( "time" "github.com/cloudquery/plugin-sdk/v4/helpers" + "github.com/cloudquery/plugin-sdk/v4/scheduler/batchsender" "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" "github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers" "github.com/cloudquery/plugin-sdk/v4/schema" @@ -121,9 +122,13 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c } }() + batchSender := batchsender.NewBatchSender(func(item any) { + s.resolveResourcesDfs(ctx, table, client, parent, item, resolvedResources, depth) + }) for r := range res { - s.resolveResourcesDfs(ctx, table, client, parent, r, resolvedResources, depth) + batchSender.Send(r) } + batchSender.Close() // we don't need any waitgroups here because we are waiting for the channel to close endTime := time.Now()