Skip to content

Commit

Permalink
Replace ClientActorRefs by ConnectionPubSub for consistency during co…
Browse files Browse the repository at this point in the history
…ordinated shutdown.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 20, 2022
1 parent 895095f commit 717412c
Show file tree
Hide file tree
Showing 20 changed files with 378 additions and 828 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelActor;
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectionPubSub;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
Expand Down Expand Up @@ -141,7 +142,6 @@
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.cluster.pubsub.DistributedPubSub;
import akka.japi.Pair;
import akka.japi.pf.DeciderBuilder;
Expand Down Expand Up @@ -190,7 +190,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private final Gauge clientConnectingGauge;
private final ReconnectTimeoutStrategy reconnectTimeoutStrategy;
private final SupervisorStrategy supervisorStrategy;
private final ClientActorRefs clientActorRefs;
private final ConnectionPubSub connectionPubSub;
private final DittoProtocolSub dittoProtocolSub;
private final int subscriptionIdPrefixLength;
protected final UUID actorUuid;
Expand Down Expand Up @@ -257,7 +257,7 @@ protected BaseClientActor(final Connection connection,

reconnectTimeoutStrategy = DuplicationReconnectTimeoutStrategy.fromConfig(clientConfig);
supervisorStrategy = createSupervisorStrategy(getSelf());
clientActorRefs = ClientActorRefs.empty();
connectionPubSub = ConnectionPubSub.get(system);
subscriptionIdPrefixLength =
ConnectionPersistenceActor.getSubscriptionPrefixLength(connection.getClientCount());

Expand Down Expand Up @@ -298,11 +298,15 @@ public void preStart() throws Exception {

initialize();

// inform connection actor of my presence if there are other client actors
if (connection.getClientCount() > 1 && !dryRun) {
connectionActor.tell(getSelf(), getSelf());
}
clientActorRefs.add(getSelf());
// register this client actor at connection pub-sub
final var connectionId = connection.getId();
connectionPubSub.subscribe(connectionId, getSelf(), false).whenComplete((consistent, error) -> {
if (error != null) {
logger.error(error, "Failed to register client actor for connection <{}>", connectionId);
} else {
logger.info("Client actor registered: <{}>", connectionId);
}
});

closeConnectionBeforeTerminatingCluster();

Expand All @@ -322,7 +326,8 @@ protected void init() {

final var inboundDispatchingSink = getInboundDispatchingSink(actorPair.second());
inboundMappingSink = getInboundMappingSink(protocolAdapter, inboundDispatchingSink);
subscriptionManager = startSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());
subscriptionManager =
startSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());

if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) {
tunnelActor = childActorNanny.startChildActor(SshTunnelActor.ACTOR_NAME,
Expand Down Expand Up @@ -483,13 +488,10 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inAnyState()
.event(PublishMappedMessage.class, this::publishMappedMessage)
.event(ConnectivityCommand.class, this::onUnknownEvent) // relevant connectivity commands were handled
.event(Signal.class, this::handleSignal)
.event(ActorRef.class, this::onOtherClientActorStartup)
.event(Terminated.class, this::otherClientActorTerminated)
.eventEquals(HealthSignal.PING, (ping, data) -> {
sender().tell(HealthSignal.PONG, self());
return stay();
})
.event(ClientActorRefs.class, this::refreshClientActorRefs)
.event(FatalPubSubException.class, this::failConnectionDueToPubSubException);
}

Expand All @@ -511,26 +513,6 @@ protected static String escapeActorName(final String name) {
return URLEncoder.encode(name, StandardCharsets.US_ASCII);
}

private FSM.State<BaseClientState, BaseClientData> onOtherClientActorStartup(final ActorRef otherClientActor,
final BaseClientData data) {
clientActorRefs.add(otherClientActor);
getContext().watch(otherClientActor);
return stay();
}

private FSM.State<BaseClientState, BaseClientData> otherClientActorTerminated(final Terminated terminated,
final BaseClientData data) {
clientActorRefs.remove(terminated.getActor());
return stay();
}

private FSM.State<BaseClientState, BaseClientData> refreshClientActorRefs(final ClientActorRefs clientActorRefs,
final BaseClientData data) {
this.clientActorRefs.clear();
this.clientActorRefs.add(clientActorRefs.getSortedRefs());
return stay();
}

private FSM.State<BaseClientState, BaseClientData> failConnectionDueToPubSubException(
final FatalPubSubException exception,
final BaseClientData data) {
Expand Down Expand Up @@ -691,7 +673,7 @@ private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inDisconnectedS
*
* @return an FSM function builder
*/
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingState() {
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingState() {
return matchEventEquals(StateTimeout(), (event, data) -> connectionTimedOut(data))
.event(ConnectionFailure.class, this::connectingConnectionFailed)
.event(ClientConnected.class, this::clientConnectedInConnectingState)
Expand Down Expand Up @@ -721,7 +703,8 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedSt
.eventEquals(Control.RESUBSCRIBE, this::resubscribe);
}

private State<BaseClientState, BaseClientData> updateConnectionStatusSuccess(final ReportConnectionStatusSuccess reportConnectionStatusSuccess,
private State<BaseClientState, BaseClientData> updateConnectionStatusSuccess(
final ReportConnectionStatusSuccess reportConnectionStatusSuccess,
BaseClientData baseClientData) {
BaseClientData nextClientData = baseClientData.setConnectionStatus(ConnectivityStatus.OPEN)
.setRecoveryStatus(RecoveryStatus.SUCCEEDED)
Expand All @@ -733,11 +716,13 @@ private State<BaseClientState, BaseClientData> updateConnectionStatusSuccess(fin
private State<BaseClientState, BaseClientData> updateConnectionStatusError(
final ReportConnectionStatusError reportConnectionStatus,
final BaseClientData baseClientData) {
BaseClientData nextClientData = baseClientData.setConnectionStatus(connectivityStatusResolver.resolve(reportConnectionStatus.cause()))
.setRecoveryStatus(RecoveryStatus.ONGOING)
.setConnectionStatusDetails(
ConnectionFailure.determineFailureDescription(null, reportConnectionStatus.cause(), null))
.setInConnectionStatusSince(Instant.now());
BaseClientData nextClientData =
baseClientData.setConnectionStatus(connectivityStatusResolver.resolve(reportConnectionStatus.cause()))
.setRecoveryStatus(RecoveryStatus.ONGOING)
.setConnectionStatusDetails(
ConnectionFailure.determineFailureDescription(null, reportConnectionStatus.cause(),
null))
.setInConnectionStatusSince(Instant.now());
return stay().using(nextClientData);
}

Expand Down Expand Up @@ -1650,14 +1635,8 @@ private FSM.State<BaseClientState, BaseClientData> handleInboundSignal(final Inb
if (signal instanceof WithSubscriptionId<?>) {
dispatchSearchCommand((WithSubscriptionId<?>) signal);
} else {
final ActorRef recipient = tryExtractEntityId(signal)
.flatMap(clientActorRefs::lookup)
.orElseThrow();
if (getSelf().equals(recipient)) {
outboundDispatchingActor.tell(inboundSignal, getSender());
} else {
recipient.tell(inboundSignal, getSender());
}
final var entityId = tryExtractEntityId(signal).orElseThrow();
connectionPubSub.publishSignal(signal, connectionId(), entityId, getSender());
}
return stay();
}
Expand All @@ -1672,26 +1651,17 @@ private static Optional<EntityId> tryExtractEntityId(final Signal<?> signal) {

private void dispatchSearchCommand(final WithSubscriptionId<?> searchCommand) {
final String subscriptionId = searchCommand.getSubscriptionId();
if (subscriptionId.length() > subscriptionIdPrefixLength) {
if (subscriptionId.length() >= subscriptionIdPrefixLength) {
final var prefix = subscriptionId.substring(0, subscriptionIdPrefixLength);
final Optional<Integer> index = parseHexString(prefix);
if (index.isPresent()) {
final ActorRef receiver = clientActorRefs.get(index.get()).orElseThrow();
if (getSelf().equals(receiver)) {
forwardThingSearchCommand(searchCommand, stateData());
} else {
// sender is overwritten at the client actor responsible for the subscription ID prefix.
receiver.tell(searchCommand, ActorRef.noSender());
}
return;
}
connectionPubSub.publishSignal(searchCommand, connectionId(), prefix, ActorRef.noSender());
} else {
// command is invalid or outdated, dropping.
logger.withCorrelationId(searchCommand)
.info("Dropping search command with invalid subscription ID: <{}>", searchCommand);
connectionLogger.failure(InfoProviderFactory.forSignal(searchCommand),
"Dropping search command with invalid subscription ID: " +
searchCommand.getSubscriptionId());
}
// command is invalid or outdated, dropping.
logger.withCorrelationId(searchCommand)
.info("Dropping search command with invalid subscription ID: <{}>", searchCommand);
connectionLogger.failure(InfoProviderFactory.forSignal(searchCommand),
"Dropping search command with invalid subscription ID: " +
searchCommand.getSubscriptionId());
}

private Instant getInConnectionStatusSince() {
Expand Down Expand Up @@ -1969,7 +1939,12 @@ private CompletionStage<Void> subscribeAndDeclareAcknowledgementLabels(final boo
final CompletionStage<Boolean> subscribe = subscribeToStreamingTypes(group, resubscribe);
final CompletionStage<Void> declare =
dittoProtocolSub.declareAcknowledgementLabels(getDeclaredAcks(), getSelf(), group);
return subscribe.thenCompose(unused -> declare);

// only resub for connection; the initial subscription happened in preStart independent of publisher actors
final CompletionStage<Void> connectionSub = resubscribe
? connectionPubSub.subscribe(connection.getId(), getSelf(), true).thenApply(unused -> null)
: CompletableFuture.completedStage(null);
return subscribe.thenCompose(unused -> declare).thenCompose(unused -> connectionSub);
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* Fake JMS message to defeat the Qpid client when it tries to set AMQP properties willy-nilly.
* Override all the setters to do nothing of already set.
* Override all the setters to do nothing if already set.
*/
public final class JMSMessageWorkaround extends JmsMessage {

Expand Down
Loading

0 comments on commit 717412c

Please sign in to comment.