-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Interrupt Ghostferry via signals and resume from SerializedState #67
Conversation
No need for the logs on that level.
This allows Ghostferry to be interrupted.
ccf1caa
to
139b7c6
Compare
dc91361
to
e0bc75a
Compare
ferry.go
Outdated
// misleading. | ||
// TODO: refactor this to make it better, possibly have to refactor | ||
// newDataIterator. | ||
dataIterator.StateTracker = NewStateTracker(f.DataIterationConcurrency * 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this line, but to do it properly would involve having to refactor newDataIterator and this method in general. This is a yakshave I don't want to undertake right now.
When we do refactor this, we should make the DataIterator/BinlogStreamer/BinlogWriter/BatchWriter easier to use as they're very difficult to initialize independently of Ferry (which is why we even have newDataIterator...).
EDIT: we might have to perform this refactor when implementing the reconciliation to make the implementation "clean". I'll look into doing this.
EDIT2: I did the refactor in this PR, in a later commit.
@@ -71,6 +71,8 @@ func (t *ShardingUnitTestSuite) SetupTest() { | |||
t.setupShardingFerry() | |||
t.dropTestDbs() | |||
|
|||
testhelpers.SetupTest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this so we can get some debug log (as by default we only emit INFO lines), which proved to be useful when I was trying to debug this implementation. I could remove it, tho.
The library gained the ability to resume the BinlogStreamer and the DataIterator from arbitrary states. We're still missing the reconciliation process to make the interrupt/resume behaviour correct with respect to data safety. We're also missing the ability to interrupt from within the IterativeVerifier. Additionally, this patch exposes the need to refactor how we initialize DataIterator/IterativeVerifiers, and the other "public" components that are below the level of Ferry. It is currently difficult to initialize these structs without Ferry and causes things like the hacks within RunOnlyDataCopyForTables (previously named RunStandaloneDataCopy). We can get rid of this in a future PR.
e0bc75a
to
b2b38cb
Compare
Made BinlogStreamer independent of Ferry.Config Previously BinlogStreamer was dependant on Ferry.Config. It even went so far to change Ferry.Config for the automatic MyServerId generation. This will become problematic as we create new BinlogStreamer instances to monitor the target database to detect schema changes. We also exposed methods to create and initialize DataIterator, BinlogStreamer, and all the major components from Ferry, so another object that operates at the same level as the Ferry can be created (for example: the IterativeVerifier, the DeltaCopier, the Reconciler). The StateTracker was also made optional from the previous commit, as this does make things kind of easier to work with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bunch of small nitpicks. I'm not experienced enough in this project to go much deeper in my review, but nothing stands out to me as wrong.
I'm surprised by how few new tests were added.
Also surprised by the lack of changes related to the md5 verification and iterative verification. Do we not serialize and resume that state? Does that mean we will always reverify all the things after a resume?
@@ -35,37 +36,37 @@ type BinlogStreamer struct { | |||
eventListeners []func([]DMLEvent) error | |||
} | |||
|
|||
func (s *BinlogStreamer) Initialize() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reasoning for removing the Initialize
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a legacy of the original Ghostferry design. This method doesn't do much and just complicates the API. Anything it does can be done directly in the Run
method, or within the Ferry.NewBinlogStreamer
methods.
config.go
Outdated
// | ||
// Caution: there is no guarentee that setting this variable to true will | ||
// be safe. | ||
AllowResumeWithMismatchedGhostferryVersion bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need this?
cursor.go
Outdated
} | ||
} | ||
|
||
// returns a new Cursor with an embedded copy of itself | ||
func (c *CursorConfig) NewCursorWithoutRowLock(table *schema.Table, maxPk uint64) *Cursor { | ||
func (c *CursorConfig) NewCursorWithoutRowLock(table *schema.Table, startPk, maxPk uint64) *Cursor { | ||
return &Cursor{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cursor := c.NewCursor(table, startPk, maxPk)
cursor.RowLock = false
return cursor
seems simpler maybe and more in line with what you are doing below with the WithoutStateTracker
method?
@@ -72,7 +74,6 @@ type Cursor struct { | |||
} | |||
|
|||
func (c *Cursor) Each(f func(*RowBatch) error) error { | |||
c.lastSuccessfulPrimaryKey = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what was this here for to begin with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was to ensure that subsequent calls to .Each
would start from scratch. It turns out we don't actually use this method in this way.
We only ever call this method once.
data_iterator.go
Outdated
d.logger = logrus.WithField("tag", "data_iterator") | ||
d.targetPKs = &sync.Map{} | ||
|
||
// If a state tracker is not provided, then the caller doesn't care about | ||
// tracking state. However, some methods are still useful so we use initialize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/use//
?
ferry.go
Outdated
} | ||
} | ||
|
||
// Even tho this function is identical to NewBinlogStreamer, it is here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/tho/though/
// Suppress siddontang/go-mysql logging as we already log the equivalents. | ||
// It also by defaults logs to stdout, which is different from Ghostferry | ||
// logging, which all goes to stderr. stdout in Ghostferry is reserved for | ||
// dumping states due to an abort. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit fragile. What if one of the other dependencies that we use logs to stdout? Or maybe a future one that we want to use does that and doesn't allow the user to disable it? What do we do then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My current thinking on this is that stdout is the most flexible method to dump the state with the least amount of code within Ghostferry itself.
If in the future we encounter issues with stdout, we can definitely make Ghostferry write to a file/pipe/whatever. We should deal with it when it comes up, especially since we don't quite have a vision on how to consume this state dump just yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im fine with this for now but this is definitely not going to age well. this should be communicated over an HTTP callback in case of integrated usage (like sharding / shop mover) and local file/db in case of standalone usage (like copydb)
|
||
WriteRetries: f.Config.DBWriteRetries, | ||
if f.StateToResumeFrom == nil { | ||
f.StateTracker = NewStateTracker(f.DataIterationConcurrency * 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this magical number 10 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a ring inside the StateTracker that tracks the speed at which we are copying data. The argument here specifies the ring's size. It should be a multiple of the number of goroutines that could be writing to the ring. 10 is just chosen as a conservative estimate.
|
||
_, found := s.completedTables[table] | ||
if found { | ||
return math.MaxUint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this feels a bit like a hack to me. Is there no cleaner way to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could return an error here, but that feels somewhat untruthful. Alternatively, I could also return the last known successful primary key. This is somewhat misleading as the last known successful pk might be out of date and the table has grown since then. If a table is marked as completed by Ghostferry, it should not be processed again as it is assumed that any growth is copied over by the binlogstreamer. I thus decided to return MaxUint64 here.
It's not that big of a deal as there is only currently on consumer of this method in the DataIterator. The DataIterator already checks if a table is completed and this condition should never be hit in real code atm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also return a boolean indicating completion of a table. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could as it doesn't matter that much: it's only useful for some defensive programming. This return should never be hit when correctly used.
f.logger.Debug("shutting down signal monitoring goroutine") | ||
return | ||
case s := <-c: | ||
f.ErrorHandler.Fatal("user", fmt.Errorf("signal received: %v", s.String())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm where is this being handled? Where's the actual code that does the dumping to stdout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ErrorHandler has always dumped the state inside the Fatal method. This code change just dumps it when a signal is received.
de7a3d3
to
4398c6f
Compare
If you want to resume with mismatched version, just manually change the JSON.
Host: s.DBConfig.Host, | ||
Port: s.DBConfig.Port, | ||
User: s.DBConfig.User, | ||
Password: s.DBConfig.Pass, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
return err | ||
} | ||
|
||
s.lastStreamedBinlogPosition = startFromBinlogPosition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feels like this should only be set after the binlogSyncer
has actually streamed the position. i.e. initialize the syncer and let the Run
method set this after it has streamed the starting position
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having it set there would be kind of redundant as this is an internal tracking variable for the main BinlogStreamer loop. Basically, it would have to look like:
ConnectBinlogStreamerToMySqlFrom():
s.startFromBinlogPosition = startFromBinlogPosition
Run():
s.lastStreamedBinlogPosition = s.startFromBinlogPosition
for ... s.lastStreamedBinlogPosition.Compare(s.targetBinlogPosition) < 0:
....
Without setting lastStreamedBinlogPosition before we start that loop, the conditional would be more complex.
Note that this variable has nothing to do with the StateTracker binlog position, which is set in BinlogWriter and once in Ferry during initial connection.
@@ -255,6 +265,10 @@ func (c *Config) ValidateConfig() error { | |||
return fmt.Errorf("Table filter function must be provided") | |||
} | |||
|
|||
if c.StateToResumeFrom != nil && c.StateToResumeFrom.GhostferryVersion != VersionString { | |||
return fmt.Errorf("StateToResumeFrom version mismatch: resume = %s, current = %s", c.StateToResumeFrom.GhostferryVersion, VersionString) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this just being defensive or there's a legitimate reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defensive for now as there's no guarantee that we won't change things. If we want to force resume, we can just set the version passed to Ghostferry to the same one as the Ghostferry instance that's launched.
@@ -76,7 +74,15 @@ func (d *DataIterator) Run() { | |||
return | |||
} | |||
|
|||
cursor := d.CursorConfig.NewCursor(table, targetPKInterface.(uint64)) | |||
startPk := d.StateTracker.LastSuccessfulPK(table.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does table.String()
include the schema? i.e. is it db.table
or just table
? former is more correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's db.table
. There are some issues if either your schema name or your table name includes a `. However, this problem is somewhat wide spread.
ferry.go
Outdated
} | ||
|
||
// Even though this function is identical to NewBinlogStreamer, it is here | ||
// for consistency so it will lead to less confusion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm i find it more confusing tbh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove this.
// Suppress siddontang/go-mysql logging as we already log the equivalents. | ||
// It also by defaults logs to stdout, which is different from Ghostferry | ||
// logging, which all goes to stderr. stdout in Ghostferry is reserved for | ||
// dumping states due to an abort. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im fine with this for now but this is definitely not going to age well. this should be communicated over an HTTP callback in case of integrated usage (like sharding / shop mover) and local file/db in case of standalone usage (like copydb)
ErrorHandler: f.ErrorHandler, | ||
Filter: f.CopyFilter, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's so hard to reason about this backwards data dependency. a constructor should be passed data down not reach up to its owner IMO.
I'm totally fine with a very verbose constructor that lives in binlog_streamer.go
that takes the required arguments over this pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm doing this because nothing external to Ferry is creating its own DataIterator/BinlogStreamer instances. I tried to have the alternative, but ended up initializing DataIterator/BinlogStreamer (for the delta copier and the reconciler) by copying all the attributes needed directly from Ferry.
I figured it may be better to turn the Ferry into a factory. I feel like this line of thinking is more natural anyway as we have a global config object passed to Ferry, and Ferry is already the God object managing all the states.
|
||
// BUG: if the PanicErrorHandler fires while running the standalone copy, we | ||
// will get an error dump even though we should not get one, which could be | ||
// misleading. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have a plan for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. I will once I hook up the iterative verifier state dumper as it can only dump states during certain phases and not others, just like this case.
|
||
_, found := s.completedTables[table] | ||
if found { | ||
return math.MaxUint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also return a boolean indicating completion of a table. WDYT?
@@ -278,7 +316,12 @@ func (f *Ferry) Start() error { | |||
// miss some records that are inserted between the time the | |||
// DataIterator determines the range of IDs to copy and the time that | |||
// the starting binlog coordinates are determined. | |||
err := f.BinlogStreamer.ConnectBinlogStreamerToMysql() | |||
var err error | |||
if f.StateToResumeFrom != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont know how i feel about all these conditionals on f.StateToResumeFrom == nil
. can we somehow boil it down to a single statement at a "higher" level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we can. I'll have to cherry-pick some changes from a downstream branch that I have going on. It's not perfect but it's a bit better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I can't easily cherry pick it back as I ended up moved the table loading logic from Start
to Initialize
and used the StateTracker
instead of StateToResumeFrom
directly in Start
after adding in the Reconciler.
I prefer if we can leave this code as is, as the PRs coming up will change this section and hopefully will make this logic less confusing.
d789dfe
to
3a95b69
Compare
3a95b69
to
c501639
Compare
This PR implements the ability to interrupt Ghostferry via SIGTERM/SIGINT. These signals causes the error handler to dump the current state to stdout and panic to terminate the process.
Resume is also implemented: give a SerializedState as the StateToResumeFrom on the Config object and DataIterator will resume via the LastSuccessfulPrimaryKeys, BinlogStreamer will resume via LastWrittenBinlogPosition, and Ferry uses the table schema cache from the SerializedState.
Testing of this will occur in a PR after this, along with the ruby integration test framework, which I'm still iterating on. The code here stands on its own, however.
Note: this PR does not implement the reconciliation process. This will be in a subsequent PR. Interestingly, the implementation is currently already "correct" if no schema change occured, as we resume streaming from LastWrittenBinlogPosition. This will of course be changed during the reconciliation process.
In addition, I also did some quick refactoring:
I tried my best to make things incremental, so each commit should stand on their own. It might be best to review in that order.