Skip to content

Commit

Permalink
Restore distinction between inbound and outbound signals after going …
Browse files Browse the repository at this point in the history
…through ConnectionPubSub.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 20, 2022
1 parent d60551e commit b230e3d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,39 @@
import java.util.Objects;
import java.util.function.Predicate;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.internal.utils.cluster.MappingStrategies;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.ConnectivityConstants;
import org.eclipse.ditto.internal.utils.cluster.MappingStrategies;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;

/**
* Envelope of an incoming signal for client actors to inform each other.
*
* @since 1.5.0
*/
@Immutable
public final class InboundSignal implements Jsonifiable.WithFieldSelectorAndPredicate<JsonField> {
public final class InboundSignal implements Jsonifiable.WithFieldSelectorAndPredicate<JsonField>,
Signal<InboundSignal> {

private static final String TYPE = InboundSignal.class.getSimpleName();
private final Signal<?> signal;
private final boolean dispatched;

private InboundSignal(final Signal<?> signal) {
private InboundSignal(final Signal<?> signal, final boolean dispatched) {
this.signal = checkNotNull(signal, "signal");
this.dispatched = dispatched;
}

/**
Expand All @@ -53,7 +60,25 @@ private InboundSignal(final Signal<?> signal) {
* @return the envelope.
*/
public static InboundSignal of(final Signal<?> signal) {
return new InboundSignal(signal);
return new InboundSignal(signal, false);
}

/**
* Return a copy of this object setting the {@code dispatched} flag to {@code true}.
*
* @return the copy.
*/
public InboundSignal asDispatched() {
return new InboundSignal(signal, true);
}

/**
* Get the {@code dispatched} flag.
*
* @return The flag.
*/
public boolean isDispatched() {
return dispatched;
}

/**
Expand All @@ -73,8 +98,9 @@ public static InboundSignal fromJson(final JsonObject jsonObject, final MappingS
.orElseThrow(() -> new NoSuchElementException("InboundSignal: No strategy found for signal type " +
signalType))
.parse(signalJson, dittoHeaders);
final boolean dispatched = jsonObject.getValue(JsonFields.DISPATCHED).orElse(true);

return InboundSignal.of((Signal<?>) signal);
return new InboundSignal((Signal<?>) signal, dispatched);
}

/**
Expand All @@ -88,21 +114,24 @@ public Signal<?> getSignal() {

@Override
public boolean equals(final Object other) {
if (other instanceof InboundSignal) {
return Objects.equals(signal, ((InboundSignal) other).signal);
if (other instanceof InboundSignal that) {
return Objects.equals(signal, that.signal) && dispatched == that.dispatched;
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(signal);
return Objects.hash(signal, dispatched);
}

@Override
public String toString() {
return getClass().getSimpleName() + "[signal=" + signal + "]";
return getClass().getSimpleName() +
"[signal=" + signal +
", dispatched=" + dispatched +
"]";
}

@Override
Expand All @@ -118,6 +147,7 @@ public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<
.set(JsonFields.SIGNAL, signalJson)
.set(JsonFields.HEADERS, headers)
.set(JsonFields.SIGNAL_TYPE, signal.getType())
.set(JsonFields.DISPATCHED, dispatched)
.build();
}

Expand All @@ -126,6 +156,37 @@ public JsonObject toJson(final JsonSchemaVersion schemaVersion, final JsonFieldS
return toJson(schemaVersion, field -> fieldSelector.getPointers().contains(field.getKey().asPointer()));
}

@Override
public InboundSignal setDittoHeaders(final DittoHeaders dittoHeaders) {
return new InboundSignal(signal.setDittoHeaders(dittoHeaders), dispatched);
}

@Override
public DittoHeaders getDittoHeaders() {
return signal.getDittoHeaders();
}

@Nonnull
@Override
public String getManifest() {
return TYPE;
}

@Override
public JsonPointer getResourcePath() {
return JsonPointer.empty();
}

@Override
public String getResourceType() {
return ConnectivityConstants.ENTITY_TYPE.toString();
}

@Override
public String getType() {
return TYPE;
}

private static final class JsonFields {

private static final JsonFieldDefinition<JsonObject> SIGNAL =
Expand All @@ -136,5 +197,8 @@ private static final class JsonFields {

private static final JsonFieldDefinition<String> SIGNAL_TYPE =
JsonFactory.newStringFieldDefinition("signalType");

private static final JsonFieldDefinition<Boolean> DISPATCHED =
JsonFactory.newBooleanFieldDefinition("dispatched");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inAnyState()
.event(RetrieveConnectionLogs.class, (command, data) -> retrieveConnectionLogs(command))
.event(ResetConnectionLogs.class, this::resetConnectionLogs)
.event(CheckConnectionLogsActive.class, (command, data) -> checkLoggingActive(command))
.event(InboundSignal.class, (signal, d) -> signal.isDispatched(), this::handleDispatchedInboundSignal)
.event(InboundSignal.class, this::handleInboundSignal)
.event(PublishMappedMessage.class, this::publishMappedMessage)
.event(ConnectivityCommand.class, this::onUnknownEvent) // relevant connectivity commands were handled
Expand Down Expand Up @@ -1628,6 +1629,13 @@ private FSM.State<BaseClientState, BaseClientData> handleSignal(final WithDittoH
return stay();
}

private FSM.State<BaseClientState, BaseClientData> handleDispatchedInboundSignal(final InboundSignal inboundSignal,
final BaseClientData data) {
// another actor dispatched this inbound signal to this actor. forward to outboundDispatchingActor.
outboundDispatchingActor.forward(inboundSignal, getContext());
return stay();
}

private FSM.State<BaseClientState, BaseClientData> handleInboundSignal(final InboundSignal inboundSignal,
final BaseClientData data) {
// dispatch signal to other client actors according to entity ID
Expand All @@ -1636,7 +1644,7 @@ private FSM.State<BaseClientState, BaseClientData> handleInboundSignal(final Inb
dispatchSearchCommand((WithSubscriptionId<?>) signal);
} else {
final var entityId = tryExtractEntityId(signal).orElseThrow();
connectionPubSub.publishSignal(signal, connectionId(), entityId, getSender());
connectionPubSub.publishSignal(inboundSignal.asDispatched(), connectionId(), entityId, getSender());
}
return stay();
}
Expand Down

0 comments on commit b230e3d

Please sign in to comment.