Skip to content

Commit

Permalink
Issue #559: Moved SignalInformationPoint to module "ditto-internal-…
Browse files Browse the repository at this point in the history
…models-signal" to make is usable in a broader scope.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Oct 26, 2021
1 parent 05ff1f9 commit 88c40ea
Show file tree
Hide file tree
Showing 11 changed files with 506 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
Expand All @@ -53,6 +54,7 @@
import org.eclipse.ditto.connectivity.service.messaging.SendResult;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.signing.NoOpSigning;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.PreparedTimer;
import org.eclipse.ditto.json.JsonFactory;
Expand Down Expand Up @@ -494,8 +496,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
}
}

final var liveCommandWithEntityId =
SignalInformationPoint.tryToGetAsLiveCommandWithEntityId(sentSignal);
final var liveCommandWithEntityId = tryToGetAsLiveCommandWithEntityId(sentSignal);
if (liveCommandWithEntityId.isPresent()
&& null != result
&& SignalInformationPoint.isLiveCommandResponse(result)) {
Expand Down Expand Up @@ -528,6 +529,16 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
});
}

private static Optional<SignalWithEntityId<?>> tryToGetAsLiveCommandWithEntityId(@Nullable final Signal<?> signal) {
final SignalWithEntityId<?> result;
if (SignalInformationPoint.isLiveCommand(signal)) {
result = (SignalWithEntityId<?>) signal;
} else {
result = null;
}
return Optional.ofNullable(result);
}

@Nullable
private MessageCommandResponse<?, ?> toMessageCommandResponse(final MessageCommand<?, ?> sentMessageCommand,
final DittoHeaders dittoHeaders,
Expand Down

This file was deleted.

5 changes: 5 additions & 0 deletions internal/models/acks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
<artifactId>akka-actor_${scala.version}</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signal</artifactId>
</dependency>

<!-- ### Testing ### -->
<dependency>
<groupId>com.typesafe.akka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.internal.models.acks;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.internal.models.acks.AcknowledgementForwarderActorStarter.isLiveSignal;

import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
Expand All @@ -33,14 +32,12 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.modify.ThingModifyCommand;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Props;
import akka.japi.pf.PFBuilder;
import scala.PartialFunction;

Expand All @@ -52,12 +49,11 @@
*/
public final class AcknowledgementAggregatorActorStarter {

protected final ActorRefFactory actorRefFactory;
protected final Duration maxTimeout;
protected final HeaderTranslator headerTranslator;
protected final PartialFunction<Signal<?>, Signal<?>> ackRequestSetter;

private int childCounter = 0;
private final ActorRefFactory actorRefFactory;
private final Duration maxTimeout;
private final HeaderTranslator headerTranslator;
private final PartialFunction<Signal<?>, Signal<?>> ackRequestSetter;
private int childCounter;

private AcknowledgementAggregatorActorStarter(final ActorRefFactory actorRefFactory,
final Duration maxTimeout,
Expand All @@ -68,6 +64,7 @@ private AcknowledgementAggregatorActorStarter(final ActorRefFactory actorRefFact
this.ackRequestSetter = ackRequestSetter;
this.maxTimeout = checkNotNull(maxTimeout, "maxTimeout");
this.headerTranslator = checkNotNull(headerTranslator, "headerTranslator");
childCounter = 0;
}

/**
Expand All @@ -85,7 +82,9 @@ public static AcknowledgementAggregatorActorStarter of(final ActorRefFactory act
final HeaderTranslator headerTranslator,
final AbstractCommandAckRequestSetter<?>... ackRequestSetters) {

return of(actorRefFactory, acknowledgementConfig.getForwarderFallbackTimeout(), headerTranslator,
return of(actorRefFactory,
acknowledgementConfig.getForwarderFallbackTimeout(),
headerTranslator,
ackRequestSetters);
}

Expand All @@ -104,7 +103,9 @@ public static AcknowledgementAggregatorActorStarter of(final ActorRefFactory act
final HeaderTranslator headerTranslator,
final AbstractCommandAckRequestSetter<?>... ackRequestSetters) {

return new AcknowledgementAggregatorActorStarter(actorRefFactory, maxTimeout, headerTranslator,
return new AcknowledgementAggregatorActorStarter(actorRefFactory,
maxTimeout,
headerTranslator,
buildAckRequestSetter(ackRequestSetters));
}

Expand Down Expand Up @@ -151,8 +152,10 @@ public <T> T start(final Signal<?> signal,
public <T> T preprocess(final Signal<?> signal,
final BiFunction<Signal<?>, Boolean, T> preprocessor,
final Function<? super DittoHeaderInvalidException, T> onInvalidHeader) {
final Signal<?> signalToForward = ackRequestSetter.apply(signal);
final Optional<DittoHeaderInvalidException> headerInvalid = getDittoHeaderInvalidException(signalToForward);

final var signalToForward = ackRequestSetter.apply(signal);
final var headerInvalid = getDittoHeaderInvalidException(signalToForward);

return headerInvalid.map(onInvalidHeader)
.orElseGet(() -> preprocessor.apply(signalToForward, shouldStartForIncoming(signalToForward)));
}
Expand All @@ -171,20 +174,24 @@ public <T> T doStart(final EntityId entityId,
final DittoHeaders dittoHeaders,
final Consumer<Object> responseSignalConsumer,
final Function<ActorRef, T> forwarderStartedFunction) {

return forwarderStartedFunction.apply(startAckAggregatorActor(entityId, dittoHeaders, responseSignalConsumer));
}

private ActorRef startAckAggregatorActor(final EntityId entityId, final DittoHeaders dittoHeaders,
private ActorRef startAckAggregatorActor(final EntityId entityId,
final DittoHeaders dittoHeaders,
final Consumer<Object> responseSignalConsumer) {
final Props props = AcknowledgementAggregatorActor.props(entityId, dittoHeaders, maxTimeout, headerTranslator,

final var props = AcknowledgementAggregatorActor.props(entityId,
dittoHeaders,
maxTimeout,
headerTranslator,
responseSignalConsumer);
final String actorName = getNextActorName(dittoHeaders);
return actorRefFactory.actorOf(props, actorName);
return actorRefFactory.actorOf(props, getNextActorName(dittoHeaders));
}

private String getNextActorName(final DittoHeaders dittoHeaders) {
final String correlationId = dittoHeaders
.getCorrelationId()
final var correlationId = dittoHeaders.getCorrelationId()
.map(cid -> URLEncoder.encode(cid, StandardCharsets.UTF_8))
.orElse("_");
return String.format("ackr%x-%s", childCounter++, correlationId);
Expand All @@ -193,6 +200,7 @@ private String getNextActorName(final DittoHeaders dittoHeaders) {
@SuppressWarnings({"unchecked", "rawtypes", "java:S3740"})
private static PartialFunction<Signal<?>, Signal<?>> buildAckRequestSetter(
final AbstractCommandAckRequestSetter<?>... ackRequestSetters) {

PFBuilder<Signal<?>, Signal<?>> pfBuilder = new PFBuilder<>();
// unavoidable raw type due to the lack of existential type
for (final AbstractCommandAckRequestSetter ackRequestSetter : ackRequestSetters) {
Expand All @@ -203,34 +211,59 @@ private static PartialFunction<Signal<?>, Signal<?>> buildAckRequestSetter(
}

private static Optional<DittoHeaderInvalidException> getDittoHeaderInvalidException(final Signal<?> signal) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final boolean isTimeoutZero = dittoHeaders.getTimeout().map(Duration::isZero).orElse(false);
final boolean isTimeoutHeaderInvalid = isTimeoutZero &&
(dittoHeaders.isResponseRequired() || !dittoHeaders.getAcknowledgementRequests().isEmpty());
if (isTimeoutHeaderInvalid) {
final Optional<DittoHeaderInvalidException> result;

final var dittoHeaders = signal.getDittoHeaders();
if (isTimeoutHeaderInvalid(dittoHeaders)) {
final var invalidHeaderKey = DittoHeaderDefinition.TIMEOUT.getKey();
final String message = String.format("The value of the header '%s' must not be zero if " +
final var message = String.format("The value of the header '%s' must not be zero if " +
"response or acknowledgements are requested.", invalidHeaderKey);
return Optional.of(DittoHeaderInvalidException.newBuilder()
result = Optional.of(DittoHeaderInvalidException.newBuilder()
.withInvalidHeaderKey(invalidHeaderKey)
.message(message)
.description("Please provide a positive timeout.")
.dittoHeaders(dittoHeaders)
.build());
} else {
return Optional.empty();
result = Optional.empty();
}

return result;
}

private static boolean isTimeoutHeaderInvalid(final DittoHeaders dittoHeaders) {
final boolean result;

final var isTimeoutZero = dittoHeaders.getTimeout().filter(Duration::isZero).isPresent();
if (isTimeoutZero) {
if (dittoHeaders.isResponseRequired()) {
result = true;
} else {
final var acknowledgementRequests = dittoHeaders.getAcknowledgementRequests();
result = !acknowledgementRequests.isEmpty();
}
} else {
result = false;
}

return result;
}

static boolean shouldStartForIncoming(final Signal<?> signal) {
final boolean isLiveSignal = isLiveSignal(signal);
private static boolean shouldStartForIncoming(final Signal<?> signal) {
final boolean result;

final var isLiveSignal = SignalInformationPoint.isChannelLive(signal);
final Collection<AcknowledgementRequest> ackRequests = signal.getDittoHeaders().getAcknowledgementRequests();
if (signal instanceof ThingModifyCommand && !isLiveSignal) {
return ackRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotLiveResponse);
} else if (signal instanceof MessageCommand || (isLiveSignal && signal instanceof ThingCommand)) {
return ackRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotTwinPersisted);
result = ackRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotLiveResponse);
} else if (SignalInformationPoint.isMessageCommand(signal) ||
isLiveSignal && SignalInformationPoint.isThingCommand(signal)) {

result = ackRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotTwinPersisted);
} else {
return false;
result = false;
}

return result;
}
}
22 changes: 13 additions & 9 deletions internal/models/signal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,32 @@
<name>Eclipse Ditto :: Internal :: Models :: Signal</name>

<dependencies>
<!-- Testing -->
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-things-model</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-messages-model</artifactId>
<scope>test</scope>
</dependency>

<!-- Testing -->
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-test</artifactId>
<artifactId>ditto-base-model</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>

<dependency>
Expand Down

0 comments on commit 88c40ea

Please sign in to comment.