From d9536cb7984bd98f1083cd4e2a4d72151229204a Mon Sep 17 00:00:00 2001 From: Marcel Tyszkiewicz Date: Mon, 19 May 2025 10:54:34 +0200 Subject: [PATCH 1/2] fix: Flush DeleteRecord messages when batch writer is flushed --- writers/batchwriter/batchwriter.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index af1b0bcb23..8c36df9883 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -107,7 +107,10 @@ func (w *BatchWriter) Flush(ctx context.Context) error { if err := w.flushMigrateTables(ctx); err != nil { return err } - return w.flushDeleteStaleTables(ctx) + if err := w.flushDeleteStaleTables(ctx); err != nil { + return err + } + return w.flushDeleteRecordTables(ctx) } func (w *BatchWriter) Close(context.Context) error { From 023ced968c9bc1a8f8d771a9980eaaec8381a24d Mon Sep 17 00:00:00 2001 From: Marcel Tyszkiewicz Date: Mon, 19 May 2025 11:50:05 +0200 Subject: [PATCH 2/2] flush DeleteRecord batch before WriteInsert --- writers/batchwriter/batchwriter.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index 8c36df9883..ad85b9fa9f 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -313,6 +313,9 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag if err := w.flushDeleteStaleTables(ctx); err != nil { return err } + if err := w.flushDeleteRecordTables(ctx); err != nil { + return err + } if err := w.startWorker(ctx, m); err != nil { return err }