Skip to content

Commit

Permalink
Add test validating KafkaProducerStream restart on error.
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Feb 2, 2022
1 parent 2e1f6aa commit 1cb4765
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand Down Expand Up @@ -72,6 +74,7 @@
import akka.actor.Status;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
import scala.concurrent.duration.FiniteDuration;

/**
* Unit test for {@link KafkaPublisherActor}.
Expand All @@ -96,8 +99,9 @@ protected void setupMocks(final TestProbe notUsed) {
mockSendProducerFactory = MockSendProducerFactory.getInstance(TARGET_TOPIC, published);
}

private void setUpMocksToFailWith(final RuntimeException exception) {
mockSendProducerFactory = MockSendProducerFactory.getInstance(TARGET_TOPIC, published, exception);
private void setUpMocksToFailWith(final RuntimeException exception, final boolean shouldThrowException) {
mockSendProducerFactory =
MockSendProducerFactory.getInstance(TARGET_TOPIC, published, exception, shouldThrowException);
}

@Override
Expand Down Expand Up @@ -306,8 +310,19 @@ public void verifyAcknowledgementsWithDebugEnabled() {
@Test
public void retriableExceptionBecomesInternalErrorAcknowledgement() {
testSendFailure(new DisconnectException(), (sender, parent) ->
assertThat(sender.expectMsgClass(Acknowledgements.class).getHttpStatus())
.isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR)
assertThat(sender.expectMsgClass(Acknowledgements.class).getHttpStatus())
.isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR),
false, 1
);
}

@Test
public void sendEnvelopeFailureDoesntKillProducerStream() {
testSendFailure(new KafkaException(), (sender, parent) ->
assertThat(sender.expectMsgClass(FiniteDuration.apply(5, TimeUnit.SECONDS), Acknowledgements.class)
.getHttpStatus())
.isEqualTo(HttpStatus.BAD_REQUEST),
true, 2
);
}

Expand Down Expand Up @@ -336,10 +351,11 @@ private void shouldContainHeader(final List<Header> headers, final String key) {
assertThat(expectedHeader).isPresent();
}

private void testSendFailure(final RuntimeException exception, final BiConsumer<TestProbe, TestKit> assertions) {
private void testSendFailure(final RuntimeException exception, final BiConsumer<TestProbe, TestKit> assertions,
final boolean shouldThrowException, final int sendMessagesAmount) {
new TestKit(actorSystem) {{
// GIVEN
setUpMocksToFailWith(exception);
setUpMocksToFailWith(exception, shouldThrowException);

final TestProbe senderProbe = TestProbe.apply("sender", actorSystem);
final OutboundSignal.MultiMapped multiMapped = OutboundSignalFactory.newMultiMappedOutboundSignal(
Expand All @@ -349,8 +365,9 @@ private void testSendFailure(final RuntimeException exception, final BiConsumer<
final ActorRef publisherActor = childActorOf(getPublisherActorProps());
publisherCreated(this, publisherActor);

// WHEN
publisherActor.tell(multiMapped, senderProbe.ref());
for (int x = 0; x < sendMessagesAmount; x++) {
publisherActor.tell(multiMapped, senderProbe.ref());
}

// THEN
assertions.accept(senderProbe, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nullable;

Expand All @@ -39,42 +40,53 @@ final class MockSendProducerFactory implements SendProducerFactory {
private final String targetTopic;
private final Queue<ProducerRecord<String, ByteBuffer>> published;
@Nullable private final RuntimeException exception;
final boolean shouldThrowException;
private final boolean blocking;
private final AtomicBoolean wait = new AtomicBoolean();
private final AtomicInteger counter = new AtomicInteger();

private MockSendProducerFactory(final String targetTopic,
final Queue<ProducerRecord<String, ByteBuffer>> published,
final boolean blocking,
final boolean slow,
@Nullable final RuntimeException exception,
final boolean shouldThrowException) {

private MockSendProducerFactory(final String targetTopic, final Queue<ProducerRecord<String, ByteBuffer>> published,
final boolean blocking, final boolean slow, @Nullable final RuntimeException exception) {
this.targetTopic = targetTopic;
this.published = published;
this.blocking = blocking;
wait.set(slow);
this.exception = exception;
this.shouldThrowException = shouldThrowException;
}

public static MockSendProducerFactory getInstance(
final String targetTopic, final Queue<ProducerRecord<String, ByteBuffer>> published) {
return new MockSendProducerFactory(targetTopic, published, false, false, null);
return new MockSendProducerFactory(targetTopic, published, false, false, null, false);
}

public static MockSendProducerFactory getInstance(final String targetTopic,
final Queue<ProducerRecord<String, ByteBuffer>> published, final RuntimeException exception) {
return new MockSendProducerFactory(targetTopic, published, false, false, exception);
final Queue<ProducerRecord<String, ByteBuffer>> published,
final RuntimeException exception,
final boolean shouldThrowException) {

return new MockSendProducerFactory(targetTopic, published, false, false, exception, shouldThrowException);
}

/**
* Never publishes any message, returns futures that never complete.
*/
public static MockSendProducerFactory getBlockingInstance(final String targetTopic,
final Queue<ProducerRecord<String, ByteBuffer>> published) {
return new MockSendProducerFactory(targetTopic, published, true, false, null);
return new MockSendProducerFactory(targetTopic, published, true, false, null, false);
}

/**
* Blocks 1 second for the first message published. Continues to operate normally afterwards.
*/
public static MockSendProducerFactory getSlowStartInstance(final String targetTopic,
final Queue<ProducerRecord<String, ByteBuffer>> published) {
return new MockSendProducerFactory(targetTopic, published, false, true, null);
return new MockSendProducerFactory(targetTopic, published, false, true, null, false);
}

@Override
Expand Down Expand Up @@ -106,11 +118,17 @@ public SendProducer<String, ByteBuffer> newSendProducer() {
published.offer(message.record());
return CompletableFuture.completedStage(ProducerResultFactory.result(dummyMetadata, message));
});
} else {
} else if (!shouldThrowException){
when(producer.sendEnvelope(any(ProducerMessage.Envelope.class)))
.thenReturn(CompletableFuture.failedStage(exception));
} else {
if (counter.get() == 0) {
when(producer.sendEnvelope(any(ProducerMessage.Envelope.class))).thenThrow(exception);
counter.getAndIncrement();
} else {
when(producer.sendEnvelope(any(ProducerMessage.Envelope.class))).thenReturn(CompletableFuture.failedStage(exception));
}
}

return producer;
}
}

0 comments on commit 1cb4765

Please sign in to comment.