-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Made InlineVerifier resumable #127
Conversation
10eddad
to
f778043
Compare
Previously, RunStandaloneCopy was recording improper LastSuccessfulPK position as the BatchWriter is reused. This is fixed. Furthermore, the IterativeVerifier is removed from deltaCopyJoinedTables and copyPrimaryKeyTables. Instead, RunStandaloneCopy now automatically verifies the data via the InlineVerifier.
f778043
to
cab1a21
Compare
# contain this row. | ||
# | ||
# Fixing this is somewhat non-trivial and likely require a more | ||
# extensive signal emitting system within Ghostferry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure how to solve this problem. Having this test is definitely better than not having this test. The race condition leading to test failure is also less likely than a successful test. Thus, I'm keeping this test here and added it to #107, where we track all the flaky tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How exactly are we handling this now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling which part? The current signal emitting? There are some signal emitting from integrationferry
tied to the event listeners on the DataIterator and the BinlogStreamer. There's no hook-in points in the InlineVerifier, BatchWriter, and etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks correct but some suggestions as usual 😄
Great tests 👍
inline_verifier.go
Outdated
s.currentRowCount += uint64(count) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebase error? Can use func (s BinlogVerifySerializedStore) RowCount()
inline_verifier.go
Outdated
} | ||
} | ||
|
||
return BinlogVerifySerializedStore(s.store) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
- Change the type of the field
s.store
toBinlogVerifySerializedStore
(feel free to remove the "Serialized" bit from the type name) so that we don't have to be going between named and unnamed type maps during (de)serialization? - Define a
Copy()
method on theBinlogVerifySerializedStore
, as it's conventional in the standard library - Once you've done that, simplify this method to:
func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.store.Copy()
}
|
||
if f.inlineVerifier != nil { | ||
serializedState.BinlogVerifyStore = f.inlineVerifier.reverifyStore.Serialize() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahhh i don't like this after-the-fact mutation of serializedState
. Everything was so nicely contained before
maybe you can pass it into the Serialize
method, similar to how we build constructors?
batches_written = 0 | ||
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do | ||
batches_written += 1 | ||
if batches_written >= 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great 👍 curious how this is guaranteed though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After every row copy, integrationferry sends a HTTP request to the ruby integration test AFTER_ROW_COPY
and the DataIterator is paused until the ruby code here returns and returns a 200 back to the integrationferry. Since we're only copying a single table, there's no concurrency.
Is this what you mean?
refute_nil dumped_state["BinlogVerifyStore"]["gftest"]["test_table_1"] | ||
|
||
# FLAKY: AFTER_BINLOG_APPLY is emitted after the BinlogStreamer | ||
# finishes processing all of the event listeners. The block below |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The block below
which block? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meant to say the block above, but changed it to be a bit more specific.
chosen_id = result.first["MIN(id)"] | ||
|
||
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = 'data2' WHERE id = #{chosen_id}") | ||
target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = 'corrupted' WHERE id = #{chosen_id}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what's the difference between this test and the previous one.. should the target db be updated here as well? the title makes it sound like only source should be corrupted/modified..
hmmm.. thinking more about... is it because we want the target to not accept the binlog copier's attempt to SET data = 'data2'
🤔
perhaps worth a code comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm.. thinking more about... is it because we want the target to not accept the binlog copier's attempt to SET data = 'data2' thinking
This is correct.
This test is to make sure that a binlog event that occured while Ghostferry has been interrupted is correctly picked up. Now, given that Ghostferry should correctly copy the row over upon resume, we'd have no way to observe that the InlineVerifier has also verified this row. Thus, if we set the data to corrupted here, the action of the InlineVerifier becomes observable.
Comment has been added.
@@ -250,6 +252,17 @@ def start_ghostferry(resuming_state = nil) | |||
@logger.debug("stdout: #{line}") | |||
elsif reader == stderr | |||
@stderr << line | |||
if line.start_with?("{") | |||
logline = JSON.parse(line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move this to a separate method? if json(line) log_json(line)
inline_verifier.go
Outdated
s := NewBinlogVerifyStore() | ||
|
||
s.store = serialized | ||
for db, _ := range s.store { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the RowCount()
method above for this?
@@ -708,7 +727,12 @@ func (f *Ferry) SerializeStateToJSON() (string, error) { | |||
return "", err | |||
} | |||
serializedState := f.StateTracker.Serialize(f.Tables) | |||
stateBytes, err := json.MarshalIndent(serializedState, "", " ") | |||
|
|||
if f.inlineVerifier != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this into f.StateTracker.Serialize
?
# contain this row. | ||
# | ||
# Fixing this is somewhat non-trivial and likely require a more | ||
# extensive signal emitting system within Ghostferry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How exactly are we handling this now?
I made some minor changes and added a unit test on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the comments 💯
This PR makes all of Ghostferry interrupt/resumable when running with the InlineVerifier. Most of the code are the ruby integration tests.
Also note: Previously,
RunStandaloneCopy
was recording improper LastSuccessfulPK position as the BatchWriter from Ferry is reused in it. This is fixed. Furthermore, the IterativeVerifier is removed fromdeltaCopyJoinedTables
andcopyPrimaryKeyTables
. Instead, RunStandaloneCopy now automatically verifies the data via the InlineVerifier.