Skip to content

Commit

Permalink
kafka: adapt to gencode (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Jun 17, 2019
1 parent 66f2e93 commit 65e606b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
16 changes: 12 additions & 4 deletions internal/client/driver/kafka3/kafka3.go
Expand Up @@ -173,8 +173,8 @@ func (kr *KafkaRunner) initiateStreaming() error {

_, err = kr.natsConn.Subscribe(fmt.Sprintf("%s_full", kr.subject), func(m *gonats.Msg) {
kr.logger.Debugf("kafka: recv a msg")
dumpData := &mysqlDriver.DumpEntry{}
if err := Decode(m.Data, dumpData); err != nil {
dumpData, err := mysqlDriver.DecodeDumpEntry(m.Data)
if err != nil {
kr.onError(TaskStateDead, err)
return
}
Expand All @@ -189,11 +189,16 @@ func (kr *KafkaRunner) initiateStreaming() error {
}
return
} else {
// TODO cache table
var tableFromDumpData *config.Table = nil
if len(dumpData.Table) > 0 {
// TODO debode tableFromDumpData
tableFromDumpData = &config.Table{}
err = DecodeGob(dumpData.Table, tableFromDumpData)
if err != nil {
kr.onError(TaskStateDead, err)
return
}
}
// TODO cache table
table, err := kr.getOrSetTable(dumpData.TableSchema, dumpData.TableName, tableFromDumpData)
if err != nil {
kr.onError(TaskStateDead, fmt.Errorf("DTLE_BUG kafka: unknown table structure"))
Expand Down Expand Up @@ -254,6 +259,9 @@ func Decode(data []byte, vPtr interface{}) (err error) {

return gob.NewDecoder(bytes.NewBuffer(msg)).Decode(vPtr)
}
func DecodeGob(data []byte, vPtr interface{}) (err error) {
return gob.NewDecoder(bytes.NewBuffer(data)).Decode(vPtr)
}

func (kr *KafkaRunner) onError(state int, err error) {
if kr.shutdown {
Expand Down
25 changes: 18 additions & 7 deletions internal/client/driver/mysql/applier.go
Expand Up @@ -496,6 +496,23 @@ func (a *Applier) initNatSubClient() (err error) {
return nil
}

func DecodeDumpEntry(data []byte) (entry *DumpEntry, err error) {
msg, err := snappy.Decode(nil, data)
if err != nil {
return nil, err
}

entry = &DumpEntry{}
n, err := entry.Unmarshal(msg)
if err != nil {
return nil, err
}
if n != uint64(len(msg)) {
return nil, fmt.Errorf("DumpEntry.Unmarshal: not all consumed. data: %v, consumed: %v",
len(msg), n)
}
return entry, nil
}
// Decode
func Decode(data []byte, vPtr interface{}) (err error) {
msg, err := snappy.Decode(nil, data)
Expand Down Expand Up @@ -775,17 +792,11 @@ func (a *Applier) initiateStreaming() error {
_, err := a.natsConn.Subscribe(fmt.Sprintf("%s_full", a.subject), func(m *gonats.Msg) {
a.logger.Debugf("mysql.applier: full. recv a msg. copyRowsQueue: %v", len(a.copyRowsQueue))

dumpData := &DumpEntry{}
data2, err := snappy.Decode(nil, m.Data)
dumpData, err := DecodeDumpEntry(m.Data)
if err != nil {
a.onError(TaskStateDead, err)
// TODO return?
}
_, err = dumpData.Unmarshal(data2)
if err != nil {
a.onError(TaskStateDead, err)
//return
}

timer := time.NewTimer(DefaultConnectWait / 2)
atomic.AddInt64(&a.nDumpEntry, 1) // this must be increased before enqueuing
Expand Down
3 changes: 3 additions & 0 deletions internal/client/driver/mysql/dumper.go
Expand Up @@ -41,6 +41,8 @@ type dumper struct {
// 0: don't checksum; 1: checksum once; 2: checksum every time
doChecksum int
oldWayDump bool

sentTableDef bool
}

func NewDumper(db usql.QueryAble, table *config.Table, chunkSize int64,
Expand All @@ -55,6 +57,7 @@ func NewDumper(db usql.QueryAble, table *config.Table, chunkSize int64,
resultsChannel: make(chan *DumpEntry, 24),
chunkSize: chunkSize,
shutdownCh: make(chan struct{}),
sentTableDef: false,
}
switch os.Getenv(g.ENV_DUMP_CHECKSUM) {
case "1":
Expand Down
24 changes: 17 additions & 7 deletions internal/client/driver/mysql/extractor.go
Expand Up @@ -739,6 +739,13 @@ func (e *Extractor) setStatementFor() string {
}

// Encode
func GobEncode(v interface{}) ([]byte, error) {
b := new(bytes.Buffer)
if err := gob.NewEncoder(b).Encode(v); err != nil {
return nil, err
}
return b.Bytes(), nil
}
func Encode(v interface{}) ([]byte, error) {
b := new(bytes.Buffer)
if err := gob.NewEncoder(b).Encode(v); err != nil {
Expand Down Expand Up @@ -1293,9 +1300,16 @@ func (e *Extractor) mysqlDump() error {
if entry.Err != "" {
e.onError(TaskStateDead, fmt.Errorf(entry.Err))
} else {
if e.needToSendTabelDef() {
// TODO encode table
//entry.Table = d.table
if !d.sentTableDef {
tableBs, err := GobEncode(d.table)
if err != nil {
realErr := fmt.Errorf(entry.Err)
e.onError(TaskStateDead, realErr)
return realErr
} else {
entry.Table = tableBs
d.sentTableDef = true
}
}
if err = e.encodeDumpEntry(entry); err != nil {
e.onError(TaskStateRestart, err)
Expand Down Expand Up @@ -1480,7 +1494,3 @@ func (e *Extractor) Shutdown() error {
e.logger.Printf("mysql.extractor: Shutting down")
return nil
}

func (e *Extractor) needToSendTabelDef() bool {
return true
}

0 comments on commit 65e606b

Please sign in to comment.