From 6708a6be55d402bfa754f34e40bb4d2b8dab6182 Mon Sep 17 00:00:00 2001 From: Kemal Hadimli Date: Tue, 27 Jun 2023 17:33:55 +0100 Subject: [PATCH 1/3] fix(batchwriter): Allow zero batch size, flush before exceeding batch size instead of after --- writers/batch.go | 14 +++++++------- writers/batch_test.go | 26 +++++++++++--------------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/writers/batch.go b/writers/batch.go index 510418c9bb..b7cc52bac2 100644 --- a/writers/batch.go +++ b/writers/batch.go @@ -132,14 +132,14 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m } return } - resources = append(resources, r) - sizeBytes += util.TotalRecordSize(r.Record) - if len(resources) >= w.batchSize || sizeBytes >= int64(w.batchSizeBytes) { + if (w.batchSize > 0 && len(resources) >= w.batchSize) || (w.batchSizeBytes > 0 && sizeBytes+util.TotalRecordSize(r.Record) >= int64(w.batchSizeBytes)) { w.flushTable(ctx, tableName, resources) - resources = make([]*message.Insert, 0) - sizeBytes = 0 + resources, sizeBytes = resources[:0], 0 } + + resources = append(resources, r) + sizeBytes += util.TotalRecordSize(r.Record) case <-time.After(w.batchTimeout): if len(resources) > 0 { w.flushTable(ctx, tableName, resources) @@ -259,7 +259,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.Message) er w.deleteStaleMessages = append(w.deleteStaleMessages, m) l := len(w.deleteStaleMessages) w.deleteStaleLock.Unlock() - if l > w.batchSize { + if w.batchSize > 0 && l > w.batchSize { if err := w.flushDeleteStaleTables(ctx); err != nil { return err } @@ -283,7 +283,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.Message) er w.migrateTableMessages = append(w.migrateTableMessages, m) l := len(w.migrateTableMessages) w.migrateTableLock.Unlock() - if l > w.batchSize { + if w.batchSize > 0 && l > w.batchSize { if err := w.flushMigrateTables(ctx); err != nil { return err } diff --git a/writers/batch_test.go b/writers/batch_test.go index a6940181d1..7e07a7189f 100644 --- a/writers/batch_test.go +++ b/writers/batch_test.go @@ -68,15 +68,6 @@ var batchTestTables = schema.Tables{ }, }, }, - { - Name: "table2", - Columns: []schema.Column{ - { - Name: "id", - Type: arrow.PrimitiveTypes.Int64, - }, - }, - }, } // TestBatchFlushDifferentMessages tests that if writer receives a message of a new type all other pending @@ -106,7 +97,7 @@ func TestBatchFlushDifferentMessages(t *testing.T) { } if testClient.MigrateTablesLen() != 1 { - t.Fatalf("expected 1 migrate table messages, got %d", testClient.MigrateTablesLen()) + t.Fatalf("expected 1 migrate table message, got %d", testClient.MigrateTablesLen()) } if testClient.InsertsLen() != 0 { @@ -118,7 +109,7 @@ func TestBatchFlushDifferentMessages(t *testing.T) { } if testClient.InsertsLen() != 1 { - t.Fatalf("expected 1 insert messages, got %d", testClient.InsertsLen()) + t.Fatalf("expected 1 insert message, got %d", testClient.InsertsLen()) } } @@ -142,9 +133,14 @@ func TestBatchSize(t *testing.T) { t.Fatalf("expected 0 insert messages, got %d", testClient.InsertsLen()) } - if err := wr.writeAll(ctx, []message.Message{&message.Insert{ - Record: record, - }}); err != nil { + if err := wr.writeAll(ctx, []message.Message{ + &message.Insert{ + Record: record, + }, + &message.Insert{ // third message to exceed the batch size + Record: record, + }, + }); err != nil { t.Fatal(err) } // we need to wait for the batch to be flushed @@ -186,7 +182,7 @@ func TestBatchTimeout(t *testing.T) { time.Sleep(time.Second * 1) if testClient.InsertsLen() != 1 { - t.Fatalf("expected 1 insert messages, got %d", testClient.InsertsLen()) + t.Fatalf("expected 1 insert message, got %d", testClient.InsertsLen()) } } From 7f347fbfceb14ee4b27fc9340ee866cccd66b2a3 Mon Sep 17 00:00:00 2001 From: Kemal Hadimli Date: Tue, 27 Jun 2023 17:37:02 +0100 Subject: [PATCH 2/3] don't keep allocating new slices --- writers/batch.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/writers/batch.go b/writers/batch.go index b7cc52bac2..08b4123563 100644 --- a/writers/batch.go +++ b/writers/batch.go @@ -122,7 +122,7 @@ func (w *BatchWriter) Close(context.Context) error { func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *message.Insert, flush <-chan chan bool) { sizeBytes := int64(0) - resources := make([]*message.Insert, 0) + resources := make([]*message.Insert, 0, w.batchSize) for { select { case r, ok := <-ch: @@ -143,14 +143,12 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m case <-time.After(w.batchTimeout): if len(resources) > 0 { w.flushTable(ctx, tableName, resources) - resources = make([]*message.Insert, 0) - sizeBytes = 0 + resources, sizeBytes = resources[:0], 0 } case done := <-flush: if len(resources) > 0 { w.flushTable(ctx, tableName, resources) - resources = make([]*message.Insert, 0) - sizeBytes = 0 + resources, sizeBytes = resources[:0], 0 } done <- true case <-ctx.Done(): From 1024fd33b98c64c78d80e4e3a6553afee0544103 Mon Sep 17 00:00:00 2001 From: Kemal Hadimli Date: Wed, 28 Jun 2023 12:35:01 +0100 Subject: [PATCH 3/3] fix merge artifact --- writers/batch_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/writers/batch_test.go b/writers/batch_test.go index 37c585a88a..ac135c69db 100644 --- a/writers/batch_test.go +++ b/writers/batch_test.go @@ -133,7 +133,7 @@ func TestBatchSize(t *testing.T) { t.Fatalf("expected 0 insert messages, got %d", testClient.InsertsLen()) } - if err := wr.writeAll(ctx, []message.Message{ + if err := wr.writeAll(ctx, []message.WriteMessage{ &message.WriteInsert{ Record: record, }, @@ -141,7 +141,7 @@ func TestBatchSize(t *testing.T) { Record: record, }, }); err != nil { - t.Fatal(err) + t.Fatal(err) } // we need to wait for the batch to be flushed time.Sleep(time.Second * 2)