Skip to content

Commit

Permalink
Merge branch 'master' into feature/graceful-shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Aug 26, 2022
2 parents 0a4e6ec + 1c234d7 commit 0d7d569
Show file tree
Hide file tree
Showing 89 changed files with 1,258 additions and 776 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;

/**
* Provides {@link Acknowledgement}s based on {@link CommandResponse}s abstracting away model-specific dependencies.
*
* @param <R> the type of the command response for which to provide the Acknowledgements.
* @param <C> the type of the command for which to provide the Acknowledgement.
* @since 3.0.0
*/
@Immutable
public interface CommandResponseAcknowledgementProvider<R extends CommandResponse<? extends R>> {
public interface CommandResponseAcknowledgementProvider<C extends Command<?>> {

/**
* Provides an {@link Acknowledgement} based on the provided {@code originalSignal} and {@code commandResponse}.
Expand All @@ -34,7 +34,7 @@ public interface CommandResponseAcknowledgementProvider<R extends CommandRespons
* @param commandResponse the CommandResponse to provide the Acknowledgement for.
* @return the created Acknowledgement.
*/
Acknowledgement provideAcknowledgement(Signal<?> originatingSignal, R commandResponse);
Acknowledgement provideAcknowledgement(C originatingSignal, CommandResponse<?> commandResponse);

/**
* Checks if the passed {@code commandResponse} is applicable by this provider to provide Acknowledgements for.
Expand All @@ -43,13 +43,13 @@ public interface CommandResponseAcknowledgementProvider<R extends CommandRespons
* @return whether the commandResponse is applicable for this provider to provide Acknowledgements for.
* @throws NullPointerException if the passed {@code commandResponse} was {@code null}.
*/
boolean isApplicable(R commandResponse);
boolean isApplicable(CommandResponse<?> commandResponse);

/**
* Get the class of the type of command responses this provider handles.
* Get the class of the type of commands this provider handles.
*
* @return the class of the command response.
* @return the class of the command.
*/
Class<R> getMatchedClass();
Class<?> getCommandClass();

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.models.acks;
package org.eclipse.ditto.base.service.acknowledgements;

import static org.eclipse.ditto.base.model.common.ConditionChecker.argumentNotEmpty;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.models.acks;
package org.eclipse.ditto.base.service.acknowledgements;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
Expand All @@ -27,24 +27,25 @@
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestTimeoutException;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.things.model.ThingId;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/**
* Unit test for {@link AcknowledgementAggregator}.
* Unit test for {@link org.eclipse.ditto.base.service.acknowledgements.AcknowledgementAggregator}.
*/
public final class AcknowledgementAggregatorTest {

private static final ThingId ENTITY_ID = ThingId.generateRandom();
private static final EntityId ENTITY_ID = randomEntityId();
private static final Duration TIMEOUT = Duration.ofMillis(1337);
private static final HeaderTranslator HEADER_TRANSLATOR = HeaderTranslator.empty();

Expand Down Expand Up @@ -146,7 +147,7 @@ public void receivedAllRequestedAcknowledgements() {
@Test
public void onlyRegardFirstReceivedAcknowledgementForSameLabel() {
final AcknowledgementLabel ackLabel = DittoAcknowledgementLabel.TWIN_PERSISTED;
final ThingId entityId = ThingId.generateRandom();
final EntityId entityId = randomEntityId();
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().correlationId(testName.getMethodName()).build();
final Acknowledgement failedAcknowledgement =
Acknowledgement.of(ackLabel, entityId, HttpStatus.UNAUTHORIZED, dittoHeaders);
Expand Down Expand Up @@ -371,4 +372,8 @@ private List<Acknowledgement> createAcknowledgements(final int amount, final Htt
return result;
}

private static EntityId randomEntityId() {
return EntityId.of(EntityType.of("foo"), UUID.randomUUID().toString());
}

}
5 changes: 0 additions & 5 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-acks</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-streaming</artifactId>
Expand Down
4 changes: 0 additions & 4 deletions connectivity/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@
<artifactId>ditto-edge-service</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-acks</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signal</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.supervision.WithSupervisorConfig;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithActivityCheckConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import org.eclipse.ditto.base.service.config.ServiceSpecificConfig;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
import org.eclipse.ditto.internal.utils.health.config.WithHealthCheckConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.eclipse.ditto.base.service.config.supervision.DefaultSupervisorConfig;
import org.eclipse.ditto.base.service.config.supervision.SupervisorConfig;
import org.eclipse.ditto.internal.models.acks.config.DefaultAcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.config.DefaultAcknowledgementConfig;
import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultActivityCheckConfig;
Expand Down Expand Up @@ -91,7 +91,8 @@ private DefaultConnectionConfig(final ConfigWithFallback config) {
priorityUpdateInterval =
config.getNonNegativeAndNonZeroDurationOrThrow(ConnectionConfigValue.PRIORITY_UPDATE_INTERVAL);
doubleDecodingEnabled = config.getBoolean(ConnectionConfigValue.DOUBLE_DECODING_ENABLED.getConfigPath());
doubleDecodingMigrationEnabled = config.getBoolean(ConnectionConfigValue.DOUBLE_DECODING_MIGRATION_ENABLED.getConfigPath());
doubleDecodingMigrationEnabled =
config.getBoolean(ConnectionConfigValue.DOUBLE_DECODING_MIGRATION_ENABLED.getConfigPath());

}

Expand Down Expand Up @@ -269,7 +270,8 @@ public int hashCode() {
blockedHostnames, blockedSubnets, blockedHostRegex, supervisorConfig, snapshotConfig,
acknowledgementConfig, cleanupConfig, maxNumberOfTargets, maxNumberOfSources, activityCheckConfig,
amqp10Config, amqp091Config, mqttConfig, kafkaConfig, httpPushConfig, ackLabelDeclareInterval,
priorityUpdateInterval, allClientActorsOnOneNode, doubleDecodingEnabled, doubleDecodingMigrationEnabled);
priorityUpdateInterval, allClientActorsOnOneNode, doubleDecodingEnabled,
doubleDecodingMigrationEnabled);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.eclipse.ditto.base.service.config.limits.LimitsConfig;
import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.models.acks.config.DefaultAcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.config.DefaultAcknowledgementConfig;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.CounterKey;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.MetricAlertRegistry;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private ConnectionIdsRetrievalActor(final MongoReadJournal readJournal,

taggedPidSourceFunction =
tag -> readJournal.getJournalPidsWithTag(tag, connectionIdsRetrievalConfig.getReadJournalBatchSize(),
Duration.ofSeconds(1), materializer);
Duration.ofSeconds(1), materializer, true);
}

@Override
Expand Down Expand Up @@ -132,10 +132,10 @@ public Receive createReceive() {
}

private void getConnectionIDsByTag(final SudoRetrieveConnectionIdsByTag sudoRetrieveConnectionIdsByTag) {
final String tag = sudoRetrieveConnectionIdsByTag.getTag();
final DittoHeaders dittoHeaders = sudoRetrieveConnectionIdsByTag.getDittoHeaders();
log.withCorrelationId(dittoHeaders)
.info("Retrieving connection IDs by tag <{}>: {}", tag, sudoRetrieveConnectionIdsByTag);
final DittoDiagnosticLoggingAdapter l = log.withCorrelationId(dittoHeaders);
final String tag = sudoRetrieveConnectionIdsByTag.getTag();
l.info("Retrieving connection IDs by tag <{}>: {}", tag, sudoRetrieveConnectionIdsByTag);
try {
final ActorRef sender = sender();
final CompletionStage<SudoRetrieveConnectionIdsByTagResponse>
Expand All @@ -145,12 +145,16 @@ private void getConnectionIDsByTag(final SudoRetrieveConnectionIdsByTag sudoRetr
.map(pid -> pid.substring(ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX.length()))
.map(ConnectionId::of)
.runWith(Sink.seq(), materializer)
.thenApply(connectionIds -> Set.of(connectionIds.toArray(new ConnectionId[0])))
.thenApply(Set::copyOf)
.thenApply(connectionIds -> {
l.info("Found the following connection IDs for tag <{}>: <{}>", tag, connectionIds);
return connectionIds;
})
.thenApply(connectionIds -> SudoRetrieveConnectionIdsByTagResponse.of(connectionIds,
dittoHeaders));
Patterns.pipe(retrieveConnectionIdsByTagResponseCompletionStage, getContext().dispatcher()).to(sender);
} catch (final Exception e) {
log.error(e, "Failed to load persistence ids from journal/snapshots for connections with tag <{}>.", tag);
l.error(e, "Failed to load persistence ids from journal/snapshots for connections with tag <{}>.", tag);
getSender().tell(buildErrorResponse(e, dittoHeaders), getSelf());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.models.acks.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.message.MessageCommandAckRequestSetter;
import org.eclipse.ditto.edge.service.acknowledgements.message.MessageCommandResponseAcknowledgementProvider;
import org.eclipse.ditto.edge.service.acknowledgements.things.ThingCommandResponseAcknowledgementProvider;
import org.eclipse.ditto.edge.service.acknowledgements.things.ThingLiveCommandAckRequestSetter;
import org.eclipse.ditto.edge.service.acknowledgements.things.ThingModifyCommandAckRequestSetter;
import org.eclipse.ditto.internal.models.signal.CommandHeaderRestoration;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.models.signal.type.SemanticSignalType;
Expand All @@ -79,8 +84,6 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.messages.model.signals.commands.acks.MessageCommandAckRequestSetter;
import org.eclipse.ditto.messages.model.signals.commands.acks.MessageCommandResponseAcknowledgementProvider;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderFilter;
Expand All @@ -92,9 +95,6 @@
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
import org.eclipse.ditto.things.model.signals.commands.acks.ThingCommandResponseAcknowledgementProvider;
import org.eclipse.ditto.things.model.signals.commands.acks.ThingLiveCommandAckRequestSetter;
import org.eclipse.ditto.things.model.signals.commands.acks.ThingModifyCommandAckRequestSetter;
import org.eclipse.ditto.thingsearch.model.signals.commands.WithSubscriptionId;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CreateSubscription;

Expand Down Expand Up @@ -539,9 +539,9 @@ private int dispatchIncomingSignal(final IncomingSignal incomingSignal) {
final Signal<?> signal = incomingSignal.signal;
final ActorRef sender = incomingSignal.sender;
final Optional<EntityId> entityIdOptional = WithEntityId.getEntityId(signal);
if (incomingSignal.isAckRequesting && entityIdOptional.isPresent()) {
if (incomingSignal.isAckRequesting && entityIdOptional.isPresent() && signal instanceof Command<?> command) {
try {
startAckregatorAndForwardSignal(entityIdOptional.get(), signal, sender);
startAckregatorAndForwardSignal(entityIdOptional.get(), command, sender);
} catch (final DittoRuntimeException e) {
handleErrorDuringStartingOfAckregator(e, signal.getDittoHeaders(), sender);
}
Expand Down Expand Up @@ -592,14 +592,14 @@ private static boolean isLive(final Signal<?> signal) {
}

private void startAckregatorAndForwardSignal(final EntityId entityId,
final Signal<?> signal,
final Command<?> command,
@Nullable final ActorRef sender) {

ackregatorStarter.doStart(entityId, signal, null,
ackregatorStarter.doStart(entityId, command, null,
responseSignal -> {

// potentially publish response/aggregated acks to reply target
final var signalDittoHeaders = signal.getDittoHeaders();
final var signalDittoHeaders = command.getDittoHeaders();
if (signalDittoHeaders.isResponseRequired()) {
outboundMessageMappingProcessorActor.tell(responseSignal, ActorRef.noSender());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.eclipse.ditto.connectivity.api.InboundSignal;
import org.eclipse.ditto.connectivity.api.OutboundSignalFactory;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.internal.models.acks.AcknowledgementForwarderActor;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementForwarderActor;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionEvent;
Expand Down Expand Up @@ -215,7 +215,8 @@ private void denyNonSourceDeclaredAck(final Acknowledgement ack) {
.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
if (null != ackregatorAddress) {
final ActorSelection acknowledgementRequester = getContext().actorSelection(ackregatorAddress);
acknowledgementRequester.tell(AcknowledgementLabelNotDeclaredException.of(ack.getLabel(), ack.getDittoHeaders()),
acknowledgementRequester.tell(
AcknowledgementLabelNotDeclaredException.of(ack.getLabel(), ack.getDittoHeaders()),
ActorRef.noSender());
} else {
logger.withCorrelationId(ack)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.eclipse.ditto.connectivity.service.messaging.persistence.SignalFilter;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;

Expand Down
Loading

0 comments on commit 0d7d569

Please sign in to comment.