Skip to content

Commit

Permalink
Remove the possibility of concurrent modification of global variables…
Browse files Browse the repository at this point in the history
… in a javascript mapper.

Global variables are still persistent for performance reasons,
but each mapping script can expect no concurrent modification during
its execution.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 29, 2021
1 parent 221c039 commit 967904d
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -1719,24 +1720,26 @@ private CompletionStage<Status.Status> tryToConfigureMessageMappingProcessor() {

private Pair<ActorRef, ActorRef> startOutboundActors(final ProtocolAdapter protocolAdapter) {
final OutboundMappingSettings settings;
final OutboundMappingProcessor outboundMappingProcessor;
final List<OutboundMappingProcessor> outboundMappingProcessors;
final int processorPoolSize = connection.getProcessorPoolSize();
try {
// this one throws DittoRuntimeExceptions when the mapper could not be configured
settings = OutboundMappingSettings.of(connection, connectivityConfig, getContext().getSystem(),
proxyActorSelection, protocolAdapter, logger);
outboundMappingProcessor = OutboundMappingProcessor.of(settings);
outboundMappingProcessors = IntStream.range(0, processorPoolSize)
.mapToObj(i -> OutboundMappingProcessor.of(settings))
.collect(Collectors.toList());
} catch (final DittoRuntimeException dre) {
connectionLogger.failure("Failed to start message mapping processor due to: {0}", dre.getMessage());
logger.info("Got DittoRuntimeException during initialization of MessageMappingProcessor: {} {} - desc: {}",
dre.getClass().getSimpleName(), dre.getMessage(), dre.getDescription().orElse(""));
throw dre;
}

final int processorPoolSize = connection.getProcessorPoolSize();
logger.debug("Starting mapping processor actors with pool size of <{}>.", processorPoolSize);
final Props outboundMappingProcessorActorProps =
OutboundMappingProcessorActor.props(getSelf(), outboundMappingProcessor, connection, connectivityConfig,
processorPoolSize);
OutboundMappingProcessorActor.props(getSelf(), outboundMappingProcessors, connection,
connectivityConfig, processorPoolSize);

final ActorRef processorActor =
getContext().actorOf(outboundMappingProcessorActorProps, OutboundMappingProcessorActor.ACTOR_NAME);
Expand Down Expand Up @@ -1786,24 +1789,26 @@ private Consumer<MatchingValidationResult.Failure> getResponseValidationFailureC
private Sink<Object, NotUsed> getInboundMappingSink(final ProtocolAdapter protocolAdapter,
final Sink<Object, NotUsed> inboundDispatchingSink) {

final InboundMappingProcessor inboundMappingProcessor;
final List<InboundMappingProcessor> inboundMappingProcessors;
final var context = getContext();
final var actorSystem = context.getSystem();
final int processorPoolSize = connection.getProcessorPoolSize();
try {
// this one throws DittoRuntimeExceptions when the mapper could not be configured
inboundMappingProcessor =
InboundMappingProcessor.of(connection, connectivityConfig, actorSystem, protocolAdapter, logger);
inboundMappingProcessors = IntStream.range(0, processorPoolSize)
.mapToObj(i -> InboundMappingProcessor.of(connection, connectivityConfig, actorSystem,
protocolAdapter, logger))
.collect(Collectors.toList());
} catch (final DittoRuntimeException dre) {
connectionLogger.failure("Failed to start message mapping processor due to: {0}", dre.getMessage());
logger.info("Got DittoRuntimeException during initialization of MessageMappingProcessor: {} {} - desc: {}",
dre.getClass().getSimpleName(), dre.getMessage(), dre.getDescription().orElse(""));
throw dre;
}

final int processorPoolSize = connection.getProcessorPoolSize();
logger.debug("Starting inbound mapping processor actors with pool size of <{}>.", processorPoolSize);

return InboundMappingSink.createSink(inboundMappingProcessor,
return InboundMappingSink.createSink(inboundMappingProcessors,
connection.getId(),
processorPoolSize,
inboundDispatchingSink,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
*/
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotEmpty;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import javax.annotation.Nullable;
Expand All @@ -35,30 +37,31 @@
import akka.dispatch.MessageDispatcher;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

/**
* This class creates a Sink which is responsible for inbound payload mapping.
* The instance of this class holds the "state" of the sink (see {@link #inboundMappingProcessor}).
* The instance of this class holds the "state" of the sink (see {@link #inboundMappingProcessors}).
*/
public final class InboundMappingSink {

private final ThreadSafeDittoLogger logger;

private final InboundMappingProcessor inboundMappingProcessor;
private final List<InboundMappingProcessor> inboundMappingProcessors;
private final Sink<Object, ?> inboundDispatchingSink;
@Nullable private final ThrottlingConfig throttlingConfig;
private final MessageDispatcher messageMappingProcessorDispatcher;
private final int processorPoolSize;

private InboundMappingSink(final InboundMappingProcessor inboundMappingProcessor,
private InboundMappingSink(final List<InboundMappingProcessor> inboundMappingProcessors,
final ConnectionId connectionId,
final int processorPoolSize,
final Sink<Object, ?> inboundDispatchingSink,
final MappingConfig mappingConfig,
@Nullable final ThrottlingConfig throttlingConfig,
final MessageDispatcher messageMappingProcessorDispatcher) {

this.inboundMappingProcessor = checkNotNull(inboundMappingProcessor, "inboundMappingProcessor");
this.inboundMappingProcessors = checkNotEmpty(inboundMappingProcessors, "inboundMappingProcessors");
this.inboundDispatchingSink = checkNotNull(inboundDispatchingSink, "inboundDispatchingSink");
checkNotNull(mappingConfig, "mappingConfig");
this.throttlingConfig = throttlingConfig;
Expand All @@ -75,7 +78,7 @@ private InboundMappingSink(final InboundMappingProcessor inboundMappingProcessor
/**
* Creates a Sink which is responsible for inbound payload mapping.
*
* @param inboundMappingProcessor the MessageMappingProcessor to use for inbound messages.
* @param inboundMappingProcessors the MessageMappingProcessors to use for inbound messages.
* @param connectionId the connectionId
* @param processorPoolSize how many message processing may happen in parallel per direction (incoming or outgoing).
* @param inboundDispatchingSink used to dispatch inbound signals.
Expand All @@ -87,15 +90,15 @@ private InboundMappingSink(final InboundMappingProcessor inboundMappingProcessor
* {@code null}.
*/
public static Sink<Object, NotUsed> createSink(
final InboundMappingProcessor inboundMappingProcessor,
final List<InboundMappingProcessor> inboundMappingProcessors,
final ConnectionId connectionId,
final int processorPoolSize,
final Sink<Object, ?> inboundDispatchingSink,
final MappingConfig mappingConfig,
@Nullable final ThrottlingConfig throttlingConfig,
final MessageDispatcher messageMappingProcessorDispatcher) {

final var inboundMappingSink = new InboundMappingSink(inboundMappingProcessor,
final var inboundMappingSink = new InboundMappingSink(inboundMappingProcessors,
connectionId,
processorPoolSize,
inboundDispatchingSink,
Expand Down Expand Up @@ -123,9 +126,12 @@ private Sink<Object, NotUsed> getSink() {
private Sink<Object, NotUsed> mapMessage() {
final Flow<Object, InboundMappingOutcomes, NotUsed> mapMessageFlow =
Flow.fromFunction(ExternalMessageWithSender.class::cast)
.zip(getLoopingInboundMappingProcessorSource())
// parallelize potentially CPU-intensive payload mapping on this actor's dispatcher
.mapAsync(processorPoolSize, message -> CompletableFuture.supplyAsync(
.mapAsync(processorPoolSize, pair -> CompletableFuture.supplyAsync(
() -> {
final var message = pair.first();
final var inboundMappingProcessor = pair.second();
logger.debug("Received inbound Message to map: {}", message);
return mapInboundMessage(message, inboundMappingProcessor);
},
Expand All @@ -150,6 +156,11 @@ private Sink<Object, NotUsed> mapMessage() {
.to(inboundDispatchingSink);
}

private Source<InboundMappingProcessor, NotUsed> getLoopingInboundMappingProcessorSource() {
return Source.from(inboundMappingProcessors)
.concatLazy(Source.lazily(this::getLoopingInboundMappingProcessorSource));
}

private int determinePoolSize(final int connectionPoolSize, final int maxPoolSize) {
if (connectionPoolSize > maxPoolSize) {
logger.info("Configured pool size <{}> is greater than the configured max pool size <{}>." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotEmpty;
import static org.eclipse.ditto.connectivity.model.MetricType.DROPPED;
import static org.eclipse.ditto.connectivity.model.MetricType.MAPPED;

Expand Down Expand Up @@ -140,19 +141,19 @@ public final class OutboundMappingProcessorActor
private final SignalEnrichmentFacade signalEnrichmentFacade;
private final int processorPoolSize;
private final DittoRuntimeExceptionToErrorResponseFunction toErrorResponseFunction;
private final OutboundMappingProcessor outboundMappingProcessor;
private final List<OutboundMappingProcessor> outboundMappingProcessors;

@SuppressWarnings("unused")
private OutboundMappingProcessorActor(final ActorRef clientActor,
final OutboundMappingProcessor outboundMappingProcessor,
final List<OutboundMappingProcessor> outboundMappingProcessors,
final Connection connection,
final ConnectivityConfig connectivityConfig,
final int processorPoolSize) {

super(OutboundSignal.class);

this.clientActor = clientActor;
this.outboundMappingProcessor = outboundMappingProcessor;
this.outboundMappingProcessors = checkNotEmpty(outboundMappingProcessors, "outboundMappingProcessors");
this.connection = connection;

dittoLoggingAdapter = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this)
Expand Down Expand Up @@ -216,21 +217,21 @@ private int determinePoolSize(final int connectionPoolSize, final int maxPoolSiz
* Creates Akka configuration object for this actor.
*
* @param clientActor the client actor that created this mapping actor.
* @param outboundMappingProcessor the MessageMappingProcessor to use for outbound messages.
* @param outboundMappingProcessors the MessageMappingProcessors to use for outbound messages.
* @param connection the connection.
* @param connectivityConfig the config of the connectivity service with potential overwrites.
* @param processorPoolSize how many message processing may happen in parallel per direction (incoming or outgoing).
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef clientActor,
final OutboundMappingProcessor outboundMappingProcessor,
final List<OutboundMappingProcessor> outboundMappingProcessors,
final Connection connection,
final ConnectivityConfig connectivityConfig,
final int processorPoolSize) {

return Props.create(OutboundMappingProcessorActor.class,
clientActor,
outboundMappingProcessor,
outboundMappingProcessors,
connection,
connectivityConfig,
processorPoolSize
Expand All @@ -241,7 +242,9 @@ public static Props props(final ActorRef clientActor,
public Receive createReceive() {
final PartialFunction<Object, Object> wrapAsOutboundSignal = new PFBuilder<>()
.match(Acknowledgement.class, this::handleNotExpectedAcknowledgement)
.match(ErrorResponse.class, errResponse -> handleCommandResponse(errResponse, errResponse.getDittoRuntimeException(), getSender()))
.match(ErrorResponse.class,
errResponse -> handleCommandResponse(errResponse, errResponse.getDittoRuntimeException(),
getSender()))
.match(CommandResponse.class, response -> handleCommandResponse(response, null, getSender()))
.match(Signal.class, signal -> handleSignal(signal, getSender()))
.match(DittoRuntimeException.class, this::mapDittoRuntimeException)
Expand Down Expand Up @@ -295,19 +298,26 @@ protected OutboundSignalWithSender mapMessage(final OutboundSignal message) {
// Targets attached to the OutboundSignal are pre-selected by authorization, topic and filter sans enrichment.
final Flow<OutboundSignalWithSender, OutboundSignal.MultiMapped, ?> flow =
Flow.<OutboundSignalWithSender>create()
.mapAsync(processorPoolSize, outbound -> toMultiMappedOutboundSignal(
outbound,
Source.single(outbound)
.zip(getOutboundMappingProcessorSource())
.mapAsync(processorPoolSize, outboundPair -> toMultiMappedOutboundSignal(
outboundPair.first(),
outboundPair.second(),
Source.single(outboundPair.first())
.via(splitByTargetExtraFieldsFlow())
.mapAsync(mappingConfig.getParallelism(), this::enrichAndFilterSignal)
.mapConcat(x -> x)
.map(this::handleOutboundSignal)
.map(outbound -> handleOutboundSignal(outbound, outboundPair.second()))
.flatMapConcat(x -> x)
))
.mapConcat(x -> x);
return flow.to(Sink.foreach(this::forwardToPublisherActor));
}

private Source<OutboundMappingProcessor, NotUsed> getOutboundMappingProcessorSource() {
return Source.from(outboundMappingProcessors)
.concatLazy(Source.lazily(this::getOutboundMappingProcessorSource));
}

/**
* Create a flow that splits 1 outbound signal into many as follows.
* <ol>
Expand Down Expand Up @@ -493,12 +503,13 @@ private void recordResponse(final CommandResponse<?> response, @Nullable final D
}
}

private Source<OutboundSignalWithSender, ?> handleOutboundSignal(final OutboundSignalWithSender outbound) {
private Source<OutboundSignalWithSender, ?> handleOutboundSignal(final OutboundSignalWithSender outbound,
final OutboundMappingProcessor outboundMappingProcessor) {
final Signal<?> source = outbound.getSource();
if (dittoLoggingAdapter.isDebugEnabled()) {
dittoLoggingAdapter.withCorrelationId(source).debug("Handling outbound signal <{}>.", source);
}
return mapToExternalMessage(outbound);
return mapToExternalMessage(outbound, outboundMappingProcessor);
}

private void forwardToPublisherActor(final OutboundSignal.MultiMapped mappedEnvelop) {
Expand All @@ -517,7 +528,8 @@ private Object handleSignal(final Signal<?> signal, final ActorRef sender) {
return OutboundSignalWithSender.of(signal, sender);
}

private Source<OutboundSignalWithSender, ?> mapToExternalMessage(final OutboundSignalWithSender outbound) {
private Source<OutboundSignalWithSender, ?> mapToExternalMessage(final OutboundSignalWithSender outbound,
final OutboundMappingProcessor outboundMappingProcessor) {
final ConnectionMonitor.InfoProvider infoProvider = InfoProviderFactory.forSignal(outbound.getSource());
final Set<ConnectionMonitor> outboundMapped = getMonitorsForMappedSignal(outbound);
final Set<ConnectionMonitor> outboundDropped = getMonitorsForDroppedSignal(outbound);
Expand Down Expand Up @@ -597,6 +609,7 @@ private Set<ConnectionMonitor> getMonitorsForOutboundSignal(final OutboundSignal

private <T> CompletionStage<Collection<OutboundSignal.MultiMapped>> toMultiMappedOutboundSignal(
final OutboundSignalWithSender outbound,
final OutboundMappingProcessor outboundMappingProcessor,
final Source<OutboundSignalWithSender, T> source) {

return source.runWith(Sink.seq(), materializer)
Expand Down Expand Up @@ -645,8 +658,8 @@ private Collection<OutboundSignalWithSender> applyFilter(final OutboundSignalWit
.newPlaceholderResolver(TIME_PLACEHOLDER, new Object());
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final Criteria criteria = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
).filterCriteria(filter.get(), dittoHeaders);
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
).filterCriteria(filter.get(), dittoHeaders);
return outboundSignalWithExtra.getExtra()
.flatMap(extra -> ThingEventToThingConverter
.mergeThingWithExtraFields(signal, extraFields.get(), extra)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.mockito.Mockito.when;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -251,8 +252,8 @@ protected Sink<Object, NotUsed> setupInboundMappingSink(final ActorRef clientAct
InboundMappingProcessor.of(connection, connectivityConfig, actorSystem, protocolAdapter, logger);
final OutboundMappingProcessor outboundMappingProcessor =
OutboundMappingProcessor.of(connection, connectivityConfig, actorSystem, protocolAdapter, logger);
final Props props = OutboundMappingProcessorActor.props(clientActor, outboundMappingProcessor, CONNECTION,
connectivityConfig, 43);
final Props props = OutboundMappingProcessorActor.props(clientActor, List.of(outboundMappingProcessor),
CONNECTION, connectivityConfig, 43);
final ActorRef outboundProcessorActor = actorSystem.actorOf(props,
OutboundMappingProcessorActor.ACTOR_NAME + "-" + name.getMethodName());

Expand All @@ -266,7 +267,7 @@ protected Sink<Object, NotUsed> setupInboundMappingSink(final ActorRef clientAct
ConnectivityConfig.of(actorSystem.settings().config()),
null);

return InboundMappingSink.createSink(inboundMappingProcessor,
return InboundMappingSink.createSink(List.of(inboundMappingProcessor),
CONNECTION_ID,
99,
inboundDispatchingSink,
Expand Down

0 comments on commit 967904d

Please sign in to comment.