Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,28 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.apache.camel.util.StopWatch;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertTrue;

@Disabled("CAMEL-13629: Flaky test")
/**
* Tests that reconfiguring a Disruptor (adding a consumer) works correctly while a producer is actively sending
* messages, and that the new consumer receives messages sent after reconfiguration.
*/
public class DisruptorReconfigureWithBlockingProducerTest extends CamelTestSupport {

private static final String DISRUPTOR_URI = "disruptor:foo?blockWhenFull=true";

@Test
void testDisruptorReconfigureWithBlockingProducer() throws Exception {
getMockEndpoint("mock:a").expectedMessageCount(20);
getMockEndpoint("mock:b").expectedMinimumMessageCount(10);
getMockEndpoint("mock:b").expectedMinimumMessageCount(12);

StopWatch watch = new StopWatch();
ProducerThread producerThread = new ProducerThread();
CountDownLatch reconfiguredLatch = new CountDownLatch(1);
ProducerThread producerThread = new ProducerThread(reconfiguredLatch);
producerThread.start();

//synchronize with the producer to the point that the buffer is full
assertTrue(producerThread.awaitFullBufferProduced());
assertTrue(producerThread.awaitFirstBatchSent());

context.addRoutes(new RouteBuilder() {
@Override
Expand All @@ -50,16 +52,10 @@ public void configure() {
}
});

// adding the consumer may take place after the current buffer is flushed
// which will take approximately 8*200=1600 ms because of delay on route.
// If the reconfigure does not correctly hold back the producer thread on this request,
// it will take approximately 20*200=4000 ms.
// be on the safe side and check that it was at least faster than 2 seconds.
assertTrue(watch.taken() < 2000, "Reconfigure of Disruptor blocked");
reconfiguredLatch.countDown();

//Wait and check that the producer has produced all messages without throwing an exception
assertTrue(producerThread.checkResult());
MockEndpoint.assertIsSatisfied(context);
MockEndpoint.assertIsSatisfied(context, 30, TimeUnit.SECONDS);
}

@Override
Expand All @@ -73,42 +69,46 @@ public void configure() {
}

private class ProducerThread extends Thread {
private final CountDownLatch startedLatch = new CountDownLatch(1);
private final CountDownLatch firstBatchSentLatch = new CountDownLatch(1);
private final CountDownLatch reconfiguredLatch;
private final CountDownLatch resultLatch = new CountDownLatch(1);
private Exception exception;
private volatile Exception exception;

ProducerThread(CountDownLatch reconfiguredLatch) {
this.reconfiguredLatch = reconfiguredLatch;
}

@Override
public void run() {
for (int i = 0; i < 8; i++) {
template.sendBody("disruptor:foo", "Message");
}
try {
for (int i = 0; i < 8; i++) {
template.sendBody(DISRUPTOR_URI, "Message");
}

startedLatch.countDown();
firstBatchSentLatch.countDown();

reconfiguredLatch.await(10, TimeUnit.SECONDS);

try {
for (int i = 0; i < 12; i++) {
template.sendBody("disruptor:foo", "Message");
template.sendBody(DISRUPTOR_URI, "Message");
}
} catch (Exception e) {
exception = e;
firstBatchSentLatch.countDown();
} finally {
resultLatch.countDown();
}

resultLatch.countDown();
}

public boolean awaitFullBufferProduced() throws InterruptedException {
return startedLatch.await(5, TimeUnit.SECONDS);
public boolean awaitFirstBatchSent() throws InterruptedException {
return firstBatchSentLatch.await(10, TimeUnit.SECONDS);
}

public boolean checkResult() throws Exception {
boolean result = resultLatch.await(30, TimeUnit.SECONDS);
if (exception != null) {
throw exception;
}
boolean result = resultLatch.await(5, TimeUnit.SECONDS);
if (exception != null) {
throw exception;
}

return result;
}
}
Expand Down
Loading