Skip to content

Commit

Permalink
Merge pull request #44 from fjordan/compression-verifier
Browse files Browse the repository at this point in the history
Add verification of snappy-compressed data
  • Loading branch information
fjordan committed Aug 9, 2018
2 parents 45323cc + 97313e4 commit d87db6b
Show file tree
Hide file tree
Showing 26 changed files with 2,987 additions and 108 deletions.
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
name = "github.com/stretchr/testify"
version = "1.1.4"

[[constraint]]
name = "github.com/golang/snappy"
revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a"

[prune]
go-tests = true
unused-packages = true
248 changes: 248 additions & 0 deletions compression_verifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package ghostferry

import (
"crypto/md5"
"database/sql"
"encoding/hex"
"errors"
"fmt"
"strconv"
"strings"

sq "github.com/Masterminds/squirrel"
"github.com/golang/snappy"
"github.com/siddontang/go-mysql/schema"
"github.com/sirupsen/logrus"
)

const (
// CompressionSnappy is used to identify Snappy (https://google.github.io/snappy/) compressed column data
CompressionSnappy = "SNAPPY"
)

type (
columnCompressionConfig map[string]string

// TableColumnCompressionConfig represents compression configuration for a
// column in a table as table -> column -> compression-type
// ex: books -> contents -> snappy
TableColumnCompressionConfig map[string]columnCompressionConfig
)

// UnsupportedCompressionError is used to identify errors resulting
// from attempting to decompress unsupported algorithms
type UnsupportedCompressionError struct {
table string
column string
algorithm string
}

func (e UnsupportedCompressionError) Error() string {
return "Compression algorithm: " + e.algorithm +
" not supported on table: " + e.table +
" for column: " + e.column
}

// CompressionVerifier provides support for verifying the payload of compressed columns that
// may have different hashes for the same data by first decompressing the compressed
// data before fingerprinting
type CompressionVerifier struct {
logger *logrus.Entry

supportedAlgorithms map[string]struct{}
tableColumnCompressions TableColumnCompressionConfig
}

// GetCompressedHashes compares the source data with the target data to ensure the integrity of the
// data being copied.
//
// The GetCompressedHashes method checks if the existing table contains compressed data
// and will apply the decompression algorithm to the applicable columns if necessary.
// After the columns are decompressed, the hashes of the data are used to verify equality
func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (map[uint64][]byte, error) {
c.logger.WithFields(logrus.Fields{
"tag": "compression_verifier",
"table": table,
}).Info("decompressing table data before verification")

tableCompression := c.tableColumnCompressions[table]

// Extract the raw rows using SQL to be decompressed
rows, err := getRows(db, schema, table, pkColumn, columns, pks)
if err != nil {
return nil, err
}
defer rows.Close()

// Decompress applicable columns and hash the resulting column values for comparison
resultSet := make(map[uint64][]byte)
for rows.Next() {
rowData, err := ScanByteRow(rows, len(columns)+1)
if err != nil {
return nil, err
}

pk, err := strconv.ParseUint(string(rowData[0]), 10, 64)
if err != nil {
return nil, err
}

// Decompress the applicable columns and then hash them together
// to create a fingerprint. decompressedRowData contains a map of all
// the non-compressed columns and associated decompressed values by the
// index of the column
decompressedRowData := [][]byte{}
for idx, column := range columns {
if algorithm, ok := tableCompression[column.Name]; ok {
// rowData contains the result of "SELECT pkColumn, * FROM ...", so idx+1 to get each column
decompressedColData, err := c.Decompress(table, column.Name, algorithm, rowData[idx+1])
if err != nil {
return nil, err
}
decompressedRowData = append(decompressedRowData, decompressedColData)
} else {
decompressedRowData = append(decompressedRowData, rowData[idx+1])
}
}

// Hash the data of the row to be added to the result set
decompressedRowHash, err := c.HashRow(decompressedRowData)
if err != nil {
return nil, err
}

resultSet[pk] = decompressedRowHash
}

metrics.Gauge(
"compression_verifier_decompress_rows",
float64(len(resultSet)),
[]MetricTag{{"table", table}},
1.0,
)

logrus.WithFields(logrus.Fields{
"tag": "compression_verifier",
"rows": len(resultSet),
"table": table,
}).Debug("decompressed rows will be compared")

return resultSet, nil
}

// Decompress will apply the configured decompression algorithm to the configured columns data
func (c *CompressionVerifier) Decompress(table, column, algorithm string, compressed []byte) ([]byte, error) {
var decompressed []byte
switch strings.ToUpper(algorithm) {
case CompressionSnappy:
return snappy.Decode(decompressed, compressed)
default:
return nil, UnsupportedCompressionError{
table: table,
column: column,
algorithm: algorithm,
}
}

}

// HashRow will fingerprint the non-primary columns of the row to verify data equality
func (c *CompressionVerifier) HashRow(decompressedRowData [][]byte) ([]byte, error) {
if len(decompressedRowData) == 0 {
return nil, errors.New("Row data to fingerprint must not be empty")
}

hash := md5.New()
var rowFingerprint []byte
for _, colData := range decompressedRowData {
rowFingerprint = append(rowFingerprint, colData...)
}

_, err := hash.Write(rowFingerprint)
if err != nil {
return nil, err
}

return []byte(hex.EncodeToString(hash.Sum(nil))), nil
}

// IsCompressedTable will identify whether or not a table is compressed
func (c *CompressionVerifier) IsCompressedTable(table string) bool {
if _, ok := c.tableColumnCompressions[table]; ok {
return true
}
return false
}

func (c *CompressionVerifier) verifyConfiguredCompression(tableColumnCompressions TableColumnCompressionConfig) error {
for table, columns := range tableColumnCompressions {
for column, algorithm := range columns {
if _, ok := c.supportedAlgorithms[algorithm]; !ok {
return &UnsupportedCompressionError{
table: table,
column: column,
algorithm: algorithm,
}
}
}
}

return nil
}

// NewCompressionVerifier first checks the map for supported compression algorithms before
// initializing and returning the initialized instance.
func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig) (*CompressionVerifier, error) {
supportedAlgorithms := make(map[string]struct{})
supportedAlgorithms[CompressionSnappy] = struct{}{}

compressionVerifier := &CompressionVerifier{
logger: logrus.WithField("tag", "compression_verifier"),
supportedAlgorithms: supportedAlgorithms,
tableColumnCompressions: tableColumnCompressions,
}

if err := compressionVerifier.verifyConfiguredCompression(tableColumnCompressions); err != nil {
return nil, err
}

return compressionVerifier, nil
}

func getRows(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (*sql.Rows, error) {
quotedPK := quoteField(pkColumn)
sql, args, err := rowSelector(columns, pkColumn).
From(QuotedTableNameFromString(schema, table)).
Where(sq.Eq{quotedPK: pks}).
OrderBy(quotedPK).
ToSql()

if err != nil {
return nil, err
}

// This query must be a prepared query. If it is not, querying will use
// MySQL's plain text interface, which will scan all values into []uint8
// if we give it []interface{}.
stmt, err := db.Prepare(sql)
if err != nil {
return nil, err
}

defer stmt.Close()
rows, err := stmt.Query(args...)
if err != nil {
return nil, err
}

return rows, nil
}

func rowSelector(columns []schema.TableColumn, pkColumn string) sq.SelectBuilder {
columnStrs := make([]string, len(columns))
for idx, column := range columns {
columnStrs[idx] = column.Name
}

return sq.Select(fmt.Sprintf("%s, %s", quoteField(pkColumn), strings.Join(columnStrs, ",")))
}
16 changes: 15 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,26 @@ type Config struct {
// Optional: defaults to empty map/no rewrites
DatabaseRewrites map[string]string

// Map the table name on the source dataabase to a different name on
// Map the table name on the source database to a different name on
// the target database. See DatabaseRewrite.
//
// Optional: defaults to empty map/no rewrites
TableRewrites map[string]string

// Map of the table and column identifying the compression type
// (if any) of the column. This is used during verification to ensure
// the data was successfully copied as it must be manually verified.
//
// Note that the IterativeVerifier must be used and the
// CompressionVerifiers for the configuration below will be instantiated
// to handle the decompression before verification
//
// Currently supported compression algorithms are:
// 1. Snappy (https://google.github.io/snappy/) as "SNAPPY"
//
// Optional: defaults to empty map/no compression
TableColumnCompression map[string]map[string]string

// The maximum number of retries for writes if the writes failed on
// the target database.
//
Expand Down
12 changes: 12 additions & 0 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ func ScanGenericRow(rows *sql.Rows, columnCount int) (RowData, error) {
return values, err
}

func ScanByteRow(rows *sql.Rows, columnCount int) ([][]byte, error) {
values := make([][]byte, columnCount)
valuePtrs := make(RowData, columnCount)

for i, _ := range values {
valuePtrs[i] = &values[i]
}

err := rows.Scan(valuePtrs...)
return values, err
}

func DefaultBuildSelect(columns []string, table *schema.Table, lastPk, batchSize uint64) squirrel.SelectBuilder {
quotedPK := quoteField(table.GetPKColumn(0).Name)

Expand Down
Loading

0 comments on commit d87db6b

Please sign in to comment.