Skip to content

Commit

Permalink
Issue #559: Introduced new internal module for Signal related funct…
Browse files Browse the repository at this point in the history
…ionality.

* Made `CommandAndCommandResponseMatchingValidator` more generic to make it re-usable throughout several services.
* Introduced `HttpPushRoundTripSignalValidator` to substitute previous version of `CommandAndCommandResponseMatchingValidator` which was specialized for HTTP push scenario.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Oct 22, 2021
1 parent 388e000 commit 1e18aaf
Show file tree
Hide file tree
Showing 20 changed files with 1,293 additions and 343 deletions.
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,11 @@
<artifactId>ditto-thingsearch-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signalenrichment</artifactId>
Expand Down
9 changes: 4 additions & 5 deletions connectivity/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-acks</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signal</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signalenrichment</artifactId>
Expand Down Expand Up @@ -208,11 +212,6 @@ jmh-generator-annprocess). jmh-generator-annprocess overwrites the whole META-IN
<artifactId>akka-stream-kafka-testkit_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ final class HttpPublisherActor extends BasePublisherActor<HttpPublishTarget> {
private final SourceQueue<Pair<HttpRequest, HttpPushContext>> sourceQueue;
private final KillSwitch killSwitch;
private final HttpRequestSigning httpRequestSigning;
private final CommandAndCommandResponseMatchingValidator commandAndCommandResponseMatchingValidator;
private final HttpPushRoundTripSignalsValidator httpPushRoundTripSignalValidator;

@SuppressWarnings("unused")
private HttpPublisherActor(final Connection connection,
Expand Down Expand Up @@ -151,8 +151,7 @@ private HttpPublisherActor(final Connection connection,
httpRequestSigning = connection.getCredentials()
.map(credentials -> credentials.accept(HttpRequestSigningExtension.get(getContext().getSystem())))
.orElse(NoOpSigning.INSTANCE);
commandAndCommandResponseMatchingValidator =
CommandAndCommandResponseMatchingValidator.newInstance(connectionLogger);
httpPushRoundTripSignalValidator = HttpPushRoundTripSignalsValidator.newInstance(connectionLogger);
}

/**
Expand Down Expand Up @@ -501,7 +500,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
&& SignalInformationPoint.isLiveCommandResponse(result)) {

// Do only return command response for live commands with a correct response.
commandAndCommandResponseMatchingValidator.accept(liveCommandWithEntityId.get(), result);
httpPushRoundTripSignalValidator.accept(liveCommandWithEntityId.get(), result);
}
if (result == null) {
connectionLogger.success(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.httppush;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.function.BiConsumer;

import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.model.MessageSendingFailedException;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.internal.models.signal.correlation.CommandAndCommandResponseMatchingValidator;

/**
* Validates that a specified {@link org.eclipse.ditto.base.model.signals.SignalWithEntityId} and
* {@link CommandResponse} are associated with each other, i.e. that the command response correlates to a command.
* <p>
* Both signals correlate if
* <ul>
* <li>their correlation IDs match,</li>
* <li>their signal types match and</li>
* <li>their entity IDs match.</li>
* </ul>
* </p>
* <p>
* If any of the above evaluates to {@code false} a {@link MessageSendingFailedException} is thrown with a detail
* message describing the cause.
* Furthermore the exception gets logged for the command response via
* {@link ConnectionLogger#failure(org.eclipse.ditto.base.model.signals.Signal, org.eclipse.ditto.base.model.exceptions.DittoRuntimeException)}.
* </p>
*/
@NotThreadSafe
final class HttpPushRoundTripSignalsValidator implements BiConsumer<SignalWithEntityId<?>, CommandResponse<?>> {

private final ConnectionLogger connectionLogger;
private final CommandAndCommandResponseMatchingValidator validator;

private HttpPushRoundTripSignalsValidator(final ConnectionLogger connectionLogger) {
this.connectionLogger = connectionLogger;
validator = CommandAndCommandResponseMatchingValidator.getInstance();
}

static HttpPushRoundTripSignalsValidator newInstance(final ConnectionLogger connectionLogger) {
return new HttpPushRoundTripSignalsValidator(checkNotNull(connectionLogger, "connectionLogger"));
}

@Override
public void accept(final SignalWithEntityId<?> signalWithEntityId, final CommandResponse<?> commandResponse) {
final var validationResult = validator.apply(signalWithEntityId, commandResponse);
if (!validationResult.isSuccess()) {
final var messageSendingFailedException = MessageSendingFailedException.newBuilder()
.httpStatus(HttpStatus.BAD_REQUEST)
.message(validationResult.getDetailMessageOrThrow())
.dittoHeaders(signalWithEntityId.getDittoHeaders())
.build();
connectionLogger.failure(commandResponse, messageSendingFailedException);
throw messageSendingFailedException;
}
}

}

0 comments on commit 1e18aaf

Please sign in to comment.