Skip to content

Commit

Permalink
Issue #106: Refactored 'AcknowledgementAggregatorActor'.
Browse files Browse the repository at this point in the history
Mainly eliminated unnecessary 'instanceof' checks and formatted code.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Nov 11, 2021
1 parent ef4f8f0 commit 9625bb7
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 258 deletions.
12 changes: 12 additions & 0 deletions internal/models/acks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-akka</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-base-model</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,29 @@

import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.signals.WithOptionalEntity;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayCommandTimeoutException;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommandResponse;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.WithThingId;
import org.eclipse.ditto.things.model.signals.acks.ThingAcknowledgementFactory;
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
Expand Down Expand Up @@ -86,7 +82,7 @@ private AcknowledgementAggregatorActor(final EntityId entityId,
timeout = getTimeout(requestCommandHeaders, maxTimeout);
getContext().setReceiveTimeout(timeout);

final Set<AcknowledgementRequest> acknowledgementRequests = requestCommandHeaders.getAcknowledgementRequests();
final var acknowledgementRequests = requestCommandHeaders.getAcknowledgementRequests();
ackregator = AcknowledgementAggregator.getInstance(entityId, correlationId, timeout, headerTranslator);
ackregator.addAcknowledgementRequests(acknowledgementRequests);
log.withCorrelationId(correlationId)
Expand All @@ -112,8 +108,13 @@ static Props props(final EntityId entityId,
final AcknowledgementConfig acknowledgementConfig,
final HeaderTranslator headerTranslator,
final Consumer<Object> responseSignalConsumer) {
return Props.create(AcknowledgementAggregatorActor.class, entityId, dittoHeaders,
acknowledgementConfig.getForwarderFallbackTimeout(), headerTranslator, responseSignalConsumer);

return Props.create(AcknowledgementAggregatorActor.class,
entityId,
dittoHeaders,
acknowledgementConfig.getForwarderFallbackTimeout(),
headerTranslator,
responseSignalConsumer);
}

/**
Expand All @@ -134,8 +135,13 @@ static Props props(final EntityId entityId,
final Duration maxTimeout,
final HeaderTranslator headerTranslator,
final Consumer<Object> responseSignalConsumer) {
return Props.create(AcknowledgementAggregatorActor.class, entityId, dittoHeaders, maxTimeout,
headerTranslator, responseSignalConsumer);

return Props.create(AcknowledgementAggregatorActor.class,
entityId,
dittoHeaders,
maxTimeout,
headerTranslator,
responseSignalConsumer);
}

@Override
Expand All @@ -152,70 +158,62 @@ public Receive createReceive() {
}

private void handleThingCommandResponse(final ThingCommandResponse<?> thingCommandResponse) {
final boolean isLiveResponse = thingCommandResponse.getDittoHeaders().getChannel().stream()
.anyMatch(TopicPath.Channel.LIVE.getName()::equals);
addCommandResponse(thingCommandResponse, thingCommandResponse, isLiveResponse);
log.withCorrelationId(correlationId).debug("Received thing command response <{}>.", thingCommandResponse);
final var acknowledgementLabel =
SignalInformationPoint.isChannelLive(thingCommandResponse) ? LIVE_RESPONSE : TWIN_PERSISTED;
addCommandResponse(thingCommandResponse, getAcknowledgement(thingCommandResponse, acknowledgementLabel));
}

private void handleMessageCommandResponse(final MessageCommandResponse<?, ?> messageCommandResponse) {
addCommandResponse(messageCommandResponse, messageCommandResponse, true);
private static Acknowledgement getAcknowledgement(final ThingCommandResponse<?> thingCommandResponse,
final AcknowledgementLabel acknowledgementLabel) {

return ThingAcknowledgementFactory.newAcknowledgement(acknowledgementLabel,
thingCommandResponse.getEntityId(),
thingCommandResponse.getHttpStatus(),
thingCommandResponse.getDittoHeaders(),
getPayload(thingCommandResponse).orElse(null));
}

private void addCommandResponse(final CommandResponse<?> commandResponse, final WithThingId withThingId,
final boolean isLiveResponse) {
log.withCorrelationId(correlationId).debug("Received command response <{}>.", commandResponse);
final Acknowledgement acknowledgement;
if (isLiveResponse) {
acknowledgement = toLiveResponseAcknowledgement(commandResponse, withThingId);
private static Optional<JsonValue> getPayload(final ThingCommandResponse<?> thingCommandResponse) {
final Optional<JsonValue> result;
if (thingCommandResponse instanceof WithOptionalEntity) {
final var withOptionalEntity = (WithOptionalEntity) thingCommandResponse;
result = withOptionalEntity.getEntity(thingCommandResponse.getImplementedSchemaVersion());
} else {
acknowledgement = toTwinPersistedAcknowledgement(commandResponse, withThingId);
result = Optional.empty();
}
return result;
}

private void addCommandResponse(final CommandResponse<?> commandResponse, final Acknowledgement acknowledgement) {
ackregator.addReceivedAcknowledgment(acknowledgement);
potentiallyCompleteAcknowledgements(commandResponse);
}

private static Acknowledgement toLiveResponseAcknowledgement(final CommandResponse<?> commandResponse,
final WithThingId withThingId) {

final DittoHeaders liveResponseAckHeaders;
if (commandResponse instanceof MessageCommandResponse) {
liveResponseAckHeaders = commandResponse.getDittoHeaders().toBuilder()
.putHeaders(((MessageCommandResponse<?, ?>) commandResponse).getMessage().getHeaders())
.build();
} else {
liveResponseAckHeaders = commandResponse.getDittoHeaders();
}

return ThingAcknowledgementFactory.newAcknowledgement(
LIVE_RESPONSE,
withThingId.getEntityId(),
commandResponse.getHttpStatus(),
liveResponseAckHeaders,
getPayload(commandResponse).orElse(null));
private void handleMessageCommandResponse(final MessageCommandResponse<?, ?> messageCommandResponse) {
log.withCorrelationId(correlationId).debug("Received message command response <{}>.", messageCommandResponse);
addCommandResponse(messageCommandResponse, getAcknowledgement(messageCommandResponse));
}

private static Acknowledgement toTwinPersistedAcknowledgement(final CommandResponse<?> commandResponse,
final WithThingId withThingId) {
return ThingAcknowledgementFactory.newAcknowledgement(
TWIN_PERSISTED,
withThingId.getEntityId(),
commandResponse.getHttpStatus(),
commandResponse.getDittoHeaders(),
getPayload(commandResponse).orElse(null)
);
private static Acknowledgement getAcknowledgement(final MessageCommandResponse<?, ?> messageCommandResponse) {
final var responseDittoHeaders = messageCommandResponse.getDittoHeaders();
final var message = messageCommandResponse.getMessage();
final var liveResponseAckHeaders = responseDittoHeaders.toBuilder()
.putHeaders(message.getHeaders())
.build();
return ThingAcknowledgementFactory.newAcknowledgement(LIVE_RESPONSE,
messageCommandResponse.getEntityId(),
messageCommandResponse.getHttpStatus(),
liveResponseAckHeaders,
getPayload(messageCommandResponse).orElse(null));
}

private static Optional<JsonValue> getPayload(final CommandResponse<?> response) {
final Optional<JsonValue> result;
if (response instanceof WithOptionalEntity) {
result = ((WithOptionalEntity) response).getEntity(response.getImplementedSchemaVersion());
} else if (response instanceof MessageCommandResponse) {
result = response.toJson().getValue(MessageCommandResponse.JsonFields.JSON_MESSAGE.getPointer()
.append(MessageCommandResponse.JsonFields.JSON_MESSAGE_PAYLOAD.getPointer()));
} else {
result = Optional.empty();
}
return result;
private static Optional<JsonValue> getPayload(final MessageCommandResponse<?, ?> messageCommandResponse) {
final var jsonMessagePointer = MessageCommandResponse.JsonFields.JSON_MESSAGE.getPointer();
final var jsonMessagePayloadPointer = MessageCommandResponse.JsonFields.JSON_MESSAGE_PAYLOAD.getPointer();
final var messagePayloadPointer = jsonMessagePointer.append(jsonMessagePayloadPointer);
final var messageCommandResponseJsonObject = messageCommandResponse.toJson();
return messageCommandResponseJsonObject.getValue(messagePayloadPointer);
}

private void handleReceiveTimeout(final ReceiveTimeout receiveTimeout) {
Expand Down Expand Up @@ -246,16 +244,14 @@ private void handleDittoRuntimeException(final DittoRuntimeException dittoRuntim
}

private void potentiallyCompleteAcknowledgements(@Nullable final CommandResponse<?> response) {

if (ackregator.receivedAllRequestedAcknowledgements()) {
completeAcknowledgements(response);
}
}

private void completeAcknowledgements(@Nullable final CommandResponse<?> response) {
final Acknowledgements aggregatedAcknowledgements =
ackregator.getAggregatedAcknowledgements(requestCommandHeaders);
final boolean builtInAcknowledgementOnly = containsOnlyTwinPersistedOrLiveResponse(aggregatedAcknowledgements);
final var aggregatedAcknowledgements = ackregator.getAggregatedAcknowledgements(requestCommandHeaders);
final var builtInAcknowledgementOnly = containsOnlyTwinPersistedOrLiveResponse(aggregatedAcknowledgements);
if (null != response && builtInAcknowledgementOnly) {
// in this case, only the implicit "twin-persisted" acknowledgement was asked for, respond with the signal:
handleSignal(response);
Expand All @@ -272,8 +268,9 @@ private void completeAcknowledgements(@Nullable final CommandResponse<?> respons

public static DittoHeadersSettable<?> restoreCommandConnectivityHeaders(final DittoHeadersSettable<?> signal,
final DittoHeaders requestCommandHeaders) {
final DittoHeadersBuilder<?, ?> enhancedHeadersBuilder = signal.getDittoHeaders()
.toBuilder()

final var signalDittoHeaders = signal.getDittoHeaders();
final var enhancedHeadersBuilder = signalDittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.EXPECTED_RESPONSE_TYPES.getKey())
.removeHeader(DittoHeaderDefinition.INBOUND_PAYLOAD_MAPPER.getKey())
.removeHeader(DittoHeaderDefinition.REPLY_TARGET.getKey());
Expand All @@ -297,7 +294,7 @@ private void handleSignal(final DittoHeadersSettable<?> signal) {
* @return the error response.
*/
private ThingErrorResponse asThingErrorResponse(final Acknowledgements aggregatedAcknowledgements) {
final ThingId thingId = ThingId.of(aggregatedAcknowledgements.getEntityId());
final var thingId = ThingId.of(aggregatedAcknowledgements.getEntityId());
final DittoRuntimeException dittoRuntimeException = GatewayCommandTimeoutException.newBuilder(timeout)
.dittoHeaders(aggregatedAcknowledgements.getDittoHeaders())
.build();
Expand All @@ -309,7 +306,7 @@ private static boolean containsOnlyTwinPersistedOrLiveResponse(final Acknowledge
return aggregatedAcknowledgements.getSize() == 1 &&
aggregatedAcknowledgements.stream()
.anyMatch(ack -> {
final AcknowledgementLabel label = ack.getLabel();
final var label = ack.getLabel();
return TWIN_PERSISTED.equals(label) ||
LIVE_RESPONSE.equals(label);
});
Expand Down

0 comments on commit 9625bb7

Please sign in to comment.