Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-4389: Add keepalive pings and handle retriable exceptions #55

Merged
merged 1 commit into from Dec 2, 2021

Conversation

shichao-an
Copy link
Member

  • Add a new config "vitess.keepalive.interval.ms" which configures the gRPC keepalive interval for VStream.
  • Add VitessErrorHandler and marks UNAVAILABLE gRPC status error as retriable.
  • Fix a race condition issue for integration tests.

This fixes https://issues.redhat.com/browse/DBZ-4389

Copy link
Member

@gunnarmorling gunnarmorling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change itself LGTM. One commen on that sleep call inline though. And the option should be added to the connector docs. Thanks, @shichao-an!

protected TestConsumer testConsumer(int expectedRecordsCount, String... topicPrefixes) throws InterruptedException {
TestConsumer consumer = new TestConsumer(expectedRecordsCount, topicPrefixes);
// Sleep for 1 second before return to avoid race conditions.
Thread.sleep(1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid such unconditional thread sleep call? It's not reliable typically and it increases execution time of the test suite. What's the race condition about? Typically, the better fix is to use Awaitility to make sure some specific condition has been matched in a test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

could you also comment a bit around where does the race condition come from? would stopConnector help to handle that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Others have chimed in, but just wanted to add a thought. The general things you could look out for with sleeps is (1) making unit tests longer than they need to and (2) more importantly, sleeps having potential to make unit tests flaky.

Depending on the code you could do something with Awaitility as Gunnar said like await().until(<something>) or some other variant of the API. Another thing is to inject a Clock so we can control/advance them as we need in our tests -- although this could be a larger refactor, for another time (I haven't looked as deeply into this part of the codebase yet).

Copy link
Member Author

@shichao-an shichao-an Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the Vitess connector's IT tests always failed on my laptop due to similar reason to https://issues.redhat.com/browse/DBZ-4391 . Because the expected data change events could be not be observed.

After troubleshooting for a while, I think there is a race condition. A typical example would be starting from this line https://github.com/debezium/debezium-connector-vitess/blob/main/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java#L116

When the VStream is started it used "current" as starting gtid position. The test inserts seems started too early when the connector started VStream, so when VTGate receives the connection the insert already completed; this makes VTGate only emit new changes after the insert. This means the insert change event is "lost" for us in the test cases and expecting the record would time out.

Copy link
Member Author

@shichao-an shichao-an Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I intentionally set the GTID to the last GTID in the current Vitess mysql, by passing in configBuilder in the tests, like this:

        builder = builder
                .with(RelationalDatabaseConnectorConfig.SERVER_NAME, TEST_SERVER)
                .with(VitessConnectorConfig.VTGATE_HOST, VTGATE_HOST)
                .with(VitessConnectorConfig.VTGATE_PORT, VTGATE_PORT)
                .with(VitessConnectorConfig.VTGATE_USER, USERNAME)
                .with(VitessConnectorConfig.VTGATE_PASSWORD, PASSWORD)
                .with(VitessConnectorConfig.GTID, "MySQL56/0588557d-5357-11ec-84d5-0242ac110002:1-1264")
                .with(VitessConnectorConfig.POLL_INTERVAL_MS, 100);

The tests will work. This means it's defintely due to the race condition of VStream with "current" and inserts as mentioned above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense. I was using a slightly different approach to tackle this exact problem - it was taking advantage that VStream will pass along an event with "OTHER" type when there is no writes happening.

But I think the logging approach you have is more reliable, could you also change it in VitessReplicationConnectionIT as well so that we are consistent on that front?

Copy link
Member

@5antelope 5antelope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @shichao-an

The change makes sense to me, had a few questions in line.

Related to this fix: I think we observe the VStream endpoint would be "broken" if there is no events for net_read_timeout (the mysql setting). Do we observe the same behavior here? I know you verified that after this change there is no error being raised after some idle time, it might be good to also confirm if we do another write after 10 min, do we still see that event go through.

@@ -194,6 +194,7 @@ private int getNumOfRowEvents(Vtgate.VStreamResponse response) {
private ManagedChannel newChannel(String vtgateHost, int vtgatePort) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(vtgateHost, vtgatePort)
.usePlaintext()
.keepAliveTime(config.getKeepaliveInterval().toMillis(), TimeUnit.MILLISECONDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not super clear to me why the channel get closed before this change - for my own understanding, is the channel closed by grpc because it is idle? I think the default idle timeout was 30 min: https://grpc.github.io/grpc-java/javadoc/io/grpc/ManagedChannelBuilder.html#idleTimeout-long-java.util.concurrent.TimeUnit-

Copy link
Member Author

@shichao-an shichao-an Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After troubleshooting for quite a bit I couldn't make a conclusion. I tried to set idleTimeout to different values and there was no effect at all. I added debug output and found the channel itself is mostly in IDLE state even before the timeout, so this idle setting might be irrelevant. Because there is only one gRPC activity (VStream) and no new gRPCs. I also debugged server-side as well.

The general assumption is that the connection/transport is closed after no active streams for a while. To gRPC this means no pings between peers for a period, so the transport is disconnected. I have searched and found these references:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The links seem to suggest using idealTimeout over keepAliveTime 🤷

I think it's fine to add this option here, the only concern I have here is it requires corresponding setting on the server side as well (e.g., grpc.keepalive_permit_without_calls)

I'm not sure in this case if it worth adding a TODO for it, I will defer to you and others here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, idealTimeout doesn't work in my verification so concluded that it's not related to idleTimeout, because idleTimeout is about "channel", but our problem is with "transport/connection", which we need to tackle with keepalive pings.

As for keepalive_permit_without_calls, I tested without this option and it works. According to this and this:

  • allows keepalive pings to be sent even if there are no calls in flight
  • Sets whether keepalive will be performed when there are no outstanding RPC on a connection

I think we do have a "call" or outstanding RPC, that is, the VStream call itself. Therefore without this option it won't affect our keepalive pings. I will need to test it in prod for several passes once I had the snapshots ready. We can add it if we do need this option.

protected TestConsumer testConsumer(int expectedRecordsCount, String... topicPrefixes) throws InterruptedException {
TestConsumer consumer = new TestConsumer(expectedRecordsCount, topicPrefixes);
// Sleep for 1 second before return to avoid race conditions.
Thread.sleep(1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

could you also comment a bit around where does the race condition come from? would stopConnector help to handle that?

@jeffchao
Copy link
Member

jeffchao commented Dec 2, 2021

@shichao-an LGTM. Could you also elaborate with a comment (in that same test is fine) on more details of the race condition?

Otherwise, just some minor questions/comments. Thanks!

@shichao-an
Copy link
Member Author

I know you verified that after this change there is no error being raised after some idle time, it might be good to also confirm if we do another write after 10 min, do we still see that event go through.

Yes I tested these cases. If there are writes, the broken time will be delayed, until being idle for the same period of time again when the transport will be disconnected

@shichao-an
Copy link
Member Author

shichao-an commented Dec 2, 2021

I added waitForVStreamStarted to wait for VStream started. I still need to add some buffer time using pollInterval (because the VTGate needs to receive the request and started process the "current" position). I tried 500ms it worked on my laptop but shouldSchemaUpdatedAfterOnlineDdl still failed on GitHub workflow CI. So I changed it to 1s. Let me know if this work for you. @gunnarmorling @sonne5 @jeffchao

@5antelope
Copy link
Member

If there are writes, the broken time will be delayed, until being idle for the same period of time again when the transport will be disconnected

just to clairfy, when you say "disconnected" do you mean the UNAVAILABLE was being raised or do you mean new writes won't be streamed out? I'm asking because if with this grpc config change, we don't have UNAVAILABLE errors, but if the new writes after the certain period of time won't be streamed out - then we have something more to dig into.

Regardless, this is a good fix anyway 💯 , just want to see if there are more we need to investigate. And since most of the time, we should expect continuous events, the net_read_timeout should be something with low priority even if it's real.

@shichao-an
Copy link
Member Author

shichao-an commented Dec 2, 2021

just to clairfy, when you say "disconnected" do you mean the UNAVAILABLE was being raised or do you mean new writes won't be streamed out?

I mean UNAVAILABLE was being raised. After that, new writes won't be streamed out because the connector crashed.

I'm asking because if with this grpc config change, we don't have UNAVAILABLE errors, but if the new writes after the certain period of time won't be streamed out - then we have something more to dig into.

Right, there's no issue with new writes as long as the transport is kept open with keepalive pings. I added the retry behavior to handle UNAVAILABLE error as well, so if somehow the transport is disconnected again when keepalive fails, then connector will be restarted automatically.

Copy link
Member

@5antelope 5antelope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM, you will also need to update the doc here: https://github.com/debezium/debezium/blob/main/documentation/modules/ROOT/pages/connectors/vitess.adoc

Don't want to block you on the clean up with VitessReplicationConnectionIT, feel free to do that separately

@shichao-an
Copy link
Member Author

This LGTM, you will also need to update the doc here: https://github.com/debezium/debezium/blob/main/documentation/modules/ROOT/pages/connectors/vitess.adoc

Will do in the debezium's PR.

Don't want to block you on the clean up with VitessReplicationConnectionIT, feel free to do that separately

Sure. We can create a maintenance PR to refactor the IT tests a bit. There is lot of duplicate code here and there.

@shichao-an shichao-an merged commit 63cb0b7 into debezium:main Dec 2, 2021
@keweishang
Copy link
Member

keweishang commented Dec 3, 2021

Looks like I'm late for the party.

I remember that my local docker setup has this issue. Here's the connector error:

[2020-06-12 07:45:53,984] ERROR WorkerSourceTask{id=sample_connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
	at eu.bolt.data.connector.vitess.VitessStreamingChangeEventSource.execute(VitessStreamingChangeEventSource.java:88)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:105)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: Network closed for unknown reason
	at io.grpc.Status.asRuntimeException(Status.java:533)
	at io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:610)
	at eu.bolt.data.connector.vitess.connection.VitessReplicationConnection$1.read(VitessReplicationConnection.java:67)
	at eu.bolt.data.connector.vitess.VitessStreamingChangeEventSource.execute(VitessStreamingChangeEventSource.java:55)
	... 6 more

And my original Bolt's internal note for this issue was:

VStream gRPC server uses the default [keepalive](https://godoc.org/google.golang.org/grpc/keepalive) configuration, which means that the gRPC server doesn’t close the connection and keeps the connection open infinitely. 

We use gRPC client default keepalive configuration too, so it should keep the connection open infinitely too.

The Network closed for unknown reason error happens only when Kafka Connect is within docker-compose. When Kafka Connect is started locally via confluent local start, there is no such problem.

Therefore, it is more of a docker network problem, not a Vitess connector problem.

@5antelope
Copy link
Member

Thanks @keweishang, that's a helpful insights! I think we do run docker compose in production, but in local we are just doing docker run ... debezium/connect (shichao can keep me honest here)

I'm curious if you observed this in local only or in prod as well and how did you solve the problem? Did we just move away from docker compose or did we do something differently?

@keweishang
Copy link
Member

In local environment, I tried to deploy both Vitess and Connect Worker with the same docker-compose and I had this issue. However, if I ran Connect Worker on bare metal without docker in local environment (I don't remember how did I run Vitess locally in this case, probably in docker or bare metal), I didn't observe this issue.

In production, we've never had this issue, but it's probably due to the fact that there's always events streamed from VTGate and the VStream gRPC is never idle.

@keweishang
Copy link
Member

Also our production Vitess and Connect Workers are also deployed on bare metal without docker

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants