diff --git a/cmd/main.go b/cmd/main.go index 7f43d34..02ac860 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -94,7 +94,7 @@ func main() { panic(err) } // adjust batch size according to source db table - cfgCopy.BatchSize = src.AdjustBatchSizeAccordingToSourceDbTable() + cfgCopy.BatchSize = int64(src.AdjustBatchSizeAccordingToSourceDbTable()) w := worker.NewWorker(&cfgCopy, fmt.Sprintf("%s.%s", db, table), ig, src) w.Run(ctx) } diff --git a/config/conf_test.json b/config/conf_test.json index 550f1b0..434eabd 100644 --- a/config/conf_test.json +++ b/config/conf_test.json @@ -4,17 +4,17 @@ "sourceUser": "root", "sourcePass": "123456", "sourceDB": "mydb", - "sourceTable": "t1", - "sourceQuery": "select * from mydb.t1", + "sourceTable": "test_table", + "sourceQuery": "select * from mydb.test_table", "sourceWhereCondition": "id > 0", "sourceSplitKey": "id", "sourceSplitTimeKey": "", "timeSplitUnit": "minute", - "databendDSN": "http://databend:databend@localhost:8000", - "databendTable": "testSync.t1", - "batchSize": 2, + "databendDSN": "http://databend:databend@localhost:8009", + "databendTable": "testSync.test_table", + "batchSize": 20000, "batchMaxInterval": 30, "userStage": "~", - "deleteAfterSync": false, + "deleteAfterSync": true, "maxThread": 10 } diff --git a/source/mysql.go b/source/mysql.go index a8dd4fd..abe8fe8 100644 --- a/source/mysql.go +++ b/source/mysql.go @@ -2,9 +2,11 @@ package source import ( "database/sql" + "database/sql/driver" "fmt" "log" "regexp" + "strconv" "strings" "time" @@ -40,25 +42,25 @@ func NewMysqlSource(cfg *config.Config) (*MysqlSource, error) { // AdjustBatchSizeAccordingToSourceDbTable has a concept called s, s = (maxKey - minKey) / sourceTableRowCount // if s == 1 it means the data is uniform in the table, if s is much bigger than 1, it means the data is not uniform in the table -func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() int64 { +func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() uint64 { minSplitKey, maxSplitKey, err := s.GetMinMaxSplitKey() if err != nil { - return s.cfg.BatchSize + return uint64(s.cfg.BatchSize) } sourceTableRowCount, err := s.GetSourceReadRowsCount() if err != nil { - return s.cfg.BatchSize + return uint64(s.cfg.BatchSize) } rangeSize := maxSplitKey - minSplitKey + 1 switch { case int64(sourceTableRowCount) <= s.cfg.BatchSize: return rangeSize - case rangeSize/int64(sourceTableRowCount) >= 10: - return s.cfg.BatchSize * 5 - case rangeSize/int64(sourceTableRowCount) >= 100: - return s.cfg.BatchSize * 20 + case rangeSize/uint64(sourceTableRowCount) >= 10: + return uint64(s.cfg.BatchSize * 5) + case rangeSize/uint64(sourceTableRowCount) >= 100: + return uint64(s.cfg.BatchSize * 20) default: - return s.cfg.BatchSize + return uint64(s.cfg.BatchSize) } } @@ -74,15 +76,18 @@ func (s *MysqlSource) GetSourceReadRowsCount() (int, error) { return rowCount, nil } -func (s *MysqlSource) GetMinMaxSplitKey() (int64, int64, error) { - rows, err := s.db.Query(fmt.Sprintf("select min(%s), max(%s) from %s.%s WHERE %s", s.cfg.SourceSplitKey, - s.cfg.SourceSplitKey, s.cfg.SourceDB, s.cfg.SourceTable, s.cfg.SourceWhereCondition)) +func (s *MysqlSource) GetMinMaxSplitKey() (uint64, uint64, error) { + query := fmt.Sprintf("SELECT MIN(%s), MAX(%s) FROM %s.%s WHERE %s", + s.cfg.SourceSplitKey, s.cfg.SourceSplitKey, + s.cfg.SourceDB, s.cfg.SourceTable, s.cfg.SourceWhereCondition) + + rows, err := s.db.Query(query) if err != nil { return 0, 0, err } defer rows.Close() - var minSplitKey, maxSplitKey sql.NullInt64 + var minSplitKey, maxSplitKey interface{} for rows.Next() { err = rows.Scan(&minSplitKey, &maxSplitKey) if err != nil { @@ -90,12 +95,22 @@ func (s *MysqlSource) GetMinMaxSplitKey() (int64, int64, error) { } } - // Check if minSplitKey and maxSplitKey are valid (not NULL) - if !minSplitKey.Valid || !maxSplitKey.Valid { + // 处理 NULL 值 + if minSplitKey == nil || maxSplitKey == nil { return 0, 0, nil } - return minSplitKey.Int64, maxSplitKey.Int64, nil + min64, err := toUint64(minSplitKey) + if err != nil { + return 0, 0, fmt.Errorf("failed to convert min value: %w", err) + } + + max64, err := toUint64(maxSplitKey) + if err != nil { + return 0, 0, fmt.Errorf("failed to convert max value: %w", err) + } + + return min64, max64, nil } func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error) { @@ -117,6 +132,7 @@ func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error) { } func (s *MysqlSource) DeleteAfterSync() error { + logrus.Infof("DeleteAfterSync: %v", s.cfg.DeleteAfterSync) if !s.cfg.DeleteAfterSync { return nil } @@ -126,6 +142,8 @@ func (s *MysqlSource) DeleteAfterSync() error { return err } + logrus.Infof("dbTables: %v", dbTables) + for db, tables := range dbTables { for _, table := range tables { count, err := s.GetSourceReadRowsCount() @@ -188,7 +206,9 @@ func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]in switch columnType.DatabaseTypeName() { case "INT", "SMALLINT", "TINYINT", "MEDIUMINT", "BIGINT": scanArgs[i] = new(sql.NullInt64) - case "UNSIGNED INT", "UNSIGNED TINYINT", "UNSIGNED MEDIUMINT", "UNSIGNED BIGINT": + case "UNSIGNED BIGINT": + scanArgs[i] = new(NullUint64) + case "UNSIGNED INT", "UNSIGNED TINYINT", "UNSIGNED MEDIUMINT": scanArgs[i] = new(sql.NullInt64) case "FLOAT", "DOUBLE": scanArgs[i] = new(sql.NullFloat64) @@ -244,6 +264,12 @@ func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]in } else { row[i] = nil } + case *NullUint64: + if v.Valid { + row[i] = v.Uint64 + } else { + row[i] = nil + } case *sql.NullBool: if v.Valid { row[i] = v.Bool @@ -375,5 +401,58 @@ func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]strin allDbTables[db] = append(allDbTables[db], tables...) } } + if s.cfg.SourceDB != "" && s.cfg.SourceTable != "" { + allDbTables[s.cfg.SourceDB] = append(allDbTables[s.cfg.SourceDB], s.cfg.SourceTable) + } return allDbTables, nil } + +// NullUint64 represents a uint64 that may be null. +type NullUint64 struct { + Uint64 uint64 + Valid bool // Valid is true if Uint64 is not NULL +} + +// Scan implements the Scanner interface. +func (n *NullUint64) Scan(value interface{}) error { + if value == nil { + n.Uint64, n.Valid = 0, false + return nil + } + + n.Valid = true + switch v := value.(type) { + case uint64: + n.Uint64 = v + case int64: + if v < 0 { + // 处理溢出的情况 + n.Uint64 = uint64(v) + } else { + n.Uint64 = uint64(v) + } + case []byte: + var err error + n.Uint64, err = strconv.ParseUint(string(v), 10, 64) + if err != nil { + return err + } + case string: + var err error + n.Uint64, err = strconv.ParseUint(v, 10, 64) + if err != nil { + return err + } + default: + return fmt.Errorf("cannot scan type %T into NullUint64", value) + } + return nil +} + +// Value implements the driver Valuer interface. +func (n NullUint64) Value() (driver.Value, error) { + if !n.Valid { + return nil, nil + } + return n.Uint64, nil +} diff --git a/source/oracle.go b/source/oracle.go index a0bc5c6..a94bc82 100644 --- a/source/oracle.go +++ b/source/oracle.go @@ -23,25 +23,25 @@ type OracleSource struct { statsRecorder *DatabendSourceStatsRecorder } -func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() int64 { +func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() uint64 { minSplitKey, maxSplitKey, err := p.GetMinMaxSplitKey() if err != nil { - return p.cfg.BatchSize + return uint64(p.cfg.BatchSize) } sourceTableRowCount, err := p.GetSourceReadRowsCount() if err != nil { - return p.cfg.BatchSize + return uint64(p.cfg.BatchSize) } rangeSize := maxSplitKey - minSplitKey + 1 switch { case int64(sourceTableRowCount) <= p.cfg.BatchSize: return rangeSize - case rangeSize/int64(sourceTableRowCount) >= 10: - return p.cfg.BatchSize * 5 - case rangeSize/int64(sourceTableRowCount) >= 100: - return p.cfg.BatchSize * 20 + case rangeSize/uint64(sourceTableRowCount) >= 10: + return uint64(p.cfg.BatchSize * 5) + case rangeSize/uint64(sourceTableRowCount) >= 100: + return uint64(p.cfg.BatchSize * 20) default: - return p.cfg.BatchSize + return uint64(p.cfg.BatchSize) } } @@ -111,19 +111,23 @@ func (p *OracleSource) GetSourceReadRowsCount() (int, error) { return rowCount, nil } -func (p *OracleSource) GetMinMaxSplitKey() (int64, int64, error) { +func (p *OracleSource) GetMinMaxSplitKey() (uint64, uint64, error) { err := p.SwitchDatabase() if err != nil { return 0, 0, err } - rows, err := p.db.Query(fmt.Sprintf("select COALESCE(min(%s),0), COALESCE(max(%s),0) from %s.%s WHERE %s", - p.cfg.SourceSplitKey, p.cfg.SourceSplitKey, p.cfg.SourceDB, p.cfg.SourceTable, p.cfg.SourceWhereCondition)) + + query := fmt.Sprintf("SELECT COALESCE(MIN(%s), 0), COALESCE(MAX(%s), 0) FROM %s.%s WHERE %s", + p.cfg.SourceSplitKey, p.cfg.SourceSplitKey, + p.cfg.SourceDB, p.cfg.SourceTable, p.cfg.SourceWhereCondition) + + rows, err := p.db.Query(query) if err != nil { return 0, 0, err } defer rows.Close() - var minSplitKey, maxSplitKey sql.NullInt64 + var minSplitKey, maxSplitKey interface{} for rows.Next() { err = rows.Scan(&minSplitKey, &maxSplitKey) if err != nil { @@ -131,12 +135,21 @@ func (p *OracleSource) GetMinMaxSplitKey() (int64, int64, error) { } } - // Check if minSplitKey and maxSplitKey are valid (not NULL) - if !minSplitKey.Valid || !maxSplitKey.Valid { + if minSplitKey == nil || maxSplitKey == nil { return 0, 0, nil } - return minSplitKey.Int64, maxSplitKey.Int64, nil + min64, err := toUint64(minSplitKey) + if err != nil { + return 0, 0, fmt.Errorf("failed to convert min value: %w", err) + } + + max64, err := toUint64(maxSplitKey) + if err != nil { + return 0, 0, fmt.Errorf("failed to convert max value: %w", err) + } + + return min64, max64, nil } func (p *OracleSource) GetMinMaxTimeSplitKey() (string, string, error) { diff --git a/source/postgres.go b/source/postgres.go index 3a80a55..5c1ce8d 100644 --- a/source/postgres.go +++ b/source/postgres.go @@ -20,25 +20,25 @@ type PostgresSource struct { statsRecorder *DatabendSourceStatsRecorder } -func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() int64 { +func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() uint64 { minSplitKey, maxSplitKey, err := p.GetMinMaxSplitKey() if err != nil { - return p.cfg.BatchSize + return uint64(p.cfg.BatchSize) } sourceTableRowCount, err := p.GetSourceReadRowsCount() if err != nil { - return p.cfg.BatchSize + return uint64(p.cfg.BatchSize) } rangeSize := maxSplitKey - minSplitKey + 1 switch { case int64(sourceTableRowCount) <= p.cfg.BatchSize: return rangeSize - case rangeSize/int64(sourceTableRowCount) >= 10: - return p.cfg.BatchSize * 5 - case rangeSize/int64(sourceTableRowCount) >= 100: - return p.cfg.BatchSize * 20 + case rangeSize/uint64(sourceTableRowCount) >= 10: + return uint64(p.cfg.BatchSize * 5) + case rangeSize/uint64(sourceTableRowCount) >= 100: + return uint64(p.cfg.BatchSize * 20) default: - return p.cfg.BatchSize + return uint64(p.cfg.BatchSize) } } @@ -107,19 +107,22 @@ func (p *PostgresSource) GetSourceReadRowsCount() (int, error) { return rowCount, nil } -func (p *PostgresSource) GetMinMaxSplitKey() (int64, int64, error) { +func (p *PostgresSource) GetMinMaxSplitKey() (uint64, uint64, error) { err := p.SwitchDatabase() if err != nil { return 0, 0, err } - rows, err := p.db.Query(fmt.Sprintf("select COALESCE(min(%s),0), COALESCE(max(%s),0) from %s WHERE %s", - p.cfg.SourceSplitKey, p.cfg.SourceSplitKey, p.cfg.SourceTable, p.cfg.SourceWhereCondition)) + + query := fmt.Sprintf("SELECT COALESCE(MIN(%s), 0), COALESCE(MAX(%s), 0) FROM %s WHERE %s", + p.cfg.SourceSplitKey, p.cfg.SourceSplitKey, p.cfg.SourceTable, p.cfg.SourceWhereCondition) + + rows, err := p.db.Query(query) if err != nil { return 0, 0, err } defer rows.Close() - var minSplitKey, maxSplitKey sql.NullInt64 + var minSplitKey, maxSplitKey interface{} for rows.Next() { err = rows.Scan(&minSplitKey, &maxSplitKey) if err != nil { @@ -127,12 +130,23 @@ func (p *PostgresSource) GetMinMaxSplitKey() (int64, int64, error) { } } - // Check if minSplitKey and maxSplitKey are valid (not NULL) - if !minSplitKey.Valid || !maxSplitKey.Valid { + // 处理 NULL 值(虽然用了 COALESCE,但保险起见还是检查) + if minSplitKey == nil || maxSplitKey == nil { return 0, 0, nil } - return minSplitKey.Int64, maxSplitKey.Int64, nil + // 转换为 uint64 + min64, err := toUint64(minSplitKey) + if err != nil { + return 0, 0, fmt.Errorf("failed to convert min value: %w", err) + } + + max64, err := toUint64(maxSplitKey) + if err != nil { + return 0, 0, fmt.Errorf("failed to convert max value: %w", err) + } + + return min64, max64, nil } func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error) { diff --git a/source/postgres_test.go b/source/postgres_test.go index 43b3f77..c0a9d58 100644 --- a/source/postgres_test.go +++ b/source/postgres_test.go @@ -139,8 +139,8 @@ func TestPostgresSource_GetMinMaxSplitKey(t *testing.T) { defer tearDownFunc() min, max, err := postgresSourceTest.postgresSource.GetMinMaxSplitKey() assert.NoError(t, err) - assert.Equal(t, int64(1), min) - assert.Equal(t, int64(2), max) + assert.Equal(t, uint64(1), min) + assert.Equal(t, uint64(2), max) } func TestPostgresSource_GetMinMaxTimeSplitKey(t *testing.T) { @@ -171,5 +171,5 @@ func TestPostgresSource_AdjustBatchSizeAccordingToSourceDbTable(t *testing.T) { postgresSourceTest, tearDownFunc := setupPostgresSourceTest() defer tearDownFunc() batchSize := postgresSourceTest.postgresSource.AdjustBatchSizeAccordingToSourceDbTable() - assert.Equal(t, int64(2), batchSize) + assert.Equal(t, uint64(2), batchSize) } diff --git a/source/source.go b/source/source.go index f15c49d..888d552 100644 --- a/source/source.go +++ b/source/source.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "os" + "strconv" "time" _ "github.com/denisenkom/go-mssqldb" @@ -15,9 +16,9 @@ import ( ) type Sourcer interface { - AdjustBatchSizeAccordingToSourceDbTable() int64 + AdjustBatchSizeAccordingToSourceDbTable() uint64 GetSourceReadRowsCount() (int, error) - GetMinMaxSplitKey() (int64, int64, error) + GetMinMaxSplitKey() (uint64, uint64, error) GetMinMaxTimeSplitKey() (string, string, error) DeleteAfterSync() error QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error) @@ -44,25 +45,25 @@ func NewSource(cfg *config.Config) (Sourcer, error) { } } -func SlimCondition(maxThread int, minSplitKey, maxSplitKey int64) [][]int64 { - var conditions [][]int64 +func SlimCondition(maxThread int, minSplitKey, maxSplitKey uint64) [][]uint64 { + var conditions [][]uint64 if minSplitKey > maxSplitKey { return conditions } - rangeSize := (maxSplitKey - minSplitKey) / int64(maxThread) + rangeSize := (maxSplitKey - minSplitKey) / uint64(maxThread) for i := 0; i < maxThread; i++ { - lowerBound := minSplitKey + rangeSize*int64(i) + lowerBound := minSplitKey + rangeSize*uint64(i) upperBound := lowerBound + rangeSize if i == maxThread-1 { // Ensure the last condition includes maxSplitKey upperBound = maxSplitKey } - conditions = append(conditions, []int64{lowerBound, upperBound}) + conditions = append(conditions, []uint64{lowerBound, upperBound}) } return conditions } -func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey int64) []string { +func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey uint64) []string { var conditions []string for { if minSplitKey >= maxSplitKey { @@ -75,7 +76,7 @@ func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey i return conditions } -func SplitConditionAccordingMaxGoRoutine(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey, allMax int64) <-chan string { +func SplitConditionAccordingMaxGoRoutine(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey, allMax uint64) <-chan string { conditions := make(chan string, 100) // make a buffered channel go func() { @@ -233,3 +234,26 @@ func parseTimeDynamic(timeStr string) (time.Time, error) { return time.Time{}, fmt.Errorf("failed to parse time: %v", err) } + +func toUint64(val interface{}) (uint64, error) { + switch v := val.(type) { + case uint64: + return v, nil + case int64: + if v < 0 { + return 0, fmt.Errorf("negative value: %d", v) + } + return uint64(v), nil + case []byte: + return strconv.ParseUint(string(v), 10, 64) + case string: + return strconv.ParseUint(v, 10, 64) + case float64: + if v < 0 { + return 0, fmt.Errorf("negative float value: %f", v) + } + return uint64(v), nil + default: + return 0, fmt.Errorf("unexpected type: %T", val) + } +} diff --git a/source/sql_server.go b/source/sql_server.go index 3be9e47..02bc6b9 100644 --- a/source/sql_server.go +++ b/source/sql_server.go @@ -63,7 +63,7 @@ func (s *SQLServerSource) GetSourceReadRowsCount() (int, error) { return rowCount, nil } -func (s *SQLServerSource) GetMinMaxSplitKey() (int64, int64, error) { +func (s *SQLServerSource) GetMinMaxSplitKey() (uint64, uint64, error) { tableName := s.cfg.SourceTable if !strings.Contains(tableName, ".") { tableName = "dbo." + tableName @@ -85,7 +85,7 @@ func (s *SQLServerSource) GetMinMaxSplitKey() (int64, int64, error) { } defer rows.Close() - var minSplitKey, maxSplitKey sql.NullInt64 + var minSplitKey, maxSplitKey interface{} for rows.Next() { err = rows.Scan(&minSplitKey, &maxSplitKey) if err != nil { @@ -97,32 +97,42 @@ func (s *SQLServerSource) GetMinMaxSplitKey() (int64, int64, error) { return 0, 0, err } - if !minSplitKey.Valid || !maxSplitKey.Valid { + if minSplitKey == nil || maxSplitKey == nil { return 0, 0, nil } - return minSplitKey.Int64, maxSplitKey.Int64, nil + min64, err := toUint64(minSplitKey) + if err != nil { + return 0, 0, fmt.Errorf("failed to convert min value: %w", err) + } + + max64, err := toUint64(maxSplitKey) + if err != nil { + return 0, 0, fmt.Errorf("failed to convert max value: %w", err) + } + + return min64, max64, nil } -func (s *SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable() int64 { +func (s *SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable() uint64 { minSplitKey, maxSplitKey, err := s.GetMinMaxSplitKey() if err != nil { - return s.cfg.BatchSize + return uint64(s.cfg.BatchSize) } sourceTableRowCount, err := s.GetSourceReadRowsCount() if err != nil { - return s.cfg.BatchSize + return uint64(s.cfg.BatchSize) } rangeSize := maxSplitKey - minSplitKey + 1 switch { case int64(sourceTableRowCount) <= s.cfg.BatchSize: return rangeSize - case rangeSize/int64(sourceTableRowCount) >= 10: - return s.cfg.BatchSize * 5 - case rangeSize/int64(sourceTableRowCount) >= 100: - return s.cfg.BatchSize * 20 + case rangeSize/uint64(sourceTableRowCount) >= 10: + return uint64(s.cfg.BatchSize * 5) + case rangeSize/uint64(sourceTableRowCount) >= 100: + return uint64(s.cfg.BatchSize * 20) default: - return s.cfg.BatchSize + return uint64(s.cfg.BatchSize) } } diff --git a/worker/worker.go b/worker/worker.go index c726675..fb38717 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -79,8 +79,8 @@ func calculateBytesSize(batch [][]interface{}) int { } // IsSplitAccordingMaxGoRoutine checks if the split key is according to the max go routine -func (w *Worker) IsSplitAccordingMaxGoRoutine(minSplitKey, maxSplitKey, batchSize int64) bool { - return (maxSplitKey-minSplitKey)/batchSize > int64(w.Cfg.MaxThread) +func (w *Worker) IsSplitAccordingMaxGoRoutine(minSplitKey, maxSplitKey, batchSize uint64) bool { + return (maxSplitKey-minSplitKey)/batchSize > uint64(w.Cfg.MaxThread) } func (w *Worker) stepBatch() error { @@ -95,7 +95,7 @@ func (w *Worker) stepBatch() error { } logrus.Infof("db.table is %s.%s, minSplitKey: %d, maxSplitKey : %d", w.Cfg.SourceDB, w.Cfg.SourceTable, minSplitKey, maxSplitKey) - if w.IsSplitAccordingMaxGoRoutine(minSplitKey, maxSplitKey, w.Cfg.BatchSize) { + if w.IsSplitAccordingMaxGoRoutine(minSplitKey, maxSplitKey, uint64(w.Cfg.BatchSize)) { fmt.Println("split according maxGoRoutine", w.Cfg.MaxThread) slimedRange := source.SlimCondition(w.Cfg.MaxThread, minSplitKey, maxSplitKey) fmt.Println("slimedRange", slimedRange) @@ -103,7 +103,7 @@ func (w *Worker) stepBatch() error { for i := 0; i < w.Cfg.MaxThread; i++ { go func(idx int) { defer wg.Done() - conditions := source.SplitConditionAccordingMaxGoRoutine(w.Cfg.SourceSplitKey, w.Cfg.BatchSize, slimedRange[idx][0], slimedRange[idx][1], maxSplitKey) + conditions := source.SplitConditionAccordingMaxGoRoutine(w.Cfg.SourceSplitKey, uint64(w.Cfg.BatchSize), slimedRange[idx][0], slimedRange[idx][1], maxSplitKey) logrus.Infof("conditions in one routine: %v", len(conditions)) if err != nil { logrus.Errorf("stepBatchWithCondition failed: %v", err) @@ -120,7 +120,7 @@ func (w *Worker) stepBatch() error { wg.Wait() return nil } - conditions := source.SplitCondition(w.Cfg.SourceSplitKey, w.Cfg.BatchSize, minSplitKey, maxSplitKey) + conditions := source.SplitCondition(w.Cfg.SourceSplitKey, uint64(w.Cfg.BatchSize), minSplitKey, maxSplitKey) for _, condition := range conditions { wg.Add(1) go func(condition string) {