From aa8a1bbebd809dc0f5299281242d5274624b7eba Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Mon, 27 Oct 2025 12:00:02 +0800 Subject: [PATCH 1/2] support setting decoder concurrency for TransactionPayloadEvent binlog --- replication/binlogsyncer.go | 4 ++++ replication/parser.go | 7 +++++++ replication/transaction_payload_event.go | 3 ++- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index ab4c7647a..39794e54c 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -134,6 +134,10 @@ type BinlogSyncerConfig struct { // Only works with MariaDB flavor. FillZeroLogPos bool + // PayloadDecoderConcurrency is used to control concurrency for decoding TransactionPayloadEvent. + // Default 0, this will be set to GOMAXPROCS. + PayloadDecoderConcurrency int + // SynchronousEventHandler is used for synchronous event handling. // This should not be used together with StartBackupWithHandler. // If this is not nil, GetEvent does not need to be called. diff --git a/replication/parser.go b/replication/parser.go index d413eef12..ebed120b3 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -40,6 +40,8 @@ type BinlogParser struct { ignoreJSONDecodeErr bool verifyChecksum bool + payloadDecoderConcurrency int + rowsEventDecodeFunc func(*RowsEvent, []byte) error tableMapOptionalMetaDecodeFunc func([]byte) error @@ -215,6 +217,10 @@ func (p *BinlogParser) SetFlavor(flavor string) { p.flavor = flavor } +func (p *BinlogParser) SetPayloadDecoderConcurrency(concurrency int) { + p.payloadDecoderConcurrency = concurrency +} + func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error) { p.rowsEventDecodeFunc = rowsEventDecodeFunc } @@ -456,6 +462,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent { func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent { e := &TransactionPayloadEvent{} e.format = *p.format + e.concurrency = p.payloadDecoderConcurrency return e } diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index a9f8a9b78..3c8437747 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -28,6 +28,7 @@ const ( type TransactionPayloadEvent struct { format FormatDescriptionEvent + concurrency int Size uint64 UncompressedSize uint64 CompressionType uint64 @@ -103,7 +104,7 @@ func (e *TransactionPayloadEvent) decodePayload() error { e.CompressionType, e.compressionType()) } - decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) + decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(e.concurrency)) if err != nil { return err } From 8ea1bf215a9629dc39b4f47f4dad3dd4535b4ae2 Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Mon, 27 Oct 2025 18:12:33 +0800 Subject: [PATCH 2/2] add missing set --- replication/binlogsyncer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 39794e54c..160de8676 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -212,6 +212,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { b.parser.SetUseDecimal(b.cfg.UseDecimal) b.parser.SetUseFloatWithTrailingZero(b.cfg.UseFloatWithTrailingZero) b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum) + b.parser.SetPayloadDecoderConcurrency(cfg.PayloadDecoderConcurrency) b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc) b.parser.SetTableMapOptionalMetaDecodeFunc(b.cfg.TableMapOptionalMetaDecodeFunc) b.running = false