Skip to content

Commit

Permalink
extend BasePublisherActor with reference to proxyActor to be able to …
Browse files Browse the repository at this point in the history
…send commands to concierge;

route LiveQueryCommandResponses to concierge for filtering the response;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Oct 20, 2021
1 parent a9b2d24 commit 0986e91
Show file tree
Hide file tree
Showing 32 changed files with 159 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ private Void dispatchEnforcedMessage(final Contextual<?> enforcementResult) {
// message does not exist; nothing to dispatch
enforcementResult.getLog().debug("Not dispatching due to lack of message: {}", enforcementResult);
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,25 +160,27 @@
public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientState, BaseClientData> implements
ConnectivityConfigModifiedBehavior {

/**
* Common logger for all sub-classes of BaseClientActor as its MDC already contains the connection ID.
*/
protected final ThreadSafeDittoLoggingAdapter logger;
protected static final Status.Success DONE = new Status.Success(Done.getInstance());

protected ConnectionContext connectionContext;
protected final ConnectionLogger connectionLogger;
protected final ConnectivityStatusResolver connectivityStatusResolver;

/**
* The name of the dispatcher that will be used for async mapping.
*/
private static final String MESSAGE_MAPPING_PROCESSOR_DISPATCHER = "message-mapping-processor-dispatcher";

private static final Pattern EXCLUDED_ADDRESS_REPORTING_CHILD_NAME_PATTERN = Pattern.compile(
OutboundMappingProcessorActor.ACTOR_NAME + "|" + OutboundDispatchingActor.ACTOR_NAME + "|" +
"StreamSupervisor-.*|subscriptionManager");

protected static final Status.Success DONE = new Status.Success(Done.getInstance());

private static final String DITTO_STATE_TIMEOUT_TIMER = "dittoStateTimeout";
private static final int SOCKET_CHECK_TIMEOUT_MS = 2000;
private static final String CLOSED_BECAUSE_OF_UNKNOWN_FAILURE_MISCONFIGURATION_STATUS_IN_CLIENT =
"Closed because of unknown/failure/misconfiguration status in client.";
/**
* Common logger for all sub-classes of BaseClientActor as its MDC already contains the connection ID.
*/
protected final ThreadSafeDittoLoggingAdapter logger;

private final Connection connection;
private final ActorRef connectionActor;
Expand All @@ -196,22 +198,18 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private final ConnectivityCounterRegistry connectionCounterRegistry;
private final ConnectionLoggerRegistry connectionLoggerRegistry;
private final Materializer materializer;
protected final ConnectionLogger connectionLogger;
protected final ConnectivityStatusResolver connectivityStatusResolver;
private final boolean dryRun;

private final ConnectionContextProvider connectionContextProvider;
protected ConnectionContext connectionContext;

// counter for all child actors ever started to disambiguate between them
private int childActorCount = 0;

private Sink<Object, NotUsed> inboundMappingSink;
private ActorRef outboundDispatchingActor;
private ActorRef outboundMappingProcessorActor;
private ActorRef subscriptionManager;
private ActorRef tunnelActor;

// counter for all child actors ever started to disambiguate between them
private int childActorCount = 0;

protected BaseClientActor(final Connection connection,
final ActorRef proxyActor,
final ActorRef connectionActor,
Expand Down Expand Up @@ -424,6 +422,16 @@ protected String getDefaultClientId() {
return getClientId(connection.getId());
}

/**
* Get the proxyActor reference.
*
* @return the proxyActor ref.
*/

protected ActorRef getProxyActor() {
return proxyActor;
}

private boolean hasInboundMapperConfigChanged(final ConnectivityConfig connectivityConfig) {
final var currentConfig = connectionContext.getConnectivityConfig().getMappingConfig().getMapperLimitsConfig();
final var modifiedConfig = connectivityConfig.getMappingConfig().getMapperLimitsConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.ditto.base.model.common.Placeholders;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
Expand Down Expand Up @@ -81,8 +82,10 @@
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionEvent;

import akka.actor.AbstractActor;
Expand All @@ -105,6 +108,7 @@ public abstract class BasePublisherActor<T extends PublishTarget> extends Abstra
protected final ConnectionConfig connectionConfig;
protected final ConnectionLogger connectionLogger;
protected final ConnectivityStatusResolver connectivityStatusResolver;
protected final ExpressionResolver connectionIdResolver;

/**
* Common logger for all sub-classes of BasePublisherActor as its MDC already contains the connection ID.
Expand All @@ -118,13 +122,15 @@ public abstract class BasePublisherActor<T extends PublishTarget> extends Abstra
private final List<Optional<ReplyTarget>> replyTargets;
private final int acknowledgementSizeBudget;
private final String clientId;
protected final ExpressionResolver connectionIdResolver;
private final ActorRef proxyActor;

protected BasePublisherActor(final Connection connection,
final String clientId,
final ActorRef proxyActor,
final ConnectivityStatusResolver connectivityStatusResolver) {
this.connection = checkNotNull(connection, "connection");
this.clientId = checkNotNull(clientId, "clientId");
this.proxyActor = checkNotNull(proxyActor, "proxyActor");
resourceStatusMap = new HashMap<>();
final List<Target> targets = connection.getTargets();
targets.forEach(target -> resourceStatusMap.put(target, getTargetResourceStatus(target)));
Expand Down Expand Up @@ -238,8 +244,27 @@ private void sendBackResponses(final OutboundSignal.MultiMapped multiMapped, @Nu
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(multiMapped.getSource());
if (!nonAcknowledgementsResponses.isEmpty() && sender != null) {
nonAcknowledgementsResponses.forEach(response -> {
l.debug("CommandResponse created from HTTP response. Replying to <{}>: <{}>", sender, response);
sender.tell(response, getSelf());
// TODO remove header merging when mergeWithResponseHeaders in HttpPublisherActor is fixed
// and headers are added to the response
final var sourceDittoHeaders = multiMapped.getSource().getDittoHeaders();
final var responseDittoHeaders = response.getDittoHeaders();
final var combinedHeaders = DittoHeaders.newBuilder(sourceDittoHeaders)
.putHeaders(responseDittoHeaders)
.build();

final var responseWithPreservedHeaders =
response.setDittoHeaders(combinedHeaders);
if (responseWithPreservedHeaders instanceof ThingQueryCommandResponse
&& isLiveResponse(responseWithPreservedHeaders)) {
l.debug("LiveQueryCommandResponse created from HTTP response. " +
"Sending response <{}> to concierge for filtering", responseWithPreservedHeaders);

proxyActor.tell(responseWithPreservedHeaders, sender);
} else {
l.debug("CommandResponse created from HTTP response. Replying to <{}>: <{}>", sender,
responseWithPreservedHeaders);
sender.tell(responseWithPreservedHeaders, getSelf());
}
});
} else if (nonAcknowledgementsResponses.isEmpty()) {
l.debug("No CommandResponse created from HTTP response.");
Expand All @@ -248,6 +273,10 @@ private void sendBackResponses(final OutboundSignal.MultiMapped multiMapped, @Nu
}
}

private boolean isLiveResponse(final WithDittoHeaders response) {
return response.getDittoHeaders().getChannel().filter(TopicPath.Channel.LIVE.getName()::equals).isPresent();
}

/**
* Gets the converter from publisher exceptions to Acknowledgements.
* Override to handle client-specific exceptions.
Expand Down Expand Up @@ -280,7 +309,6 @@ private static Acknowledgements appendConnectionIdToAcknowledgements(final Ackno
acknowledgements.getDittoHeaders());
}


/**
* Appends the ConnectionId to the processed {@code commandResponse} payload.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Props getActorPropsForType(final Connection connection, final ActorRef pr
result = KafkaClientActor.props(connection, proxyActor, connectionActor, dittoHeaders);
break;
case HTTP_PUSH:
result = HttpPushClientActor.props(connection, connectionActor, dittoHeaders);
result = HttpPushClientActor.props(connection, proxyActor, connectionActor, dittoHeaders);
break;
default:
throw new IllegalArgumentException("ConnectionType <" + connectionType + "> is not supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ final class OutboundDispatchingActor extends AbstractActor {
@SuppressWarnings("unused")
private OutboundDispatchingActor(final OutboundMappingSettings settings,
final ActorRef outboundMappingProcessorActor) {

this.settings = settings;
this.outboundMappingProcessorActor = outboundMappingProcessorActor;
}
Expand Down Expand Up @@ -182,7 +181,7 @@ private void handleInboundResponseOrAcknowledgement(final Signal<?> responseOrAc
final Consumer<ActorRef> action = acknowledgementForwarder -> {
if (responseOrAck instanceof ThingQueryCommandResponse && isLiveResponse(responseOrAck)) {
// forward live command responses to concierge to filter response
proxyActor.tell(responseOrAck, getSender());
settings.getProxyActor().tell(responseOrAck, getSender());
} else {
acknowledgementForwarder.forward(responseOrAck, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ protected CompletionStage<Status.Status> startPublisherActor() {
if (null != jmsSession) {
final Props props = AmqpPublisherActor.props(connection(), jmsSession,
connectionContext.getConnectivityConfig().getConnectionConfig(), getDefaultClientId(),
connectivityStatusResolver);
getProxyActor(), connectivityStatusResolver);
amqpPublisherActor = startChildActorConflictFree(AmqpPublisherActor.ACTOR_NAME_PREFIX, props);
Patterns.ask(amqpPublisherActor, AmqpPublisherActor.Control.INITIALIZE, clientAskTimeout)
.whenComplete((result, error) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ private AmqpPublisherActor(final Connection connection,
final Session session,
final ConnectionConfig connectionConfig,
final String clientId,
final ActorRef proxyActor,
final ConnectivityStatusResolver connectivityStatusResolver) {

super(connection, clientId, connectivityStatusResolver);
super(connection, clientId, proxyActor, connectivityStatusResolver);
this.session = checkNotNull(session, "session");

final Executor jmsDispatcher = JMSConnectionHandlingActor.getOwnDispatcher(getContext().system());
Expand Down Expand Up @@ -159,8 +160,7 @@ private AmqpPublisherActor(final Connection connection,
*/
private static CompletableFuture<Object> triggerPublishAsync(
final Pair<ExternalMessage, AmqpMessageContext> messageToPublish,
final Executor jmsDispatcher
) {
final Executor jmsDispatcher) {
final ExternalMessage message = messageToPublish.first();
final AmqpMessageContext context = messageToPublish.second();
return CompletableFuture.supplyAsync(() -> context.onPublishMessage(message), jmsDispatcher)
Expand All @@ -176,6 +176,7 @@ private static CompletableFuture<Object> triggerPublishAsync(
* @param session the jms session
* @param connectionConfig configuration for all connections.
* @param clientId identifier of the client actor.
* @param proxyActor the actor used to send signals into the ditto cluster.
* @param connectivityStatusResolver connectivity status resolver to resolve occurred exceptions to a connectivity
* status.
* @return the Akka configuration Props object.
Expand All @@ -184,13 +185,14 @@ static Props props(final Connection connection,
final Session session,
final ConnectionConfig connectionConfig,
final String clientId,
final ActorRef proxyActor,
final ConnectivityStatusResolver connectivityStatusResolver) {

return Props.create(AmqpPublisherActor.class,
connection,
session,
connectionConfig,
clientId,
clientId, proxyActor,
connectivityStatusResolver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ final class HttpPublisherActor extends BasePublisherActor<HttpPublishTarget> {
private HttpPublisherActor(final Connection connection,
final HttpPushFactory factory,
final String clientId,
final ActorRef proxyActor,
final ConnectivityStatusResolver connectivityStatusResolver) {

super(connection, clientId, connectivityStatusResolver);
super(connection, clientId, proxyActor, connectivityStatusResolver);
this.factory = factory;
materializer = Materializer.createMaterializer(this::getContext);
final var config = connectionConfig.getHttpPushConfig();
Expand Down Expand Up @@ -161,13 +162,15 @@ private HttpPublisherActor(final Connection connection,
* @param connection the connection.
* @param factory the http push factory to use.
* @param clientId the client ID.
* @param proxyActor the actor used to send signals into the ditto cluster.
* @param connectivityStatusResolver connectivity status resolver to resolve occurred exceptions to a connectivity
* status.
* @return the Akka configuration Props object.
*/
static Props props(final Connection connection, final HttpPushFactory factory, final String clientId,
final ConnectivityStatusResolver connectivityStatusResolver) {
return Props.create(HttpPublisherActor.class, connection, factory, clientId, connectivityStatusResolver);
final ActorRef proxyActor, final ConnectivityStatusResolver connectivityStatusResolver) {
return Props.create(HttpPublisherActor.class, connection, factory, clientId, proxyActor,
connectivityStatusResolver);
}

private static Uri setPathAndQuery(final Uri uri, @Nullable final String path, @Nullable final String query) {
Expand Down Expand Up @@ -459,7 +462,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
httpStatus);
} else if (sentSignal instanceof ThingCommand &&
SignalInformationPoint.isChannelLive(sentSignal)) {
result = toLiveCommandResponse(sentSignal, mergedDittoHeaders, body, httpStatus);
result = toLiveCommandResponse(mergedDittoHeaders, body);
} else {
result = null;
}
Expand Down Expand Up @@ -488,7 +491,8 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
}
}

final var liveCommandWithEntityId = SignalInformationPoint.tryToGetAsLiveCommandWithEntityId(sentSignal);
final var liveCommandWithEntityId =
SignalInformationPoint.tryToGetAsLiveCommandWithEntityId(sentSignal);
if (liveCommandWithEntityId.isPresent()
&& null != result
&& SignalInformationPoint.isLiveCommandResponse(result)) {
Expand Down Expand Up @@ -571,10 +575,8 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
}

@Nullable
private CommandResponse<?> toLiveCommandResponse(final Signal<?> sentSignal,
final DittoHeaders dittoHeaders,
final JsonValue jsonValue,
final HttpStatus status) {
private CommandResponse<?> toLiveCommandResponse(final DittoHeaders dittoHeaders,
final JsonValue jsonValue) {

final boolean isDittoProtocolMessage = dittoHeaders.getDittoContentType()
.filter(org.eclipse.ditto.base.model.headers.contenttype.ContentType::isDittoProtocol)
Expand Down Expand Up @@ -608,7 +610,6 @@ private CommandResponse<?> toCommandResponse(final JsonObject jsonObject) {
jsonObject, CommandResponse.class.getSimpleName(), signal.getClass().getSimpleName());
return null;
}

}

private ConnectionFailure toConnectionFailure(@Nullable final Done done, @Nullable final Throwable error) {
Expand All @@ -635,5 +636,6 @@ private boolean matches(final String headerName) {
return name.equalsIgnoreCase(headerName);
}
}

}

0 comments on commit 0986e91

Please sign in to comment.