Skip to content

Commit

Permalink
review: minor code formatting changes;
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Feb 7, 2022
1 parent 1cb4765 commit 2383069
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@
import akka.stream.javadsl.SourceQueueWithComplete;

/**
* Responsible for publishing {@link org.eclipse.ditto.connectivity.api.ExternalMessage}s into an Kafka
* broker.
* Responsible for publishing {@link org.eclipse.ditto.connectivity.api.ExternalMessage}s into a Kafka broker.
*/
final class KafkaPublisherActor extends BasePublisherActor<KafkaPublishTarget> {

Expand Down Expand Up @@ -179,11 +178,10 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
final Function<RecordMetadata, SendResult> callback =
new ProducerCallback(signal, autoAckLabel, ackSizeQuota, connection);

final ExternalMessage messageWithConnectionIdHeader = message
.withHeader("ditto-connection-id", connection.getId().toString());
final ExternalMessage messageWithConnectionIdHeader =
message.withHeader("ditto-connection-id", connection.getId().toString());

return producerStream.publish(publishTarget, messageWithConnectionIdHeader)
.thenApply(callback);
return producerStream.publish(publishTarget, messageWithConnectionIdHeader).thenApply(callback);
}

@Override
Expand Down Expand Up @@ -261,6 +259,7 @@ private SendResult buildResponseFromMetadata(@Nullable final RecordMetadata meta
} else {
issuedAck = null;
}

return new SendResult(issuedAck, dittoHeaders);
}

Expand All @@ -277,6 +276,7 @@ private JsonObject toPayload(final RecordMetadata metadata) {
if (metadata.hasOffset()) {
builder.set("offset", metadata.offset(), this::isQuotaSufficient);
}

return builder.build();
}

Expand All @@ -293,8 +293,8 @@ private boolean isQuotaSufficient(final JsonField field) {

private boolean isDebugEnabled() {
final Map<String, String> specificConfig = connection.getSpecificConfig();
return Boolean.parseBoolean(specificConfig.getOrDefault("debugEnabled", Boolean.FALSE.toString()));

return Boolean.parseBoolean(specificConfig.getOrDefault("debugEnabled", Boolean.FALSE.toString()));
}

}
Expand Down Expand Up @@ -367,17 +367,15 @@ private void handleSendResult(
} else {
// should never happen, we provide only ProducerMessage.single to the source
logger.warning("Received multipart result, ignoring: {}", results);
resultFuture
.completeExceptionally(
new IllegalArgumentException("Received unexpected multipart result."));
resultFuture.completeExceptionally(
new IllegalArgumentException("Received unexpected multipart result."));
}
} else {
logger.debug("Failed to send kafka record: [{}] {}", exception.getClass().getName(),
exception.getMessage());
resultFuture.completeExceptionally(exception);
escalate(exception, ConnectionFailure.determineFailureDescription(Instant.now(),
exception,
"Broker may not be available."));
exception, "Broker may not be available."));
}
}

Expand All @@ -395,6 +393,7 @@ private CompletableFuture<RecordMetadata> publish(final KafkaPublishTarget publi
logger.error(ex, ex.getMessage());
resultFuture.completeExceptionally(ex);
}

return resultFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class KafkaPublisherActorTest extends AbstractPublisherActorTest {
.getConnectionConfig()
.getKafkaConfig()
.getProducerConfig();

private MockSendProducerFactory mockSendProducerFactory;

@Override
Expand Down Expand Up @@ -165,7 +166,7 @@ protected void verifyAcknowledgements(final Supplier<Acknowledgements> ackSuppli
assertThat(acks.getSize()).isEqualTo(1);
final Acknowledgement ack = acks.stream().findAny().orElseThrow();
assertThat(ack.getHttpStatus()).isEqualTo(HttpStatus.NO_CONTENT);
assertThat(ack.getLabel().toString()).isEqualTo("please-verify");
assertThat(ack.getLabel().toString()).hasToString("please-verify");
assertThat(ack.getEntity()).isEmpty();
}

Expand Down Expand Up @@ -215,6 +216,7 @@ private boolean containsOverflowError(final Object msg) {
ack.getEntity().map(JsonValue::asObject).map(o -> MessageSendingFailedException.fromJson(o,
ack.getDittoHeaders())).orElseThrow();
assertThat(messageSendingFailedException.getMessage()).contains("There are too many uncommitted messages");

return true;
}

Expand Down Expand Up @@ -293,7 +295,7 @@ public void verifyAcknowledgementsWithDebugEnabled() {
.first()
.satisfies(ack -> {
assertThat(ack.getHttpStatus()).isEqualTo(HttpStatus.OK);
assertThat(ack.getLabel().toString()).isEqualTo("please-verify");
assertThat(ack.getLabel().toString()).hasToString("please-verify");
assertThat(ack.getEntity()).contains(JsonObject.newBuilder()
.set("timestamp", 0)
.set("serializedKeySize", 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private MockSendProducerFactory(final String targetTopic,

public static MockSendProducerFactory getInstance(
final String targetTopic, final Queue<ProducerRecord<String, ByteBuffer>> published) {

return new MockSendProducerFactory(targetTopic, published, false, false, null, false);
}

Expand All @@ -78,6 +79,7 @@ public static MockSendProducerFactory getInstance(final String targetTopic,
*/
public static MockSendProducerFactory getBlockingInstance(final String targetTopic,
final Queue<ProducerRecord<String, ByteBuffer>> published) {

return new MockSendProducerFactory(targetTopic, published, true, false, null, false);
}

Expand Down Expand Up @@ -112,23 +114,28 @@ public SendProducer<String, ByteBuffer> newSendProducer() {
final ProducerMessage.Envelope<String, ByteBuffer, CompletableFuture<RecordMetadata>> envelope =
invocationOnMock.getArgument(0);
final RecordMetadata dummyMetadata =
new RecordMetadata(new TopicPartition(targetTopic, 5), 0L, 0L, 0L, 0L, 0, 0);
new RecordMetadata(new TopicPartition(targetTopic, 5),
0L, 0L, 0L, 0L, 0, 0);
final ProducerMessage.Message<String, ByteBuffer, CompletableFuture<RecordMetadata>> message =
(ProducerMessage.Message<String, ByteBuffer, CompletableFuture<RecordMetadata>>) envelope;
published.offer(message.record());

return CompletableFuture.completedStage(ProducerResultFactory.result(dummyMetadata, message));
});
} else if (!shouldThrowException){
when(producer.sendEnvelope(any(ProducerMessage.Envelope.class)))
.thenReturn(CompletableFuture.failedStage(exception));
} else {
if (counter.get() == 0) {
when(producer.sendEnvelope(any(ProducerMessage.Envelope.class))).thenThrow(exception);
when(producer.sendEnvelope(any(ProducerMessage.Envelope.class)))
.thenThrow(exception);
counter.getAndIncrement();
} else {
when(producer.sendEnvelope(any(ProducerMessage.Envelope.class))).thenReturn(CompletableFuture.failedStage(exception));
when(producer.sendEnvelope(any(ProducerMessage.Envelope.class)))
.thenReturn(CompletableFuture.failedStage(exception));
}
}

return producer;
}
}

0 comments on commit 2383069

Please sign in to comment.