diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActor.java index b64ebb62df..9728f72a48 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActor.java @@ -152,6 +152,8 @@ private Signal adjustSignalAndStartAckForwarder(final Signal signal, final if (hasSourceDeclaredAcks || liveCommandExpectingResponse) { // start ackregator for source declared acks return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(), + self(), + sender(), entityId, signal, settings.getAcknowledgementConfig(), diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java index 05987f347e..dc3a4fa4b0 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java @@ -325,7 +325,10 @@ private void invalidateLoggers(final ConnectionId connectionId) { mapsKeysToDelete.forEach(loggerKey -> { // flush logs before removing from loggers: try { - LOGGERS.get(loggerKey).close(); + final ConnectionLogger connectionLogger = LOGGERS.get(loggerKey); + if (connectionLogger != null) { + connectionLogger.close(); + } } catch (final IOException e) { LOGGER.warn("Exception during closing logger <{}>: <{}>: {}", loggerKey, e.getClass().getSimpleName(), e.getMessage()); diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java index e490b78437..b27d352242 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java @@ -461,6 +461,8 @@ private Signal startAckForwarder(final Signal signal) { if (entityIdOptional.isPresent()) { final var entityIdWithType = entityIdOptional.get(); return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(), + self(), + sender(), entityIdWithType, signal, acknowledgementConfig, diff --git a/internal/models/acks/src/main/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActor.java b/internal/models/acks/src/main/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActor.java index a3bd499178..f8b7a0cbd1 100644 --- a/internal/models/acks/src/main/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActor.java +++ b/internal/models/acks/src/main/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActor.java @@ -35,6 +35,7 @@ import akka.actor.AbstractActor; import akka.actor.ActorRef; +import akka.actor.ActorRefFactory; import akka.actor.Props; import akka.actor.ReceiveTimeout; @@ -127,13 +128,15 @@ public static String determineActorName(final DittoHeaders dittoHeaders) { return ACTOR_NAME_PREFIX + URLEncoder.encode(correlationId, Charset.defaultCharset()); } - static Optional startAcknowledgementForwarderForTest(final akka.actor.ActorContext context, + static Optional startAcknowledgementForwarderForTest(final ActorRefFactory actorRefFactory, + final ActorRef parent, + final ActorRef ackRequester, final EntityId entityId, final Signal signal, final AcknowledgementConfig acknowledgementConfig) { - final AcknowledgementForwarderActorStarter starter = - AcknowledgementForwarderActorStarter.getInstance(context, entityId, signal, acknowledgementConfig, + final AcknowledgementForwarderActorStarter starter = AcknowledgementForwarderActorStarter + .getInstance(actorRefFactory, parent, ackRequester, entityId, signal, acknowledgementConfig, label -> true); return starter.get(); } @@ -145,7 +148,9 @@ static Optional startAcknowledgementForwarderForTest(final akka.actor. * in case that an Actor with this name already exists, a new correlation ID is generated and the process repeated. * If the signal does not require acknowledgements, no forwarder starts and the signal itself is returned. * - * @param context the context ({@code getContext()} of the Actor to start the AcknowledgementForwarderActor in. + * @param actorRefFactory the factory to start the forwarder actor in. + * @param parent the parent of the forwarder actor. + * @param ackRequester the actor which should receive the forwarded acknowledgements. * @param entityId the entityId of the {@code Signal} which requested the Acknowledgements. * @param signal the signal for which acknowledgements are expected. * @param acknowledgementConfig the AcknowledgementConfig to use for looking up config values. @@ -153,14 +158,16 @@ static Optional startAcknowledgementForwarderForTest(final akka.actor. * @return the signal for which a suitable ack forwarder has started whenever required. * @throws NullPointerException if any argument is {@code null}. */ - public static Signal startAcknowledgementForwarder(final akka.actor.ActorContext context, + public static Signal startAcknowledgementForwarder(final ActorRefFactory actorRefFactory, + final ActorRef parent, + final ActorRef ackRequester, final EntityId entityId, final Signal signal, final AcknowledgementConfig acknowledgementConfig, final Predicate isAckLabelAllowed) { final AcknowledgementForwarderActorStarter starter = - AcknowledgementForwarderActorStarter.getInstance(context, entityId, signal, acknowledgementConfig, - isAckLabelAllowed); + AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, parent, ackRequester, entityId, + signal, acknowledgementConfig, isAckLabelAllowed); final DittoHeadersBuilder dittoHeadersBuilder = signal.getDittoHeaders().toBuilder(); starter.getConflictFree().ifPresent(dittoHeadersBuilder::correlationId); if (!signal.getDittoHeaders().getAcknowledgementRequests().isEmpty()) { diff --git a/internal/models/acks/src/main/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorStarter.java b/internal/models/acks/src/main/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorStarter.java index 1a7f3e55a6..687ba2bf50 100644 --- a/internal/models/acks/src/main/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorStarter.java +++ b/internal/models/acks/src/main/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorStarter.java @@ -28,18 +28,18 @@ import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder; -import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig; -import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement; -import org.eclipse.ditto.protocol.TopicPath; +import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestDuplicateCorrelationIdException; -import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig; import org.eclipse.ditto.messages.model.signals.commands.MessageCommand; +import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement; +import org.eclipse.ditto.protocol.TopicPath; import org.eclipse.ditto.things.model.signals.commands.ThingCommand; import org.eclipse.ditto.things.model.signals.events.ThingEvent; -import akka.actor.ActorContext; import akka.actor.ActorRef; +import akka.actor.ActorRefFactory; import akka.actor.InvalidActorNameException; import akka.actor.Props; import akka.japi.Pair; @@ -54,20 +54,26 @@ final class AcknowledgementForwarderActorStarter implements Supplier signal; private final DittoHeaders dittoHeaders; private final AcknowledgementConfig acknowledgementConfig; private final Set acknowledgementRequests; - private AcknowledgementForwarderActorStarter(final ActorContext context, + private AcknowledgementForwarderActorStarter(final ActorRefFactory actorRefFactory, + final ActorRef parent, + final ActorRef ackRequester, final EntityId entityId, final Signal signal, final AcknowledgementConfig acknowledgementConfig, final Predicate isAckLabelAllowed) { - actorContext = checkNotNull(context, "context"); + this.actorRefFactory = checkNotNull(actorRefFactory, "actorRefFactory"); + this.parent = parent; + this.ackRequester = ackRequester; this.entityId = checkNotNull(entityId, "entityId"); this.signal = checkNotNull(signal, "signal"); dittoHeaders = signal.getDittoHeaders(); @@ -81,8 +87,9 @@ private AcknowledgementForwarderActorStarter(final ActorContext context, /** * Returns an instance of {@code ActorStarter}. * - * @param context the context to start the forwarder actor in. Furthermore provides the sender and self - * reference for forwarding. + * @param actorRefFactory the factory to start the forwarder actor in. + * @param parent the parent of the forwarder actor. + * @param ackRequester the actor which should receive the forwarded acknowledgements. * @param entityId is used for the NACKs if the forwarder actor cannot be started. * @param signal the signal for which the forwarder actor is to start. * @param acknowledgementConfig provides configuration setting regarding acknowledgement handling. @@ -91,13 +98,16 @@ private AcknowledgementForwarderActorStarter(final ActorContext context, * @return a means to start an acknowledgement forwarder actor. * @throws NullPointerException if any argument is {@code null}. */ - static AcknowledgementForwarderActorStarter getInstance(final ActorContext context, + static AcknowledgementForwarderActorStarter getInstance(final ActorRefFactory actorRefFactory, + final ActorRef parent, + final ActorRef ackRequester, final EntityId entityId, final Signal signal, final AcknowledgementConfig acknowledgementConfig, final Predicate isAckLabelAllowed) { - return new AcknowledgementForwarderActorStarter(context, entityId, signal, acknowledgementConfig, + return new AcknowledgementForwarderActorStarter(actorRefFactory, parent, ackRequester, entityId, signal, + acknowledgementConfig, // live-response is always allowed isAckLabelAllowed.or(DittoAcknowledgementLabel.LIVE_RESPONSE::equals)); } @@ -181,9 +191,9 @@ private Pair parseCorrelationId(final DittoHeaders dittoHeaders } private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders) { - final Props props = AcknowledgementForwarderActor.props(actorContext.sender(), dittoHeaders, + final Props props = AcknowledgementForwarderActor.props(ackRequester, dittoHeaders, acknowledgementConfig.getForwarderFallbackTimeout()); - return actorContext.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders)); + return actorRefFactory.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders)); } private DittoRuntimeException getDuplicateCorrelationIdException(final Throwable cause) { @@ -195,15 +205,12 @@ private DittoRuntimeException getDuplicateCorrelationIdException(final Throwable } private void declineAllNonDittoAckRequests(final DittoRuntimeException dittoRuntimeException) { - final ActorRef sender = actorContext.sender(); - final ActorRef self = actorContext.self(); - // answer NACKs for all AcknowledgementRequests with labels which were not Ditto-defined acknowledgementRequests.stream() .map(AcknowledgementRequest::getLabel) .filter(Predicate.not(DittoAcknowledgementLabel::contains)) .map(label -> getNack(label, dittoRuntimeException)) - .forEach(nack -> sender.tell(nack, self)); + .forEach(nack -> ackRequester.tell(nack, parent)); } private Acknowledgement getNack(final AcknowledgementLabel label, diff --git a/internal/models/acks/src/test/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorStarterTest.java b/internal/models/acks/src/test/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorStarterTest.java index 585d08ccfa..e65ef149ff 100644 --- a/internal/models/acks/src/test/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorStarterTest.java +++ b/internal/models/acks/src/test/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorStarterTest.java @@ -25,11 +25,11 @@ import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel; import org.eclipse.ditto.base.model.common.HttpStatus; import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; +import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestDuplicateCorrelationIdException; import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig; import org.eclipse.ditto.internal.models.acks.config.DefaultAcknowledgementConfig; import org.eclipse.ditto.things.model.ThingId; -import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; -import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestDuplicateCorrelationIdException; import org.eclipse.ditto.things.model.signals.events.ThingDeleted; import org.junit.AfterClass; import org.junit.Before; @@ -93,7 +93,6 @@ public void setUp() { when(actorContext.actorOf(any(Props.class), anyString())) .thenAnswer((Answer) invocationOnMock -> actorSystem.actorOf(invocationOnMock.getArgument(0), invocationOnMock.getArgument(1))); - when(actorContext.sender()).thenReturn(testProbe.ref()); } @Test @@ -155,7 +154,9 @@ public void startForwarderActorWithDuplicateCorrelationId() { } private AcknowledgementForwarderActorStarter getActorStarter(final DittoHeaders dittoHeaders) { - return AcknowledgementForwarderActorStarter.getInstance(actorContext, KNOWN_ENTITY_ID, + return AcknowledgementForwarderActorStarter.getInstance(actorContext, TestProbe.apply(actorSystem).ref(), + testProbe.ref(), + KNOWN_ENTITY_ID, ThingDeleted.of(KNOWN_ENTITY_ID, 1L, Instant.EPOCH, dittoHeaders, null), acknowledgementConfig, label -> true); } diff --git a/internal/models/acks/src/test/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorTest.java b/internal/models/acks/src/test/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorTest.java index bc2c2ad283..012baaa9f7 100644 --- a/internal/models/acks/src/test/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorTest.java +++ b/internal/models/acks/src/test/java/org/eclipse/ditto/internal/models/acks/AcknowledgementForwarderActorTest.java @@ -53,6 +53,7 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.testkit.TestProbe; import akka.testkit.javadsl.TestKit; /** @@ -130,10 +131,10 @@ public void createAcknowledgementForwarderAndThreadAcknowledgementThrough() Acknowledgement.of(acknowledgementLabel, entityId, HttpStatus.ACCEPTED, dittoHeaders); new TestKit(actorSystem) {{ - when(actorContext.sender()).thenReturn(getRef()); final Optional underTest = - AcknowledgementForwarderActor.startAcknowledgementForwarderForTest(actorContext, entityId, signal, + AcknowledgementForwarderActor.startAcknowledgementForwarderForTest(actorContext, + TestProbe.apply(actorSystem).ref(), getRef(), entityId, signal, acknowledgementConfig); softly.assertThat(underTest).isPresent();