Skip to content

Commit

Permalink
refactory & renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Sep 13, 2018
1 parent 068438e commit 7d7620f
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 78 deletions.
8 changes: 4 additions & 4 deletions internal/client/driver/mysql/applier.go
Expand Up @@ -215,7 +215,7 @@ type Applier struct {
rowCopyCompleteFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan *dumpEntry
copyRowsQueue chan *DumpEntry
applyDataEntryQueue chan *binlog.BinlogEntry
applyBinlogTxQueue chan *binlog.BinlogTx
applyBinlogGroupTxQueue chan []*binlog.BinlogTx
Expand Down Expand Up @@ -256,7 +256,7 @@ func NewApplier(subject, tp string, cfg *config.MySQLDriverConfig, logger *log.L
currentCoordinates: &models.CurrentCoordinates{},
tableItems: make(mapSchemaTableItems),
rowCopyComplete: make(chan bool, 1),
copyRowsQueue: make(chan *dumpEntry, 24),
copyRowsQueue: make(chan *DumpEntry, 24),
applyDataEntryQueue: make(chan *binlog.BinlogEntry, cfg.ReplChanBufferSize*2),
applyBinlogMtsTxQueue: make(chan *binlog.BinlogEntry, cfg.ReplChanBufferSize*2),
applyBinlogTxQueue: make(chan *binlog.BinlogTx, cfg.ReplChanBufferSize*2),
Expand Down Expand Up @@ -525,7 +525,7 @@ func (a *Applier) initiateStreaming() error {
a.logger.Debugf("mysql.applier: nats subscribe")
_, err := a.natsConn.Subscribe(fmt.Sprintf("%s_full", a.subject), func(m *gonats.Msg) {
a.logger.Debugf("mysql.applier: recv a msg")
dumpData := &dumpEntry{}
dumpData := &DumpEntry{}
if err := Decode(m.Data, dumpData); err != nil {
a.onError(TaskStateDead, err)
}
Expand Down Expand Up @@ -1173,7 +1173,7 @@ func (a *Applier) ApplyBinlogEvent(workerIdx int, binlogEntry *binlog.BinlogEntr
return nil
}

func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *dumpEntry) error {
func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *DumpEntry) error {
queries := []string{}
queries = append(queries, entry.SystemVariablesStatement, entry.SqlMode, entry.DbSQL)
queries = append(queries, entry.TbSQL...)
Expand Down
10 changes: 5 additions & 5 deletions internal/client/driver/mysql/applier_test.go
Expand Up @@ -280,7 +280,7 @@ func TestApplier_ApplyBinlogEvent(t *testing.T) {
func TestApplier_ApplyEventQueries(t *testing.T) {
type args struct {
db *gosql.DB
entry *dumpEntry
entry *DumpEntry
}
tests := []struct {
name string
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestApplier_onApplyTxStructWithSetGtid(t *testing.T) {
applyRowCount int
rowCopyComplete chan bool
rowCopyCompleteFlag int64
copyRowsQueue chan *dumpEntry
copyRowsQueue chan *DumpEntry
applyDataEntryQueue chan *binlog.BinlogEntry
applyBinlogTxQueue chan *binlog.BinlogTx
applyBinlogGroupTxQueue chan []*binlog.BinlogTx
Expand Down Expand Up @@ -471,7 +471,7 @@ func TestApplier_validateGrants(t *testing.T) {
applyRowCount int
rowCopyComplete chan bool
rowCopyCompleteFlag int64
copyRowsQueue chan *dumpEntry
copyRowsQueue chan *DumpEntry
applyDataEntryQueue chan *binlog.BinlogEntry
applyBinlogTxQueue chan *binlog.BinlogTx
applyBinlogGroupTxQueue chan []*binlog.BinlogTx
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestApplier_createTableGtidExecuted(t *testing.T) {
applyRowCount int
rowCopyComplete chan bool
rowCopyCompleteFlag int64
copyRowsQueue chan *dumpEntry
copyRowsQueue chan *DumpEntry
applyDataEntryQueue chan *binlog.BinlogEntry
applyBinlogTxQueue chan *binlog.BinlogTx
applyBinlogGroupTxQueue chan []*binlog.BinlogTx
Expand Down Expand Up @@ -597,7 +597,7 @@ func TestApplier_onDone(t *testing.T) {
applyRowCount int
rowCopyComplete chan bool
rowCopyCompleteFlag int64
copyRowsQueue chan *dumpEntry
copyRowsQueue chan *DumpEntry
applyDataEntryQueue chan *binlog.BinlogEntry
applyBinlogTxQueue chan *binlog.BinlogTx
applyBinlogGroupTxQueue chan []*binlog.BinlogTx
Expand Down
48 changes: 16 additions & 32 deletions internal/client/driver/mysql/dumper.go
Expand Up @@ -30,8 +30,8 @@ type dumper struct {
table *config.Table
columns string
entriesCount int
resultsChannel chan *dumpEntry
entriesChannel chan *dumpEntry
resultsChannel chan *DumpEntry
entriesChannel chan *DumpEntry
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
Expand All @@ -50,8 +50,8 @@ func NewDumper(db usql.QueryAble, table *config.Table, total, chunkSize int64,
TableName: table.TableName,
table: table,
total: total,
resultsChannel: make(chan *dumpEntry, 24),
entriesChannel: make(chan *dumpEntry),
resultsChannel: make(chan *DumpEntry, 24),
entriesChannel: make(chan *DumpEntry),
chunkSize: chunkSize,
shutdownCh: make(chan struct{}),
}
Expand All @@ -63,7 +63,7 @@ type dumpStatResult struct {
TotalCount int64
}

type dumpEntry struct {
type DumpEntry struct {
SystemVariablesStatement string
SqlMode string
DbSQL string
Expand All @@ -78,22 +78,22 @@ type dumpEntry struct {
err error
}

func (e *dumpEntry) incrementCounter() {
func (e *DumpEntry) incrementCounter() {
e.RowsCount++
}

func (d *dumper) getDumpEntries() ([]*dumpEntry, error) {
func (d *dumper) getDumpEntries() ([]*DumpEntry, error) {
if d.total == 0 {
return []*dumpEntry{}, nil
return []*DumpEntry{}, nil
}

columnList, err := ubase.GetTableColumns(d.db, d.TableSchema, d.TableName)
if err != nil {
return []*dumpEntry{}, err
return []*DumpEntry{}, err
}

if err := ubase.ApplyColumnTypes(d.db, d.TableSchema, d.TableName, columnList); err != nil {
return []*dumpEntry{}, err
return []*DumpEntry{}, err
}

needPm := false
Expand All @@ -119,17 +119,17 @@ func (d *dumper) getDumpEntries() ([]*dumpEntry, error) {
if sliceCount == 0 {
sliceCount = 1
}
entries := make([]*dumpEntry, sliceCount)
entries := make([]*DumpEntry, sliceCount)
for i := 0; i < sliceCount; i++ {
offset := uint64(i) * uint64(d.chunkSize)
entries[i] = &dumpEntry{
entries[i] = &DumpEntry{
Offset: offset,
}
}
return entries, nil
}

func (d *dumper) buildQueryOldWay(e *dumpEntry) string {
func (d *dumper) buildQueryOldWay(e *DumpEntry) string {
return fmt.Sprintf(`SELECT %s FROM %s.%s where (%s) LIMIT %d OFFSET %d`,
d.columns,
usql.EscapeName(d.TableSchema),
Expand All @@ -140,7 +140,7 @@ func (d *dumper) buildQueryOldWay(e *dumpEntry) string {
)
}

func (d *dumper) buildQueryOnUniqueKey(e *dumpEntry) string {
func (d *dumper) buildQueryOnUniqueKey(e *DumpEntry) string {
nCol := len(d.table.UseUniqueKey.Columns.Columns)
uniqueKeyColumnAscending := make([]string, nCol, nCol)
for i, col := range d.table.UseUniqueKey.Columns.Columns {
Expand Down Expand Up @@ -193,8 +193,8 @@ func (d *dumper) buildQueryOnUniqueKey(e *dumpEntry) string {
}

// dumps a specific chunk, reading chunk info from the channel
func (d *dumper) getChunkData(e *dumpEntry) (err error) {
entry := &dumpEntry{
func (d *dumper) getChunkData(e *DumpEntry) (err error) {
entry := &DumpEntry{
TableSchema: d.TableSchema,
TableName: d.TableName,
RowsCount: e.RowsCount,
Expand Down Expand Up @@ -295,22 +295,6 @@ func (d *dumper) getChunkData(e *dumpEntry) (err error) {
return nil
}

/*func (e *dumpEntry) escape(colValue string) string {
e.colBuffer = *new(bytes.Buffer)
if !strings.ContainsAny(colValue, stringOfBackslashAndQuoteChars) {
return colValue
} else {
for _, char_c := range colValue {
c := fmt.Sprintf("%c", char_c)
if strings.ContainsAny(c, stringOfBackslashAndQuoteChars) {
e.colBuffer.WriteString("\\")
}
e.colBuffer.WriteString(c)
}
return e.colBuffer.String()
}
}*/

func (d *dumper) worker() {
for e := range d.entriesChannel {
select {
Expand Down
27 changes: 3 additions & 24 deletions internal/client/driver/mysql/dumper_test.go
Expand Up @@ -63,7 +63,7 @@ func Test_dumper_getRowsCount(t *testing.T) {
func Test_dumpEntry_incrementCounter(t *testing.T) {
tests := []struct {
name string
e *dumpEntry
e *DumpEntry
}{
// TODO: Add test cases.
}
Expand All @@ -78,7 +78,7 @@ func Test_dumper_getDumpEntries(t *testing.T) {
tests := []struct {
name string
d *dumper
want []*dumpEntry
want []*DumpEntry
wantErr bool
}{
// TODO: Add test cases.
Expand All @@ -99,7 +99,7 @@ func Test_dumper_getDumpEntries(t *testing.T) {

func Test_dumper_getChunkData(t *testing.T) {
type args struct {
e *dumpEntry
e *DumpEntry
}
tests := []struct {
name string
Expand All @@ -118,27 +118,6 @@ func Test_dumper_getChunkData(t *testing.T) {
}
}

func Test_dumpEntry_escape(t *testing.T) {
type args struct {
colValue string
}
tests := []struct {
name string
e *dumpEntry
args args
want string
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.e.escape(tt.args.colValue); got != tt.want {
t.Errorf("dumpEntry.escape() = %v, want %v", got, tt.want)
}
})
}
}

func Test_dumper_worker(t *testing.T) {
tests := []struct {
name string
Expand Down
21 changes: 9 additions & 12 deletions internal/client/driver/mysql/extractor.go
Expand Up @@ -34,6 +34,7 @@ import (
"udup/internal/config/mysql"
log "udup/internal/logger"
"udup/internal/models"
"udup/utils"
)

const (
Expand Down Expand Up @@ -1119,7 +1120,7 @@ func (e *Extractor) mysqlDump() error {
}
}
}
entry := &dumpEntry{
entry := &DumpEntry{
SystemVariablesStatement: setSystemVariablesStatement,
SqlMode: setSqlMode,
DbSQL: dbSQL,
Expand All @@ -1143,7 +1144,7 @@ func (e *Extractor) mysqlDump() error {
dbSQL = fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", db.TableSchema)
}
}
entry := &dumpEntry{
entry := &DumpEntry{
SystemVariablesStatement: setSystemVariablesStatement,
SqlMode: setSqlMode,
DbSQL: dbSQL,
Expand All @@ -1166,7 +1167,7 @@ func (e *Extractor) mysqlDump() error {
// ------
// Dump all of the tables and generate source records ...
e.logger.Printf("mysql.extractor: Step %d: scanning contents of %d tables", step, e.tableCount)
startScan := currentTimeMillis()
startScan := utils.CurrentTimeMillis()
counter := 0
//pool := models.NewPool(10)
for _, db := range e.replicateDoDb {
Expand Down Expand Up @@ -1215,14 +1216,14 @@ func (e *Extractor) mysqlDump() error {

// We've copied all of the tables, but our buffer holds onto the very last record.
// First mark the snapshot as complete and then apply the updated offset to the buffered record ...
stop := currentTimeMillis()
stop := utils.CurrentTimeMillis()
e.logger.Printf("mysql.extractor: Step %d: scanned %d rows in %d tables in %s",
step, e.mysqlContext.TotalRowsCopied, e.tableCount, time.Duration(stop-startScan))
step++

return nil
}
func (e *Extractor) encodeDumpEntry(entry *dumpEntry) error {
func (e *Extractor) encodeDumpEntry(entry *DumpEntry) error {
txMsg, err := Encode(entry)
if err != nil {
return err
Expand All @@ -1234,10 +1235,6 @@ func (e *Extractor) encodeDumpEntry(entry *dumpEntry) error {
return nil
}

func currentTimeMillis() int64 {
return time.Now().UnixNano() / 1000000
}

func (e *Extractor) Stats() (*models.TaskStatistics, error) {
totalRowsCopied := e.mysqlContext.GetTotalRowsCopied()
rowsEstimate := atomic.LoadInt64(&e.mysqlContext.RowsEstimate)
Expand Down Expand Up @@ -1394,7 +1391,7 @@ func (e *Extractor) Shutdown() error {
return nil
}

func (e *Extractor) kafkaTransformSnapshotData(table *config.Table, value *dumpEntry) error {
func (e *Extractor) kafkaTransformSnapshotData(table *config.Table, value *DumpEntry) error {
var err error

tableIdent := fmt.Sprintf("%v.%v.%v", e.kafkaMgr.Cfg.Topic, table.TableSchema, table.TableName)
Expand All @@ -1416,7 +1413,7 @@ func (e *Extractor) kafkaTransformSnapshotData(table *config.Table, value *dumpE
valuePayload.Source.Db = table.TableSchema
valuePayload.Source.Table = table.TableName
valuePayload.Op = kafka2.RECORD_OP_INSERT
valuePayload.TsMs = currentTimeMillis()
valuePayload.TsMs = utils.CurrentTimeMillis()

valuePayload.Before = nil
valuePayload.After = kafka2.NewRow()
Expand Down Expand Up @@ -1583,7 +1580,7 @@ func (e *Extractor) kafkaTransformDMLEventQuery(dmlEvent *binlog.BinlogEntry) (e
valuePayload.Source.Db = dataEvent.DatabaseName
valuePayload.Source.Table = dataEvent.TableName
valuePayload.Op = op
valuePayload.TsMs = currentTimeMillis()
valuePayload.TsMs = utils.CurrentTimeMillis()

valueSchema := kafka2.NewEnvelopeSchema(tableIdent, colDefs)

Expand Down
2 changes: 1 addition & 1 deletion internal/client/driver/mysql/extractor_test.go
Expand Up @@ -414,7 +414,7 @@ func TestExtractor_mysqlDump(t *testing.T) {

func TestExtractor_encodeDumpEntry(t *testing.T) {
type args struct {
entry *dumpEntry
entry *DumpEntry
}
tests := []struct {
name string
Expand Down
6 changes: 6 additions & 0 deletions utils/utils.go
Expand Up @@ -6,6 +6,8 @@

package utils

import "time"

// Return a substring of limited lenth.
func StrLim(s string, lim int) string {
if lim < len(s) {
Expand All @@ -23,3 +25,7 @@ func StringElse(s1 string, s2 string) string {
return s2
}
}

func CurrentTimeMillis() int64 {
return time.Now().UnixNano() / 1000000
}

0 comments on commit 7d7620f

Please sign in to comment.