From 23793233a046285f569fa172e75304e86e3cd4ad Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Tue, 4 Jul 2023 13:13:07 +0300 Subject: [PATCH 1/5] don't reset manually --- writers/batchwriter/batchwriter.go | 10 +--------- writers/mixedbatchwriter/mixedbatchwriter.go | 10 +--------- writers/mixedbatchwriter/mixedbatchwriter_test.go | 5 +++-- 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index 1310b3956d..ae3ef9bcf5 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -122,7 +122,7 @@ func (w *BatchWriter) Close(context.Context) error { func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *message.WriteInsert, flush <-chan chan bool) { sizeBytes := int64(0) resources := make([]*message.WriteInsert, 0, w.batchSize) - tick := timer(w.batchTimeout) + tick := time.Tick(w.batchTimeout) for { select { case r, ok := <-ch: @@ -145,7 +145,6 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m w.flushTable(ctx, tableName, resources) resources, sizeBytes = resources[:0], 0 } - tick = timer(w.batchTimeout) case done := <-flush: if len(resources) > 0 { w.flushTable(ctx, tableName, resources) @@ -324,10 +323,3 @@ func (w *BatchWriter) startWorker(ctx context.Context, msg *message.WriteInsert) ch <- msg return nil } - -func timer(timeout time.Duration) <-chan time.Time { - if timeout == 0 { - return nil - } - return time.After(timeout) -} diff --git a/writers/mixedbatchwriter/mixedbatchwriter.go b/writers/mixedbatchwriter/mixedbatchwriter.go index 4f76a12fe6..a5d90b0d71 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter.go +++ b/writers/mixedbatchwriter/mixedbatchwriter.go @@ -76,7 +76,7 @@ func New(client Client, opts ...Option) (*MixedBatchWriter, error) { batchSize: defaultBatchSize, batchSizeBytes: defaultBatchSizeBytes, batchTimeout: defaultBatchTimeout, - timerFn: timer, + timerFn: time.Tick, } for _, opt := range opts { opt(c) @@ -149,7 +149,6 @@ loop: return err } prevMsgType = writers.MsgTypeUnset - tick = w.timerFn(w.batchTimeout) } } return flush(prevMsgType) @@ -215,10 +214,3 @@ func (m *insertBatchManager) flush(ctx context.Context) error { m.batch = m.batch[:0] return nil } - -func timer(timeout time.Duration) <-chan time.Time { - if timeout == 0 { - return nil - } - return time.After(timeout) -} diff --git a/writers/mixedbatchwriter/mixedbatchwriter_test.go b/writers/mixedbatchwriter/mixedbatchwriter_test.go index ade2d52558..0685df079a 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter_test.go +++ b/writers/mixedbatchwriter/mixedbatchwriter_test.go @@ -246,8 +246,9 @@ func TestMixedBatchWriterTimeout(t *testing.T) { withTimerFn(func(_ time.Duration) <-chan time.Time { c := make(chan time.Time) go func() { - <-triggerTimeout - c <- time.Now() + for range triggerTimeout { + c <- time.Now() + } }() return c }), From 519fcaf9ce6149acb7edadb80c1b28bca97a23b2 Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Tue, 4 Jul 2023 13:56:52 +0300 Subject: [PATCH 2/5] // nolint:staticcheck --- writers/batchwriter/batchwriter.go | 2 +- writers/mixedbatchwriter/mixedbatchwriter.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index ae3ef9bcf5..9662a08336 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -122,7 +122,7 @@ func (w *BatchWriter) Close(context.Context) error { func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *message.WriteInsert, flush <-chan chan bool) { sizeBytes := int64(0) resources := make([]*message.WriteInsert, 0, w.batchSize) - tick := time.Tick(w.batchTimeout) + tick := time.Tick(w.batchTimeout) // nolint:staticcheck for { select { case r, ok := <-ch: diff --git a/writers/mixedbatchwriter/mixedbatchwriter.go b/writers/mixedbatchwriter/mixedbatchwriter.go index a5d90b0d71..98b53a1dba 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter.go +++ b/writers/mixedbatchwriter/mixedbatchwriter.go @@ -76,7 +76,7 @@ func New(client Client, opts ...Option) (*MixedBatchWriter, error) { batchSize: defaultBatchSize, batchSizeBytes: defaultBatchSizeBytes, batchTimeout: defaultBatchTimeout, - timerFn: time.Tick, + timerFn: time.Tick, // nolint:staticcheck } for _, opt := range opts { opt(c) From e63bc5e260085294691b371f717a164267b451cf Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Tue, 4 Jul 2023 15:57:24 +0300 Subject: [PATCH 3/5] don't leak --- writers/batchwriter/batchwriter.go | 10 +++++++++- writers/mixedbatchwriter/mixedbatchwriter.go | 14 +++++++++++--- writers/mixedbatchwriter/mixedbatchwriter_test.go | 4 ++-- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index 9662a08336..c37f539744 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -122,7 +122,8 @@ func (w *BatchWriter) Close(context.Context) error { func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *message.WriteInsert, flush <-chan chan bool) { sizeBytes := int64(0) resources := make([]*message.WriteInsert, 0, w.batchSize) - tick := time.Tick(w.batchTimeout) // nolint:staticcheck + tick, done := ticker(w.batchTimeout) + defer done() for { select { case r, ok := <-ch: @@ -323,3 +324,10 @@ func (w *BatchWriter) startWorker(ctx context.Context, msg *message.WriteInsert) ch <- msg return nil } + +func ticker(interval time.Duration) (ch <-chan time.Time, stop func()) { + if t := time.NewTicker(interval); t != nil { + return t.C, t.Stop + } + return nil, func() {} +} diff --git a/writers/mixedbatchwriter/mixedbatchwriter.go b/writers/mixedbatchwriter/mixedbatchwriter.go index 98b53a1dba..684a77c140 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter.go +++ b/writers/mixedbatchwriter/mixedbatchwriter.go @@ -17,7 +17,7 @@ type Client interface { DeleteStaleBatch(ctx context.Context, messages message.WriteDeleteStales) error } -type timerFn func(timeout time.Duration) <-chan time.Time +type timerFn func(timeout time.Duration) (<-chan time.Time, func()) type MixedBatchWriter struct { client Client @@ -76,7 +76,7 @@ func New(client Client, opts ...Option) (*MixedBatchWriter, error) { batchSize: defaultBatchSize, batchSizeBytes: defaultBatchSizeBytes, batchTimeout: defaultBatchTimeout, - timerFn: time.Tick, // nolint:staticcheck + timerFn: ticker, } for _, opt := range opts { opt(c) @@ -116,7 +116,8 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri } prevMsgType := writers.MsgTypeUnset var err error - tick := w.timerFn(w.batchTimeout) + tick, done := w.timerFn(w.batchTimeout) + defer done() loop: for { select { @@ -214,3 +215,10 @@ func (m *insertBatchManager) flush(ctx context.Context) error { m.batch = m.batch[:0] return nil } + +func ticker(interval time.Duration) (ch <-chan time.Time, stop func()) { + if t := time.NewTicker(interval); t != nil { + return t.C, t.Stop + } + return nil, func() {} +} diff --git a/writers/mixedbatchwriter/mixedbatchwriter_test.go b/writers/mixedbatchwriter/mixedbatchwriter_test.go index 0685df079a..d6da4cc2c6 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter_test.go +++ b/writers/mixedbatchwriter/mixedbatchwriter_test.go @@ -243,14 +243,14 @@ func TestMixedBatchWriterTimeout(t *testing.T) { wr, err := New(client, WithBatchSize(1000), WithBatchSizeBytes(1000000), - withTimerFn(func(_ time.Duration) <-chan time.Time { + withTimerFn(func(_ time.Duration) (<-chan time.Time, func()) { c := make(chan time.Time) go func() { for range triggerTimeout { c <- time.Now() } }() - return c + return c, func() { close(c) } }), ) if err != nil { From 68015d39fd8c4ab30b82969e283a2651cedcfe8b Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Tue, 4 Jul 2023 16:16:15 +0300 Subject: [PATCH 4/5] move to writers --- writers/batchwriter/batchwriter.go | 9 +-------- writers/mixedbatchwriter/mixedbatchwriter.go | 19 +++++-------------- .../mixedbatchwriter/mixedbatchwriter_test.go | 2 +- writers/ticker.go | 17 +++++++++++++++++ 4 files changed, 24 insertions(+), 23 deletions(-) create mode 100644 writers/ticker.go diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index c37f539744..4a7247a214 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -122,7 +122,7 @@ func (w *BatchWriter) Close(context.Context) error { func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *message.WriteInsert, flush <-chan chan bool) { sizeBytes := int64(0) resources := make([]*message.WriteInsert, 0, w.batchSize) - tick, done := ticker(w.batchTimeout) + tick, done := writers.NewTicker(w.batchTimeout) defer done() for { select { @@ -324,10 +324,3 @@ func (w *BatchWriter) startWorker(ctx context.Context, msg *message.WriteInsert) ch <- msg return nil } - -func ticker(interval time.Duration) (ch <-chan time.Time, stop func()) { - if t := time.NewTicker(interval); t != nil { - return t.C, t.Stop - } - return nil, func() {} -} diff --git a/writers/mixedbatchwriter/mixedbatchwriter.go b/writers/mixedbatchwriter/mixedbatchwriter.go index 684a77c140..d4c797debc 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter.go +++ b/writers/mixedbatchwriter/mixedbatchwriter.go @@ -17,15 +17,13 @@ type Client interface { DeleteStaleBatch(ctx context.Context, messages message.WriteDeleteStales) error } -type timerFn func(timeout time.Duration) (<-chan time.Time, func()) - type MixedBatchWriter struct { client Client logger zerolog.Logger batchSize int batchSizeBytes int batchTimeout time.Duration - timerFn timerFn + tickerFn writers.TickerFunc } // Assert at compile-time that MixedBatchWriter implements the Writer interface @@ -57,9 +55,9 @@ func WithBatchTimeout(timeout time.Duration) Option { } } -func withTimerFn(timer timerFn) Option { +func withTickerFn(tickerFn writers.TickerFunc) Option { return func(p *MixedBatchWriter) { - p.timerFn = timer + p.tickerFn = tickerFn } } @@ -76,7 +74,7 @@ func New(client Client, opts ...Option) (*MixedBatchWriter, error) { batchSize: defaultBatchSize, batchSizeBytes: defaultBatchSizeBytes, batchTimeout: defaultBatchTimeout, - timerFn: ticker, + tickerFn: writers.NewTicker, } for _, opt := range opts { opt(c) @@ -116,7 +114,7 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri } prevMsgType := writers.MsgTypeUnset var err error - tick, done := w.timerFn(w.batchTimeout) + tick, done := w.tickerFn(w.batchTimeout) defer done() loop: for { @@ -215,10 +213,3 @@ func (m *insertBatchManager) flush(ctx context.Context) error { m.batch = m.batch[:0] return nil } - -func ticker(interval time.Duration) (ch <-chan time.Time, stop func()) { - if t := time.NewTicker(interval); t != nil { - return t.C, t.Stop - } - return nil, func() {} -} diff --git a/writers/mixedbatchwriter/mixedbatchwriter_test.go b/writers/mixedbatchwriter/mixedbatchwriter_test.go index d6da4cc2c6..a8536f8d4e 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter_test.go +++ b/writers/mixedbatchwriter/mixedbatchwriter_test.go @@ -243,7 +243,7 @@ func TestMixedBatchWriterTimeout(t *testing.T) { wr, err := New(client, WithBatchSize(1000), WithBatchSizeBytes(1000000), - withTimerFn(func(_ time.Duration) (<-chan time.Time, func()) { + withTickerFn(func(_ time.Duration) (<-chan time.Time, func()) { c := make(chan time.Time) go func() { for range triggerTimeout { diff --git a/writers/ticker.go b/writers/ticker.go new file mode 100644 index 0000000000..df657a2fbe --- /dev/null +++ b/writers/ticker.go @@ -0,0 +1,17 @@ +package writers + +import ( + "time" +) + +type TickerFunc func(interval time.Duration) (ch <-chan time.Time, done func()) + +func NewTicker(interval time.Duration) (<-chan time.Time, func()) { + if interval <= 0 { + return nil, nop + } + t := time.NewTicker(interval) + return t.C, t.Stop +} + +func nop() {} From 8f241a0cebc9b706af1c8fc099f993d8a45d741c Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Tue, 4 Jul 2023 16:20:35 +0300 Subject: [PATCH 5/5] streaming --- .../streamingbatchwriter/mocktimer_test.go | 16 ++++++++--- .../streamingbatchwriter.go | 27 +++++++------------ .../streamingbatchwriter_test.go | 4 +-- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/writers/streamingbatchwriter/mocktimer_test.go b/writers/streamingbatchwriter/mocktimer_test.go index a4166de499..044066dfbe 100644 --- a/writers/streamingbatchwriter/mocktimer_test.go +++ b/writers/streamingbatchwriter/mocktimer_test.go @@ -1,16 +1,24 @@ package streamingbatchwriter -import "time" +import ( + "time" + + "github.com/cloudquery/plugin-sdk/v4/writers" +) type mockTimer struct { expire chan time.Time } -func (t *mockTimer) timer(time.Duration) <-chan time.Time { - return t.expire +func (t *mockTimer) timer(time.Duration) (<-chan time.Time, func()) { + return t.expire, t.close +} + +func (t *mockTimer) close() { + close(t.expire) } -func newMockTimer() (timerFn, chan time.Time) { +func newMockTimer() (writers.TickerFunc, chan time.Time) { expire := make(chan time.Time) t := &mockTimer{ expire: expire, diff --git a/writers/streamingbatchwriter/streamingbatchwriter.go b/writers/streamingbatchwriter/streamingbatchwriter.go index 14f1ce3a78..01fb0d083b 100644 --- a/writers/streamingbatchwriter/streamingbatchwriter.go +++ b/writers/streamingbatchwriter/streamingbatchwriter.go @@ -61,11 +61,9 @@ type StreamingBatchWriter struct { batchSizeRows int64 batchSizeBytes int64 - timerFn timerFn + tickerFn writers.TickerFunc } -type timerFn func(timeout time.Duration) <-chan time.Time - // Assert at compile-time that StreamingBatchWriter implements the Writer interface var _ writers.Writer = (*StreamingBatchWriter)(nil) @@ -95,9 +93,9 @@ func WithBatchSizeBytes(size int64) Option { } } -func withTimerFn(timer timerFn) Option { +func withTickerFn(tickerFn writers.TickerFunc) Option { return func(p *StreamingBatchWriter) { - p.timerFn = timer + p.tickerFn = tickerFn } } @@ -115,7 +113,7 @@ func New(client Client, opts ...Option) (*StreamingBatchWriter, error) { batchTimeout: defaultBatchTimeoutSeconds * time.Second, batchSizeRows: defaultBatchSize, batchSizeBytes: defaultBatchSizeBytes, - timerFn: timer, + tickerFn: writers.NewTicker, } for _, opt := range opts { opt(c) @@ -225,7 +223,7 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err batchSizeRows: w.batchSizeRows, batchTimeout: w.batchTimeout, - timerFn: w.timerFn, + tickerFn: w.tickerFn, } w.workersWaitGroup.Add(1) @@ -277,7 +275,7 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err batchSizeRows: w.batchSizeRows, batchSizeBytes: w.batchSizeBytes, batchTimeout: w.batchTimeout, - timerFn: w.timerFn, + tickerFn: w.tickerFn, } w.workersLock.Lock() w.insertWorkers[tableName] = wr @@ -303,7 +301,7 @@ type streamingWorkerManager[T message.WriteMessage] struct { batchSizeRows int64 batchSizeBytes int64 batchTimeout time.Duration - timerFn timerFn + tickerFn writers.TickerFunc } func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup, tableName string) { @@ -345,7 +343,8 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup, } defer closeFlush() - tick := s.timerFn(s.batchTimeout) + tick, done := s.tickerFn(s.batchTimeout) + defer done() for { select { case r, ok := <-s.ch: @@ -370,7 +369,6 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup, if sizeRows > 0 { closeFlush() } - tick = s.timerFn(s.batchTimeout) case done := <-s.flush: if sizeRows > 0 { closeFlush() @@ -379,10 +377,3 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup, } } } - -func timer(timeout time.Duration) <-chan time.Time { - if timeout == 0 { - return nil - } - return time.After(timeout) -} diff --git a/writers/streamingbatchwriter/streamingbatchwriter_test.go b/writers/streamingbatchwriter/streamingbatchwriter_test.go index 12a5ffc043..9c5806747e 100644 --- a/writers/streamingbatchwriter/streamingbatchwriter_test.go +++ b/writers/streamingbatchwriter/streamingbatchwriter_test.go @@ -229,7 +229,7 @@ func TestStreamingBatchTimeout(t *testing.T) { testClient := newClient() timerFn, timerExpire := newMockTimer() - wr, err := New(testClient, withTimerFn(timerFn)) + wr, err := New(testClient, withTickerFn(timerFn)) if err != nil { t.Fatal(err) } @@ -333,7 +333,7 @@ func TestStreamingBatchUpserts(t *testing.T) { testClient := newClient() timerFn, timerExpire := newMockTimer() - wr, err := New(testClient, WithBatchSizeRows(2), withTimerFn(timerFn)) + wr, err := New(testClient, WithBatchSizeRows(2), withTickerFn(timerFn)) if err != nil { t.Fatal(err) }