Skip to content

Commit

Permalink
replace gob with gencode (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Jun 17, 2019
1 parent 21704fa commit 66f2e93
Show file tree
Hide file tree
Showing 8 changed files with 880 additions and 23 deletions.
10 changes: 7 additions & 3 deletions internal/client/driver/kafka3/kafka3.go
Expand Up @@ -190,7 +190,11 @@ func (kr *KafkaRunner) initiateStreaming() error {
return
} else {
// TODO cache table
table, err := kr.getOrSetTable(dumpData.TableSchema, dumpData.TableName, dumpData.Table)
var tableFromDumpData *config.Table = nil
if len(dumpData.Table) > 0 {
// TODO debode tableFromDumpData
}
table, err := kr.getOrSetTable(dumpData.TableSchema, dumpData.TableName, tableFromDumpData)
if err != nil {
kr.onError(TaskStateDead, fmt.Errorf("DTLE_BUG kafka: unknown table structure"))
return
Expand Down Expand Up @@ -310,8 +314,8 @@ func (kr *KafkaRunner) kafkaTransformSnapshotData(table *config.Table, value *my
for i, _ := range columnList {
var value interface{}

if *rowValues[i] != nil {
valueStr := string((*rowValues[i]).([]byte))
if rowValues[i] != nil {
valueStr := string(*rowValues[i])
switch columnList[i].Type {
case mysql.TinyintColumnType, mysql.SmallintColumnType, mysql.MediumIntColumnType, mysql.IntColumnType:
value, err = strconv.ParseInt(valueStr, 10, 64)
Expand Down
13 changes: 10 additions & 3 deletions internal/client/driver/mysql/applier.go
Expand Up @@ -776,8 +776,15 @@ func (a *Applier) initiateStreaming() error {
a.logger.Debugf("mysql.applier: full. recv a msg. copyRowsQueue: %v", len(a.copyRowsQueue))

dumpData := &DumpEntry{}
if err := Decode(m.Data, dumpData); err != nil {
data2, err := snappy.Decode(nil, m.Data)
if err != nil {
a.onError(TaskStateDead, err)
// TODO return?
}
_, err = dumpData.Unmarshal(data2)
if err != nil {
a.onError(TaskStateDead, err)
//return
}

timer := time.NewTimer(DefaultConnectWait / 2)
Expand Down Expand Up @@ -1427,9 +1434,9 @@ func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *DumpEntry) error {
}

colData := entry.ValuesX[i][j]
if *colData != nil {
if colData != nil {
buf.WriteByte('\'')
buf.WriteString(sql.EscapeValue(string((*colData).([]byte))))
buf.WriteString(sql.EscapeValue(string(*colData)))
buf.WriteByte('\'')
} else {
buf.WriteString("NULL")
Expand Down
15 changes: 5 additions & 10 deletions internal/client/driver/mysql/dumper.go
Expand Up @@ -76,7 +76,7 @@ type dumpStatResult struct {
TotalCount int64
}

type DumpEntry struct {
type DumpEntryOrig struct {
SystemVariablesStatement string
SqlMode string
DbSQL string
Expand Down Expand Up @@ -193,7 +193,9 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
// TODO use PS
// TODO escape schema/table/column name once and save
defer func() {
entry.Err = err
if err != nil {
entry.Err = err.Error()
}
if err == nil && entry.RowsCount == 0 {
return
}
Expand Down Expand Up @@ -259,10 +261,8 @@ func (d *dumper) getChunkData() (nRows int64, err error) {

scanArgs := make([]interface{}, len(columns)) // tmp use, for casting `values` to `[]interface{}`

interfacePtrWithNil := new(interface{})

for rows.Next() {
rowValuesRaw := make([]*interface{}, len(columns))
rowValuesRaw := make([]*[]byte, len(columns))
for i := range rowValuesRaw {
scanArgs[i] = &rowValuesRaw[i]
}
Expand All @@ -272,11 +272,6 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
return 0, err
}

for i := range rowValuesRaw {
if rowValuesRaw[i] == nil {
rowValuesRaw[i] = interfacePtrWithNil
}
}
entry.ValuesX = append(entry.ValuesX, rowValuesRaw)

entry.incrementCounter()
Expand Down
11 changes: 7 additions & 4 deletions internal/client/driver/mysql/extractor.go
Expand Up @@ -1290,11 +1290,12 @@ func (e *Extractor) mysqlDump() error {
e.dumpers = append(e.dumpers, d)
// Scan the rows in the table ...
for entry := range d.resultsChannel {
if entry.Err != nil {
e.onError(TaskStateDead, entry.Err)
if entry.Err != "" {
e.onError(TaskStateDead, fmt.Errorf(entry.Err))
} else {
if e.needToSendTabelDef() {
entry.Table = d.table
// TODO encode table
//entry.Table = d.table
}
if err = e.encodeDumpEntry(entry); err != nil {
e.onError(TaskStateRestart, err)
Expand All @@ -1320,10 +1321,12 @@ func (e *Extractor) mysqlDump() error {
return nil
}
func (e *Extractor) encodeDumpEntry(entry *DumpEntry) error {
txMsg, err := Encode(entry)
bs, err := entry.Marshal(nil)
if err != nil {
return err
}
txMsg := snappy.Encode(nil, bs)

if err := e.publish(fmt.Sprintf("%s_full", e.subject), "", txMsg); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions internal/client/driver/mysql/gen.sh
@@ -0,0 +1,3 @@
#!/bin/bash

gencode go -schema type.schema -package mysql
6 changes: 3 additions & 3 deletions internal/client/driver/mysql/sql/builder.go
Expand Up @@ -37,9 +37,9 @@ func EscapeName(name string) string {
return fmt.Sprintf("`%s`", name)
}

func EscapeColRawToString(col *interface{}) string {
if *col != nil {
return fmt.Sprintf("'%s'", EscapeValue(string((*col).([]byte))))
func EscapeColRawToString(col *[]byte) string {
if col != nil {
return fmt.Sprintf("'%s'", EscapeValue(string(*col)))
} else {
return "NULL"
}
Expand Down
13 changes: 13 additions & 0 deletions internal/client/driver/mysql/type.schema
@@ -0,0 +1,13 @@
struct DumpEntry {
SystemVariablesStatement string
SqlMode string
DbSQL string
TableName string
TableSchema string
TbSQL []string
ValuesX [][]*[]byte
TotalCount int64
RowsCount int64
Err string
Table []byte
}

0 comments on commit 66f2e93

Please sign in to comment.