-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mysql] add new tables to existing cdc job #777
Conversation
Current implementation use a Singleton class I'm working with @leonardBang to find an elegant way to exchange data among them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhjwpku for the great work, I left some comments. And I think we need to consider more cases, and we'd better to split different feature into to separate PRs.
@@ -120,6 +120,77 @@ public void testJobManagerFailoverSingleParallelism() throws Exception { | |||
1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"customers"}); | |||
} | |||
|
|||
@Test | |||
public void testJobManagerFailoverInBinlogPhaseWithNewlyAddTable() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need test more cases, like new adding table in snapshot phase and new adding table in binglog pahse.
And we also can split below support to two PRs. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, definitely, this simple case is for my local test, we should come up with more cases to make it behave right.
@@ -54,6 +54,7 @@ public static BinlogOffset readBinlogPosition(int offsetVersion, DataInputDeseri | |||
return in.readBoolean() ? new BinlogOffset(in.readUTF(), in.readLong()) : null; | |||
case 2: | |||
case 3: | |||
case 4: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do not need upgrade offsetVersion
, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I bump the VERSION in MySqlSplitSerializer
, I think this will propagate to all the serializer?
...in/java/com/ververica/cdc/connectors/mysql/source/events/FinishedBinlogSplitReportEvent.java
Outdated
Show resolved
Hide resolved
...java/com/ververica/cdc/connectors/mysql/source/assigners/state/HybridPendingSplitsState.java
Outdated
Show resolved
Hide resolved
.../main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
Outdated
Show resolved
Hide resolved
...ain/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
Outdated
Show resolved
Hide resolved
...ain/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
Outdated
Show resolved
Hide resolved
...ql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/ververica/cdc/connectors/mysql/source/events/PleaseRequestSplitEvent.java
Outdated
Show resolved
Hide resolved
Signed-off-by: 元组 <zhaojunwang.zjw@alibaba-inc.com>
Thanks @zhjwpku for the great continuous contribution, LGTM. I also help polish the code and test. |
This PR try to achieve the feature mentioned in [0]
1.1 check whether there is newly added table in
MySqlSnapshotSplitAssigner#open
1.1.1 if assigner not finished, add the tables to remainingTables
and continue the normal processure
1.1.2 if assigner has finished, a.k.a the job has enter the incremental phase
1.1.2.1 set the assigner to suspend status
1.1.2.2 send
SuspendBinlogSplitReaderRequestEvent
toSourceReader
2.1 on receiving the
SuspendBinlogSplitReaderRequestEvent
2.1.2 set a flag to tell MySqlSplitReader to suspend
2.1.3
MySqlSplitReader
stop currentReader infetch()
and updatebinlog offset in
MySqlRecordEmitter#emitRecord
2.1.4 response a
SuspendBinlogSplitReaderResponseEvent
event toMySqlSplitEnumerator
inMySqlSourceReader#snapshotState
3.1 on receiving the
SuspendBinlogSplitReaderResponseEvent
3.1.1 save the received offset, and start again the snapshot phase
3.1.2 when snapshot phase finished, use the saved offset as minBinlogOffset
in
MySqlHybridSplitAssigner#createBinlogSplit
[0] #513
Signed-off-by: 元组 zhaojunwang.zjw@alibaba-inc.com