Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][tidb-connector][#1631] add cdc error event handle function #1632

Merged
merged 3 commits into from
Nov 2, 2022

Conversation

leozlliang
Copy link
Contributor

  1. add cdc error event handler
  2. fix issue when pull cdc event blocked

@leozlliang leozlliang changed the title [bugfix]#1631 fix tikv-java error event handler [fix][tidb-connector][#1631] fix cdc error event handler Oct 20, 2022
@leozlliang leozlliang changed the title [fix][tidb-connector][#1631] fix cdc error event handler [fix][tidb-connector][#1631] add cdc error event handle function Oct 20, 2022
@GOODBOY008
Copy link
Member

@lincoln-lil Thank you for contributions. Please use spotless:apply to format code.

@GOODBOY008 GOODBOY008 self-requested a review October 20, 2022 05:22
@leozlliang
Copy link
Contributor Author

Thanks for your review, done.

@GOODBOY008
Copy link
Member

Thanks for your review, done.

@leozlliang Can u add unit test to confirm this fix is worked.

Copy link
Member

@GOODBOY008 GOODBOY008 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leozlliang I have left some comment.

@@ -0,0 +1,250 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update license.

@@ -0,0 +1,86 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -0,0 +1,262 @@
/*
* Copyright 2021 TiKV Project Authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -0,0 +1,190 @@
/*
* Copyright 2021 TiKV Project Authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@@ -114,6 +119,9 @@ public void open(final Configuration config) throws Exception {
cdcClient = new CDCClient(session, keyRange);
prewrites = new TreeMap<>();
commits = new TreeMap<>();
// if pull cdc event block when region split, cdc event will lose.
// use queue to make sure pull event unblock.
committedEvents = new LinkedBlockingQueue<Cdcpb.Event.Row>(50000000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capacity 50000000 can you give me some explain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -114,6 +119,9 @@ public void open(final Configuration config) throws Exception {
cdcClient = new CDCClient(session, keyRange);
prewrites = new TreeMap<>();
commits = new TreeMap<>();
// if pull cdc event block when region split, cdc event will lose.
// use queue to make sure pull event unblock.
committedEvents = new LinkedBlockingQueue<Cdcpb.Event.Row>(50000000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
committedEvents = new LinkedBlockingQueue<Cdcpb.Event.Row>(50000000);
committedEvents = new LinkedBlockingQueue<>(50000000);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -198,6 +207,8 @@ protected void readSnapshotEvents() throws Exception {

protected void readChangeEvents() throws Exception {
LOG.info("read change event from resolvedTs:{}", resolvedTs);
// child thread to sink committed rows.
new Thread(this::asyncSink).start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Executors.newSingleThreadExecutor(threadFactory) will be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, done

@GOODBOY008 GOODBOY008 reopened this Oct 28, 2022

private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if expectSize not expected, is this function will exit? otherwise will block the ci stop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants