-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add verification of snappy-compressed data #44
Conversation
c804a31
to
b1f092f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of nitpicks but the general approach seems good to me.
One thing I think we should consider is that it's actually pretty rare for these "collisions" (same payload but different compressed data) to happen, but with your changes here, we would decompress every single row of a table like that (while we only really have to decompress the ones for which the fingerprinting approach gives us mismatches). I think defaulting to MD5 and only decompressing on mismatches might be a big performance win compared to the approach in this PR (always decompress).
@pushrax, if you have half an hour, I'd love your eyes on this too so that Forrest can get some more feedback.
compression_verifier.go
Outdated
CompressionSnappy = "SNAPPY" | ||
) | ||
|
||
// UnsupportedCompressionError is used to identify a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
identify a ... what?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!
compression_verifier.go
Outdated
} | ||
|
||
// CompressionVerifier provides the same functionality as the iterative verifier but also | ||
// provides support for manually verifying the payload of compressed columns that may |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think calling it "manual verification" was a bad choice of words on our part. I think we should consider a different terminology (or just omitting it and call it "verifying").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good feedback. Changed to just "verifying" as that makes more sense. 👍
compression_verifier.go
Outdated
|
||
// CompressionVerifier provides the same functionality as the iterative verifier but also | ||
// provides support for manually verifying the payload of compressed columns that may | ||
// have different hashes for the same data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nitpick: I think this is a tiny bit confusing. The "root problem" is that there are multiple compressed versions of the same data. The fact that there are also (as a result of this) multiple hashes for the same (decompressed) data is a side-effect of that, not the actual problem (and it's confusing since hash functions themselves can't have this property, it's the combination with compression functions that causes this problem).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice call out. Updated the comment to be a bit more clear and not confuse the two. What do you think?
compression_verifier.go
Outdated
type CompressionVerifier struct { | ||
logger *logrus.Entry | ||
|
||
// supportedAlgorithms provide O(1) lookup to check if a configured algorithm is supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/provide/provides/
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops 🤕
compression_verifier.go
Outdated
// supportedAlgorithms provide O(1) lookup to check if a configured algorithm is supported | ||
supportedAlgorithms map[string]struct{} | ||
|
||
// tableCmpressions is represented as table[column][compression_algorithm] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Cmp/Comp/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Took me a minute to understand what you mean by table[column][compression_algorithm]
. Is that a common notation? How about something like tableName -> columnName -> compressionAlgorithmName
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the name tableColumnCompressions
is a bit more clear and helps the reader understand the data structure. Updated 😄
compression_verifier.go
Outdated
} | ||
|
||
// Decode will apply the configured decompression algorithm to the configured columns data | ||
func (c *CompressionVerifier) Decode(table, column, algorithm string, compressed []byte) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, for clarity's sake, can we call it Decompress
instead of Decode
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed, but yeah it was definitely the Snappy
library that threw me off and caused me to start naming things using decode
instead of just decompress
.
compression_verifier.go
Outdated
if algorithm, ok := tableCompression[column.Name]; ok { | ||
decodedColData, err := c.Decode(table, column.Name, algorithm, rowData[idx].([]byte)) | ||
if err != nil { | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you feel about ignoring errors here? Imagine we have corrupted data in a compressed column, i.e. data that can't be decompressed (for example because it was truncated). With your code here, we will never ever be able to copy that data. But (at this layer) we actually don't care about the data being decompressable, we only care about it being equal to the data in the other database. So as long as both the source and the target are the same (even if both are corrupted, as long as they are corrupted in the exact same way), we should not treat that as an error (if you agree, can you add a test for that?). I'd suggest to treat decompression errors as "this column wasn't meant to be decompressed anyway" and just store it in decodedRowData
as raw compressed data (same as the else
branch below) and then give it a second chance by comparing using the regular md5 approach.
Not sure if this actually ever happens in practice, but (if I remember correctly) our current Ruby implementation of this algorithm doesn't have that problem since it only decompresses if the fingerprints are not equal already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great call out. If there is a decompression error then surely it will cause problems in other places so it should definitely be logged (and possibly make some noise?).
We can continue execution and yes just confirm equality, however, if we're only going down this path when the fingerprints of the SQL don't match (as you point out in your other comment), and now the data cannot be decompressed, then we won't be able to confirm the equality of the data, right? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hkdsun, how do you feel about this comment? i.e. ALWAYS decompressing makes it impossible to move data that is corrupted (while that was previously possible).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's the current behaviour in the Ruby implementation.
compression_verifier.go
Outdated
case CompressionSnappy: | ||
return snappy.Decode(decoded, compressed) | ||
default: | ||
return nil, UnsupportedCompressionError{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this ever happen? Don't you check for this in the initializer already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could happen if the method is called outside of the workflow of the CompressionVerifier
and in an ad hoc sense. Should we not export this method?
compression_verifier.go
Outdated
|
||
} | ||
|
||
// HashRow will fingerprint the non-primary columns of the row to be verify data equality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/to be/to/
compression_verifier.go
Outdated
|
||
// NewCompressionVerifier first checks the map for supported compression algorithms before | ||
// initializing and returning the initialized instance. | ||
func NewCompressionVerifier(tableCompressions map[string]map[string]string) (*CompressionVerifier, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method isn't used anywhere (only in the tests). That seems a bit odd. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may or may not be used depending on the user's use case. If it were to be used, this function is provided as a convenience and safeguard to ensure the configured compression algorithms are supported.
@fw42 since Forrest and Hormoz get back at the same time, we can probably wait for Hormoz :blobheart: to get back? 👂 |
I'd like Forrest to have some feedback ready to act on as soon as he gets back. Not super urgent but if anyone has some time to take a look, I think that would help make sure that Forrest isn't still blocked on this when he returns from vacation. |
e5a917a
to
2889981
Compare
In my opinion, there won't be a huge performance win overall since this configuration is a very special case that I imagine will be use infrequently by the average user. What you suggested, in my opinion, will make the verification logic more complex to follow and since we can avoid it without a huge performance hit, I prefer the current approach |
Not sure what you mean by "average user", but Shopify's use-case will run into this for every single shop, potentially tens of thousands of times, depending on how many rows the table has. |
My justification was that majority of tables will not go through this codepath - therefore the overall ferry performance will not be affected by these changes. So even if this is used for every ferry run, for every shop, all the other tables (which are much larger) dwarf the time spent in this codepath. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't go into a lot of detail in my review since Flo has already raised a lot of good points and we discussed a larger refactor in Slack before our vacations based on the following feedback:
Having the data structures (or in general having the logic) flow directly down in a clean line is a principle Justin advocates for and would simplify this PR a bunch. This principle usually makes the solution much easier to follow and modify down the line.
What reminded me of this principle was the cylclic data flow from user's configuration to CompressionVerifier, to IterativeVerifier, back to the user's config (if v.CompressionVerifier.columnCompression[table] { ... }
) and finally back to the CompressionVerifier's GetHashes method.
In pseudo-code this is how it'd go:
- user passes the configuration to the itrative verifier:
type IterativeVerifier struct {
CompressionConfig map[string]TableCompressionConfig
...
}
- iterative verifier uses the config to decide how to get the hashes for a table:
if i.CompressionConfig[table] != nil {
return GetCompressedTableHash(i.db, i.CompressionConfig[table], table)
}
return GetMd5Hashes(table)
}
- and GetCompressedHash does the right thing, given the table's config:
GetCompressedTableHash(db, compressionConfig, table) {
// core logic that decompresses columns when necessary
}
compression_verifier.go
Outdated
supportedAlgorithms map[string]struct{} | ||
|
||
// tableColumnCompressions is represented as table -> column -> compression_algorithm | ||
tableColumnCompressions map[string]map[string]string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using a simple type annotation here would greatly improve readability. Something like:
type ColumnCompressionConfig map[string]string // map of columns to decompressors. e.g. { "body": "SNAPPY" }
you'd then have:
tableColumnCompressions map[string]ColumnCompressionConfig
which is a hash of table -> ColumnCompressionConfig
which is readily comprehensible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added two types, one being exported (TableColumnCompressionConfig
). Do you think this is more clear?
5be68f0
to
0ee4ec5
Compare
f510b79
to
5b96948
Compare
This PR now only uses the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I'm missed it, I think the most important test of this PR is still missing, i.e. a test with two rows that have the same decompressed value but different compressed values. Can you add that test?
Besides my comment about the client-side MD5 being a waste of resources, I'd say this looks pretty good and basically ready to go. I imagine having the client-side MD5 stuff makes it easier to integrate your new code with the existing iterative verifier, so I'm ok with keeping it. It's unnecessary and a bit of a waste of CPU resources but probably negligible? So unless you have an idea how to cleanly remove it, I'd say I'm ok with keeping it.
compression_verifier.go
Outdated
// supportedAlgorithms provide O(1) lookup to check if a configured algorithm is supported | ||
supportedAlgorithms map[string]struct{} | ||
|
||
// tableColumnCompressions is represented as table -> column -> compression-type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick this comment is unnecessary since you already explain it above
// | ||
// The GetCompressedHashes method checks if the existing table contains compressed data | ||
// and will apply the decompression algorithm to the applicable columns if necessary. | ||
// After the columns are decompressed, the hashes of the data are used to verify equality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have already loaded the data from the database and already decompressed it, then why still do the hashing? The whole point of hashing is to have to load less data into the client (the hashes are computed server-side). I feel like the extra hashing here is wasted effort. Why not just compare the decompressed data (without hashing it first)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client side hashing here is to be compatible with the IterativeVerifier (it expects a single byte slice per primary key to be returned by this method)
compression_verifier.go
Outdated
decompressedRowData := make(map[uint64][]byte) | ||
for idx, column := range columns { | ||
if column.Name == pkColumn { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because the primary key can never be a compressed column? (nitpick, but is there a check for that anywhere? should we add one to verifyConfiguredCompression
maybe?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mistake on my understanding of how the IterativeVerifier
was generating the row_fingerprint
(also see comment below: #44 (comment)). Will update 👍
compression_verifier.go
Outdated
// Check if column is configured as compressed and decompress if necessary | ||
if algorithm, ok := tableCompression[column.Name]; ok { | ||
decompressedColData, err := c.Decompress(table, column.Name, algorithm, rowData[idx].([]byte)) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this return
here mean that we fail hard if data can't be decompressed even if the fingerprints already match? Or do you catch that case elsewhere so it should never get to this point here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the fingerprints already match, we never get here due to the changes in iterative_verifier.go
} | ||
|
||
// Hash the data of the row to be added to the result set | ||
decompressedRowHash, err := c.HashRow(decompressedRowData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, I think this is unnecessary and we should remove the HashRow
method and instead just compare the unhashed data. If we already have the data in the client, there's no point in hashing it client-side (that was a server-side optimization).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 Would it really be faster? It takes the m * n mn iterations down to just m iterations (one value to compare versus a value for each column) plus the cost to calculate the hash. Unless you're referring to just comparing the compressed column and allowing the IterativeVerifier
to hash/fingerprint the other columns, which adds some complexity. What do you think?
EDIT: Also, as @hkdsun pointed out above, this is what the IterativeVerifier
expects. We could change it, but at the cost of complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with this 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wfm
compression_verifier.go
Outdated
} | ||
|
||
quotedCol := normalizeAndQuoteColumn(column) | ||
columnStrs[idx] = fmt.Sprintf("COALESCE(%s, 'NULL')", quotedCol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand why this line is here. Can you explain? This looks like an artefact from the server-side MD5 fingerprinting algorithm (which we aren't using here, are we?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line just writes NULL
instead of having an empty value for the column. Turns out we can remove it and just use NULL
s. Why is this needed for the IterativeVerifier
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically because you can't MD5(NULL)
in MySQL, if I remember correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm that doesn't appear to be the case.
mysql> select md5(null)\G
*************************** 1. row ***************************
md5(null): NULL
1 row in set (0.00 sec)
mysql>
but this does throw an error:
mysql> select md5()\G
ERROR 1582 (42000): Incorrect parameter count in the call to native function 'md5'
Maybe that's what is happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm then it was probably the CONCAT
that needed this
compression_verifier.go
Outdated
continue | ||
} | ||
|
||
quotedCol := normalizeAndQuoteColumn(column) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm where is this defined? I only see this method in iterative_verifier.go
. Why is it in scope here??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can remove it :)
config.go
Outdated
// the target database. See DatabaseRewrite. | ||
// | ||
// Optional: defaults to empty map/no rewrites | ||
TableRewrites map[string]string | ||
|
||
// Map of the table and column identifying the compression type | ||
// (if any) of the column. This is used during verification to ensure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space here before "to"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀
iterative_verifier.go
Outdated
@@ -573,7 +574,27 @@ func (v *IterativeVerifier) compareFingerprints(pks []uint64, table *schema.Tabl | |||
return nil, targetErr | |||
} | |||
|
|||
return compareHashes(sourceHashes, targetHashes), nil | |||
mismatches := compareHashes(sourceHashes, targetHashes) | |||
if len(mismatches) > 0 && v.CompressionVerifier != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this block of code here could be it's own function to make things more readable.
mismatches := compareHashes(sourceHashes, targetHashes)
if mismatchInCompressedColumns(mismatches) {
return compareCompressedColumns(...)
} else {
return mismatches, nil
}
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That does look simpler, however, we would need to introduce some compression-specific logic to the IterativeVerifier
to prevent duplication of code or flowing back and forth between the CompressionVerifier
and IterativeVerifier
. That would clean up this block of execution inside the conditional, but would blur the line of responsibility between these two components. We could do something like:
if v.compressionMismatch(mismatches) {
sourceHashes, targetHashes, err = GetCompressedHashes(...)
if err != nil {
return nil, err
}
return compareHashes(sourceHashes, targetHashes), nil
}
func (v *IterativeVerifier) compressionMismatch(mismatches) {
if len(mismatches) > 0 && v.CompressionVerifier != nil {
return true
}
return false
}
and rename the current GetCompressedHashes
to GetCompressedTableHashes
and wrap the former to provide a clean interface.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! This is definitely not an easy task to get introduced to Ghostferry with but kudos to you for jumping right in 🙂
Besides my review comments, I didn't see any integration of the CompressionVerifier with the sharding package or the IterativeVerifier. Was that intentional? What's the plan there?
glide.yaml
Outdated
- package: github.com/go-sql-driver/mysql | ||
version: ^1.3.0 | ||
- package: github.com/Shopify/go-dogstatsd | ||
- package: github.com/golang/snappy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This must have been a strange conflict to resolve 😛 We moved away from glide to dep in #45 so we'll have to put this dependency in Gopkg.toml
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops 🤕 Will add!
config.go
Outdated
// (if any) of the column. This is used during verification to ensure | ||
// the data was successfully copied as it must be manually verified. | ||
// | ||
// Note that the VerifierType must be set to the IterativeVerifier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VerifierType is a copydb concept. I'd just say IterativeVerifier must be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated 👍
// initializing and returning the initialized instance. | ||
func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig) (*CompressionVerifier, error) { | ||
supportedAlgorithms := make(map[string]struct{}) | ||
supportedAlgorithms[CompressionSnappy] = struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to not go with a global here? I know it can't be a constant but it could be a var:
var supportedAlgorithms = map[string]struct{}{
CompressionSnappy: {},
}
Seems strange to create it for every instance.. and what if somebody doesn't use this constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we create multiple IterativeVerifier
s or do we reuse a single one? If just a single instance then this will only be instantiated once.
@fw42 mentioned this same thing, and I considered it, but didn't want to make a package-level global var. We can obviously if we intentionally want to do that, but it's generally frowned upon.
compression_verifier.go
Outdated
type CompressionVerifier struct { | ||
logger *logrus.Entry | ||
|
||
// supportedAlgorithms provide O(1) lookup to check if a configured algorithm is supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the code speaks for itself here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔪 removed
table string | ||
column string | ||
algorithm string | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a new type of error? We're not handling it any differently in any part of the code anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We aren't handling it now, but once we integrate into copydb
or sharding
we may. It allows the type of error to be checked against if we do want any logical conditions based on the error. If there will never be a need we should just remove it, but I was thinking we may want it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'm fine with keeping it since it's not a huge amount of complexity.
Though, as a general principle, I like to avoid complexity if it's not being used immediately (and I don't see us handling this error any differently than any other verifier/initialization error when integrating into sharding package) – while keeping the future maintenance/changes in mind of course.
compression_verifier.go
Outdated
supportedAlgorithms map[string]struct{} | ||
|
||
// tableColumnCompressions is represented as table -> column -> compression-type | ||
tableColumnCompressions TableColumnCompressionConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why tableColumnCompressions
is a field on CompressionVerifier
. It's never accessed from within the methods here. It's in the constructor but that's just to populate the field – not consume it
The only other time I see it being accessed is from within the IterativeVerifier
. Is that a sign that it should not be here? Perhaps it should just be a config on the IterativeVerifier
or Ferry
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's currently part of the CompressionVerifier
because it's the configuration for it. Given the CompressionVerifier
is also a component of the IterativeVerifier
it's essentially part of the IterativeVerifier
already, but contained within the CompressionVerifier
as the CompressionVerifier
contains the logic and info for all things compressed. It's also not accessed within the CompressionVerifier
, but referenced at runtime and then passed to the GetCompressedHashes
method as that's how I understood your earlier comment about the direct logic flow: #44 (review).
Does that make sense? If you feel we should still move it let me know so we can discuss because I want this to make sense 👍
compression_verifier.go
Outdated
for idx, column := range columns { | ||
if column.Name == pkColumn { | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we excluding the primary key column from hashing/comparison?
I was under the impression that what we're trying to do is SELECT pk_col, *
and then massaging the result to a map[pk]rowData
where rowData
is all the row columns after decompression and which must contain the pk column as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mistakenly thought the IterativeVerifier
wasn't including the primary key for some reason in the fingerprint because it is selected in addition to the row_fingerprint
. Looking at it again I see that it is selected separately in addition to it being included.
I'll make adjustments to include the PK in the compression verifier's row hash 👍
compression_verifier.go
Outdated
} | ||
// Check if column is configured as compressed and decompress if necessary | ||
if algorithm, ok := tableCompression[column.Name]; ok { | ||
decompressedColData, err := c.Decompress(table, column.Name, algorithm, rowData[idx].([]byte)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think rowData[idx]
doesn't always map one-to-one to the ordering of the columns
slice. Do you agree?
Imagine the following example table:
| col_1 | col_2 | pk_col | col_3 |
In this scenario, with the rowSelector that we've defined below, you'd have:
rowData = [pk_col, col_1, col_2, col_3]
whereas
columns = [col_1, col_2, pk_col, col_3]
compression_verifier.go
Outdated
// Check if column is configured as compressed and decompress if necessary | ||
if algorithm, ok := tableCompression[column.Name]; ok { | ||
decompressedColData, err := c.Decompress(table, column.Name, algorithm, rowData[idx].([]byte)) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the fingerprints already match, we never get here due to the changes in iterative_verifier.go
compression_verifier.go
Outdated
c.logger.Info("decompressing table data before verification") | ||
|
||
// Extract the raw rows using SQL to be decompressed | ||
rows, err := c.getRows(db, schema, table, pkColumn, columns, pks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a lot of confusion would be cleared up if the getRows
function did the hard work of massaging SELECT pk_col, *
to map[pk]rowData
where rowData
's indexes map one-to-one with the columns
slice.
@fw42 thanks for the comments and review! 😄 I've just pushed up a test that I believe satisfies your concerns. I was working on it last week (as we were discussing getting this fixture data for the tests over Slack) but ran into some encoding issues with the snappy compressed data that I didn't resolve until earlier this morning. I'm working to address the rest of your questions/comments/concerns. EDIT: @fw42 the client-side MD5 only happens now on mismatch. @hkdsun thanks 😄 ! We can certainly integrate this into the sharding package and/or Thanks so much guys looking forward to getting this merged in and used once it's all ready to ship 🚢 |
933f5c8
to
a709c8e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there! I'll follow up with a review of the test files
Gopkg.toml
Outdated
@@ -33,3 +33,6 @@ | |||
[prune] | |||
go-tests = true | |||
unused-packages = true | |||
[[constraint]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we move these up to where the other [[constraint]]
blocks are?
// Decompress applicable columns and hash the resulting column values for comparison | ||
resultSet := make(map[uint64][]byte) | ||
for rows.Next() { | ||
rowData, err := ScanByteRow(rows, len(columns)+1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really like this idea 👍
With some effort, we could simplify the rest of the codebase too (definitely not in this PR but this was something we were discussing back in the day we were writing the uint parsing code and discovered the driver's weird behaviour):
Lines 17 to 20 in b0eb328
// The mysql driver never actually gives you a uint64 from Scan, instead you | |
// get an int64 for values that fit in int64 or a byte slice decimal string | |
// with the uint64 value in it. | |
func (r RowData) GetUint64(colIdx int) (res uint64, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the +1
? Is that because columns
doesn't include the primary key column?
} | ||
|
||
// Hash the data of the row to be added to the result set | ||
decompressedRowHash, err := c.HashRow(decompressedRowData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with this 👍
iterative_verifier.go
Outdated
@@ -573,6 +614,29 @@ func (v *IterativeVerifier) compareFingerprints(pks []uint64, table *schema.Tabl | |||
return nil, targetErr | |||
} | |||
|
|||
mismatches := compareHashes(sourceHashes, targetHashes) | |||
if len(mismatches) < 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be <=
right? otherwise this is basically dead code and we'd be decompressing all compressed tables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀 good catch
@@ -329,6 +297,79 @@ func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error) { | |||
return v.verificationResultAndStatus, v.verificationErr | |||
} | |||
|
|||
func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (map[uint64][]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assuming these previously defined methods were only moved around and not modified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can undo and just let another PR re-order them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine, just double checking
compression_verifier.go
Outdated
} | ||
|
||
hash.Write(rowFingerprint) | ||
return []byte(hex.EncodeToString(hash.Sum(nil))), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to other reviewers: this Sum()
method is not the same as md5.Sum()
We're actually working with a hash.Hash
implementation. See the example: https://golang.org/pkg/crypto/md5/#example_New
test/iterative_verifier_test.go
Outdated
} | ||
|
||
func (t *IterativeVerifierTestSuite) TestVerifyCompressedMismatchOncePass() { | ||
t.InsertCompressedRowInDb(43, testhelpers.TestCompressedData3, t.Ferry.SourceDB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's assert our test's assumption right before this with a helpful comment (after all it's the whole point of the PR):
// Two fixtures that have different compressed values but have equal decompressed values
t.Require().NotEqual(testhelpers.TestCompressedData3, testhelpers.TestCompressedData4)
t.Require().Nil(err) | ||
t.Require().False(result.DataCorrect) | ||
t.Require().Equal(fmt.Sprintf("verification failed on table: %s.%s for pks: %s", "gftest", testhelpers.TestCompressedTable1Name, "42"), result.Message) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we possibly have a symmetrical test for the Data3 and Data4 case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep! Will check for positive case here after a compressed mismatch
bab3ac2
to
6de8e71
Compare
return nil, err | ||
} | ||
|
||
pk, err := strconv.ParseUint(string(rowData[0]), 10, 64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have to parse the pk
out of rowData
? Don't we already have all the pks in pks
?
for idx, column := range columns { | ||
if algorithm, ok := tableCompression[column.Name]; ok { | ||
// rowData contains the result of "SELECT pkColumn, * FROM ...", so idx+1 to get each column | ||
decompressedColData, err := c.Decompress(table, column.Name, algorithm, rowData[idx+1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question here. why the +1
? Is that because rowData
contains the primary key column but columns
doesn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm I just saw the comment 🤦♂️
// to create a fingerprint. decompressedRowData contains a map of all | ||
// the non-compressed columns and associated decompressed values by the | ||
// index of the column | ||
decompressedRowData := [][]byte{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you already know exactly how long this array will be. Isn't it more idiomatic to make
it here than rather than to append
below? Seems more memory-efficient (fewer allocations). But probably no big deal and I honestly don't know which one is preferable. Just curious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't actually know the number of rows until iterating through all of them. That would be more efficient though if we did.
compression_verifier.go
Outdated
resultSet[pk] = decompressedRowHash | ||
} | ||
|
||
metrics.Gauge("compression_verifier_decompress_rows", float64(len(resultSet)), []MetricTag{}, 1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a tag for the table name?
compression_verifier.go
Outdated
rowFingerprint = append(rowFingerprint, colData...) | ||
} | ||
|
||
hash.Write(rowFingerprint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hash.Write
can return an error. We probably want to check that here?
// 1. Snappy (https://google.github.io/snappy/) as "SNAPPY" | ||
// | ||
// Optional: defaults to empty map/no compression | ||
TableColumnCompression map[string]map[string]string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually used anywhere? I don't see it. Or is that a follow-up PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep @hkdsun will be following up with a PR to integrate into sharding and use this
iterative_verifier.go
Outdated
@@ -573,6 +614,29 @@ func (v *IterativeVerifier) compareFingerprints(pks []uint64, table *schema.Tabl | |||
return nil, targetErr | |||
} | |||
|
|||
mismatches := compareHashes(sourceHashes, targetHashes) | |||
if len(mismatches) <= 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, can this actually ever be negative? Why not == 0
?
iterative_verifier.go
Outdated
return mismatches, nil | ||
} | ||
|
||
if v.CompressionVerifier != nil && v.CompressionVerifier.IsCompressedTable(table.Name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Why not if len(mismatches) > 0 && ...
here and then you can get rid of line 618?
|
||
this.di.Initialize() | ||
this.di.AddBatchListener(func(ev *ghostferry.RowBatch) error { | ||
this.receivedRows = append(this.receivedRows, ev.Values()...) | ||
this.receivedRows[ev.TableSchema().Name] = append(this.receivedRows[ev.TableSchema().Name], ev.Values()...) | ||
return nil | ||
}) | ||
} | ||
|
||
func (this *DataIteratorTestSuite) TestNoEventsForEmptyTable() { | ||
_, err := this.Ferry.SourceDB.Query(fmt.Sprintf("DELETE FROM `%s`.`%s`", testhelpers.TestSchemaName, testhelpers.TestTable1Name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line needs it's own this.Require().Nil(err)
, otherwise it's useless
test/iterative_verifier_test.go
Outdated
t.Require().Equal("", result.Message) | ||
} | ||
|
||
func (t *IterativeVerifierTestSuite) TestVerifyCompressedMismatchOncePass() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick but Mismatch
seems a bit ambiguous, how about something like SameDecompressedDataButDifferentHash
to be more explicit?
@@ -222,6 +297,25 @@ func (t *IterativeVerifierTestSuite) InsertRowInDb(id int, data string, db *sql. | |||
t.Require().Nil(err) | |||
} | |||
|
|||
func (t *IterativeVerifierTestSuite) InsertCompressedRowInDb(id int, data string, db *sql.DB) { | |||
t.SetColumnType(testhelpers.TestSchemaName, testhelpers.TestCompressedTable1Name, testhelpers.TestCompressedColumn1Name, "MEDIUMBLOB", db) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems weird. Why do we change the schema during the test (rather than to just always have it be a MEDIUMBLOB and set it during test setup or whenever we create the db)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We change to MEDIUMBLOB
because of the snappy-compressed data. TEXT
expects utf8 data and throws an error when we try to insert the compressed data. I didn't want to change any other tests but the ones added in this PR. If we know later it won't negatively impact the other tests then I'm sure we can change it and just use MEDIUMBLOB
everywhere.
testhelpers/unit_test_suite.go
Outdated
} | ||
|
||
decompressed := [][]byte{} | ||
for _, path := range filePaths { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick, but this whole loop thing seems a bit unnecessary to me.. I'd create some kind of LoadFixtureFromFile
helper function and then just do
func init() {
TestCompressedData3 = LoadFixtureFromFile("urls1.snappy")
TestCompressedData4 = LoadFixtureFromFile("urls2.snappy")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update. I like this more 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few small comments but overall this look good to me.
👍 🚢 🇮🇹
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
6de8e71
to
e5316c4
Compare
e5316c4
to
97313e4
Compare
This is an initial commit of adding verification of snappy compressed data. This approach adds a
CompressionVerifier
that will allowSnappy
and other compression algorithms to be added to decompress data to be fingerprinted and verified. A component is added to the configuration asking the user to identify the compression algorithms used, if any, in corresponding tables and columns.The process to decompress the data is done during the call to the
IterativeVerifier
'sGetHashes
method here. Is there a better approach?Questions
Q: Why do we need this?
A: Certain compression algorithms may create different hashes of data depending on the version, hardware, or simply because they are not deterministic (Snappy being one of them). Because of this, we cannot blindly rely on the
md5
hash of the compressed data in the tables to verify equality. We must first decompress the data and then we can fingerprint for equality.Q: Does the
CompressionVerifier
warrant its own test suite? Or should the functionality be included in the existingIterativeVerifier
's test suite?A: Nope. Not right now.
To be completed
@Shopify/pods