From bd919b4445681039193302dedf6a49c28bcb5dde Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Tue, 21 Mar 2023 16:04:42 +0800 Subject: [PATCH] sorter(ticdc): fix dml order (#8598) close pingcap/tiflow#8597 --- cdc/model/mounter.go | 4 +++ cdc/model/mounter_test.go | 30 +++++++++++++++++++ .../engine/pebble/encoding/key.go | 22 ++++++++++++-- .../engine/pebble/encoding/key_test.go | 21 +++++++++++++ 4 files changed, 74 insertions(+), 3 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 69be1f6f620..d38d6c7491c 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -131,6 +131,10 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { if i.RawKV.OpType == OpTypeDelete && j.RawKV.OpType != OpTypeDelete { return true } + // update DML + if i.RawKV.OldValue != nil && j.RawKV.OldValue == nil { + return true + } } return i.CRTs < j.CRTs } diff --git a/cdc/model/mounter_test.go b/cdc/model/mounter_test.go index b92b4c213dc..dd5e29a3f24 100644 --- a/cdc/model/mounter_test.go +++ b/cdc/model/mounter_test.go @@ -98,3 +98,33 @@ func TestResolvedTsEqual(t *testing.T) { t5 := ResolvedTs{Mode: BatchResolvedMode, Ts: 2, BatchID: 1} require.False(t, t1.Equal(t5)) } + +func TestComparePolymorphicEvents(t *testing.T) { + cases := []struct { + a *PolymorphicEvent + b *PolymorphicEvent + }{ + { + a: NewPolymorphicEvent(&RawKVEntry{ + OpType: OpTypeDelete, + }), + b: NewPolymorphicEvent(&RawKVEntry{ + OpType: OpTypePut, + }), + }, + { + a: NewPolymorphicEvent(&RawKVEntry{ + OpType: OpTypePut, + OldValue: []byte{0}, + Value: []byte{0}, + }), + b: NewPolymorphicEvent(&RawKVEntry{ + OpType: OpTypePut, + Value: []byte{0}, + }), + }, + } + for _, item := range cases { + require.True(t, ComparePolymorphicEvents(item.a, item.b)) + } +} diff --git a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go index 711ebc5389a..6a38a4909b5 100644 --- a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go +++ b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go @@ -21,6 +21,12 @@ import ( "go.uber.org/zap" ) +const ( + typeDelete = iota + 1 + typeUpdate + typeInsert +) + // DecodeKey decodes a key to uniqueID, tableID, startTs, CRTs. func DecodeKey(key []byte) (uniqueID uint32, tableID uint64, startTs, CRTs uint64) { // uniqueID, tableID, CRTs, startTs, Key, Put/Delete @@ -75,7 +81,7 @@ func EncodeTsKey(uniqueID uint32, tableID uint64, CRTs uint64, startTs ...uint64 } // EncodeKey encodes a key according to event. -// Format: uniqueID, tableID, CRTs, startTs, Put/Delete, Key. +// Format: uniqueID, tableID, CRTs, startTs, delete/update/insert, Key. func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) []byte { if event.RawKV == nil { log.Panic("rawkv must not be nil", zap.Any("event", event)) @@ -96,9 +102,19 @@ func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) [ // startTs binary.BigEndian.PutUint64(uint64Buf[:], event.StartTs) buf = append(buf, uint64Buf[:]...) - // Let Delete < Put - binary.BigEndian.PutUint16(uint64Buf[:], ^uint16(event.RawKV.OpType)) + // Let Delete < Update < Insert + binary.BigEndian.PutUint16(uint64Buf[:], getDMLOrder(event.RawKV)) buf = append(buf, uint64Buf[:2]...) // key return append(buf, event.RawKV.Key...) } + +// getDMLOrder returns the order of the dml types: delete