Skip to content
Permalink
Browse files
IGNITE-14009 Fixes flaky PubSubStreamerSelfTest. (#38)
  • Loading branch information
ololo3000 committed Jan 29, 2021
1 parent 3fb24c6 commit 7c13d5764a7a588f2220bae4bbb3619c43a14146
Showing 2 changed files with 21 additions and 7 deletions.
@@ -34,9 +34,10 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
@@ -71,11 +72,14 @@ class MockPubSubServer {
/** */
public static final int MESSAGES_PER_REQUEST = 10;

/** Time to wait for the message in milliseconds. */
private static final long MSG_WAIT_TIMEOUT = 1_000L;

/** */
private final Map<String, Publisher> publishers = new HashMap<>();

/** */
private final Queue<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
private final BlockingDeque<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();

public SubscriberStubSettings createSubscriberStub() throws IOException {
CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
@@ -136,8 +140,18 @@ private ClientCall<PullRequest, PullResponse> clientCall() {
private void pullMessages(ClientCall.Listener<PullResponse> listener, Metadata metadata) {
PullResponse.Builder pullResponse = PullResponse.newBuilder();

for(int i = 0; i < MESSAGES_PER_REQUEST; i++) {
pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(blockingQueue.remove()).build());
try {
for (int i = 0; i < MESSAGES_PER_REQUEST; i++) {
PubsubMessage msg = blockingQueue.poll(MSG_WAIT_TIMEOUT, TimeUnit.MILLISECONDS);

if (msg == null)
break;

pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(msg).build());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

listener.onMessage(pullResponse.build());
@@ -180,9 +180,6 @@ private void consumerStream(ProjectTopicName topic, Map<String, String> keyValMa

pubSubStmr.setSingleTupleExtractor(singleTupleExtractor());

// Start Pub/Sub streamer.
pubSubStmr.start();

final CountDownLatch latch = new CountDownLatch(CNT);

IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@@ -210,6 +207,9 @@ private void consumerStream(ProjectTopicName topic, Map<String, String> keyValMa

ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);

// Start Pub/Sub streamer.
pubSubStmr.start();

// Checks all events successfully processed in 10 seconds.
assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
latch.await(10, TimeUnit.SECONDS));

0 comments on commit 7c13d57

Please sign in to comment.