Skip to content
Merged
11 changes: 5 additions & 6 deletions pkg/applier/sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,8 @@ func (a *ShardedApplier) writeChunklet(ctx context.Context, shard *shardTarget,
// Build the INSERT statement
// Note: We use just the table name, not the fully qualified name, because
// the database connection (shard.writeDB) already determines which database to write to
tableName := fmt.Sprintf("`%s`", chunkletData.chunk.NewTable.TableName)
query := fmt.Sprintf("INSERT IGNORE INTO %s (%s) VALUES %s",
tableName,
chunkletData.chunk.NewTable.QuotedTableName,
columnList,
strings.Join(valuesClauses, ", "),
)
Expand Down Expand Up @@ -582,8 +581,8 @@ func (a *ShardedApplier) DeleteKeys(ctx context.Context, sourceTable, _ *table.T
// Build DELETE statement
// Use just the table name, not the fully qualified name, because
// the database connection (shard.writeDB) already determines which database to write to
deleteStmt := fmt.Sprintf("DELETE FROM `%s` WHERE (%s) IN (%s)",
sourceTable.TableName,
deleteStmt := fmt.Sprintf("DELETE FROM %s WHERE (%s) IN (%s)",
sourceTable.QuotedTableName,
table.QuoteColumns(sourceTable.KeyColumns),
strings.Join(pkValues, ","),
)
Expand Down Expand Up @@ -775,8 +774,8 @@ func (a *ShardedApplier) UpsertRows(ctx context.Context, sourceTable, _ *table.T

// Use just the table name, not the fully qualified name, because
// the database connection (shard.writeDB) already determines which database to write to
upsertStmt := fmt.Sprintf("INSERT INTO `%s` (%s) VALUES %s AS new ON DUPLICATE KEY UPDATE %s",
sourceTable.TableName,
upsertStmt := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s AS new ON DUPLICATE KEY UPDATE %s",
sourceTable.QuotedTableName,
columnList,
strings.Join(valuesClauses, ", "),
strings.Join(updateClauses, ", "),
Expand Down
6 changes: 3 additions & 3 deletions pkg/applier/single_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (a *SingleTargetApplier) writeChunklet(ctx context.Context, chunkletData ch

// Build the INSERT statement
query := fmt.Sprintf("INSERT IGNORE INTO %s (%s) VALUES %s",
chunkletData.chunk.NewTable.TableName,
chunkletData.chunk.NewTable.QuotedTableName,
columnList,
strings.Join(valuesClauses, ", "),
)
Expand Down Expand Up @@ -428,7 +428,7 @@ func (a *SingleTargetApplier) DeleteKeys(ctx context.Context, sourceTable, targe

// Build DELETE statement
deleteStmt := fmt.Sprintf("DELETE FROM %s WHERE (%s) IN (%s)",
targetTable.TableName,
targetTable.QuotedTableName,
table.QuoteColumns(sourceTable.KeyColumns),
strings.Join(pkValues, ","),
)
Expand Down Expand Up @@ -527,7 +527,7 @@ func (a *SingleTargetApplier) UpsertRows(ctx context.Context, sourceTable, targe
}

upsertStmt := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s AS new ON DUPLICATE KEY UPDATE %s",
targetTable.TableName,
targetTable.QuotedTableName,
columnList,
strings.Join(valuesClauses, ", "),
strings.Join(updateClauses, ", "),
Expand Down
20 changes: 13 additions & 7 deletions pkg/checksum/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,15 @@ func NewCheckerDefaultConfig() *CheckerConfig {
}

// NewChecker creates a new checksum object.
func NewChecker(db *sql.DB, chunker table.Chunker, feed *repl.Client, config *CheckerConfig) (Checker, error) {
if feed == nil {
return nil, errors.New("feed must be non-nil")
// sourceDBs contains the source database connections (one for single-source migrations,
// multiple for N:M moves). The distributed checker aggregates checksums across all sources.
// The single checker uses sourceDBs[0].
func NewChecker(sourceDBs []*sql.DB, chunker table.Chunker, feeds []*repl.Client, config *CheckerConfig) (Checker, error) {
if len(sourceDBs) == 0 {
return nil, errors.New("at least one source database must be provided")
}
if len(feeds) == 0 {
return nil, errors.New("at least one feed must be provided")
}
Comment thread
morgo marked this conversation as resolved.
Comment on lines +55 to 61
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewChecker only checks that sourceDBs/feeds slices are non-empty. If a caller passes a slice containing nil entries (e.g. []*sql.DB{nil} or []*repl.Client{nil}), the checker will later panic when calling methods on those elements (feed.Flush, dbconn.NewTableLock, etc.). Add validation that each element in sourceDBs and feeds is non-nil and return a clear error.

Copilot uses AI. Check for mistakes.
if chunker == nil {
return nil, errors.New("chunker must be non-nil")
Expand All @@ -65,8 +71,8 @@ func NewChecker(db *sql.DB, chunker table.Chunker, feed *repl.Client, config *Ch
if config.Applier != nil {
return &DistributedChecker{
concurrency: config.Concurrency,
db: db,
feed: feed,
sourceDBs: sourceDBs,
feeds: feeds,
chunker: chunker,
dbConfig: config.DBConfig,
logger: config.Logger,
Expand All @@ -77,8 +83,8 @@ func NewChecker(db *sql.DB, chunker table.Chunker, feed *repl.Client, config *Ch
}
return &SingleChecker{
concurrency: config.Concurrency,
db: db,
feed: feed,
db: sourceDBs[0],
feed: feeds[0],
chunker: chunker,
dbConfig: config.DBConfig,
logger: config.Logger,
Expand Down
Loading
Loading