Skip to content

Commit

Permalink
Fix RetrieveThings handling for connectivity
Browse files Browse the repository at this point in the history
* Now RetrieveThings is forwarded to the right place (the aggregator proxy)
  via EdgeCommandForwarder for both edge services

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed Jun 22, 2022
1 parent 28ff503 commit e1d0c68
Show file tree
Hide file tree
Showing 23 changed files with 108 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.enforcement.ConnectionEnforcerActorPropsFactory;
import org.eclipse.ditto.connectivity.service.messaging.ConnectionIdsRetrievalActor;
import org.eclipse.ditto.connectivity.service.messaging.ConnectivityProxyActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceOperationsActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceStreamingActorCreator;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionSupervisorActor;
Expand Down Expand Up @@ -77,12 +76,9 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,

final ActorRef commandForwarder = getCommandForwarder(clusterConfig, pubSubMediator);

final ActorRef proxyActor =
startChildActor(ConnectivityProxyActor.ACTOR_NAME, ConnectivityProxyActor.props(commandForwarder));

final var enforcerActorPropsFactory = ConnectionEnforcerActorPropsFactory.get(actorSystem);
final var connectionSupervisorProps =
ConnectionSupervisorActor.props(proxyActor, pubSubMediator, enforcerActorPropsFactory);
ConnectionSupervisorActor.props(commandForwarder, pubSubMediator, enforcerActorPropsFactory);
// Create persistence streaming actor (with no cache) and make it known to pubSubMediator.
final ActorRef persistenceStreamingActor =
startChildActor(ConnectionPersistenceStreamingActorCreator.ACTOR_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.cluster.pubsub.DistributedPubSub;
import akka.http.javadsl.ConnectionContext;
import akka.japi.Pair;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.FSMStateFunctionBuilder;
Expand All @@ -165,7 +164,6 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
protected final ThreadSafeDittoLoggingAdapter logger;
protected static final Status.Success DONE = new Status.Success(Done.getInstance());

protected ConnectionContext connectionContext;
protected final ConnectionLogger connectionLogger;
protected final ConnectivityStatusResolver connectivityStatusResolver;

Expand All @@ -184,7 +182,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private final Connection connection;
private final ConnectivityConfig connectivityConfig;
private final ActorRef connectionActor;
private final ActorSelection proxyActorSelection;
private final ActorSelection commandForwarderActorSelection;
private final Gauge clientGauge;
private final Gauge clientConnectingGauge;
private final ReconnectTimeoutStrategy reconnectTimeoutStrategy;
Expand All @@ -197,7 +195,6 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
protected final ConnectivityCounterRegistry connectionCounterRegistry;
protected final ConnectionLoggerRegistry connectionLoggerRegistry;
private final boolean dryRun;
private final ActorRef proxyActor;

// counter for all child actors ever started to disambiguate between them
private int childActorCount = 0;
Expand All @@ -208,7 +205,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private ActorRef tunnelActor;

protected BaseClientActor(final Connection connection,
final ActorRef proxyActor,
final ActorRef commandForwarderActor,
final ActorRef connectionActor,
final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {
Expand All @@ -219,7 +216,6 @@ protected BaseClientActor(final Connection connection,
final Config withOverwrites = connectivityConfigOverwrites.withFallback(config);
connectivityConfig = ConnectivityConfig.of(withOverwrites);
this.connectionActor = connectionActor;
this.proxyActor = proxyActor;

// this is retrieve via the extension for each baseClientActor in order to not pass it as constructor arg
// as all constructor arguments need to be serializable as the BaseClientActor is started behind a cluster
Expand All @@ -233,7 +229,7 @@ protected BaseClientActor(final Connection connection,
// log the default client ID for tracing
logger.info("Using default client ID <{}>", getDefaultClientId());

proxyActorSelection = getLocalActorOfSamePath(proxyActor);
commandForwarderActorSelection = getLocalActorOfSamePath(commandForwarderActor);

final UserIndicatedErrors userIndicatedErrors = UserIndicatedErrors.of(config);
connectivityStatusResolver = ConnectivityStatusResolver.of(userIndicatedErrors);
Expand Down Expand Up @@ -322,7 +318,7 @@ protected void init() {

final var inboundDispatchingSink = getInboundDispatchingSink(actorPair.second());
inboundMappingSink = getInboundMappingSink(protocolAdapter, inboundDispatchingSink);
subscriptionManager = startSubscriptionManager(proxyActorSelection, connectivityConfig().getClientConfig());
subscriptionManager = startSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());

if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) {
tunnelActor = startChildActor(SshTunnelActor.ACTOR_NAME, SshTunnelActor.props(connection,
Expand Down Expand Up @@ -395,15 +391,6 @@ protected String getDefaultClientId() {
return getClientId(connection.getId());
}

/**
* Get the proxyActor reference.
*
* @return the proxyActor ref.
*/
protected ActorRef getProxyActor() {
return proxyActor;
}

private FSM.State<BaseClientState, BaseClientData> completeInitialization() {

final State<BaseClientState, BaseClientData> state = goTo(INITIALIZED);
Expand Down Expand Up @@ -1727,7 +1714,7 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final ProtocolAdapter proto
try {
// this one throws DittoRuntimeExceptions when the mapper could not be configured
settings = OutboundMappingSettings.of(connection, connectivityConfig, getContext().getSystem(),
proxyActorSelection, protocolAdapter, logger);
commandForwarderActorSelection, protocolAdapter, logger);
outboundMappingProcessors = IntStream.range(0, processorPoolSize)
.mapToObj(i -> OutboundMappingProcessor.of(settings))
.toList();
Expand Down Expand Up @@ -1763,7 +1750,7 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final ProtocolAdapter proto
private Sink<Object, NotUsed> getInboundDispatchingSink(final ActorRef outboundMappingProcessorActor) {
return InboundDispatchingSink.createSink(connection,
protocolAdapter.headerTranslator(),
proxyActorSelection,
commandForwarderActorSelection,
connectionActor,
outboundMappingProcessorActor,
getSelf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ public interface ClientActorPropsFactory extends DittoExtensionPoint {
* Create actor {@link Props} for a connection.
*
* @param connection the connection.
* @param proxyActor the actor used to send signals into the ditto cluster..
* @param commandForwarderActor the actor used to send signals into the ditto cluster..
* @param connectionActor the connectionPersistenceActor which creates this client.
* @param actorSystem the actorSystem.
* @param dittoHeaders Ditto headers of the command that caused the client actors to be created.
* @return the actor props
*/
Props getActorPropsForType(Connection connection,
ActorRef proxyActor,
ActorRef commandForwarderActor,
ActorRef connectionActor,
ActorSystem actorSystem,
DittoHeaders dittoHeaders,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public DefaultClientActorPropsFactory(final ActorSystem actorSystem) {
}

@Override
public Props getActorPropsForType(final Connection connection, final ActorRef proxyActor,
public Props getActorPropsForType(final Connection connection, final ActorRef commandForwarderActor,
final ActorRef connectionActor,
final ActorSystem actorSystem,
final DittoHeaders dittoHeaders,
Expand All @@ -51,27 +51,27 @@ public Props getActorPropsForType(final Connection connection, final ActorRef pr
final Props result;
switch (connectionType) {
case AMQP_091:
result = RabbitMQClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
result = RabbitMQClientActor.props(connection, commandForwarderActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case AMQP_10:
result = AmqpClientActor.props(connection, proxyActor, connectionActor, connectivityConfigOverwrites,
result = AmqpClientActor.props(connection, commandForwarderActor, connectionActor, connectivityConfigOverwrites,
actorSystem, dittoHeaders);
break;
case MQTT:
result = HiveMqtt3ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
result = HiveMqtt3ClientActor.props(connection, commandForwarderActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case MQTT_5:
result = HiveMqtt5ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
result = HiveMqtt5ClientActor.props(connection, commandForwarderActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case KAFKA:
result = KafkaClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
result = KafkaClientActor.props(connection, commandForwarderActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case HTTP_PUSH:
result = HttpPushClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
result = HttpPushClientActor.props(connection, commandForwarderActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ public final class AmqpClientActor extends BaseClientActor implements ExceptionL
*/
@SuppressWarnings("unused")
private AmqpClientActor(final Connection connection,
final ActorRef proxyActor,
final ActorRef commandForwarderActor,
final ActorRef connectionActor,
final Config connectivityConfigOverwrites,
final DittoHeaders dittoHeaders) {

super(connection, proxyActor, connectionActor, dittoHeaders, connectivityConfigOverwrites);
super(connection, commandForwarderActor, connectionActor, dittoHeaders, connectivityConfigOverwrites);
final ConnectionConfig connectionConfig = connectivityConfig().getConnectionConfig();
final Amqp10Config amqp10Config = connectionConfig.getAmqp10Config();
jmsConnectionFactory =
Expand All @@ -152,10 +152,10 @@ private AmqpClientActor(final Connection connection,
@SuppressWarnings("unused")
private AmqpClientActor(final Connection connection,
final JmsConnectionFactory jmsConnectionFactory,
final ActorRef proxyActor,
final ActorRef commandForwarderActor,
final ActorRef connectionActor, final DittoHeaders dittoHeaders) {

super(connection, proxyActor, connectionActor, dittoHeaders, ConfigFactory.empty());
super(connection, commandForwarderActor, connectionActor, dittoHeaders, ConfigFactory.empty());

this.jmsConnectionFactory = jmsConnectionFactory;
connectionListener = new StatusReportingListener(getSelf(), logger, connectionLogger);
Expand All @@ -170,38 +170,38 @@ private AmqpClientActor(final Connection connection,
* Creates Akka configuration object for this actor.
*
* @param connection the connection.
* @param proxyActor the actor used to send signals into the ditto cluster.
* @param commandForwarderActor the actor used to send signals into the ditto cluster.
* @param connectionActor the connectionPersistenceActor which created this client.
* @param configOverwrites an override for the default connectivity config values -
* @param actorSystem the actor system.
* as Typesafe {@code Config} because this one is serializable in Akka by default.
* @param dittoHeaders headers of the command that caused this actor to be created.
* @return the Akka configuration Props object.
*/
public static Props props(final Connection connection, final ActorRef proxyActor,
public static Props props(final Connection connection, final ActorRef commandForwarderActor,
final ActorRef connectionActor, final Config configOverwrites, final ActorSystem actorSystem,
final DittoHeaders dittoHeaders) {

return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem, configOverwrites),
proxyActor, connectionActor, configOverwrites, dittoHeaders);
commandForwarderActor, connectionActor, configOverwrites, dittoHeaders);
}

/**
* Creates Akka configuration object for this actor.
*
* @param connection connection parameters.
* @param proxyActor the actor used to send signals into the ditto cluster.
* @param commandForwarderActor the actor used to send signals into the ditto cluster.
* @param connectionActor the connectionPersistenceActor which created this client.
* @param jmsConnectionFactory the JMS connection factory.
* @param actorSystem the actor system.
* @return the Akka configuration Props object.
*/
static Props propsForTest(final Connection connection, @Nullable final ActorRef proxyActor,
static Props propsForTest(final Connection connection, @Nullable final ActorRef commandForwarderActor,
final ActorRef connectionActor, final JmsConnectionFactory jmsConnectionFactory,
final ActorSystem actorSystem) {

return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem, ConfigFactory.empty()),
jmsConnectionFactory, proxyActor, connectionActor, DittoHeaders.empty());
jmsConnectionFactory, commandForwarderActor, connectionActor, DittoHeaders.empty());
}

private static Connection validateConnection(final Connection connection, final ActorSystem actorSystem,
Expand Down
Loading

0 comments on commit e1d0c68

Please sign in to comment.