Skip to content

Commit

Permalink
fix _ #230-3: kafka: mysql time type (full-copy only)
Browse files Browse the repository at this point in the history
For incremental copy: waiting for upstream fix.
  • Loading branch information
ffffwh committed Oct 22, 2018
1 parent 3a2361b commit 51595e4
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 5 deletions.
60 changes: 57 additions & 3 deletions internal/client/driver/kafka3/kafka2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"encoding/json"
"fmt"
"math/big"
"strings"

"strconv"

Expand Down Expand Up @@ -322,10 +323,63 @@ func NewTimeField(optional bool, field string) *Schema {
}
}

func timeValueHelper(h, m, s, microsec int64, isNeg bool) int64 {
r := (h * 3600 + m * 60 + s) * 1000000 + microsec
if isNeg {
return -r
} else {
return r
}
}
// precision make no difference
func TimeValue(timestamp int64) int64 {
// TODO
return 0
func TimeValue(value string) int64 {
var err error

if len(value) == 0 {
return 0
}
isNeg := false
if value[0] == '-' {
isNeg = true
value = value[1:]
}

ss := strings.Split(value, ":")
if len(ss) != 3 {
// TODO report err, as well the followings.
return 0
}
var h,m,s,microsec int64
h, err = strconv.ParseInt(ss[0], 10, 64)
if err != nil {
return 0
}
m, err = strconv.ParseInt(ss[1], 10, 64)
if err != nil {
return 0
}
ssms := strings.Split(ss[2], ".")
switch len(ssms) {
case 1:
s, err = strconv.ParseInt(ss[2], 10, 64)
if err != nil {
return 0
}
microsec = 0
case 2:
s, err = strconv.ParseInt(ssms[0], 10, 64)
if err != nil {
return 0
}
microsec, err = strconv.ParseInt(ssms[1], 10, 64)
if err != nil {
return 0
}
default:
return 0
}

return timeValueHelper(h,m,s,microsec, isNeg)
}
func NewDateTimeField(optional bool, field string) *Schema {
return &Schema{
Expand Down
11 changes: 11 additions & 0 deletions internal/client/driver/kafka3/kafka2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,14 @@ func TestDecimalValueFromStringMysql(t *testing.T) {
t.Fail()
}
}


func TestTimeValue(t *testing.T) {
test := func(value string, h,m,s,microsec int64, isNeg bool) {
if TimeValue(value) != timeValueHelper(h,m,s,microsec,isNeg) {
t.Fatalf("failed for %v", value)
}
}
test("01:02:03",1,2,3,0,false)
test("-800:02:03.100000",800,2,3,100000,true)
}
13 changes: 11 additions & 2 deletions internal/client/driver/kafka3/kafka3.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (kr *KafkaRunner) kafkaTransformSnapshotData(table *config.Table, value *my
}
case mysql.DecimalColumnType:
value = DecimalValueFromStringMysql(valueStr)
case mysql.TimeColumnType:
value = TimeValue(valueStr)
default:
value = valueStr
}
Expand Down Expand Up @@ -440,6 +442,13 @@ func (kr *KafkaRunner) kafkaTransformDMLEventQuery(dmlEvent *binlog.BinlogEntry)
afterValue = int64(afterValue.(uint64))
}
}
case mysql.TimeColumnType:
//if beforeValue != nil {
// beforeValue = int64(beforeValue.(uint64))
//}
//if afterValue != nil {
// afterValue = int64(afterValue.(uint64))
//}
default:
// do nothing
}
Expand All @@ -455,11 +464,11 @@ func (kr *KafkaRunner) kafkaTransformDMLEventQuery(dmlEvent *binlog.BinlogEntry)
}

if before != nil {
kr.logger.Debugf("kafka. type of beforeValue %T", beforeValue)
kr.logger.Debugf("kafka. beforeValue: type: %T, value: %v", beforeValue, beforeValue)
before.AddField(colName, beforeValue)
}
if after != nil {
kr.logger.Debugf("kafka. type of afterValue %T", afterValue)
kr.logger.Debugf("kafka. afterValue: type: %T, value: %v", afterValue, afterValue)
after.AddField(colName, afterValue)
}
}
Expand Down

0 comments on commit 51595e4

Please sign in to comment.