From 29e50aa932647e9e1ddb3cbae85e7ec32e61df32 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Tue, 3 Dec 2024 17:12:36 +0400 Subject: [PATCH 1/2] Implement batch sender. --- scheduler/batchsender/batch_sender.go | 74 +++++++++++++++++++++++++++ scheduler/scheduler_dfs.go | 6 ++- 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 scheduler/batchsender/batch_sender.go diff --git a/scheduler/batchsender/batch_sender.go b/scheduler/batchsender/batch_sender.go new file mode 100644 index 0000000000..f4d6fa8fc5 --- /dev/null +++ b/scheduler/batchsender/batch_sender.go @@ -0,0 +1,74 @@ +package batchsender + +import ( + "sync" + "time" + + "github.com/cloudquery/plugin-sdk/v4/helpers" +) + +const ( + batchSize = 100 + batchTimeout = 100 * time.Millisecond +) + +// BatchSender is a helper struct that batches items and sends them in batches of batchSize or after batchTimeout. +// +// - If item is already a slice, it will be sent directly +// - Otherwise, it will be added to the current batch +// - If the current batch has reached the batch size, it will be sent immediately +// - Otherwise, a timer will be started to send the current batch after the batch timeout +type BatchSender struct { + sendFn func(any) + items []any + timer *time.Timer + itemsLock sync.Mutex +} + +func NewBatchSender(sendFn func(any)) *BatchSender { + return &BatchSender{sendFn: sendFn} +} + +func (bs *BatchSender) Send(item any) { + if bs.timer != nil { + bs.timer.Stop() + } + + items := helpers.InterfaceSlice(item) + + // If item is already a slice, send it directly + // together with the current batch + if len(items) > 1 { + bs.flush(items...) + return + } + + // Otherwise, add item to the current batch + bs.appendToBatch(items...) + + // If the current batch has reached the batch size, send it + if len(bs.items) >= batchSize { + bs.flush() + return + } + + // Otherwise, start a timer to send the current batch after the batch timeout + bs.timer = time.AfterFunc(batchTimeout, func() { bs.flush() }) +} + +func (bs *BatchSender) appendToBatch(items ...any) { + bs.itemsLock.Lock() + defer bs.itemsLock.Unlock() + + bs.items = append(bs.items, items...) +} + +func (bs *BatchSender) flush(items ...any) { + bs.itemsLock.Lock() + defer bs.itemsLock.Unlock() + + bs.items = append(bs.items, items...) + + bs.sendFn(bs.items) + bs.items = nil +} diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index a3aece7422..8a65ff038d 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -9,6 +9,7 @@ import ( "time" "github.com/cloudquery/plugin-sdk/v4/helpers" + "github.com/cloudquery/plugin-sdk/v4/scheduler/batchsender" "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" "github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers" "github.com/cloudquery/plugin-sdk/v4/schema" @@ -121,8 +122,11 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c } }() + batchSender := batchsender.NewBatchSender(func(item any) { + s.resolveResourcesDfs(ctx, table, client, parent, item, resolvedResources, depth) + }) for r := range res { - s.resolveResourcesDfs(ctx, table, client, parent, r, resolvedResources, depth) + batchSender.Send(r) } // we don't need any waitgroups here because we are waiting for the channel to close From feb165edf0a9e379beeb85bbe2e87e29c7bd6754 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Tue, 3 Dec 2024 18:18:24 +0400 Subject: [PATCH 2/2] Closes batchsender by sending the last batch immediately. --- scheduler/batchsender/batch_sender.go | 11 +++++++++++ scheduler/scheduler_dfs.go | 1 + 2 files changed, 12 insertions(+) diff --git a/scheduler/batchsender/batch_sender.go b/scheduler/batchsender/batch_sender.go index f4d6fa8fc5..941b6aef83 100644 --- a/scheduler/batchsender/batch_sender.go +++ b/scheduler/batchsender/batch_sender.go @@ -69,6 +69,17 @@ func (bs *BatchSender) flush(items ...any) { bs.items = append(bs.items, items...) + if len(bs.items) == 0 { + return + } + bs.sendFn(bs.items) bs.items = nil } + +func (bs *BatchSender) Close() { + if bs.timer != nil { + bs.timer.Stop() + } + bs.flush() +} diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 8a65ff038d..bfb027a95b 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -128,6 +128,7 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c for r := range res { batchSender.Send(r) } + batchSender.Close() // we don't need any waitgroups here because we are waiting for the channel to close endTime := time.Now()