Checksum N:M support#680
Conversation
Changes to the checksum package to support N:M resharding operations
where data from multiple source databases must be checksummed against
multiple target databases.
Key changes:
- NewChecker signature: first parameter changed from db *sql.DB to
sourceDBs []*sql.DB. Single-source callers pass []*sql.DB{db}.
The feeds parameter also changed from *repl.Client to []*repl.Client.
- DistributedChecker.ChecksumChunk: now aggregates BIT_XOR checksums
and sums row counts across ALL sources (not just one), matching the
existing target-side aggregation. This ensures the merged source data
matches the merged target data regardless of how rows are partitioned.
- DistributedChecker.replaceChunk: reads from all source databases when
recopying a mismatched chunk, then delegates to the applier which
handles routing rows to the correct target shard.
- DistributedChecker.initConnPool: creates per-source table locks and
transaction pools using the explicitly provided sourceDBs slice,
rather than inferring sources from chunker tables (which also contain
target tables in the single-source migration case).
- Source pools keyed by *sql.DB pointer (via sourcePool struct slice)
rather than schema name, since multiple sources may share the same
database name in Vitess resharding scenarios.
- TableInfo.DB() accessor added to expose the database connection,
used by the buffered copier and distributed checker to read from
the correct source per chunk.
- New test TestDistributedChecksumNtoM: validates 2-source, 2-target
checksum where sources contain rows 1-4 and 5-8 respectively, and
targets contain even and odd rows respectively.
When tests construct a Migration struct without setting LockWaitTimeout, it defaults to 0s. Previously this would set lock_wait_timeout=0 on the MySQL connection, which MySQL rejects with Error 1231. Now we only override the DBConfig default (30s) when the migration explicitly sets a non-zero value.
There was a problem hiding this comment.
Pull request overview
This PR updates the checksum subsystem to support N:M scenarios by allowing the checker to operate over multiple source databases and multiple replication feeds, aggregating per-chunk checksums across sources and targets.
Changes:
- Updated
checksum.NewCheckerAPI to accept slices of source DBs and feeds; updated migration/move runners and checksum tests accordingly. - Extended
DistributedCheckerto query/aggregate checksums across multiple source DBs, and to recopy chunks by reading rows from all sources. - Added an N:M distributed checksum test case and introduced a
TableInfo.DB()accessor for components that need the originating DB.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/table/tableinfo.go | Adds DB() accessor to expose the TableInfo-associated connection. |
| pkg/move/runner.go | Adapts checksum checker construction to new multi-source/multi-feed API. |
| pkg/migration/runner.go | Avoids invalid lock_wait_timeout=0 and adapts checker construction to new API. |
| pkg/checksum/checksum.go | Changes NewChecker signature to accept multiple source DBs and feeds. |
| pkg/checksum/distributed.go | Implements multi-source aggregation and multi-source recopy logic in distributed checker. |
| pkg/checksum/single_test.go | Updates single-checker tests to use new NewChecker signature and validations. |
| pkg/checksum/distributed_test.go | Updates existing distributed test and adds an N:M distributed checksum test. |
Comments suppressed due to low confidence (1)
pkg/checksum/checksum.go:67
- NewChecker now only validates slice length, but it can still panic if config is nil (config.DBConfig dereference) or if the slice contains nil entries (e.g., feeds[0] == nil / sourceDBs[0] == nil). Consider adding explicit nil checks for config, and for each element in sourceDBs/feeds (and optionally enforce a consistent mapping like len(sourceDBs)==len(feeds) for the distributed case) so this fails with a clear error instead of panicking later.
func NewChecker(sourceDBs []*sql.DB, chunker table.Chunker, feeds []*repl.Client, config *CheckerConfig) (Checker, error) {
if len(sourceDBs) == 0 {
return nil, errors.New("at least one source database must be provided")
}
if len(feeds) == 0 {
return nil, errors.New("at least one feed must be provided")
}
if chunker == nil {
return nil, errors.New("chunker must be non-nil")
}
if config.DBConfig == nil {
return nil, errors.New("dbconfig must be non-nil")
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- initConnPool: construct per-DB TableInfo objects with the correct SchemaName for each source/target connection, so force-kill's performance_schema queries match the right schema. Added schemaForDB() helper and tableInfosForDB() closure. - replaceChunk: use table.QuoteColumns() for column list to handle reserved words like 'values'. - MultiChunker: key by SchemaName.TableName instead of just TableName, so N:M moves with the same table name from different source schemas don't collide.
Now that QuotedName no longer includes the schema prefix, it is safe to use everywhere SQL references a table. This fixes several places that were manually wrapping TableName in backticks, which would fail for reserved words used as table names. Changed in: - pkg/checksum/distributed.go: checksum queries, delete, and recopy select - pkg/applier/sharded.go: upsert statement - pkg/applier/single_target.go: insert and upsert statements - pkg/move/runner.go: SHOW CREATE TABLE queries - pkg/move/cutover.go: RENAME TABLE old name - pkg/migration/cutover.go: RENAME TABLE old name (both algorithms)
QuotedName included the schema prefix (`schema`.`table`) which breaks when a TableInfo is used with a connection scoped to a different database. Since all connections are already scoped to the correct database via DSN, the schema prefix is unnecessary and harmful for N:M moves. Replace QuotedName with QuotedTableName (`table`) throughout: - pkg/repl/subscription_map.go, subscription_queue.go - pkg/table/tableinfo.go, chunker_composite.go, chunker_optimistic.go - pkg/table/asserty/asserty.go - pkg/copier/buffered.go, unbuffered.go - pkg/checksum/single.go, distributed.go - pkg/migration/cutover.go (oldQuotedName still uses SchemaName for RENAME TABLE which accepts fully qualified names) - pkg/move/cutover.go - pkg/move/runner.go - All affected test files
# Conflicts: # pkg/checksum/distributed.go # pkg/checksum/distributed_test.go # pkg/migration/runner_resume_test.go # pkg/migration/runner_test.go # pkg/table/chunker_optimistic_test.go # pkg/table/datum_test.go # pkg/table/tableinfo_test.go # pkg/testutils/testing.go
6a0379b to
8a0b716
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The multi-chunker map was keyed by positional index, which made checkpoint watermarks fragile — if tables were added or removed between runs, indices would shift and watermarks would map to the wrong chunkers. Instead, key by TableInfo.QualifiedName() which returns: - host.schema.table when Host is set (for N:M moves with same-named schemas) - schema.table otherwise (stable across restarts for single-host operations) This is both unique (disambiguates same table name on different servers) and stable (survives table additions/removals between checkpoint and resume). Also sets Host on TableInfo in the move runner so the infrastructure is ready for N:M move operations.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func NewChecker(sourceDBs []*sql.DB, chunker table.Chunker, feeds []*repl.Client, config *CheckerConfig) (Checker, error) { | ||
| if len(sourceDBs) == 0 { | ||
| return nil, errors.New("at least one source database must be provided") | ||
| } | ||
| if len(feeds) == 0 { | ||
| return nil, errors.New("at least one feed must be provided") | ||
| } |
There was a problem hiding this comment.
NewChecker only checks that sourceDBs/feeds slices are non-empty. If a caller passes a slice containing nil entries (e.g. []*sql.DB{nil} or []*repl.Client{nil}), the checker will later panic when calling methods on those elements (feed.Flush, dbconn.NewTableLock, etc.). Add validation that each element in sourceDBs and feeds is non-nil and return a clear error.
| for i := range c.sourcePools { | ||
| srcTrx, err := c.sourcePools[i].trxPool.Get() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get transaction for source %d: %w", i, err) | ||
| } | ||
| defer c.sourcePools[i].trxPool.Put(srcTrx) | ||
|
|
There was a problem hiding this comment.
ChecksumChunk defers trxPool.Put(srcTrx) inside the per-source loop. This holds one transaction open per source until the entire chunk check finishes (including all target queries), which increases connection/transaction pressure and can prolong metadata locks. Prefer returning the transaction to the pool immediately after the query/Scan completes (avoid defer inside loops).
| for i, targetTrxPool := range c.targetTrxPools { | ||
| targetTrx, err := targetTrxPool.Get() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get transaction for target %d: %w", i, err) | ||
| } | ||
| defer targetTrxPool.Put(targetTrx) | ||
|
|
There was a problem hiding this comment.
ChecksumChunk defers targetTrxPool.Put(targetTrx) inside the per-target loop. As with sources, this keeps one transaction checked out per target until the function returns, which can unnecessarily tie up pool connections. Return the trx to the pool immediately after the query/Scan instead of deferring inside the loop.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Summary
This changes the
DistributedChecker(and its supporting interfaces) to support N:M checksums — verifying data consistency when multiple source databases are resharded into multiple target databases.Related issue: Part of the N:M move operations work. This PR scopes the change to mostly the checksum layer.
This does break the checkpoint format: The multi-chunker now fully qualifies tables by
schema.table, orhost.schema.tablein move contexts. We don't promise checkpoint compatibility between versions, and the default behavior is to resume optimistically (i.e. unless --strict is set, it is not blocking).Motivation
The existing
DistributedCheckerassumed a single source database and a single replication feed. For N:M moves (e.g., resharding 2 source databases into 2 target databases), the checksum needs to aggregate across all sources and all targets independently, then compare the aggregates.Design
Happy path (no mismatches)
The checksum operates at the aggregate level — there is no mapping from individual sources to individual targets. For each chunk range:
BIT_XOR(CRC32(...))andCOUNT(*)per sourceBIT_XORis associative and commutative, so XOR-ing per-database checksums produces the same result as checksumming all rows in one table. The count is simply summed. As long as the combined source checksum matches the combined target checksum, no source-to-target mapping is needed.Mismatch path (
replaceChunk)When a chunk fails the checksum,
replaceChunkrecopies the data:DELETE from all targets — iterates
applier.GetTargets()and executesDELETE FROM t1 WHERE <chunk range>on each target's DB connection. Each connection is scoped to its target database, so the unqualified table name resolves correctly. This clears the entire chunk range from all shards.SELECT from all sources — iterates all source pools, querying each source's DB connection with the same chunk range. Rows from all sources are merged into a single slice. Each source connection is scoped to its database.
Apply (fan-out via applier) — passes the merged rows to
ShardedApplier.Apply(), which extracts the sharding column value from each row, hashes it, routes it to the correct shard, and doesINSERT IGNOREon that shard's write connection. The fan-out to the correct target is fully handled by the applier.Between the DELETE and INSERT IGNORE there is a window where concurrent binlog events could arrive. This is safe because
replaceChunkruns during the checksum phase (after copy is complete), arecopyLockserializes recopies, and the checksum retries (up tomaxRetries) will catch any remaining drift.Locking
During
initConnPool, table locks are acquired on all sources and all targets before creating transaction pools withREPEATABLE READconsistent snapshots. ThetablesToInListhelper usesDATABASE()(notSchemaName) to resolve the schema, so the sameTableInfoobjects work correctly across different database connections.Key changes
NewCheckersignature — now accepts[]*sql.DBand[]*repl.Client(slices) instead of single values. TheSingleCheckeruses[0]; theDistributedCheckeriterates all.DistributedCheckerstruct —db/feedfields replaced withsourceDBs/feedsslices andsourcePoolsfor per-source transaction pools.ChecksumChunk— aggregates checksums across all sources and all targets via XOR/sum.replaceChunk— DELETE from all targets, SELECT from all sources, fan-out via applier.initConnPool— locks tables on all sources and all targets, flushes all feeds, creates per-source and per-target transaction pools.MultiChunker— keys internal map by host/schema/table instead ofTableNameto support multiple sources with the same table name (e.g.,t1on different servers).QuotedTableName— bothSingleTargetApplierandShardedAppliernow useQuotedTableNameinstead of manually quoting, for consistency.Testing
TestDistributedChecksumNtoM— 2 sources (rows 1-4 and 5-8) resharded by even/odd into 2 targets. Verifies the aggregate checksum passes.TestFixCorruptWithApplier— existing test updated for the newNewCheckersignature.TestDistributedChecksum— existing 1:N test updated for the new signature.