From 7d7620ff76c26c867716dfe8b6079cc4603127c3 Mon Sep 17 00:00:00 2001 From: ffffwh Date: Mon, 3 Sep 2018 20:20:08 +0800 Subject: [PATCH] refactory & renaming --- internal/client/driver/mysql/applier.go | 8 ++-- internal/client/driver/mysql/applier_test.go | 10 ++-- internal/client/driver/mysql/dumper.go | 48 +++++++------------ internal/client/driver/mysql/dumper_test.go | 27 ++--------- internal/client/driver/mysql/extractor.go | 21 ++++---- .../client/driver/mysql/extractor_test.go | 2 +- utils/utils.go | 6 +++ 7 files changed, 44 insertions(+), 78 deletions(-) diff --git a/internal/client/driver/mysql/applier.go b/internal/client/driver/mysql/applier.go index eef404927..9bb3cefae 100644 --- a/internal/client/driver/mysql/applier.go +++ b/internal/client/driver/mysql/applier.go @@ -215,7 +215,7 @@ type Applier struct { rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete - copyRowsQueue chan *dumpEntry + copyRowsQueue chan *DumpEntry applyDataEntryQueue chan *binlog.BinlogEntry applyBinlogTxQueue chan *binlog.BinlogTx applyBinlogGroupTxQueue chan []*binlog.BinlogTx @@ -256,7 +256,7 @@ func NewApplier(subject, tp string, cfg *config.MySQLDriverConfig, logger *log.L currentCoordinates: &models.CurrentCoordinates{}, tableItems: make(mapSchemaTableItems), rowCopyComplete: make(chan bool, 1), - copyRowsQueue: make(chan *dumpEntry, 24), + copyRowsQueue: make(chan *DumpEntry, 24), applyDataEntryQueue: make(chan *binlog.BinlogEntry, cfg.ReplChanBufferSize*2), applyBinlogMtsTxQueue: make(chan *binlog.BinlogEntry, cfg.ReplChanBufferSize*2), applyBinlogTxQueue: make(chan *binlog.BinlogTx, cfg.ReplChanBufferSize*2), @@ -525,7 +525,7 @@ func (a *Applier) initiateStreaming() error { a.logger.Debugf("mysql.applier: nats subscribe") _, err := a.natsConn.Subscribe(fmt.Sprintf("%s_full", a.subject), func(m *gonats.Msg) { a.logger.Debugf("mysql.applier: recv a msg") - dumpData := &dumpEntry{} + dumpData := &DumpEntry{} if err := Decode(m.Data, dumpData); err != nil { a.onError(TaskStateDead, err) } @@ -1173,7 +1173,7 @@ func (a *Applier) ApplyBinlogEvent(workerIdx int, binlogEntry *binlog.BinlogEntr return nil } -func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *dumpEntry) error { +func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *DumpEntry) error { queries := []string{} queries = append(queries, entry.SystemVariablesStatement, entry.SqlMode, entry.DbSQL) queries = append(queries, entry.TbSQL...) diff --git a/internal/client/driver/mysql/applier_test.go b/internal/client/driver/mysql/applier_test.go index 919404406..5e631e90e 100644 --- a/internal/client/driver/mysql/applier_test.go +++ b/internal/client/driver/mysql/applier_test.go @@ -280,7 +280,7 @@ func TestApplier_ApplyBinlogEvent(t *testing.T) { func TestApplier_ApplyEventQueries(t *testing.T) { type args struct { db *gosql.DB - entry *dumpEntry + entry *DumpEntry } tests := []struct { name string @@ -403,7 +403,7 @@ func TestApplier_onApplyTxStructWithSetGtid(t *testing.T) { applyRowCount int rowCopyComplete chan bool rowCopyCompleteFlag int64 - copyRowsQueue chan *dumpEntry + copyRowsQueue chan *DumpEntry applyDataEntryQueue chan *binlog.BinlogEntry applyBinlogTxQueue chan *binlog.BinlogTx applyBinlogGroupTxQueue chan []*binlog.BinlogTx @@ -471,7 +471,7 @@ func TestApplier_validateGrants(t *testing.T) { applyRowCount int rowCopyComplete chan bool rowCopyCompleteFlag int64 - copyRowsQueue chan *dumpEntry + copyRowsQueue chan *DumpEntry applyDataEntryQueue chan *binlog.BinlogEntry applyBinlogTxQueue chan *binlog.BinlogTx applyBinlogGroupTxQueue chan []*binlog.BinlogTx @@ -534,7 +534,7 @@ func TestApplier_createTableGtidExecuted(t *testing.T) { applyRowCount int rowCopyComplete chan bool rowCopyCompleteFlag int64 - copyRowsQueue chan *dumpEntry + copyRowsQueue chan *DumpEntry applyDataEntryQueue chan *binlog.BinlogEntry applyBinlogTxQueue chan *binlog.BinlogTx applyBinlogGroupTxQueue chan []*binlog.BinlogTx @@ -597,7 +597,7 @@ func TestApplier_onDone(t *testing.T) { applyRowCount int rowCopyComplete chan bool rowCopyCompleteFlag int64 - copyRowsQueue chan *dumpEntry + copyRowsQueue chan *DumpEntry applyDataEntryQueue chan *binlog.BinlogEntry applyBinlogTxQueue chan *binlog.BinlogTx applyBinlogGroupTxQueue chan []*binlog.BinlogTx diff --git a/internal/client/driver/mysql/dumper.go b/internal/client/driver/mysql/dumper.go index f80f508d3..950ee8d78 100644 --- a/internal/client/driver/mysql/dumper.go +++ b/internal/client/driver/mysql/dumper.go @@ -30,8 +30,8 @@ type dumper struct { table *config.Table columns string entriesCount int - resultsChannel chan *dumpEntry - entriesChannel chan *dumpEntry + resultsChannel chan *DumpEntry + entriesChannel chan *DumpEntry shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -50,8 +50,8 @@ func NewDumper(db usql.QueryAble, table *config.Table, total, chunkSize int64, TableName: table.TableName, table: table, total: total, - resultsChannel: make(chan *dumpEntry, 24), - entriesChannel: make(chan *dumpEntry), + resultsChannel: make(chan *DumpEntry, 24), + entriesChannel: make(chan *DumpEntry), chunkSize: chunkSize, shutdownCh: make(chan struct{}), } @@ -63,7 +63,7 @@ type dumpStatResult struct { TotalCount int64 } -type dumpEntry struct { +type DumpEntry struct { SystemVariablesStatement string SqlMode string DbSQL string @@ -78,22 +78,22 @@ type dumpEntry struct { err error } -func (e *dumpEntry) incrementCounter() { +func (e *DumpEntry) incrementCounter() { e.RowsCount++ } -func (d *dumper) getDumpEntries() ([]*dumpEntry, error) { +func (d *dumper) getDumpEntries() ([]*DumpEntry, error) { if d.total == 0 { - return []*dumpEntry{}, nil + return []*DumpEntry{}, nil } columnList, err := ubase.GetTableColumns(d.db, d.TableSchema, d.TableName) if err != nil { - return []*dumpEntry{}, err + return []*DumpEntry{}, err } if err := ubase.ApplyColumnTypes(d.db, d.TableSchema, d.TableName, columnList); err != nil { - return []*dumpEntry{}, err + return []*DumpEntry{}, err } needPm := false @@ -119,17 +119,17 @@ func (d *dumper) getDumpEntries() ([]*dumpEntry, error) { if sliceCount == 0 { sliceCount = 1 } - entries := make([]*dumpEntry, sliceCount) + entries := make([]*DumpEntry, sliceCount) for i := 0; i < sliceCount; i++ { offset := uint64(i) * uint64(d.chunkSize) - entries[i] = &dumpEntry{ + entries[i] = &DumpEntry{ Offset: offset, } } return entries, nil } -func (d *dumper) buildQueryOldWay(e *dumpEntry) string { +func (d *dumper) buildQueryOldWay(e *DumpEntry) string { return fmt.Sprintf(`SELECT %s FROM %s.%s where (%s) LIMIT %d OFFSET %d`, d.columns, usql.EscapeName(d.TableSchema), @@ -140,7 +140,7 @@ func (d *dumper) buildQueryOldWay(e *dumpEntry) string { ) } -func (d *dumper) buildQueryOnUniqueKey(e *dumpEntry) string { +func (d *dumper) buildQueryOnUniqueKey(e *DumpEntry) string { nCol := len(d.table.UseUniqueKey.Columns.Columns) uniqueKeyColumnAscending := make([]string, nCol, nCol) for i, col := range d.table.UseUniqueKey.Columns.Columns { @@ -193,8 +193,8 @@ func (d *dumper) buildQueryOnUniqueKey(e *dumpEntry) string { } // dumps a specific chunk, reading chunk info from the channel -func (d *dumper) getChunkData(e *dumpEntry) (err error) { - entry := &dumpEntry{ +func (d *dumper) getChunkData(e *DumpEntry) (err error) { + entry := &DumpEntry{ TableSchema: d.TableSchema, TableName: d.TableName, RowsCount: e.RowsCount, @@ -295,22 +295,6 @@ func (d *dumper) getChunkData(e *dumpEntry) (err error) { return nil } -/*func (e *dumpEntry) escape(colValue string) string { - e.colBuffer = *new(bytes.Buffer) - if !strings.ContainsAny(colValue, stringOfBackslashAndQuoteChars) { - return colValue - } else { - for _, char_c := range colValue { - c := fmt.Sprintf("%c", char_c) - if strings.ContainsAny(c, stringOfBackslashAndQuoteChars) { - e.colBuffer.WriteString("\\") - } - e.colBuffer.WriteString(c) - } - return e.colBuffer.String() - } -}*/ - func (d *dumper) worker() { for e := range d.entriesChannel { select { diff --git a/internal/client/driver/mysql/dumper_test.go b/internal/client/driver/mysql/dumper_test.go index 2c3ef673b..ccc6a78a2 100644 --- a/internal/client/driver/mysql/dumper_test.go +++ b/internal/client/driver/mysql/dumper_test.go @@ -63,7 +63,7 @@ func Test_dumper_getRowsCount(t *testing.T) { func Test_dumpEntry_incrementCounter(t *testing.T) { tests := []struct { name string - e *dumpEntry + e *DumpEntry }{ // TODO: Add test cases. } @@ -78,7 +78,7 @@ func Test_dumper_getDumpEntries(t *testing.T) { tests := []struct { name string d *dumper - want []*dumpEntry + want []*DumpEntry wantErr bool }{ // TODO: Add test cases. @@ -99,7 +99,7 @@ func Test_dumper_getDumpEntries(t *testing.T) { func Test_dumper_getChunkData(t *testing.T) { type args struct { - e *dumpEntry + e *DumpEntry } tests := []struct { name string @@ -118,27 +118,6 @@ func Test_dumper_getChunkData(t *testing.T) { } } -func Test_dumpEntry_escape(t *testing.T) { - type args struct { - colValue string - } - tests := []struct { - name string - e *dumpEntry - args args - want string - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.e.escape(tt.args.colValue); got != tt.want { - t.Errorf("dumpEntry.escape() = %v, want %v", got, tt.want) - } - }) - } -} - func Test_dumper_worker(t *testing.T) { tests := []struct { name string diff --git a/internal/client/driver/mysql/extractor.go b/internal/client/driver/mysql/extractor.go index 6db4d4622..436cccad3 100644 --- a/internal/client/driver/mysql/extractor.go +++ b/internal/client/driver/mysql/extractor.go @@ -34,6 +34,7 @@ import ( "udup/internal/config/mysql" log "udup/internal/logger" "udup/internal/models" + "udup/utils" ) const ( @@ -1119,7 +1120,7 @@ func (e *Extractor) mysqlDump() error { } } } - entry := &dumpEntry{ + entry := &DumpEntry{ SystemVariablesStatement: setSystemVariablesStatement, SqlMode: setSqlMode, DbSQL: dbSQL, @@ -1143,7 +1144,7 @@ func (e *Extractor) mysqlDump() error { dbSQL = fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", db.TableSchema) } } - entry := &dumpEntry{ + entry := &DumpEntry{ SystemVariablesStatement: setSystemVariablesStatement, SqlMode: setSqlMode, DbSQL: dbSQL, @@ -1166,7 +1167,7 @@ func (e *Extractor) mysqlDump() error { // ------ // Dump all of the tables and generate source records ... e.logger.Printf("mysql.extractor: Step %d: scanning contents of %d tables", step, e.tableCount) - startScan := currentTimeMillis() + startScan := utils.CurrentTimeMillis() counter := 0 //pool := models.NewPool(10) for _, db := range e.replicateDoDb { @@ -1215,14 +1216,14 @@ func (e *Extractor) mysqlDump() error { // We've copied all of the tables, but our buffer holds onto the very last record. // First mark the snapshot as complete and then apply the updated offset to the buffered record ... - stop := currentTimeMillis() + stop := utils.CurrentTimeMillis() e.logger.Printf("mysql.extractor: Step %d: scanned %d rows in %d tables in %s", step, e.mysqlContext.TotalRowsCopied, e.tableCount, time.Duration(stop-startScan)) step++ return nil } -func (e *Extractor) encodeDumpEntry(entry *dumpEntry) error { +func (e *Extractor) encodeDumpEntry(entry *DumpEntry) error { txMsg, err := Encode(entry) if err != nil { return err @@ -1234,10 +1235,6 @@ func (e *Extractor) encodeDumpEntry(entry *dumpEntry) error { return nil } -func currentTimeMillis() int64 { - return time.Now().UnixNano() / 1000000 -} - func (e *Extractor) Stats() (*models.TaskStatistics, error) { totalRowsCopied := e.mysqlContext.GetTotalRowsCopied() rowsEstimate := atomic.LoadInt64(&e.mysqlContext.RowsEstimate) @@ -1394,7 +1391,7 @@ func (e *Extractor) Shutdown() error { return nil } -func (e *Extractor) kafkaTransformSnapshotData(table *config.Table, value *dumpEntry) error { +func (e *Extractor) kafkaTransformSnapshotData(table *config.Table, value *DumpEntry) error { var err error tableIdent := fmt.Sprintf("%v.%v.%v", e.kafkaMgr.Cfg.Topic, table.TableSchema, table.TableName) @@ -1416,7 +1413,7 @@ func (e *Extractor) kafkaTransformSnapshotData(table *config.Table, value *dumpE valuePayload.Source.Db = table.TableSchema valuePayload.Source.Table = table.TableName valuePayload.Op = kafka2.RECORD_OP_INSERT - valuePayload.TsMs = currentTimeMillis() + valuePayload.TsMs = utils.CurrentTimeMillis() valuePayload.Before = nil valuePayload.After = kafka2.NewRow() @@ -1583,7 +1580,7 @@ func (e *Extractor) kafkaTransformDMLEventQuery(dmlEvent *binlog.BinlogEntry) (e valuePayload.Source.Db = dataEvent.DatabaseName valuePayload.Source.Table = dataEvent.TableName valuePayload.Op = op - valuePayload.TsMs = currentTimeMillis() + valuePayload.TsMs = utils.CurrentTimeMillis() valueSchema := kafka2.NewEnvelopeSchema(tableIdent, colDefs) diff --git a/internal/client/driver/mysql/extractor_test.go b/internal/client/driver/mysql/extractor_test.go index d80be092f..c7cdca574 100644 --- a/internal/client/driver/mysql/extractor_test.go +++ b/internal/client/driver/mysql/extractor_test.go @@ -414,7 +414,7 @@ func TestExtractor_mysqlDump(t *testing.T) { func TestExtractor_encodeDumpEntry(t *testing.T) { type args struct { - entry *dumpEntry + entry *DumpEntry } tests := []struct { name string diff --git a/utils/utils.go b/utils/utils.go index f9f0502d1..049f3ed1d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -6,6 +6,8 @@ package utils +import "time" + // Return a substring of limited lenth. func StrLim(s string, lim int) string { if lim < len(s) { @@ -23,3 +25,7 @@ func StringElse(s1 string, s2 string) string { return s2 } } + +func CurrentTimeMillis() int64 { + return time.Now().UnixNano() / 1000000 +}