Skip to content

Commit

Permalink
Add ditto protocol sub access and use it in gateawy+connectivity
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Aug 20, 2019
1 parent 5f25971 commit c27f8b2
Show file tree
Hide file tree
Showing 20 changed files with 462 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
import java.text.MessageFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
Expand Down Expand Up @@ -64,6 +67,8 @@
import org.eclipse.ditto.services.connectivity.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.services.connectivity.messaging.validation.DittoConnectivityCommandValidator;
import org.eclipse.ditto.services.connectivity.util.ConnectionLogUtil;
import org.eclipse.ditto.services.models.concierge.pubsub.DittoProtocolSub;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.models.connectivity.OutboundSignal;
import org.eclipse.ditto.services.models.connectivity.OutboundSignalFactory;
import org.eclipse.ditto.services.utils.akka.LogUtil;
Expand Down Expand Up @@ -176,7 +181,8 @@ public final class ConnectionActor extends AbstractPersistentActorWithTimersAndC
private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);

private final String connectionId;
private final ActorRef pubSubMediator;
private final ActorRef pubSubMediator; // for publishing of connection events
private final DittoProtocolSub dittoProtocolSub;
private final ActorRef conciergeForwarder;
private final long snapshotThreshold;
private final SnapshotAdapter<Connection> snapshotAdapter;
Expand Down Expand Up @@ -211,12 +217,14 @@ public final class ConnectionActor extends AbstractPersistentActorWithTimersAndC
@SuppressWarnings("unused")
private ConnectionActor(final String connectionId,
final ActorRef pubSubMediator,
final DittoProtocolSub dittoProtocolSub,
final ActorRef conciergeForwarder,
final ClientActorPropsFactory propsFactory,
@Nullable final Consumer<ConnectivityCommand<?>> customCommandValidator) {

this.connectionId = connectionId;
this.pubSubMediator = pubSubMediator;
this.dittoProtocolSub = dittoProtocolSub;
this.conciergeForwarder = conciergeForwarder;
this.propsFactory = propsFactory;
final DittoConnectivityCommandValidator dittoCommandValidator =
Expand Down Expand Up @@ -260,21 +268,22 @@ private ConnectionActor(final String connectionId,
* Creates Akka configuration object for this actor.
*
* @param connectionId the connection ID.
* @param pubSubMediator Akka pub-sub mediator.
* @param dittoProtocolSub Ditto protocol sub access.
* @param conciergeForwarder proxy of concierge service.
* @param propsFactory factory of props of client actors for various protocols.
* @param commandValidator validator for commands that should throw an exception if a command is invalid.
* @return the Akka configuration Props object.
*/
public static Props props(final String connectionId,
final ActorRef pubSubMediator,
final DittoProtocolSub dittoProtocolSub,
final ActorRef conciergeForwarder,
final ClientActorPropsFactory propsFactory,
@Nullable final Consumer<ConnectivityCommand<?>> commandValidator
) {

return Props.create(ConnectionActor.class, connectionId, pubSubMediator, conciergeForwarder, propsFactory,
commandValidator);
return Props.create(ConnectionActor.class, connectionId, pubSubMediator, dittoProtocolSub, conciergeForwarder,
propsFactory, commandValidator);
}

@Override
Expand Down Expand Up @@ -1036,10 +1045,10 @@ private void respondWithEmptyMetrics(final RetrieveConnectionMetrics command, fi
);
final RetrieveConnectionMetricsResponse metricsResponse =
RetrieveConnectionMetricsResponse.getBuilder(connectionId, command.getDittoHeaders())
.connectionMetrics(metrics)
.sourceMetrics(ConnectivityModelFactory.emptySourceMetrics())
.targetMetrics(ConnectivityModelFactory.emptyTargetMetrics())
.build();
.connectionMetrics(metrics)
.sourceMetrics(ConnectivityModelFactory.emptySourceMetrics())
.targetMetrics(ConnectivityModelFactory.emptyTargetMetrics())
.build();
origin.tell(metricsResponse, getSelf());
}

Expand All @@ -1056,7 +1065,7 @@ private void respondWithEmptyStatus(final RetrieveConnectionStatus command, fina
origin.tell(statusResponse, getSelf());
}

private void subscribeForEvents() {
private CompletionStage<Void> subscribeForEvents() {
checkConnectionNotNull();

// unsubscribe to previously subscribed topics
Expand All @@ -1066,18 +1075,29 @@ private void subscribeForEvents() {
.flatMap(target -> target.getTopics().stream().map(FilteredTopic::getTopic))
.collect(Collectors.toSet());

forEachPubSubTopicDo(pubSubTopic -> {
log.debug("Subscribing to pub-sub topic <{}> for connection <{}>.", pubSubTopic, connectionId);
pubSubMediator.tell(DistPubSubAccess.subscribe(pubSubTopic, getSelf()), getSelf());
});
log.debug("Subscribing to pub-sub topics <{}> for connection <{}>.", uniqueTopics, connectionId);
return dittoProtocolSub.subscribe(toStreamingTypes(uniqueTopics), getTargetAuthSubjects(), getSelf());
}

private Set<String> getTargetAuthSubjects() {
if (connection == null || connection.getTargets().isEmpty()) {
return Collections.emptySet();
} else {
return connection.getTargets()
.stream()
.map(Target::getAuthorizationContext)
.map(AuthorizationContext::getAuthorizationSubjectIds)
.flatMap(List::stream)
.collect(Collectors.toSet());
}
}

private void unsubscribeFromEvents() {
forEachPubSubTopicDo(pubSubTopic -> {
log.debug("Unsubscribing from pub-sub topic <{}> for connection <{}>.", pubSubTopic, connectionId);
pubSubMediator.tell(DistPubSubAccess.unsubscribe(pubSubTopic, getSelf()), getSelf());
});
uniqueTopics = Collections.emptySet();
log.debug("Unsubscribing from pub-sub topics <{}> for connection <{}>.", uniqueTopics, connectionId);
if (!uniqueTopics.isEmpty()) {
dittoProtocolSub.removeSubscriber(toStreamingTypes(uniqueTopics), getSelf());
uniqueTopics = Collections.emptySet();
}
}

private void forEachPubSubTopicDo(final Consumer<String> topicConsumer) {
Expand Down Expand Up @@ -1189,23 +1209,30 @@ private void handleSnapshotSuccess(final SaveSnapshotSuccess sss) {
confirmedSnapshotSequenceNr = Math.max(confirmedSnapshotSequenceNr, sss.metadata().sequenceNr());
}

private void schedulePendingResponse(final ConnectivityCommandResponse response, final ActorRef sender) {
getContext().system().scheduler()
.scheduleOnce(flushPendingResponsesTimeout,
sender,
response,
getContext().dispatcher(),
getSelf());
private static Collection<StreamingType> toStreamingTypes(final Set<Topic> uniqueTopics) {
return uniqueTopics.stream()
.map(topic -> {
switch (topic) {
case LIVE_EVENTS:
return StreamingType.LIVE_EVENTS;
case LIVE_COMMANDS:
return StreamingType.LIVE_COMMANDS;
case LIVE_MESSAGES:
return StreamingType.MESSAGES;
case TWIN_EVENTS:
default:
return StreamingType.EVENTS;
}
})
.collect(Collectors.toList());
}

private static Consumer<ConnectionActor> subscribeForEventsAndScheduleResponse(
final ConnectivityCommandResponse response,
final ActorRef sender) {
final ConnectivityCommandResponse response, final ActorRef sender) {

return connectionActor -> {
connectionActor.subscribeForEvents();
connectionActor.schedulePendingResponse(response, sender);
};
return connectionActor ->
connectionActor.subscribeForEvents()
.thenAccept(_void -> sender.tell(response, connectionActor.getSelf()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectionConfig;
import org.eclipse.ditto.services.connectivity.messaging.config.DittoConnectivityConfig;
import org.eclipse.ditto.services.connectivity.util.ConnectionLogUtil;
import org.eclipse.ditto.services.models.concierge.pubsub.DittoProtocolSub;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;
Expand Down Expand Up @@ -77,6 +78,7 @@ public final class ConnectionSupervisorActor extends AbstractActor {

@SuppressWarnings("unused")
private ConnectionSupervisorActor(final ActorRef pubSubMediator,
final DittoProtocolSub dittoProtocolSub,
final ActorRef conciergeForwarder,
final ClientActorPropsFactory propsFactory,
@Nullable final ConnectivityCommandInterceptor commandValidator) {
Expand All @@ -93,7 +95,8 @@ private ConnectionSupervisorActor(final ActorRef pubSubMediator,
exponentialBackOffConfig = connectionConfig.getSupervisorConfig().getExponentialBackOffConfig();

persistenceActorProps =
ConnectionActor.props(connectionId, pubSubMediator, conciergeForwarder, propsFactory, commandValidator);
ConnectionActor.props(connectionId, pubSubMediator, dittoProtocolSub, conciergeForwarder, propsFactory,
commandValidator);
}

/**
Expand All @@ -104,18 +107,20 @@ private ConnectionSupervisorActor(final ActorRef pubSubMediator,
* </p>
*
* @param pubSubMediator the PubSub mediator actor.
* @param dittoProtocolSub Ditto protocol sub access.
* @param conciergeForwarder the actor used to send signals to the concierge service.
* @param propsFactory the {@link ClientActorPropsFactory}
* @param commandValidator a custom command validator for connectivity commands
* @return the {@link Props} to create this actor.
*/
public static Props props(final ActorRef pubSubMediator,
final DittoProtocolSub dittoProtocolSub,
final ActorRef conciergeForwarder,
final ClientActorPropsFactory propsFactory,
@Nullable final ConnectivityCommandInterceptor commandValidator) {

return Props.create(ConnectionSupervisorActor.class, pubSubMediator, conciergeForwarder, propsFactory,
commandValidator);
return Props.create(ConnectionSupervisorActor.class, pubSubMediator, dittoProtocolSub, conciergeForwarder,
propsFactory, commandValidator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,23 @@ public void init() {
RetrieveConnectionResponse.of(modifiedConnection.toJson(), DittoHeaders.empty());
retrieveConnectionStatusOpenResponse =
RetrieveConnectionStatusResponse.getBuilder(connectionId, DittoHeaders.empty())
.connectionStatus(ConnectivityStatus.OPEN)
.liveStatus(ConnectivityStatus.OPEN)
.connectedSince(INSTANT)
.clientStatus(asList(ConnectivityModelFactory.newClientStatus("client1", ConnectivityStatus.OPEN,
"connection is open", INSTANT)))
.sourceStatus(asList(
ConnectivityModelFactory.newSourceStatus("client1", ConnectivityStatus.OPEN, "source1", "consumer started"),
ConnectivityModelFactory.newSourceStatus("client1", ConnectivityStatus.OPEN, "source2", "consumer started")
))
.targetStatus(asList(ConnectivityModelFactory.newTargetStatus("client1", ConnectivityStatus.OPEN, "target1",
"publisher started")))
.build();
.connectionStatus(ConnectivityStatus.OPEN)
.liveStatus(ConnectivityStatus.OPEN)
.connectedSince(INSTANT)
.clientStatus(
asList(ConnectivityModelFactory.newClientStatus("client1", ConnectivityStatus.OPEN,
"connection is open", INSTANT)))
.sourceStatus(asList(
ConnectivityModelFactory.newSourceStatus("client1", ConnectivityStatus.OPEN, "source1",
"consumer started"),
ConnectivityModelFactory.newSourceStatus("client1", ConnectivityStatus.OPEN, "source2",
"consumer started")
))
.targetStatus(
asList(ConnectivityModelFactory.newTargetStatus("client1", ConnectivityStatus.OPEN,
"target1",
"publisher started")))
.build();
connectionNotAccessibleException = ConnectionNotAccessibleException.newBuilder(connectionId).build();
}

Expand Down Expand Up @@ -208,7 +213,8 @@ public void createConnectionAfterDeleted() {
final TestProbe clientActorMock = TestProbe.apply(actorSystem);
final ActorRef underTest =
TestConstants.createConnectionSupervisorActor(connectionId, actorSystem, pubSubMediator,
conciergeForwarder, (connection, concierge) -> MockClientActor.props(clientActorMock.ref()));
conciergeForwarder,
(connection, concierge) -> MockClientActor.props(clientActorMock.ref()));
watch(underTest);

// create connection
Expand All @@ -233,7 +239,8 @@ public void openConnectionAfterDeletedFails() {
final TestProbe clientActorMock = TestProbe.apply(actorSystem);
final ActorRef underTest =
TestConstants.createConnectionSupervisorActor(connectionId, actorSystem, pubSubMediator,
conciergeForwarder, (connection, concierge) -> MockClientActor.props(clientActorMock.ref()));
conciergeForwarder,
(connection, concierge) -> MockClientActor.props(clientActorMock.ref()));
watch(underTest);

// create connection
Expand Down Expand Up @@ -316,10 +323,10 @@ public void retrieveMetricsInClosedStateDoesNotStartClientActor() {

final RetrieveConnectionMetricsResponse metricsResponse =
RetrieveConnectionMetricsResponse.getBuilder(connectionId, DittoHeaders.empty())
.connectionMetrics(ConnectivityModelFactory.emptyConnectionMetrics())
.sourceMetrics(ConnectivityModelFactory.emptySourceMetrics())
.targetMetrics(ConnectivityModelFactory.emptyTargetMetrics())
.build();
.connectionMetrics(ConnectivityModelFactory.emptyConnectionMetrics())
.sourceMetrics(ConnectivityModelFactory.emptySourceMetrics())
.targetMetrics(ConnectivityModelFactory.emptyTargetMetrics())
.build();
expectMsg(metricsResponse);
}};
}
Expand Down Expand Up @@ -497,7 +504,8 @@ public void recoverDeletedConnection() {
public void exceptionDuringClientActorPropsCreation() {
new TestKit(actorSystem) {{
final Props connectionActorProps =
ConnectionActor.props(TestConstants.createRandomConnectionId(), pubSubMediator, conciergeForwarder,
ConnectionActor.props(TestConstants.createRandomConnectionId(), pubSubMediator,
TestConstants.dummyDittoProtocolSub(pubSubMediator), conciergeForwarder,
(connection, conciergeForwarder) -> {
throw ConnectionConfigurationInvalidException.newBuilder("validation failed...")
.build();
Expand Down Expand Up @@ -527,7 +535,8 @@ public void exceptionDueToCustomValidator() {
new TestKit(actorSystem) {{
final Props connectionActorProps =
ConnectionActor.props(TestConstants.createRandomConnectionId(), pubSubMediator,
conciergeForwarder, mockClientActorPropsFactory,
TestConstants.dummyDittoProtocolSub(pubSubMediator), conciergeForwarder,
mockClientActorPropsFactory,
command -> {
throw ConnectionUnavailableException.newBuilder(connectionId)
.dittoHeaders(command.getDittoHeaders())
Expand Down Expand Up @@ -628,7 +637,8 @@ public void testConnectionActorRespondsToCleanupCommand() {
expectMsg(createConnectionResponse);

// send cleanup command
underTest.tell(CleanupPersistence.of(ConnectionActor.PERSISTENCE_ID_PREFIX + connectionId, DittoHeaders.empty()),
underTest.tell(
CleanupPersistence.of(ConnectionActor.PERSISTENCE_ID_PREFIX + connectionId, DittoHeaders.empty()),
getRef());
expectMsg(CleanupPersistenceResponse.success(ConnectionActor.PERSISTENCE_ID_PREFIX + connectionId,
DittoHeaders.empty()));
Expand Down
Loading

0 comments on commit c27f8b2

Please sign in to comment.