diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index 47fa6baf57a..24983945643 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -48,7 +48,7 @@ func (x OnDDLAction) String() string { return proto.EnumName(OnDDLAction_name, int32(x)) } func (OnDDLAction) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{0} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{0} } // VEventType enumerates the event types. @@ -119,7 +119,31 @@ func (x VEventType) String() string { return proto.EnumName(VEventType_name, int32(x)) } func (VEventType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{1} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{1} +} + +// MigrationType specifies the type of migration for the Journal. +type MigrationType int32 + +const ( + MigrationType_TABLES MigrationType = 0 + MigrationType_SHARDS MigrationType = 1 +) + +var MigrationType_name = map[int32]string{ + 0: "TABLES", + 1: "SHARDS", +} +var MigrationType_value = map[string]int32{ + "TABLES": 0, + "SHARDS": 1, +} + +func (x MigrationType) String() string { + return proto.EnumName(MigrationType_name, int32(x)) +} +func (MigrationType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{2} } type BinlogTransaction_Statement_Category int32 @@ -167,7 +191,7 @@ func (x BinlogTransaction_Statement_Category) String() string { return proto.EnumName(BinlogTransaction_Statement_Category_name, int32(x)) } func (BinlogTransaction_Statement_Category) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{1, 0, 0} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{1, 0, 0} } // Charset is the per-statement charset info from a QUERY_EVENT binlog entry. @@ -187,7 +211,7 @@ func (m *Charset) Reset() { *m = Charset{} } func (m *Charset) String() string { return proto.CompactTextString(m) } func (*Charset) ProtoMessage() {} func (*Charset) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{0} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{0} } func (m *Charset) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Charset.Unmarshal(m, b) @@ -244,7 +268,7 @@ func (m *BinlogTransaction) Reset() { *m = BinlogTransaction{} } func (m *BinlogTransaction) String() string { return proto.CompactTextString(m) } func (*BinlogTransaction) ProtoMessage() {} func (*BinlogTransaction) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{1} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{1} } func (m *BinlogTransaction) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BinlogTransaction.Unmarshal(m, b) @@ -294,7 +318,7 @@ func (m *BinlogTransaction_Statement) Reset() { *m = BinlogTransaction_S func (m *BinlogTransaction_Statement) String() string { return proto.CompactTextString(m) } func (*BinlogTransaction_Statement) ProtoMessage() {} func (*BinlogTransaction_Statement) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{1, 0} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{1, 0} } func (m *BinlogTransaction_Statement) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BinlogTransaction_Statement.Unmarshal(m, b) @@ -352,7 +376,7 @@ func (m *StreamKeyRangeRequest) Reset() { *m = StreamKeyRangeRequest{} } func (m *StreamKeyRangeRequest) String() string { return proto.CompactTextString(m) } func (*StreamKeyRangeRequest) ProtoMessage() {} func (*StreamKeyRangeRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{2} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{2} } func (m *StreamKeyRangeRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamKeyRangeRequest.Unmarshal(m, b) @@ -405,7 +429,7 @@ func (m *StreamKeyRangeResponse) Reset() { *m = StreamKeyRangeResponse{} func (m *StreamKeyRangeResponse) String() string { return proto.CompactTextString(m) } func (*StreamKeyRangeResponse) ProtoMessage() {} func (*StreamKeyRangeResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{3} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{3} } func (m *StreamKeyRangeResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamKeyRangeResponse.Unmarshal(m, b) @@ -449,7 +473,7 @@ func (m *StreamTablesRequest) Reset() { *m = StreamTablesRequest{} } func (m *StreamTablesRequest) String() string { return proto.CompactTextString(m) } func (*StreamTablesRequest) ProtoMessage() {} func (*StreamTablesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{4} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{4} } func (m *StreamTablesRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamTablesRequest.Unmarshal(m, b) @@ -502,7 +526,7 @@ func (m *StreamTablesResponse) Reset() { *m = StreamTablesResponse{} } func (m *StreamTablesResponse) String() string { return proto.CompactTextString(m) } func (*StreamTablesResponse) ProtoMessage() {} func (*StreamTablesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{5} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{5} } func (m *StreamTablesResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamTablesResponse.Unmarshal(m, b) @@ -547,7 +571,7 @@ func (m *Rule) Reset() { *m = Rule{} } func (m *Rule) String() string { return proto.CompactTextString(m) } func (*Rule) ProtoMessage() {} func (*Rule) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{6} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{6} } func (m *Rule) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Rule.Unmarshal(m, b) @@ -594,7 +618,7 @@ func (m *Filter) Reset() { *m = Filter{} } func (m *Filter) String() string { return proto.CompactTextString(m) } func (*Filter) ProtoMessage() {} func (*Filter) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{7} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{7} } func (m *Filter) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Filter.Unmarshal(m, b) @@ -649,7 +673,7 @@ func (m *BinlogSource) Reset() { *m = BinlogSource{} } func (m *BinlogSource) String() string { return proto.CompactTextString(m) } func (*BinlogSource) ProtoMessage() {} func (*BinlogSource) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{8} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{8} } func (m *BinlogSource) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BinlogSource.Unmarshal(m, b) @@ -731,7 +755,7 @@ func (m *RowChange) Reset() { *m = RowChange{} } func (m *RowChange) String() string { return proto.CompactTextString(m) } func (*RowChange) ProtoMessage() {} func (*RowChange) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{9} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{9} } func (m *RowChange) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RowChange.Unmarshal(m, b) @@ -778,7 +802,7 @@ func (m *RowEvent) Reset() { *m = RowEvent{} } func (m *RowEvent) String() string { return proto.CompactTextString(m) } func (*RowEvent) ProtoMessage() {} func (*RowEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{10} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{10} } func (m *RowEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RowEvent.Unmarshal(m, b) @@ -824,7 +848,7 @@ func (m *FieldEvent) Reset() { *m = FieldEvent{} } func (m *FieldEvent) String() string { return proto.CompactTextString(m) } func (*FieldEvent) ProtoMessage() {} func (*FieldEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{11} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{11} } func (m *FieldEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FieldEvent.Unmarshal(m, b) @@ -871,7 +895,7 @@ func (m *ShardGtid) Reset() { *m = ShardGtid{} } func (m *ShardGtid) String() string { return proto.CompactTextString(m) } func (*ShardGtid) ProtoMessage() {} func (*ShardGtid) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{12} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{12} } func (m *ShardGtid) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ShardGtid.Unmarshal(m, b) @@ -923,7 +947,7 @@ func (m *VGtid) Reset() { *m = VGtid{} } func (m *VGtid) String() string { return proto.CompactTextString(m) } func (*VGtid) ProtoMessage() {} func (*VGtid) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{13} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{13} } func (m *VGtid) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VGtid.Unmarshal(m, b) @@ -962,7 +986,7 @@ func (m *KeyspaceShard) Reset() { *m = KeyspaceShard{} } func (m *KeyspaceShard) String() string { return proto.CompactTextString(m) } func (*KeyspaceShard) ProtoMessage() {} func (*KeyspaceShard) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{14} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{14} } func (m *KeyspaceShard) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_KeyspaceShard.Unmarshal(m, b) @@ -998,10 +1022,12 @@ func (m *KeyspaceShard) GetShard() string { type Journal struct { Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - Tables []string `protobuf:"bytes,2,rep,name=tables,proto3" json:"tables,omitempty"` - LocalPosition string `protobuf:"bytes,3,opt,name=local_position,json=localPosition,proto3" json:"local_position,omitempty"` - ShardGtids []*ShardGtid `protobuf:"bytes,4,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"` - Participants []*KeyspaceShard `protobuf:"bytes,5,rep,name=participants,proto3" json:"participants,omitempty"` + MigrationType MigrationType `protobuf:"varint,2,opt,name=migration_type,json=migrationType,proto3,enum=binlogdata.MigrationType" json:"migration_type,omitempty"` + Tables []string `protobuf:"bytes,3,rep,name=tables,proto3" json:"tables,omitempty"` + LocalPosition string `protobuf:"bytes,4,opt,name=local_position,json=localPosition,proto3" json:"local_position,omitempty"` + ShardGtids []*ShardGtid `protobuf:"bytes,5,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"` + Participants []*KeyspaceShard `protobuf:"bytes,6,rep,name=participants,proto3" json:"participants,omitempty"` + ReversedIds []int64 `protobuf:"varint,7,rep,packed,name=reversed_ids,json=reversedIds,proto3" json:"reversed_ids,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -1011,7 +1037,7 @@ func (m *Journal) Reset() { *m = Journal{} } func (m *Journal) String() string { return proto.CompactTextString(m) } func (*Journal) ProtoMessage() {} func (*Journal) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{15} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{15} } func (m *Journal) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Journal.Unmarshal(m, b) @@ -1038,6 +1064,13 @@ func (m *Journal) GetId() int64 { return 0 } +func (m *Journal) GetMigrationType() MigrationType { + if m != nil { + return m.MigrationType + } + return MigrationType_TABLES +} + func (m *Journal) GetTables() []string { if m != nil { return m.Tables @@ -1066,6 +1099,13 @@ func (m *Journal) GetParticipants() []*KeyspaceShard { return nil } +func (m *Journal) GetReversedIds() []int64 { + if m != nil { + return m.ReversedIds + } + return nil +} + // VEvent represents a vstream event type VEvent struct { Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"` @@ -1087,7 +1127,7 @@ func (m *VEvent) Reset() { *m = VEvent{} } func (m *VEvent) String() string { return proto.CompactTextString(m) } func (*VEvent) ProtoMessage() {} func (*VEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{16} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{16} } func (m *VEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VEvent.Unmarshal(m, b) @@ -1186,7 +1226,7 @@ func (m *VStreamRequest) Reset() { *m = VStreamRequest{} } func (m *VStreamRequest) String() string { return proto.CompactTextString(m) } func (*VStreamRequest) ProtoMessage() {} func (*VStreamRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{17} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{17} } func (m *VStreamRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamRequest.Unmarshal(m, b) @@ -1253,7 +1293,7 @@ func (m *VStreamResponse) Reset() { *m = VStreamResponse{} } func (m *VStreamResponse) String() string { return proto.CompactTextString(m) } func (*VStreamResponse) ProtoMessage() {} func (*VStreamResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{18} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{18} } func (m *VStreamResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamResponse.Unmarshal(m, b) @@ -1296,7 +1336,7 @@ func (m *VStreamRowsRequest) Reset() { *m = VStreamRowsRequest{} } func (m *VStreamRowsRequest) String() string { return proto.CompactTextString(m) } func (*VStreamRowsRequest) ProtoMessage() {} func (*VStreamRowsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{19} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{19} } func (m *VStreamRowsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamRowsRequest.Unmarshal(m, b) @@ -1367,7 +1407,7 @@ func (m *VStreamRowsResponse) Reset() { *m = VStreamRowsResponse{} } func (m *VStreamRowsResponse) String() string { return proto.CompactTextString(m) } func (*VStreamRowsResponse) ProtoMessage() {} func (*VStreamRowsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_1f081d4c0b940318, []int{20} + return fileDescriptor_binlogdata_db2d20dd0016de21, []int{20} } func (m *VStreamRowsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamRowsResponse.Unmarshal(m, b) @@ -1447,104 +1487,110 @@ func init() { proto.RegisterType((*VStreamRowsResponse)(nil), "binlogdata.VStreamRowsResponse") proto.RegisterEnum("binlogdata.OnDDLAction", OnDDLAction_name, OnDDLAction_value) proto.RegisterEnum("binlogdata.VEventType", VEventType_name, VEventType_value) + proto.RegisterEnum("binlogdata.MigrationType", MigrationType_name, MigrationType_value) proto.RegisterEnum("binlogdata.BinlogTransaction_Statement_Category", BinlogTransaction_Statement_Category_name, BinlogTransaction_Statement_Category_value) } -func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_1f081d4c0b940318) } - -var fileDescriptor_binlogdata_1f081d4c0b940318 = []byte{ - // 1484 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x57, 0xcb, 0x72, 0xe3, 0x54, - 0x13, 0x1e, 0xdb, 0x92, 0x2f, 0xad, 0x5c, 0x94, 0x93, 0xcb, 0xef, 0x3f, 0xc5, 0x50, 0x41, 0xc5, - 0x30, 0x21, 0x55, 0x38, 0x83, 0x81, 0x61, 0x35, 0x4c, 0xf9, 0xa2, 0xc9, 0x38, 0x51, 0xec, 0xcc, - 0x89, 0x92, 0xa1, 0x66, 0xa3, 0x52, 0xa4, 0x93, 0x44, 0x44, 0x96, 0x3c, 0xd2, 0x71, 0x42, 0x1e, - 0x80, 0xe2, 0x01, 0xd8, 0xf2, 0x02, 0x3c, 0x04, 0x5b, 0xb6, 0x14, 0x4f, 0xc0, 0x8a, 0xf7, 0xa0, - 0xce, 0x45, 0xb2, 0x9d, 0xb9, 0x65, 0xa8, 0x62, 0xc1, 0xc6, 0xd5, 0xa7, 0x4f, 0x77, 0xab, 0xfb, - 0xeb, 0x3e, 0xdd, 0x6d, 0xd0, 0x4f, 0x82, 0x28, 0x8c, 0xcf, 0x7c, 0x97, 0xba, 0x8d, 0x51, 0x12, - 0xd3, 0x18, 0xc1, 0x84, 0xb3, 0xae, 0x5d, 0xd2, 0x64, 0xe4, 0x89, 0x8b, 0x75, 0xed, 0xe5, 0x98, - 0x24, 0xd7, 0xf2, 0xb0, 0x40, 0xe3, 0x51, 0x3c, 0xd1, 0x32, 0xf6, 0xa1, 0xd2, 0x39, 0x77, 0x93, - 0x94, 0x50, 0xb4, 0x06, 0x65, 0x2f, 0x0c, 0x48, 0x44, 0xeb, 0x85, 0x8d, 0xc2, 0xa6, 0x8a, 0xe5, - 0x09, 0x21, 0x50, 0xbc, 0x38, 0x8a, 0xea, 0x45, 0xce, 0xe5, 0x34, 0x93, 0x4d, 0x49, 0x72, 0x49, - 0x92, 0x7a, 0x49, 0xc8, 0x8a, 0x93, 0xf1, 0x57, 0x09, 0x96, 0xda, 0xdc, 0x0f, 0x3b, 0x71, 0xa3, - 0xd4, 0xf5, 0x68, 0x10, 0x47, 0x68, 0x07, 0x20, 0xa5, 0x2e, 0x25, 0x43, 0x12, 0xd1, 0xb4, 0x5e, - 0xd8, 0x28, 0x6d, 0x6a, 0xcd, 0xfb, 0x8d, 0xa9, 0x08, 0x5e, 0x51, 0x69, 0x1c, 0x66, 0xf2, 0x78, - 0x4a, 0x15, 0x35, 0x41, 0x23, 0x97, 0x24, 0xa2, 0x0e, 0x8d, 0x2f, 0x48, 0x54, 0x57, 0x36, 0x0a, - 0x9b, 0x5a, 0x73, 0xa9, 0x21, 0x02, 0x34, 0xd9, 0x8d, 0xcd, 0x2e, 0x30, 0x90, 0x9c, 0x5e, 0xff, - 0xad, 0x08, 0xb5, 0xdc, 0x1a, 0xb2, 0xa0, 0xea, 0xb9, 0x94, 0x9c, 0xc5, 0xc9, 0x35, 0x0f, 0x73, - 0xa1, 0xf9, 0xe0, 0x96, 0x8e, 0x34, 0x3a, 0x52, 0x0f, 0xe7, 0x16, 0xd0, 0x67, 0x50, 0xf1, 0x04, - 0x7a, 0x1c, 0x1d, 0xad, 0xb9, 0x3c, 0x6d, 0x4c, 0x02, 0x8b, 0x33, 0x19, 0xa4, 0x43, 0x29, 0x7d, - 0x19, 0x72, 0xc8, 0xe6, 0x30, 0x23, 0x8d, 0x5f, 0x0a, 0x50, 0xcd, 0xec, 0xa2, 0x65, 0x58, 0x6c, - 0x5b, 0xce, 0x51, 0x1f, 0x9b, 0x9d, 0xc1, 0x4e, 0xbf, 0xf7, 0xc2, 0xec, 0xea, 0x77, 0xd0, 0x1c, - 0x54, 0xdb, 0x96, 0xd3, 0x36, 0x77, 0x7a, 0x7d, 0xbd, 0x80, 0xe6, 0xa1, 0xd6, 0xb6, 0x9c, 0xce, - 0x60, 0x7f, 0xbf, 0x67, 0xeb, 0x45, 0xb4, 0x08, 0x5a, 0xdb, 0x72, 0xf0, 0xc0, 0xb2, 0xda, 0xad, - 0xce, 0x9e, 0x5e, 0x42, 0xab, 0xb0, 0xd4, 0xb6, 0x9c, 0xee, 0xbe, 0xe5, 0x74, 0xcd, 0x03, 0x6c, - 0x76, 0x5a, 0xb6, 0xd9, 0xd5, 0x15, 0x04, 0x50, 0x66, 0xec, 0xae, 0xa5, 0xab, 0x92, 0x3e, 0x34, - 0x6d, 0xbd, 0x2c, 0xcd, 0xf5, 0xfa, 0x87, 0x26, 0xb6, 0xf5, 0x8a, 0x3c, 0x1e, 0x1d, 0x74, 0x5b, - 0xb6, 0xa9, 0x57, 0xe5, 0xb1, 0x6b, 0x5a, 0xa6, 0x6d, 0xea, 0xb5, 0x5d, 0xa5, 0x5a, 0xd4, 0x4b, - 0xbb, 0x4a, 0xb5, 0xa4, 0x2b, 0xc6, 0x4f, 0x05, 0x58, 0x3d, 0xa4, 0x09, 0x71, 0x87, 0x7b, 0xe4, - 0x1a, 0xbb, 0xd1, 0x19, 0xc1, 0xe4, 0xe5, 0x98, 0xa4, 0x14, 0xad, 0x43, 0x75, 0x14, 0xa7, 0x01, - 0xc3, 0x8e, 0x03, 0x5c, 0xc3, 0xf9, 0x19, 0x6d, 0x43, 0xed, 0x82, 0x5c, 0x3b, 0x09, 0x93, 0x97, - 0x80, 0xa1, 0x46, 0x5e, 0x90, 0xb9, 0xa5, 0xea, 0x85, 0xa4, 0xa6, 0xf1, 0x2d, 0xbd, 0x1b, 0x5f, - 0xe3, 0x14, 0xd6, 0x6e, 0x3a, 0x95, 0x8e, 0xe2, 0x28, 0x25, 0xc8, 0x02, 0x24, 0x14, 0x1d, 0x3a, - 0xc9, 0x2d, 0xf7, 0x4f, 0x6b, 0xde, 0x7d, 0x6b, 0x01, 0xe0, 0xa5, 0x93, 0x9b, 0x2c, 0xe3, 0x7b, - 0x58, 0x16, 0xdf, 0xb1, 0xdd, 0x93, 0x90, 0xa4, 0xb7, 0x09, 0x7d, 0x0d, 0xca, 0x94, 0x0b, 0xd7, - 0x8b, 0x1b, 0xa5, 0xcd, 0x1a, 0x96, 0xa7, 0xf7, 0x8d, 0xd0, 0x87, 0x95, 0xd9, 0x2f, 0xff, 0x2b, - 0xf1, 0x7d, 0x09, 0x0a, 0x1e, 0x87, 0x04, 0xad, 0x80, 0x3a, 0x74, 0xa9, 0x77, 0x2e, 0xa3, 0x11, - 0x07, 0x16, 0xca, 0x69, 0x10, 0x52, 0x92, 0xf0, 0x14, 0xd6, 0xb0, 0x3c, 0x19, 0x0f, 0xa0, 0xfc, - 0x84, 0x53, 0xe8, 0x13, 0x50, 0x93, 0x31, 0x8b, 0x55, 0x3c, 0x75, 0x7d, 0xda, 0x01, 0x66, 0x18, - 0x8b, 0x6b, 0xe3, 0xe7, 0x22, 0xcc, 0x09, 0x87, 0x0e, 0xe3, 0x71, 0xe2, 0x11, 0x86, 0xe0, 0x05, - 0xb9, 0x4e, 0x47, 0xae, 0x47, 0x32, 0x04, 0xb3, 0x33, 0x73, 0x26, 0x3d, 0x77, 0x13, 0x5f, 0x7e, - 0x55, 0x1c, 0xd0, 0x57, 0xa0, 0x71, 0x24, 0xa9, 0x43, 0xaf, 0x47, 0x84, 0x63, 0xb8, 0xd0, 0x5c, - 0x99, 0x14, 0x15, 0xc7, 0x89, 0xda, 0xd7, 0x23, 0x82, 0x81, 0xe6, 0xf4, 0x6c, 0x25, 0x2a, 0xb7, - 0xa8, 0xc4, 0x49, 0xfe, 0xd4, 0x99, 0xfc, 0x6d, 0xe5, 0x60, 0x94, 0xa5, 0x95, 0xa9, 0x58, 0x05, - 0x1c, 0x19, 0x40, 0xa8, 0x01, 0xe5, 0x38, 0x72, 0x7c, 0x3f, 0xac, 0x57, 0xb8, 0x9b, 0xff, 0x9b, - 0x96, 0x1d, 0x44, 0xdd, 0xae, 0xd5, 0x12, 0x29, 0x51, 0xe3, 0xa8, 0xeb, 0x87, 0xc6, 0x33, 0xa8, - 0xe1, 0xf8, 0xaa, 0x73, 0xce, 0x1d, 0x30, 0xa0, 0x7c, 0x42, 0x4e, 0xe3, 0x84, 0xc8, 0xac, 0x82, - 0xec, 0x7a, 0x38, 0xbe, 0xc2, 0xf2, 0x06, 0x6d, 0x80, 0xea, 0x9e, 0x66, 0x89, 0x99, 0x15, 0x11, - 0x17, 0x86, 0x0b, 0x55, 0x1c, 0x5f, 0xf1, 0x4e, 0x89, 0xee, 0x82, 0x40, 0xc4, 0x89, 0xdc, 0x61, - 0x06, 0x77, 0x8d, 0x73, 0xfa, 0xee, 0x90, 0xa0, 0x87, 0xa0, 0x25, 0xf1, 0x95, 0xe3, 0xf1, 0xcf, - 0x8b, 0xb2, 0xd5, 0x9a, 0xab, 0x33, 0xa9, 0xcc, 0x9c, 0xc3, 0x90, 0x64, 0x64, 0x6a, 0x3c, 0x03, - 0x78, 0x12, 0x90, 0xd0, 0xbf, 0xd5, 0x47, 0x3e, 0x66, 0xf0, 0x91, 0xd0, 0xcf, 0xec, 0xcf, 0x49, - 0x97, 0xb9, 0x05, 0x2c, 0xef, 0x18, 0x10, 0x87, 0x2c, 0xdb, 0x3b, 0x34, 0xf0, 0xff, 0x41, 0x8d, - 0x20, 0x50, 0xce, 0x68, 0xe0, 0xf3, 0xe2, 0xa8, 0x61, 0x4e, 0x1b, 0x8f, 0x41, 0x3d, 0xe6, 0xe6, - 0x1e, 0x82, 0xc6, 0xa5, 0x1c, 0xc6, 0xce, 0x2a, 0x76, 0x26, 0xcc, 0xfc, 0xd3, 0x18, 0xd2, 0x8c, - 0x4c, 0x8d, 0x16, 0xcc, 0xef, 0xc9, 0xcf, 0x72, 0x81, 0xf7, 0xf7, 0xcb, 0xf8, 0xbd, 0x00, 0x95, - 0xdd, 0x78, 0x9c, 0x44, 0x6e, 0x88, 0x16, 0xa0, 0x18, 0xf8, 0x5c, 0xaf, 0x84, 0x8b, 0x81, 0xff, - 0xc6, 0x7e, 0x71, 0x0f, 0x16, 0xc2, 0xd8, 0x73, 0x43, 0x27, 0xef, 0x34, 0x22, 0xaa, 0x79, 0xce, - 0x3d, 0xc8, 0xda, 0xcd, 0x8d, 0xa8, 0x94, 0x5b, 0x46, 0x85, 0x1e, 0xc1, 0xdc, 0xc8, 0x4d, 0x68, - 0xe0, 0x05, 0x23, 0x97, 0xcd, 0x6a, 0x95, 0x2b, 0xfe, 0x7f, 0x5a, 0x71, 0x26, 0x6a, 0x3c, 0x23, - 0x6e, 0xfc, 0x59, 0x84, 0xf2, 0xb1, 0x48, 0xfc, 0x16, 0x28, 0xfc, 0x45, 0x8a, 0x21, 0xbb, 0x36, - 0x6d, 0x41, 0x48, 0xf0, 0x37, 0xc9, 0x65, 0xd0, 0x07, 0x50, 0xa3, 0xc1, 0x90, 0xa4, 0xd4, 0x1d, - 0x8e, 0x38, 0x44, 0x25, 0x3c, 0x61, 0xbc, 0x2e, 0x7d, 0x6c, 0x92, 0xb2, 0x77, 0xa4, 0x70, 0x16, - 0x23, 0xd1, 0xe7, 0x50, 0x63, 0xe5, 0xca, 0x07, 0x7f, 0x5d, 0xe5, 0xf5, 0xbf, 0x72, 0xa3, 0x58, - 0xf9, 0x67, 0x71, 0x35, 0xc9, 0x1e, 0xc0, 0xd7, 0xa0, 0xf1, 0x02, 0x93, 0x4a, 0xe2, 0x01, 0xaf, - 0xcd, 0x3e, 0xe0, 0xac, 0x90, 0x31, 0x9c, 0x4e, 0x8a, 0xfa, 0x3e, 0xa8, 0x97, 0xdc, 0xa5, 0x8a, - 0x5c, 0x40, 0xa6, 0x83, 0xe3, 0x98, 0x8a, 0x7b, 0xd6, 0xdd, 0xbf, 0x13, 0x09, 0xae, 0x57, 0x5f, - 0xed, 0xee, 0x32, 0xf7, 0x38, 0x93, 0x41, 0x1f, 0xc1, 0x9c, 0x37, 0x4e, 0x12, 0xbe, 0xe0, 0x04, - 0x43, 0x52, 0x5f, 0xe1, 0x50, 0x68, 0x92, 0x67, 0x07, 0x43, 0x62, 0xfc, 0x58, 0x84, 0x85, 0x63, - 0x31, 0x02, 0xb2, 0xb1, 0xf3, 0x18, 0x96, 0xc9, 0xe9, 0x29, 0xf1, 0x68, 0x70, 0x49, 0x1c, 0xcf, - 0x0d, 0x43, 0x92, 0x38, 0xb2, 0x96, 0xb4, 0xe6, 0x62, 0x43, 0xac, 0x82, 0x1d, 0xce, 0xef, 0x75, - 0xf1, 0x52, 0x2e, 0x2b, 0x59, 0x3e, 0x32, 0x61, 0x39, 0x18, 0x0e, 0x89, 0x1f, 0xb8, 0x74, 0xda, - 0x80, 0x68, 0x22, 0xab, 0xf2, 0x45, 0x1e, 0xdb, 0x3b, 0x2e, 0x25, 0x13, 0x33, 0xb9, 0x46, 0x6e, - 0xe6, 0x1e, 0x2b, 0xd9, 0xe4, 0x2c, 0x9f, 0x64, 0xf3, 0x52, 0xd3, 0xe6, 0x4c, 0x2c, 0x2f, 0x67, - 0xa6, 0xa4, 0x72, 0x63, 0x4a, 0x4e, 0xba, 0xa9, 0xfa, 0xae, 0x6e, 0x6a, 0x3c, 0x82, 0xc5, 0x1c, - 0x08, 0x39, 0x05, 0xb7, 0xa0, 0xcc, 0x53, 0x99, 0x3d, 0x63, 0xf4, 0x6a, 0xd5, 0x61, 0x29, 0x61, - 0xfc, 0x50, 0x04, 0x94, 0xe9, 0xc7, 0x57, 0xe9, 0x7f, 0x14, 0xcc, 0x15, 0x50, 0x39, 0x5f, 0x22, - 0x29, 0x0e, 0x0c, 0x87, 0xd0, 0x4d, 0xe9, 0xe8, 0x22, 0x87, 0x51, 0x28, 0x3f, 0x63, 0xbf, 0x98, - 0xa4, 0xe3, 0x90, 0x62, 0x29, 0x61, 0xfc, 0x5a, 0x80, 0xe5, 0x19, 0x1c, 0x24, 0x96, 0x93, 0xce, - 0x5c, 0x78, 0x73, 0x67, 0x46, 0x9b, 0x50, 0x1d, 0x5d, 0xbc, 0xa5, 0x83, 0xe7, 0xb7, 0xaf, 0x7d, - 0xc5, 0x1f, 0x82, 0x92, 0xc4, 0x57, 0x59, 0x7b, 0x9a, 0x1e, 0x57, 0x9c, 0xcf, 0x66, 0xde, 0x4c, - 0x1c, 0x33, 0x33, 0x4f, 0xdc, 0x6c, 0x7d, 0x03, 0xda, 0xd4, 0xe8, 0x64, 0xdb, 0x6d, 0x6f, 0xa7, - 0x3f, 0xc0, 0xa6, 0x7e, 0x07, 0x55, 0x41, 0x39, 0xb4, 0x07, 0x07, 0x7a, 0x81, 0x51, 0xe6, 0xb7, - 0x66, 0x47, 0x6c, 0xcc, 0x8c, 0x72, 0xa4, 0x50, 0x69, 0xeb, 0x8f, 0x02, 0xc0, 0xa4, 0x21, 0x21, - 0x0d, 0x2a, 0x47, 0xfd, 0xbd, 0xfe, 0xe0, 0x79, 0x5f, 0x18, 0xd8, 0xb1, 0x7b, 0x5d, 0xbd, 0x80, - 0x6a, 0xa0, 0x8a, 0x15, 0xbc, 0xc8, 0xbe, 0x20, 0xf7, 0xef, 0x12, 0x5b, 0xce, 0xf3, 0xe5, 0x5b, - 0x41, 0x15, 0x28, 0xe5, 0x2b, 0xb6, 0xdc, 0xa9, 0xcb, 0xcc, 0x20, 0x36, 0x0f, 0xac, 0x56, 0xc7, - 0xd4, 0x2b, 0xec, 0x22, 0xdf, 0xae, 0x01, 0xca, 0xd9, 0x6a, 0xcd, 0x34, 0xd9, 0x42, 0x0e, 0xec, - 0x3b, 0x03, 0xfb, 0xa9, 0x89, 0x75, 0x8d, 0xf1, 0xf0, 0xe0, 0xb9, 0x3e, 0xc7, 0x78, 0x4f, 0x7a, - 0xa6, 0xd5, 0xd5, 0xe7, 0xd9, 0x46, 0xfe, 0xd4, 0x6c, 0x61, 0xbb, 0x6d, 0xb6, 0x6c, 0x7d, 0x81, - 0xdd, 0x1c, 0x73, 0x07, 0x17, 0xd9, 0x67, 0x76, 0x07, 0x47, 0xb8, 0xdf, 0xb2, 0x74, 0xbd, 0xfd, - 0xe9, 0x8b, 0xfb, 0x97, 0x01, 0x25, 0x69, 0xda, 0x08, 0xe2, 0x6d, 0x41, 0x6d, 0x9f, 0xc5, 0xdb, - 0x97, 0x74, 0x9b, 0xff, 0xe9, 0xdb, 0x9e, 0xbc, 0x8a, 0x93, 0x32, 0xe7, 0x7c, 0xf1, 0x77, 0x00, - 0x00, 0x00, 0xff, 0xff, 0x0c, 0x14, 0xe6, 0xb2, 0x50, 0x0e, 0x00, 0x00, +func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_db2d20dd0016de21) } + +var fileDescriptor_binlogdata_db2d20dd0016de21 = []byte{ + // 1558 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x57, 0xcb, 0x72, 0xdb, 0xca, + 0x11, 0x35, 0x09, 0xf0, 0xd5, 0x90, 0x28, 0x68, 0xf4, 0x08, 0xa3, 0x8a, 0x53, 0x0a, 0x2a, 0x8e, + 0x14, 0x55, 0x85, 0x72, 0x98, 0xc4, 0x59, 0x39, 0x0e, 0x1f, 0xb0, 0x4c, 0x09, 0x22, 0xe5, 0x21, + 0x24, 0xa7, 0xbc, 0x41, 0x41, 0xc4, 0x48, 0x42, 0x04, 0x02, 0x34, 0x30, 0xa4, 0xa2, 0x0f, 0x48, + 0xe5, 0x03, 0xb2, 0xcd, 0x0f, 0x64, 0x9f, 0x6d, 0xb6, 0xd9, 0xe7, 0x0b, 0xb2, 0xca, 0x7f, 0xdc, + 0x9a, 0x07, 0x40, 0x42, 0xf6, 0xb5, 0xe5, 0x5b, 0x75, 0x17, 0x77, 0xc3, 0xea, 0xe9, 0xe9, 0xe7, + 0x41, 0x4f, 0x77, 0x13, 0xf4, 0x4b, 0x3f, 0x0c, 0xa2, 0x6b, 0xcf, 0xa5, 0x6e, 0x73, 0x1a, 0x47, + 0x34, 0x42, 0xb0, 0xe0, 0xec, 0x68, 0x73, 0x1a, 0x4f, 0xc7, 0xe2, 0x62, 0x47, 0xfb, 0x30, 0x23, + 0xf1, 0xbd, 0x3c, 0xd4, 0x69, 0x34, 0x8d, 0x16, 0x5a, 0xc6, 0x29, 0x54, 0xba, 0x37, 0x6e, 0x9c, + 0x10, 0x8a, 0xb6, 0xa1, 0x3c, 0x0e, 0x7c, 0x12, 0xd2, 0x46, 0x61, 0xb7, 0xb0, 0x5f, 0xc2, 0xf2, + 0x84, 0x10, 0xa8, 0xe3, 0x28, 0x0c, 0x1b, 0x45, 0xce, 0xe5, 0x34, 0x93, 0x4d, 0x48, 0x3c, 0x27, + 0x71, 0x43, 0x11, 0xb2, 0xe2, 0x64, 0xfc, 0x5f, 0x81, 0xf5, 0x0e, 0x8f, 0xc3, 0x8e, 0xdd, 0x30, + 0x71, 0xc7, 0xd4, 0x8f, 0x42, 0x74, 0x04, 0x90, 0x50, 0x97, 0x92, 0x09, 0x09, 0x69, 0xd2, 0x28, + 0xec, 0x2a, 0xfb, 0x5a, 0x6b, 0xaf, 0xb9, 0x94, 0xc1, 0x47, 0x2a, 0xcd, 0x51, 0x2a, 0x8f, 0x97, + 0x54, 0x51, 0x0b, 0x34, 0x32, 0x27, 0x21, 0x75, 0x68, 0x74, 0x4b, 0xc2, 0x86, 0xba, 0x5b, 0xd8, + 0xd7, 0x5a, 0xeb, 0x4d, 0x91, 0xa0, 0xc9, 0x6e, 0x6c, 0x76, 0x81, 0x81, 0x64, 0xf4, 0xce, 0x7f, + 0x8a, 0x50, 0xcb, 0xac, 0x21, 0x0b, 0xaa, 0x63, 0x97, 0x92, 0xeb, 0x28, 0xbe, 0xe7, 0x69, 0xd6, + 0x5b, 0xcf, 0x1f, 0x19, 0x48, 0xb3, 0x2b, 0xf5, 0x70, 0x66, 0x01, 0xfd, 0x0a, 0x2a, 0x63, 0x81, + 0x1e, 0x47, 0x47, 0x6b, 0x6d, 0x2c, 0x1b, 0x93, 0xc0, 0xe2, 0x54, 0x06, 0xe9, 0xa0, 0x24, 0x1f, + 0x02, 0x0e, 0xd9, 0x0a, 0x66, 0xa4, 0xf1, 0xcf, 0x02, 0x54, 0x53, 0xbb, 0x68, 0x03, 0xd6, 0x3a, + 0x96, 0x73, 0x3e, 0xc0, 0x66, 0x77, 0x78, 0x34, 0xe8, 0xbf, 0x37, 0x7b, 0xfa, 0x13, 0xb4, 0x02, + 0xd5, 0x8e, 0xe5, 0x74, 0xcc, 0xa3, 0xfe, 0x40, 0x2f, 0xa0, 0x55, 0xa8, 0x75, 0x2c, 0xa7, 0x3b, + 0x3c, 0x3d, 0xed, 0xdb, 0x7a, 0x11, 0xad, 0x81, 0xd6, 0xb1, 0x1c, 0x3c, 0xb4, 0xac, 0x4e, 0xbb, + 0x7b, 0xa2, 0x2b, 0x68, 0x0b, 0xd6, 0x3b, 0x96, 0xd3, 0x3b, 0xb5, 0x9c, 0x9e, 0x79, 0x86, 0xcd, + 0x6e, 0xdb, 0x36, 0x7b, 0xba, 0x8a, 0x00, 0xca, 0x8c, 0xdd, 0xb3, 0xf4, 0x92, 0xa4, 0x47, 0xa6, + 0xad, 0x97, 0xa5, 0xb9, 0xfe, 0x60, 0x64, 0x62, 0x5b, 0xaf, 0xc8, 0xe3, 0xf9, 0x59, 0xaf, 0x6d, + 0x9b, 0x7a, 0x55, 0x1e, 0x7b, 0xa6, 0x65, 0xda, 0xa6, 0x5e, 0x3b, 0x56, 0xab, 0x45, 0x5d, 0x39, + 0x56, 0xab, 0x8a, 0xae, 0x1a, 0x7f, 0x2f, 0xc0, 0xd6, 0x88, 0xc6, 0xc4, 0x9d, 0x9c, 0x90, 0x7b, + 0xec, 0x86, 0xd7, 0x04, 0x93, 0x0f, 0x33, 0x92, 0x50, 0xb4, 0x03, 0xd5, 0x69, 0x94, 0xf8, 0x0c, + 0x3b, 0x0e, 0x70, 0x0d, 0x67, 0x67, 0x74, 0x08, 0xb5, 0x5b, 0x72, 0xef, 0xc4, 0x4c, 0x5e, 0x02, + 0x86, 0x9a, 0x59, 0x41, 0x66, 0x96, 0xaa, 0xb7, 0x92, 0x5a, 0xc6, 0x57, 0xf9, 0x32, 0xbe, 0xc6, + 0x15, 0x6c, 0x3f, 0x0c, 0x2a, 0x99, 0x46, 0x61, 0x42, 0x90, 0x05, 0x48, 0x28, 0x3a, 0x74, 0xf1, + 0x6d, 0x79, 0x7c, 0x5a, 0xeb, 0xe9, 0x67, 0x0b, 0x00, 0xaf, 0x5f, 0x3e, 0x64, 0x19, 0x7f, 0x81, + 0x0d, 0xe1, 0xc7, 0x76, 0x2f, 0x03, 0x92, 0x3c, 0x26, 0xf5, 0x6d, 0x28, 0x53, 0x2e, 0xdc, 0x28, + 0xee, 0x2a, 0xfb, 0x35, 0x2c, 0x4f, 0x5f, 0x9b, 0xa1, 0x07, 0x9b, 0x79, 0xcf, 0xdf, 0x4b, 0x7e, + 0xbf, 0x05, 0x15, 0xcf, 0x02, 0x82, 0x36, 0xa1, 0x34, 0x71, 0xe9, 0xf8, 0x46, 0x66, 0x23, 0x0e, + 0x2c, 0x95, 0x2b, 0x3f, 0xa0, 0x24, 0xe6, 0x9f, 0xb0, 0x86, 0xe5, 0xc9, 0x78, 0x0e, 0xe5, 0xd7, + 0x9c, 0x42, 0xbf, 0x80, 0x52, 0x3c, 0x63, 0xb9, 0x8a, 0xa7, 0xae, 0x2f, 0x07, 0xc0, 0x0c, 0x63, + 0x71, 0x6d, 0xfc, 0xa3, 0x08, 0x2b, 0x22, 0xa0, 0x51, 0x34, 0x8b, 0xc7, 0x84, 0x21, 0x78, 0x4b, + 0xee, 0x93, 0xa9, 0x3b, 0x26, 0x29, 0x82, 0xe9, 0x99, 0x05, 0x93, 0xdc, 0xb8, 0xb1, 0x27, 0xbd, + 0x8a, 0x03, 0xfa, 0x1d, 0x68, 0x1c, 0x49, 0xea, 0xd0, 0xfb, 0x29, 0xe1, 0x18, 0xd6, 0x5b, 0x9b, + 0x8b, 0xa2, 0xe2, 0x38, 0x51, 0xfb, 0x7e, 0x4a, 0x30, 0xd0, 0x8c, 0xce, 0x57, 0xa2, 0xfa, 0x88, + 0x4a, 0x5c, 0x7c, 0xbf, 0x52, 0xee, 0xfb, 0x1d, 0x64, 0x60, 0x94, 0xa5, 0x95, 0xa5, 0x5c, 0x05, + 0x1c, 0x29, 0x40, 0xa8, 0x09, 0xe5, 0x28, 0x74, 0x3c, 0x2f, 0x68, 0x54, 0x78, 0x98, 0x3f, 0x5a, + 0x96, 0x1d, 0x86, 0xbd, 0x9e, 0xd5, 0x16, 0x9f, 0xa4, 0x14, 0x85, 0x3d, 0x2f, 0x30, 0xde, 0x42, + 0x0d, 0x47, 0x77, 0xdd, 0x1b, 0x1e, 0x80, 0x01, 0xe5, 0x4b, 0x72, 0x15, 0xc5, 0x44, 0x7e, 0x55, + 0x90, 0x5d, 0x0f, 0x47, 0x77, 0x58, 0xde, 0xa0, 0x5d, 0x28, 0xb9, 0x57, 0xe9, 0x87, 0xc9, 0x8b, + 0x88, 0x0b, 0xc3, 0x85, 0x2a, 0x8e, 0xee, 0x78, 0xa7, 0x44, 0x4f, 0x41, 0x20, 0xe2, 0x84, 0xee, + 0x24, 0x85, 0xbb, 0xc6, 0x39, 0x03, 0x77, 0x42, 0xd0, 0x0b, 0xd0, 0xe2, 0xe8, 0xce, 0x19, 0x73, + 0xf7, 0xa2, 0x6c, 0xb5, 0xd6, 0x56, 0xee, 0x53, 0xa6, 0xc1, 0x61, 0x88, 0x53, 0x32, 0x31, 0xde, + 0x02, 0xbc, 0xf6, 0x49, 0xe0, 0x3d, 0xca, 0xc9, 0xcf, 0x19, 0x7c, 0x24, 0xf0, 0x52, 0xfb, 0x2b, + 0x32, 0x64, 0x6e, 0x01, 0xcb, 0x3b, 0x06, 0xc4, 0x88, 0x7d, 0xed, 0x23, 0xea, 0x7b, 0xdf, 0xa1, + 0x46, 0x10, 0xa8, 0xd7, 0xd4, 0xf7, 0x78, 0x71, 0xd4, 0x30, 0xa7, 0x8d, 0x57, 0x50, 0xba, 0xe0, + 0xe6, 0x5e, 0x80, 0xc6, 0xa5, 0x1c, 0xc6, 0x4e, 0x2b, 0x36, 0x97, 0x66, 0xe6, 0x1a, 0x43, 0x92, + 0x92, 0x89, 0xd1, 0x86, 0xd5, 0x13, 0xe9, 0x96, 0x0b, 0x7c, 0x7d, 0x5c, 0xc6, 0xbf, 0x8a, 0x50, + 0x39, 0x8e, 0x66, 0x71, 0xe8, 0x06, 0xa8, 0x0e, 0x45, 0xdf, 0xe3, 0x7a, 0x0a, 0x2e, 0xfa, 0x1e, + 0xfa, 0x23, 0xd4, 0x27, 0xfe, 0x75, 0xec, 0xb2, 0x7a, 0x10, 0xa5, 0x5d, 0xe4, 0x35, 0xf3, 0xe3, + 0xe5, 0xc8, 0x4e, 0x53, 0x09, 0x5e, 0xdf, 0xab, 0x93, 0xe5, 0xe3, 0x52, 0xc5, 0x2a, 0xb9, 0x8a, + 0x7d, 0x06, 0xf5, 0x20, 0x1a, 0xbb, 0x81, 0x93, 0xf5, 0x2a, 0x95, 0x07, 0xb5, 0xca, 0xb9, 0x67, + 0x69, 0xc3, 0x7a, 0x80, 0x4b, 0xe9, 0x91, 0xb8, 0xa0, 0x97, 0xb0, 0x32, 0x75, 0x63, 0xea, 0x8f, + 0xfd, 0xa9, 0xcb, 0xa6, 0x7d, 0x99, 0x2b, 0xe6, 0xc2, 0xce, 0xe1, 0x86, 0x73, 0xe2, 0xe8, 0x67, + 0xb0, 0x12, 0x93, 0x39, 0x89, 0x13, 0xe2, 0x39, 0xcc, 0x6f, 0x65, 0x57, 0xd9, 0x57, 0xb0, 0x96, + 0xf2, 0xfa, 0x5e, 0x62, 0xfc, 0xaf, 0x08, 0xe5, 0x0b, 0x51, 0x5d, 0x07, 0xa0, 0x72, 0x6c, 0xc4, + 0x24, 0xdf, 0x5e, 0x76, 0x22, 0x24, 0x38, 0x30, 0x5c, 0x06, 0xfd, 0x04, 0x6a, 0xd4, 0x9f, 0x90, + 0x84, 0xba, 0x93, 0x29, 0x07, 0x53, 0xc1, 0x0b, 0xc6, 0xa7, 0x6a, 0x84, 0x8d, 0x6b, 0xf6, 0x58, + 0x05, 0x3c, 0x8c, 0x44, 0xbf, 0x86, 0x1a, 0x7b, 0x13, 0x7c, 0xbb, 0x68, 0x94, 0xf8, 0x23, 0xdb, + 0x7c, 0xf0, 0x22, 0xb8, 0x5b, 0x5c, 0x8d, 0xd3, 0x57, 0xf6, 0x7b, 0xd0, 0x78, 0x15, 0x4b, 0x25, + 0xd1, 0x25, 0xb6, 0xf3, 0x5d, 0x22, 0x7d, 0x2d, 0x18, 0xae, 0x16, 0x2f, 0x67, 0x0f, 0x4a, 0x73, + 0x1e, 0x52, 0x45, 0x6e, 0x39, 0xcb, 0xc9, 0x71, 0xd8, 0xc5, 0x3d, 0x1b, 0x21, 0x7f, 0x16, 0x55, + 0xd4, 0xa8, 0x7e, 0x3c, 0x42, 0x64, 0x81, 0xe1, 0x54, 0x86, 0x21, 0x3c, 0x9e, 0xc5, 0x31, 0xdf, + 0xa2, 0xfc, 0x09, 0x69, 0x6c, 0x72, 0x28, 0x34, 0xc9, 0xb3, 0xfd, 0x09, 0x31, 0xfe, 0x56, 0x84, + 0xfa, 0x85, 0x98, 0x33, 0xe9, 0x6c, 0x7b, 0x05, 0x1b, 0xe4, 0xea, 0x8a, 0x8c, 0xa9, 0x3f, 0x27, + 0xce, 0xd8, 0x0d, 0x02, 0x12, 0x3b, 0xb2, 0x60, 0xb5, 0xd6, 0x5a, 0x53, 0xec, 0x9b, 0x5d, 0xce, + 0xef, 0xf7, 0xf0, 0x7a, 0x26, 0x2b, 0x59, 0x1e, 0x32, 0x61, 0xc3, 0x9f, 0x4c, 0x88, 0xe7, 0xbb, + 0x74, 0xd9, 0x80, 0xe8, 0x54, 0x5b, 0xf2, 0xd9, 0x5f, 0xd8, 0x47, 0x2e, 0x25, 0x0b, 0x33, 0x99, + 0x46, 0x66, 0xe6, 0x19, 0xab, 0xea, 0xf8, 0x3a, 0x1b, 0x97, 0xab, 0x52, 0xd3, 0xe6, 0x4c, 0x2c, + 0x2f, 0x73, 0xa3, 0x58, 0x7d, 0x30, 0x8a, 0x17, 0x2d, 0xbb, 0xf4, 0xa5, 0x96, 0x6d, 0xbc, 0x84, + 0xb5, 0x0c, 0x08, 0x39, 0x6a, 0x0f, 0xa0, 0xcc, 0x3f, 0x65, 0xda, 0x2b, 0xd0, 0xc7, 0x55, 0x87, + 0xa5, 0x84, 0xf1, 0xd7, 0x22, 0xa0, 0x54, 0x3f, 0xba, 0x4b, 0x7e, 0xa0, 0x60, 0x6e, 0x42, 0x89, + 0xf3, 0x25, 0x92, 0xe2, 0xc0, 0x70, 0x08, 0xdc, 0x84, 0x4e, 0x6f, 0x33, 0x18, 0x85, 0xf2, 0x5b, + 0xf6, 0x8b, 0x49, 0x32, 0x0b, 0x28, 0x96, 0x12, 0xc6, 0xbf, 0x0b, 0xb0, 0x91, 0xc3, 0x41, 0x62, + 0xb9, 0x68, 0xff, 0x85, 0x6f, 0x6f, 0xff, 0x68, 0x1f, 0xaa, 0xd3, 0xdb, 0xcf, 0x8c, 0x89, 0xec, + 0xf6, 0x93, 0xaf, 0xf8, 0xa7, 0xa0, 0xc6, 0xd1, 0x5d, 0xd2, 0x50, 0xb9, 0xe6, 0xf2, 0x4c, 0xe4, + 0x7c, 0x36, 0x58, 0x73, 0x79, 0xe4, 0x06, 0xab, 0xb8, 0x39, 0xf8, 0x03, 0x68, 0x4b, 0xf3, 0x99, + 0xad, 0xd0, 0xfd, 0xa3, 0xc1, 0x10, 0x9b, 0xfa, 0x13, 0x54, 0x05, 0x75, 0x64, 0x0f, 0xcf, 0xf4, + 0x02, 0xa3, 0xcc, 0x3f, 0x99, 0x5d, 0xb1, 0x96, 0x33, 0xca, 0x91, 0x42, 0xca, 0xc1, 0x7f, 0x0b, + 0x00, 0x8b, 0x86, 0x84, 0x34, 0xa8, 0x9c, 0x0f, 0x4e, 0x06, 0xc3, 0x77, 0x03, 0x61, 0xe0, 0xc8, + 0xee, 0xf7, 0xf4, 0x02, 0xaa, 0x41, 0x49, 0xec, 0xf9, 0x45, 0xe6, 0x41, 0x2e, 0xf9, 0x0a, 0xfb, + 0x07, 0x90, 0x6d, 0xf8, 0x2a, 0xaa, 0x80, 0x92, 0xed, 0xf1, 0x72, 0x71, 0x2f, 0x33, 0x83, 0xd8, + 0x3c, 0xb3, 0xda, 0x5d, 0x53, 0xaf, 0xb0, 0x8b, 0x6c, 0x85, 0x07, 0x28, 0xa7, 0xfb, 0x3b, 0xd3, + 0x64, 0x5b, 0x3f, 0x30, 0x3f, 0x43, 0xfb, 0x8d, 0x89, 0x75, 0x8d, 0xf1, 0xf0, 0xf0, 0x9d, 0xbe, + 0xc2, 0x78, 0xaf, 0xfb, 0xa6, 0xd5, 0xd3, 0x57, 0xd9, 0xda, 0xff, 0xc6, 0x6c, 0x63, 0xbb, 0x63, + 0xb6, 0x6d, 0xbd, 0xce, 0x6e, 0x2e, 0x78, 0x80, 0x6b, 0xcc, 0xcd, 0xf1, 0xf0, 0x1c, 0x0f, 0xda, + 0x96, 0xae, 0x1f, 0xec, 0xc1, 0x6a, 0x6e, 0xfe, 0x30, 0x5f, 0x76, 0xbb, 0x63, 0x99, 0x23, 0xfd, + 0x09, 0xa3, 0x47, 0x6f, 0xda, 0xb8, 0x37, 0xd2, 0x0b, 0x9d, 0x5f, 0xbe, 0xdf, 0x9b, 0xfb, 0x94, + 0x24, 0x49, 0xd3, 0x8f, 0x0e, 0x05, 0x75, 0x78, 0x1d, 0x1d, 0xce, 0xe9, 0x21, 0xff, 0x0b, 0x7a, + 0xb8, 0x78, 0x3e, 0x97, 0x65, 0xce, 0xf9, 0xcd, 0x37, 0x01, 0x00, 0x00, 0xff, 0xff, 0x2e, 0xb4, + 0x72, 0xde, 0xde, 0x0e, 0x00, 0x00, } diff --git a/go/vt/wrangler/migrater.go b/go/vt/wrangler/migrater.go index 58759a1bef5..7e4c7a71c2e 100644 --- a/go/vt/wrangler/migrater.go +++ b/go/vt/wrangler/migrater.go @@ -42,15 +42,6 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" ) -// MigrationType specifies the type of migration. -type MigrationType int - -// The following constants define the migration type. -const ( - MigrateTables = MigrationType(iota) - MigrateShards -) - type migrateDirection int const ( @@ -66,7 +57,7 @@ const ( ) type migrater struct { - migrationType MigrationType + migrationType binlogdatapb.MigrationType wr *Wrangler id int64 sources map[topo.KeyspaceShard]*miSource @@ -91,7 +82,7 @@ type miSource struct { } // MigrateReads is a generic way of migrating read traffic for a resharding workflow. -func (wr *Wrangler) MigrateReads(ctx context.Context, migrationType MigrationType, streams map[topo.KeyspaceShard][]uint32, cells []string, servedType topodatapb.TabletType, direction migrateDirection) error { +func (wr *Wrangler) MigrateReads(ctx context.Context, migrationType binlogdatapb.MigrationType, streams map[topo.KeyspaceShard][]uint32, cells []string, servedType topodatapb.TabletType, direction migrateDirection) error { if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY { return fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType) } @@ -109,14 +100,14 @@ func (wr *Wrangler) MigrateReads(ctx context.Context, migrationType MigrationTyp } defer unlock(&err) - if mi.migrationType == MigrateTables { + if mi.migrationType == binlogdatapb.MigrationType_TABLES { return mi.migrateTableReads(ctx, cells, servedType, direction) } return mi.migrateShardReads(ctx, cells, servedType, direction) } // MigrateWrites is a generic way of migrating write traffic for a resharding workflow. -func (wr *Wrangler) MigrateWrites(ctx context.Context, migrationType MigrationType, streams map[topo.KeyspaceShard][]uint32, filteredReplicationWaitTime time.Duration) error { +func (wr *Wrangler) MigrateWrites(ctx context.Context, migrationType binlogdatapb.MigrationType, streams map[topo.KeyspaceShard][]uint32, filteredReplicationWaitTime time.Duration) error { mi, err := wr.buildMigrater(ctx, migrationType, streams) if err != nil { return err @@ -177,7 +168,7 @@ func (wr *Wrangler) MigrateWrites(ctx context.Context, migrationType MigrationTy return nil } -func (wr *Wrangler) buildMigrater(ctx context.Context, migrationType MigrationType, streams map[topo.KeyspaceShard][]uint32) (*migrater, error) { +func (wr *Wrangler) buildMigrater(ctx context.Context, migrationType binlogdatapb.MigrationType, streams map[topo.KeyspaceShard][]uint32) (*migrater, error) { mi := &migrater{ migrationType: migrationType, wr: wr, @@ -185,6 +176,7 @@ func (wr *Wrangler) buildMigrater(ctx context.Context, migrationType MigrationTy targets: make(map[topo.KeyspaceShard]*miTarget), sources: make(map[topo.KeyspaceShard]*miSource), } + mi.wr.Logger().Infof("Migration ID for streams %v: %d", streams, mi.id) for targetks, uids := range streams { targetShard, err := mi.wr.ts.GetShard(ctx, targetks.Keyspace, targetks.Shard) if err != nil { @@ -293,7 +285,7 @@ func (mi *migrater) validate(ctx context.Context) error { uniqueSources[sourceks] = uid } } - if mi.migrationType == MigrateTables { + if mi.migrationType == binlogdatapb.MigrationType_TABLES { // All shards must be present. if err := mi.compareShards(ctx, mi.sourceKeyspace, mi.sourceShards()); err != nil { return err @@ -307,7 +299,7 @@ func (mi *migrater) validate(ctx context.Context) error { return fmt.Errorf("cannot migrate streams with wild card table names: %v", table) } } - } else { // MigrateShards + } else { // binlogdatapb.MigrationType_SHARDS // Source and target keyspace must match if mi.sourceKeyspace != mi.targetKeyspace { return fmt.Errorf("source and target keyspace must match: %v vs %v", mi.sourceKeyspace, mi.targetKeyspace) @@ -323,7 +315,7 @@ func (mi *migrater) validate(ctx context.Context) error { } func (mi *migrater) validateForWrite(ctx context.Context) error { - if mi.migrationType == MigrateTables { + if mi.migrationType == binlogdatapb.MigrationType_TABLES { return mi.validateTableForWrite(ctx) } return mi.validateShardForWrite(ctx) @@ -456,7 +448,7 @@ func (mi *migrater) checkJournals(ctx context.Context) (journalsExist bool, err func (mi *migrater) stopSourceWrites(ctx context.Context) error { var err error - if mi.migrationType == MigrateTables { + if mi.migrationType == binlogdatapb.MigrationType_TABLES { err = mi.changeTableSourceWrites(ctx, disallowWrites) } else { err = mi.changeShardsAccess(ctx, mi.sourceKeyspace, mi.sourceShards(), disallowWrites) @@ -511,7 +503,7 @@ func (mi *migrater) waitForCatchup(ctx context.Context, filteredReplicationWaitT func (mi *migrater) cancelMigration(ctx context.Context) { var err error - if mi.migrationType == MigrateTables { + if mi.migrationType == binlogdatapb.MigrationType_TABLES { err = mi.changeTableSourceWrites(ctx, allowWrites) } else { err = mi.changeShardsAccess(ctx, mi.sourceKeyspace, mi.sourceShards(), allowWrites) @@ -554,6 +546,7 @@ func (mi *migrater) createJournals(ctx context.Context) error { } journal := &binlogdatapb.Journal{ Id: mi.id, + MigrationType: mi.migrationType, Tables: mi.tables, LocalPosition: source.position, } @@ -644,7 +637,7 @@ func (mi *migrater) createReverseReplication(ctx context.Context) error { } func (mi *migrater) allowTargetWrites(ctx context.Context) error { - if mi.migrationType == MigrateTables { + if mi.migrationType == binlogdatapb.MigrationType_TABLES { return mi.allowTableTargetWrites(ctx) } return mi.changeShardsAccess(ctx, mi.targetKeyspace, mi.targetShards(), allowWrites) @@ -662,7 +655,7 @@ func (mi *migrater) allowTableTargetWrites(ctx context.Context) error { } func (mi *migrater) changeRouting(ctx context.Context) error { - if mi.migrationType == MigrateTables { + if mi.migrationType == binlogdatapb.MigrationType_TABLES { return mi.changeTableRouting(ctx) } return mi.changeShardRouting(ctx) diff --git a/go/vt/wrangler/migrater_test.go b/go/vt/wrangler/migrater_test.go index 891d58bfb1a..f7edc690af1 100644 --- a/go/vt/wrangler/migrater_test.go +++ b/go/vt/wrangler/migrater_test.go @@ -40,7 +40,7 @@ func TestTableMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Single cell RDONLY migration. - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, []string{"cell1"}, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, []string{"cell1"}, topodatapb.TabletType_RDONLY, directionForward) if err != nil { t.Fatal(err) } @@ -69,7 +69,7 @@ func TestTableMigrate(t *testing.T) { // The global routing already contains redirections for rdonly. // So, adding routes for replica and deploying to cell2 will also cause // cell2 to migrat rdonly. This is a quirk that can be fixed later if necessary. - err = tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, []string{"cell2"}, topodatapb.TabletType_REPLICA, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, []string{"cell2"}, topodatapb.TabletType_REPLICA, directionForward) if err != nil { t.Fatal(err) } @@ -107,7 +107,7 @@ func TestTableMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Single cell backward REPLICA migration. - err = tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, []string{"cell2"}, topodatapb.TabletType_REPLICA, directionBackward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, []string{"cell2"}, topodatapb.TabletType_REPLICA, directionBackward) if err != nil { t.Fatal(err) } @@ -127,7 +127,7 @@ func TestTableMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Migrate all REPLICA. - err = tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) if err != nil { t.Fatal(err) } @@ -153,7 +153,7 @@ func TestTableMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // All cells RDONLY backward migration. - err = tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionBackward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionBackward) if err != nil { t.Fatal(err) } @@ -173,7 +173,7 @@ func TestTableMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Can't migrate master with MigrateReads. - err = tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_MASTER, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_MASTER, directionForward) want := "tablet type must be REPLICA or RDONLY: MASTER" if err == nil || err.Error() != want { t.Errorf("MigrateReads(master) err: %v, want %v", err, want) @@ -182,7 +182,7 @@ func TestTableMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Can't migrate writes if REPLICA and RDONLY have not fully migrated yet. - err = tme.wr.MigrateWrites(ctx, MigrateTables, tme.streams, 1*time.Second) + err = tme.wr.MigrateWrites(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, 1*time.Second) want = "missing tablet type specific routing, read-only traffic must be migrated before migrating writes" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites err: %v, want %v", err, want) @@ -193,7 +193,7 @@ func TestTableMigrate(t *testing.T) { // Test MigrateWrites cancelation on failure. // Migrate all the reads first. - err = tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) if err != nil { t.Fatal(err) } @@ -248,7 +248,7 @@ func TestTableMigrate(t *testing.T) { tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil) - err = tme.wr.MigrateWrites(ctx, MigrateTables, tme.streams, 0*time.Second) + err = tme.wr.MigrateWrites(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, 0*time.Second) want = "DeadlineExceeded" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites(0 timeout) err: %v, must contain %v", err, want) @@ -298,7 +298,7 @@ func TestTableMigrate(t *testing.T) { tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) - err = tme.wr.MigrateWrites(ctx, MigrateTables, tme.streams, 1*time.Second) + err = tme.wr.MigrateWrites(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, 1*time.Second) if err != nil { t.Fatal(err) } @@ -332,7 +332,7 @@ func TestShardMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Single cell RDONLY migration. - err := tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, []string{"cell1"}, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, []string{"cell1"}, topodatapb.TabletType_RDONLY, directionForward) if err != nil { t.Fatal(err) } @@ -348,7 +348,7 @@ func TestShardMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Other cell REPLICA migration. - err = tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, []string{"cell2"}, topodatapb.TabletType_REPLICA, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, []string{"cell2"}, topodatapb.TabletType_REPLICA, directionForward) if err != nil { t.Fatal(err) } @@ -364,7 +364,7 @@ func TestShardMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Single cell backward REPLICA migration. - err = tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, []string{"cell2"}, topodatapb.TabletType_REPLICA, directionBackward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, []string{"cell2"}, topodatapb.TabletType_REPLICA, directionBackward) if err != nil { t.Fatal(err) } @@ -383,7 +383,7 @@ func TestShardMigrate(t *testing.T) { // This is an extra step that does not exist in the tables test. // The per-cell migration mechanism is different for tables. So, this // extra step is needed to bring things in sync. - err = tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) if err != nil { t.Fatal(err) } @@ -395,7 +395,7 @@ func TestShardMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Migrate all REPLICA. - err = tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) if err != nil { t.Fatal(err) } @@ -407,7 +407,7 @@ func TestShardMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // All cells RDONLY backward migration. - err = tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, nil, topodatapb.TabletType_RDONLY, directionBackward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, nil, topodatapb.TabletType_RDONLY, directionBackward) if err != nil { t.Fatal(err) } @@ -419,7 +419,7 @@ func TestShardMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Can't migrate master with MigrateReads. - err = tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, nil, topodatapb.TabletType_MASTER, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, nil, topodatapb.TabletType_MASTER, directionForward) want := "tablet type must be REPLICA or RDONLY: MASTER" if err == nil || err.Error() != want { t.Errorf("MigrateReads(master) err: %v, want %v", err, want) @@ -428,7 +428,7 @@ func TestShardMigrate(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Can't migrate writes if REPLICA and RDONLY have not fully migrated yet. - err = tme.wr.MigrateWrites(ctx, MigrateShards, tme.streams, 1*time.Second) + err = tme.wr.MigrateWrites(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, 1*time.Second) want = "cannot migrate MASTER away" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites err: %v, want %v", err, want) @@ -439,7 +439,7 @@ func TestShardMigrate(t *testing.T) { // Test MigrateWrites cancelation on failure. // Migrate all the reads first. - err = tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) if err != nil { t.Fatal(err) } @@ -484,7 +484,7 @@ func TestShardMigrate(t *testing.T) { tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil) - err = tme.wr.MigrateWrites(ctx, MigrateShards, tme.streams, 0*time.Second) + err = tme.wr.MigrateWrites(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, 0*time.Second) want = "DeadlineExceeded" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites(0 timeout) err: %v, must contain %v", err, want) @@ -502,9 +502,9 @@ func TestShardMigrate(t *testing.T) { // Test successful MigrateWrites. // Create journals. - journal1 := "insert into _vt.resharding_journal.*8372031610433464572.*local_position.*MariaDB/5-456-892.*shard_gtids.*-80.*MariaDB/5-456-893.*participants.*40.*40" + journal1 := "insert into _vt.resharding_journal.*8372031610433464572.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*-80.*MariaDB/5-456-893.*participants.*40.*40" tme.dbSource1Client.addQueryRE(journal1, &sqltypes.Result{}, nil) - journal2 := "insert into _vt.resharding_journal.*8372031610433464572.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*shard_gtids.*80.*MariaDB/5-456-893.*participants.*40.*40" + journal2 := "insert into _vt.resharding_journal.*8372031610433464572.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*shard_gtids.*80.*MariaDB/5-456-893.*participants.*40.*40" tme.dbSource2Client.addQueryRE(journal2, &sqltypes.Result{}, nil) // Create backward replicaions. @@ -520,7 +520,7 @@ func TestShardMigrate(t *testing.T) { tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) - err = tme.wr.MigrateWrites(ctx, MigrateShards, tme.streams, 1*time.Second) + err = tme.wr.MigrateWrites(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, 1*time.Second) if err != nil { t.Fatal(err) } @@ -545,11 +545,11 @@ func TestMigrateFailJournal(t *testing.T) { tme := newTestTableMigrater(ctx, t) defer tme.stopTablets(t) - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) if err != nil { t.Fatal(err) } - err = tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) if err != nil { t.Fatal(err) } @@ -590,7 +590,7 @@ func TestMigrateFailJournal(t *testing.T) { tme.dbSource1Client.addQueryRE("insert into _vt.resharding_journal", nil, errors.New("journaling intentionally failed")) tme.dbSource2Client.addQueryRE("insert into _vt.resharding_journal", nil, errors.New("journaling intentionally failed")) - err = tme.wr.MigrateWrites(ctx, MigrateTables, tme.streams, 1*time.Second) + err = tme.wr.MigrateWrites(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, 1*time.Second) want := "journaling intentionally failed" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites(0 timeout) err: %v, must contain %v", err, want) @@ -613,11 +613,11 @@ func TestTableMigrateJournalExists(t *testing.T) { tme := newTestTableMigrater(ctx, t) defer tme.stopTablets(t) - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) if err != nil { t.Fatal(err) } - err = tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) if err != nil { t.Fatal(err) } @@ -648,7 +648,7 @@ func TestTableMigrateJournalExists(t *testing.T) { tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) - err = tme.wr.MigrateWrites(ctx, MigrateTables, tme.streams, 1*time.Second) + err = tme.wr.MigrateWrites(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, 1*time.Second) if err != nil { t.Fatal(err) } @@ -675,11 +675,11 @@ func TestShardMigrateJournalExists(t *testing.T) { tme := newTestShardMigrater(ctx, t) defer tme.stopTablets(t) - err := tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) if err != nil { t.Fatal(err) } - err = tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) + err = tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, nil, topodatapb.TabletType_REPLICA, directionForward) if err != nil { t.Fatal(err) } @@ -689,7 +689,7 @@ func TestShardMigrateJournalExists(t *testing.T) { tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 8372031610433464572", &sqltypes.Result{}, nil) // Create the missing journal. - journal2 := "insert into _vt.resharding_journal.*8372031610433464572.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*shard_gtids.*80.*MariaDB/5-456-893.*participants.*40.*40" + journal2 := "insert into _vt.resharding_journal.*8372031610433464572.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*shard_gtids.*80.*MariaDB/5-456-893.*participants.*40.*40" tme.dbSource2Client.addQueryRE(journal2, &sqltypes.Result{}, nil) // Create backward replicaions. @@ -710,7 +710,7 @@ func TestShardMigrateJournalExists(t *testing.T) { tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) - err = tme.wr.MigrateWrites(ctx, MigrateShards, tme.streams, 1*time.Second) + err = tme.wr.MigrateWrites(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, 1*time.Second) if err != nil { t.Fatal(err) } @@ -753,7 +753,7 @@ func TestMigrateDistinctTargets(t *testing.T) { ), nil) tme.streams[topo.KeyspaceShard{Keyspace: "ks1", Shard: "-40"}] = []uint32{1} - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) want := "target keyspaces are mismatched across streams" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateReads: %v, must contain %v", err, want) @@ -784,7 +784,7 @@ func TestMigrateDistinctSources(t *testing.T) { fmt.Sprintf("%v", bls), ), nil) - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) want := "source keyspaces are mismatched across streams" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateReads: %v, must contain %v", err, want) @@ -798,7 +798,7 @@ func TestMigrateVReplicationStreamNotFound(t *testing.T) { tme.dbDest1Client.addQuery("select source from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) want := "VReplication stream 1 not found for ks2:-80" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateReads: %v, must contain %v", err, want) @@ -826,7 +826,7 @@ func TestMigrateMismatchedTables(t *testing.T) { fmt.Sprintf("%v", bls), ), nil) - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) want := "table lists are mismatched across streams" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateReads: %v, must contain %v", err, want) @@ -857,7 +857,7 @@ func TestMigrateDupUidSources(t *testing.T) { fmt.Sprintf("%v", bls), ), nil) - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) want := "duplicate sources for uids" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateReads: %v, must contain %v", err, want) @@ -873,7 +873,7 @@ func TestTableMigrateAllShardsNotPresent(t *testing.T) { {Keyspace: "ks2", Shard: "-80"}: {1, 2}, } - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) want := "mismatched shards for keyspace" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateReads: %v, must contain %v", err, want) @@ -931,7 +931,7 @@ func TestMigrateNoTableWildcards(t *testing.T) { fmt.Sprintf("%v", bls3), ), nil) - err := tme.wr.MigrateReads(ctx, MigrateTables, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_TABLES, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) want := "cannot migrate streams with wild card table names" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateReads: %v, must contain %v", err, want) @@ -943,7 +943,7 @@ func TestShardMigrateSourceTargetMismatch(t *testing.T) { tme := newTestTableMigrater(ctx, t) defer tme.stopTablets(t) - err := tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) want := "source and target keyspace must match" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateReads: %v, must contain %v", err, want) @@ -973,7 +973,7 @@ func TestShardMigrateTargetMatchesSource(t *testing.T) { tme.streams[topo.KeyspaceShard{Keyspace: "ks", Shard: "-40"}] = []uint32{1} - err := tme.wr.MigrateReads(ctx, MigrateShards, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) + err := tme.wr.MigrateReads(ctx, binlogdatapb.MigrationType_SHARDS, tme.streams, nil, topodatapb.TabletType_RDONLY, directionForward) want := "target shard matches a source shard" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateReads: %v, must contain %v", err, want) diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index dd4de0f3935..b28dbc5356f 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -222,12 +222,20 @@ message KeyspaceShard { string shard = 2; } +// MigrationType specifies the type of migration for the Journal. +enum MigrationType { + TABLES = 0; + SHARDS = 1; +} + message Journal { int64 id = 1; - repeated string tables = 2; - string local_position = 3; - repeated ShardGtid shard_gtids = 4; - repeated KeyspaceShard participants = 5; + MigrationType migration_type = 2; + repeated string tables = 3; + string local_position = 4; + repeated ShardGtid shard_gtids = 5; + repeated KeyspaceShard participants = 6; + repeated int64 reversed_ids = 7; } // VEvent represents a vstream event diff --git a/py/vtproto/binlogdata_pb2.py b/py/vtproto/binlogdata_pb2.py index dcac54576b0..53eda7da5b1 100644 --- a/py/vtproto/binlogdata_pb2.py +++ b/py/vtproto/binlogdata_pb2.py @@ -23,7 +23,7 @@ package='binlogdata', syntax='proto3', serialized_options=_b('Z\'vitess.io/vitess/go/vt/proto/binlogdata'), - serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bvtrpc.proto\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xde\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\x12\'\n\x06on_ddl\x18\x07 \x01(\x0e\x32\x17.binlogdata.OnDDLAction\"B\n\tRowChange\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"J\n\x08RowEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12*\n\x0brow_changes\x18\x02 \x03(\x0b\x32\x15.binlogdata.RowChange\">\n\nFieldEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12\x1c\n\x06\x66ields\x18\x02 \x03(\x0b\x32\x0c.query.Field\":\n\tShardGtid\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12\x0c\n\x04gtid\x18\x03 \x01(\t\"3\n\x05VGtid\x12*\n\x0bshard_gtids\x18\x01 \x03(\x0b\x32\x15.binlogdata.ShardGtid\"0\n\rKeyspaceShard\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\"\x9a\x01\n\x07Journal\x12\n\n\x02id\x18\x01 \x01(\x03\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12\x16\n\x0elocal_position\x18\x03 \x01(\t\x12*\n\x0bshard_gtids\x18\x04 \x03(\x0b\x32\x15.binlogdata.ShardGtid\x12/\n\x0cparticipants\x18\x05 \x03(\x0b\x32\x19.binlogdata.KeyspaceShard\"\x90\x02\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x11\n\ttimestamp\x18\x02 \x01(\x03\x12\x0c\n\x04gtid\x18\x03 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x04 \x01(\t\x12\'\n\trow_event\x18\x05 \x01(\x0b\x32\x14.binlogdata.RowEvent\x12+\n\x0b\x66ield_event\x18\x06 \x01(\x0b\x32\x16.binlogdata.FieldEvent\x12 \n\x05vgtid\x18\x07 \x01(\x0b\x32\x11.binlogdata.VGtid\x12$\n\x07journal\x18\x08 \x01(\x0b\x32\x13.binlogdata.Journal\x12\x14\n\x0c\x63urrent_time\x18\x14 \x01(\x03\"\xc7\x01\n\x0eVStreamRequest\x12,\n\x13\x65\x66\x66\x65\x63tive_caller_id\x18\x01 \x01(\x0b\x32\x0f.vtrpc.CallerID\x12\x32\n\x13immediate_caller_id\x18\x02 \x01(\x0b\x32\x15.query.VTGateCallerID\x12\x1d\n\x06target\x18\x03 \x01(\x0b\x32\r.query.Target\x12\x10\n\x08position\x18\x04 \x01(\t\x12\"\n\x06\x66ilter\x18\x05 \x01(\x0b\x32\x12.binlogdata.Filter\"5\n\x0fVStreamResponse\x12\"\n\x06\x65vents\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent\"\xc8\x01\n\x12VStreamRowsRequest\x12,\n\x13\x65\x66\x66\x65\x63tive_caller_id\x18\x01 \x01(\x0b\x32\x0f.vtrpc.CallerID\x12\x32\n\x13immediate_caller_id\x18\x02 \x01(\x0b\x32\x15.query.VTGateCallerID\x12\x1d\n\x06target\x18\x03 \x01(\x0b\x32\r.query.Target\x12\r\n\x05query\x18\x04 \x01(\t\x12\"\n\x06lastpk\x18\x05 \x01(\x0b\x32\x12.query.QueryResult\"\x97\x01\n\x13VStreamRowsResponse\x12\x1c\n\x06\x66ields\x18\x01 \x03(\x0b\x32\x0c.query.Field\x12\x1e\n\x08pkfields\x18\x02 \x03(\x0b\x32\x0c.query.Field\x12\x0c\n\x04gtid\x18\x03 \x01(\t\x12\x18\n\x04rows\x18\x04 \x03(\x0b\x32\n.query.Row\x12\x1a\n\x06lastpk\x18\x05 \x01(\x0b\x32\n.query.Row*>\n\x0bOnDDLAction\x12\n\n\x06IGNORE\x10\x00\x12\x08\n\x04STOP\x10\x01\x12\x08\n\x04\x45XEC\x10\x02\x12\x0f\n\x0b\x45XEC_IGNORE\x10\x03*\xd1\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\x0b\n\x07REPLACE\x10\x07\x12\n\n\x06UPDATE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\x07\n\x03SET\x10\n\x12\t\n\x05OTHER\x10\x0b\x12\x07\n\x03ROW\x10\x0c\x12\t\n\x05\x46IELD\x10\r\x12\r\n\tHEARTBEAT\x10\x0e\x12\t\n\x05VGTID\x10\x0f\x12\x0b\n\x07JOURNAL\x10\x10\x42)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3') + serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bvtrpc.proto\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xde\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\x12\'\n\x06on_ddl\x18\x07 \x01(\x0e\x32\x17.binlogdata.OnDDLAction\"B\n\tRowChange\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"J\n\x08RowEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12*\n\x0brow_changes\x18\x02 \x03(\x0b\x32\x15.binlogdata.RowChange\">\n\nFieldEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12\x1c\n\x06\x66ields\x18\x02 \x03(\x0b\x32\x0c.query.Field\":\n\tShardGtid\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12\x0c\n\x04gtid\x18\x03 \x01(\t\"3\n\x05VGtid\x12*\n\x0bshard_gtids\x18\x01 \x03(\x0b\x32\x15.binlogdata.ShardGtid\"0\n\rKeyspaceShard\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\"\xe3\x01\n\x07Journal\x12\n\n\x02id\x18\x01 \x01(\x03\x12\x31\n\x0emigration_type\x18\x02 \x01(\x0e\x32\x19.binlogdata.MigrationType\x12\x0e\n\x06tables\x18\x03 \x03(\t\x12\x16\n\x0elocal_position\x18\x04 \x01(\t\x12*\n\x0bshard_gtids\x18\x05 \x03(\x0b\x32\x15.binlogdata.ShardGtid\x12/\n\x0cparticipants\x18\x06 \x03(\x0b\x32\x19.binlogdata.KeyspaceShard\x12\x14\n\x0creversed_ids\x18\x07 \x03(\x03\"\x90\x02\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x11\n\ttimestamp\x18\x02 \x01(\x03\x12\x0c\n\x04gtid\x18\x03 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x04 \x01(\t\x12\'\n\trow_event\x18\x05 \x01(\x0b\x32\x14.binlogdata.RowEvent\x12+\n\x0b\x66ield_event\x18\x06 \x01(\x0b\x32\x16.binlogdata.FieldEvent\x12 \n\x05vgtid\x18\x07 \x01(\x0b\x32\x11.binlogdata.VGtid\x12$\n\x07journal\x18\x08 \x01(\x0b\x32\x13.binlogdata.Journal\x12\x14\n\x0c\x63urrent_time\x18\x14 \x01(\x03\"\xc7\x01\n\x0eVStreamRequest\x12,\n\x13\x65\x66\x66\x65\x63tive_caller_id\x18\x01 \x01(\x0b\x32\x0f.vtrpc.CallerID\x12\x32\n\x13immediate_caller_id\x18\x02 \x01(\x0b\x32\x15.query.VTGateCallerID\x12\x1d\n\x06target\x18\x03 \x01(\x0b\x32\r.query.Target\x12\x10\n\x08position\x18\x04 \x01(\t\x12\"\n\x06\x66ilter\x18\x05 \x01(\x0b\x32\x12.binlogdata.Filter\"5\n\x0fVStreamResponse\x12\"\n\x06\x65vents\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent\"\xc8\x01\n\x12VStreamRowsRequest\x12,\n\x13\x65\x66\x66\x65\x63tive_caller_id\x18\x01 \x01(\x0b\x32\x0f.vtrpc.CallerID\x12\x32\n\x13immediate_caller_id\x18\x02 \x01(\x0b\x32\x15.query.VTGateCallerID\x12\x1d\n\x06target\x18\x03 \x01(\x0b\x32\r.query.Target\x12\r\n\x05query\x18\x04 \x01(\t\x12\"\n\x06lastpk\x18\x05 \x01(\x0b\x32\x12.query.QueryResult\"\x97\x01\n\x13VStreamRowsResponse\x12\x1c\n\x06\x66ields\x18\x01 \x03(\x0b\x32\x0c.query.Field\x12\x1e\n\x08pkfields\x18\x02 \x03(\x0b\x32\x0c.query.Field\x12\x0c\n\x04gtid\x18\x03 \x01(\t\x12\x18\n\x04rows\x18\x04 \x03(\x0b\x32\n.query.Row\x12\x1a\n\x06lastpk\x18\x05 \x01(\x0b\x32\n.query.Row*>\n\x0bOnDDLAction\x12\n\n\x06IGNORE\x10\x00\x12\x08\n\x04STOP\x10\x01\x12\x08\n\x04\x45XEC\x10\x02\x12\x0f\n\x0b\x45XEC_IGNORE\x10\x03*\xd1\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\x0b\n\x07REPLACE\x10\x07\x12\n\n\x06UPDATE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\x07\n\x03SET\x10\n\x12\t\n\x05OTHER\x10\x0b\x12\x07\n\x03ROW\x10\x0c\x12\t\n\x05\x46IELD\x10\r\x12\r\n\tHEARTBEAT\x10\x0e\x12\t\n\x05VGTID\x10\x0f\x12\x0b\n\x07JOURNAL\x10\x10*\'\n\rMigrationType\x12\n\n\x06TABLES\x10\x00\x12\n\n\x06SHARDS\x10\x01\x42)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3') , dependencies=[vtrpc__pb2.DESCRIPTOR,query__pb2.DESCRIPTOR,topodata__pb2.DESCRIPTOR,]) @@ -52,8 +52,8 @@ ], containing_type=None, serialized_options=None, - serialized_start=2678, - serialized_end=2740, + serialized_start=2751, + serialized_end=2813, ) _sym_db.RegisterEnumDescriptor(_ONDDLACTION) @@ -135,12 +135,35 @@ ], containing_type=None, serialized_options=None, - serialized_start=2743, - serialized_end=2952, + serialized_start=2816, + serialized_end=3025, ) _sym_db.RegisterEnumDescriptor(_VEVENTTYPE) VEventType = enum_type_wrapper.EnumTypeWrapper(_VEVENTTYPE) +_MIGRATIONTYPE = _descriptor.EnumDescriptor( + name='MigrationType', + full_name='binlogdata.MigrationType', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='TABLES', index=0, number=0, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SHARDS', index=1, number=1, + serialized_options=None, + type=None), + ], + containing_type=None, + serialized_options=None, + serialized_start=3027, + serialized_end=3066, +) +_sym_db.RegisterEnumDescriptor(_MIGRATIONTYPE) + +MigrationType = enum_type_wrapper.EnumTypeWrapper(_MIGRATIONTYPE) IGNORE = 0 STOP = 1 EXEC = 2 @@ -162,6 +185,8 @@ HEARTBEAT = 14 VGTID = 15 JOURNAL = 16 +TABLES = 0 +SHARDS = 1 _BINLOGTRANSACTION_STATEMENT_CATEGORY = _descriptor.EnumDescriptor( @@ -884,29 +909,43 @@ is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='tables', full_name='binlogdata.Journal.tables', index=1, - number=2, type=9, cpp_type=9, label=3, + name='migration_type', full_name='binlogdata.Journal.migration_type', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='tables', full_name='binlogdata.Journal.tables', index=2, + number=3, type=9, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='local_position', full_name='binlogdata.Journal.local_position', index=2, - number=3, type=9, cpp_type=9, label=1, + name='local_position', full_name='binlogdata.Journal.local_position', index=3, + number=4, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='shard_gtids', full_name='binlogdata.Journal.shard_gtids', index=3, - number=4, type=11, cpp_type=10, label=3, + name='shard_gtids', full_name='binlogdata.Journal.shard_gtids', index=4, + number=5, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='participants', full_name='binlogdata.Journal.participants', index=4, - number=5, type=11, cpp_type=10, label=3, + name='participants', full_name='binlogdata.Journal.participants', index=5, + number=6, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='reversed_ids', full_name='binlogdata.Journal.reversed_ids', index=6, + number=7, type=3, cpp_type=2, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -924,7 +963,7 @@ oneofs=[ ], serialized_start=1633, - serialized_end=1787, + serialized_end=1860, ) @@ -1010,8 +1049,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1790, - serialized_end=2062, + serialized_start=1863, + serialized_end=2135, ) @@ -1069,8 +1108,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2065, - serialized_end=2264, + serialized_start=2138, + serialized_end=2337, ) @@ -1100,8 +1139,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2266, - serialized_end=2319, + serialized_start=2339, + serialized_end=2392, ) @@ -1159,8 +1198,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2322, - serialized_end=2522, + serialized_start=2395, + serialized_end=2595, ) @@ -1218,8 +1257,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2525, - serialized_end=2676, + serialized_start=2598, + serialized_end=2749, ) _BINLOGTRANSACTION_STATEMENT.fields_by_name['category'].enum_type = _BINLOGTRANSACTION_STATEMENT_CATEGORY @@ -1243,6 +1282,7 @@ _ROWEVENT.fields_by_name['row_changes'].message_type = _ROWCHANGE _FIELDEVENT.fields_by_name['fields'].message_type = query__pb2._FIELD _VGTID.fields_by_name['shard_gtids'].message_type = _SHARDGTID +_JOURNAL.fields_by_name['migration_type'].enum_type = _MIGRATIONTYPE _JOURNAL.fields_by_name['shard_gtids'].message_type = _SHARDGTID _JOURNAL.fields_by_name['participants'].message_type = _KEYSPACESHARD _VEVENT.fields_by_name['type'].enum_type = _VEVENTTYPE @@ -1286,6 +1326,7 @@ DESCRIPTOR.message_types_by_name['VStreamRowsResponse'] = _VSTREAMROWSRESPONSE DESCRIPTOR.enum_types_by_name['OnDDLAction'] = _ONDDLACTION DESCRIPTOR.enum_types_by_name['VEventType'] = _VEVENTTYPE +DESCRIPTOR.enum_types_by_name['MigrationType'] = _MIGRATIONTYPE _sym_db.RegisterFileDescriptor(DESCRIPTOR) Charset = _reflection.GeneratedProtocolMessageType('Charset', (_message.Message,), dict(