Skip to content

Commit

Permalink
kafka: adapt to gencode (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Jun 10, 2019
1 parent ca0845e commit b821b99
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 13 deletions.
16 changes: 12 additions & 4 deletions internal/client/driver/kafka3/kafka3.go
Expand Up @@ -173,20 +173,25 @@ func (kr *KafkaRunner) initiateStreaming() error {

_, err = kr.natsConn.Subscribe(fmt.Sprintf("%s_full", kr.subject), func(m *gonats.Msg) {
kr.logger.Debugf("kafka: recv a msg")
dumpData := &mysqlDriver.DumpEntry{}
if err := Decode(m.Data, dumpData); err != nil {
dumpData, err := mysqlDriver.DecodeDumpEntry(m.Data)
if err != nil {
kr.onError(TaskStateDead, err)
return
}

if dumpData.DbSQL != "" || len(dumpData.TbSQL) > 0 {
kr.logger.Debugf("kafka. a sql dumpEntry")
} else {
// TODO cache table
var tableFromDumpData *config.Table = nil
if len(dumpData.Table) > 0 {
// TODO debode tableFromDumpData
tableFromDumpData = &config.Table{}
err = DecodeGob(dumpData.Table, tableFromDumpData)
if err != nil {
kr.onError(TaskStateDead, err)
return
}
}
// TODO cache table
table, err := kr.getOrSetTable(dumpData.TableSchema, dumpData.TableName, tableFromDumpData)
if err != nil {
kr.onError(TaskStateDead, fmt.Errorf("DTLE_BUG kafka: unknown table structure"))
Expand Down Expand Up @@ -247,6 +252,9 @@ func Decode(data []byte, vPtr interface{}) (err error) {

return gob.NewDecoder(bytes.NewBuffer(msg)).Decode(vPtr)
}
func DecodeGob(data []byte, vPtr interface{}) (err error) {
return gob.NewDecoder(bytes.NewBuffer(data)).Decode(vPtr)
}

func (kr *KafkaRunner) onError(state int, err error) {
if kr.shutdown {
Expand Down
25 changes: 18 additions & 7 deletions internal/client/driver/mysql/applier.go
Expand Up @@ -496,6 +496,23 @@ func (a *Applier) initNatSubClient() (err error) {
return nil
}

func DecodeDumpEntry(data []byte) (entry *DumpEntry, err error) {
msg, err := snappy.Decode(nil, data)
if err != nil {
return nil, err
}

entry = &DumpEntry{}
n, err := entry.Unmarshal(msg)
if err != nil {
return nil, err
}
if n != uint64(len(msg)) {
return nil, fmt.Errorf("DumpEntry.Unmarshal: not all consumed. data: %v, consumed: %v",
len(msg), n)
}
return entry, nil
}
// Decode
func Decode(data []byte, vPtr interface{}) (err error) {
msg, err := snappy.Decode(nil, data)
Expand Down Expand Up @@ -775,17 +792,11 @@ func (a *Applier) initiateStreaming() error {
_, err := a.natsConn.Subscribe(fmt.Sprintf("%s_full", a.subject), func(m *gonats.Msg) {
a.logger.Debugf("mysql.applier: full. recv a msg. copyRowsQueue: %v", len(a.copyRowsQueue))

dumpData := &DumpEntry{}
data2, err := snappy.Decode(nil, m.Data)
dumpData, err := DecodeDumpEntry(m.Data)
if err != nil {
a.onError(TaskStateDead, err)
// TODO return?
}
_, err = dumpData.Unmarshal(data2)
if err != nil {
a.onError(TaskStateDead, err)
//return
}

timer := time.NewTimer(DefaultConnectWait / 2)
atomic.AddInt64(&a.nDumpEntry, 1) // this must be increased before enqueuing
Expand Down
16 changes: 14 additions & 2 deletions internal/client/driver/mysql/extractor.go
Expand Up @@ -739,6 +739,13 @@ func (e *Extractor) setStatementFor() string {
}

// Encode
func GobEncode(v interface{}) ([]byte, error) {
b := new(bytes.Buffer)
if err := gob.NewEncoder(b).Encode(v); err != nil {
return nil, err
}
return b.Bytes(), nil
}
func Encode(v interface{}) ([]byte, error) {
b := new(bytes.Buffer)
if err := gob.NewEncoder(b).Encode(v); err != nil {
Expand Down Expand Up @@ -1294,8 +1301,12 @@ func (e *Extractor) mysqlDump() error {
e.onError(TaskStateDead, fmt.Errorf(entry.Err))
} else {
if e.needToSendTabelDef() {
// TODO encode table
//entry.Table = d.table
tableBs, err := GobEncode(d.table)
if err != nil {
e.onError(TaskStateDead, fmt.Errorf(entry.Err))
} else {
entry.Table = tableBs
}
}
if err = e.encodeDumpEntry(entry); err != nil {
e.onError(TaskStateRestart, err)
Expand Down Expand Up @@ -1482,5 +1493,6 @@ func (e *Extractor) Shutdown() error {
}

func (e *Extractor) needToSendTabelDef() bool {
// TODO if dst_is_kafka && table_changed
return true
}

0 comments on commit b821b99

Please sign in to comment.