Skip to content
Permalink
Browse files
IGNITE-14010 Fixes flaky KafkaIgniteStreamerSelfTest. (#39)
  • Loading branch information
ololo3000 committed Jan 29, 2021
1 parent 7c13d57 commit 1774aaa62eafd5f8820bf8a39d063f0bcd2a2150
Showing 1 changed file with 6 additions and 6 deletions.
@@ -26,7 +26,6 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -46,6 +45,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;

/**
@@ -208,9 +208,6 @@ private void consumerStream(String topic, Map<String, String> keyValMap)
return entries;
});

// Start kafka streamer.
kafkaStmr.start();

final CountDownLatch latch = new CountDownLatch(CNT);

IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@@ -238,9 +235,12 @@ private void consumerStream(String topic, Map<String, String> keyValMap)

ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);

// Checks all events successfully processed in 10 seconds.
// Start kafka streamer.
kafkaStmr.start();

// Checks all events successfully processed in test timeout.
assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
latch.await(10, TimeUnit.SECONDS));
latch.await(getTestTimeout(), MILLISECONDS));

for (Map.Entry<String, String> entry : keyValMap.entrySet())
assertEquals(entry.getValue(), cache.get(entry.getKey()));

0 comments on commit 1774aaa

Please sign in to comment.