Skip to content

Commit

Permalink
Merge pull request #57 from Shopify/staging-verif-shardin
Browse files Browse the repository at this point in the history
Integration of CompressionVerifier into sharding package
  • Loading branch information
hkdsun committed Aug 10, 2018
2 parents d87db6b + 6f51526 commit 71ca7bf
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
2 changes: 1 addition & 1 deletion compression_verifier.go
Expand Up @@ -177,7 +177,7 @@ func (c *CompressionVerifier) IsCompressedTable(table string) bool {
func (c *CompressionVerifier) verifyConfiguredCompression(tableColumnCompressions TableColumnCompressionConfig) error {
for table, columns := range tableColumnCompressions {
for column, algorithm := range columns {
if _, ok := c.supportedAlgorithms[algorithm]; !ok {
if _, ok := c.supportedAlgorithms[strings.ToUpper(algorithm)]; !ok {
return &UnsupportedCompressionError{
table: table,
column: column,
Expand Down
2 changes: 1 addition & 1 deletion config.go
Expand Up @@ -171,7 +171,7 @@ type Config struct {
// 1. Snappy (https://google.github.io/snappy/) as "SNAPPY"
//
// Optional: defaults to empty map/no compression
TableColumnCompression map[string]map[string]string
TableColumnCompression TableColumnCompressionConfig

// The maximum number of retries for writes if the writes failed on
// the target database.
Expand Down
44 changes: 41 additions & 3 deletions sharding/sharding.go
Expand Up @@ -109,6 +109,15 @@ func (r *ShardingFerry) newIterativeVerifier() (*ghostferry.IterativeVerifier, e
}
}

var compressionVerifier *ghostferry.CompressionVerifier
if r.config.TableColumnCompression != nil {
var err error
compressionVerifier, err = ghostferry.NewCompressionVerifier(r.config.TableColumnCompression)
if err != nil {
return nil, err
}
}

return &ghostferry.IterativeVerifier{
CursorConfig: &ghostferry.CursorConfig{
DB: r.Ferry.SourceDB,
Expand All @@ -117,7 +126,8 @@ func (r *ShardingFerry) newIterativeVerifier() (*ghostferry.IterativeVerifier, e
BuildSelect: r.config.CopyFilter.BuildSelect,
},

BinlogStreamer: r.Ferry.BinlogStreamer,
BinlogStreamer: r.Ferry.BinlogStreamer,
CompressionVerifier: compressionVerifier,

TableSchemaCache: r.Ferry.Tables,
Tables: r.Ferry.Tables.AsSlice(),
Expand Down Expand Up @@ -189,12 +199,15 @@ func (r *ShardingFerry) Run() {
r.Ferry.FlushBinlogAndStopStreaming()
copyWG.Wait()

// Joined tables cannot be easily monitored for changes in the binlog stream
// so we copy and verify them for a second time during the cutover phase to
// pick up any new changes since DataIterator copied the tables
metrics.Measure("deltaCopyJoinedTables", nil, 1.0, func() {
err = r.deltaCopyJoinedTables()
})
if err != nil {
r.logger.WithField("error", err).Errorf("failed to delta-copy joined tables after locking")
r.Ferry.ErrorHandler.Fatal("sharding", err)
r.Ferry.ErrorHandler.Fatal("sharding.delta_copy", err)
}

var verificationResult ghostferry.VerificationResult
Expand Down Expand Up @@ -240,7 +253,32 @@ func (r *ShardingFerry) deltaCopyJoinedTables() error {
}
}

return r.Ferry.RunStandaloneDataCopy(tables)
err := r.Ferry.RunStandaloneDataCopy(tables)
if err != nil {
return err
}

verifier, err := r.newIterativeVerifier()
if err != nil {
return err
}

verifier.Tables = tables

err = verifier.Initialize()
if err != nil {
return err
}

verificationResult, err := verifier.VerifyOnce()
if err != nil {
return err
}
if !verificationResult.DataCorrect {
return fmt.Errorf("joined tables verifier detected data discrepancy: %s", verificationResult.Message)
}

return nil
}

func (r *ShardingFerry) copyPrimaryKeyTables() error {
Expand Down

0 comments on commit 71ca7bf

Please sign in to comment.