Skip to content

Commit

Permalink
Wait for VStream started in startConnector
Browse files Browse the repository at this point in the history
  • Loading branch information
shichao-an committed Dec 2, 2021
1 parent cd62b46 commit f0c3364
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ private int getNumOfRowEvents(Vtgate.VStreamResponse response) {
.setFlags(vStreamFlags)
.build(),
responseObserver);
LOGGER.info("Started VStream");
}

private VitessGrpc.VitessStub newStub(ManagedChannel channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,6 @@ protected static String incrementGtid(String gtid, int increment) {

protected TestConsumer testConsumer(int expectedRecordsCount, String... topicPrefixes) throws InterruptedException {
TestConsumer consumer = new TestConsumer(expectedRecordsCount, topicPrefixes);
// Sleep for 1 second before return to avoid race conditions.
Thread.sleep(1000);
return consumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,13 @@ private void waitForShardedGtidAcquiring(final LogInterceptor logInterceptor) {
.until(() -> logInterceptor.containsMessage("Default VGTID '[{\"keyspace\":"));
}

private void waitForVStreamStarted(final LogInterceptor logInterceptor) {
// The inserts must happen only after VStream is started some buffer.
Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
.pollInterval(Duration.ofMillis(500))
.until(() -> logInterceptor.containsMessage("Started VStream"));
}

private void startConnector() throws InterruptedException {
startConnector(false);
}
Expand All @@ -576,9 +583,11 @@ private void startConnector(boolean hasMultipleShards) throws InterruptedExcepti
private void startConnector(Function<Configuration.Builder, Configuration.Builder> customConfig, boolean hasMultipleShards)
throws InterruptedException {
Configuration.Builder configBuilder = customConfig.apply(TestHelper.defaultConfig(hasMultipleShards));
final LogInterceptor logInterceptor = new LogInterceptor();
start(VitessConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning();
waitForVStreamStarted(logInterceptor);
}

private void waitForStreamingRunning() throws InterruptedException {
Expand Down

0 comments on commit f0c3364

Please sign in to comment.