Skip to content

Commit

Permalink
Remove headers as a part of client actor props to prevent accidental …
Browse files Browse the repository at this point in the history
…restarts.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 6, 2022
1 parent a8714fd commit 67c8f3c
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
protected BaseClientActor(final Connection connection,
final ActorRef commandForwarderActor,
final ActorRef connectionActor,
final DittoHeaders dittoHeaders,
final boolean dryRun,
final Config connectivityConfigOverwrites) {

this.connection = checkNotNull(connection, "connection");
Expand Down Expand Up @@ -266,8 +266,7 @@ protected BaseClientActor(final Connection connection,
subscriptionIdPrefixLength =
ConnectionPersistenceActor.getSubscriptionPrefixLength(connection.getClientCount());

// Send init message to allow for unsafe initialization of subclasses.
dryRun = dittoHeaders.isDryRun();
this.dryRun = dryRun;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ private AmqpClientActor(final Connection connection,
final ActorRef commandForwarderActor,
final ActorRef connectionActor,
final Config connectivityConfigOverwrites,
final DittoHeaders dittoHeaders) {
final boolean dryRun) {

super(connection, commandForwarderActor, connectionActor, dittoHeaders, connectivityConfigOverwrites);
super(connection, commandForwarderActor, connectionActor, dryRun, connectivityConfigOverwrites);
final ConnectionConfig connectionConfig = connectivityConfig().getConnectionConfig();
final Amqp10Config amqp10Config = connectionConfig.getAmqp10Config();
jmsConnectionFactory =
Expand All @@ -154,9 +154,10 @@ private AmqpClientActor(final Connection connection,
private AmqpClientActor(final Connection connection,
final JmsConnectionFactory jmsConnectionFactory,
final ActorRef commandForwarderActor,
final ActorRef connectionActor, final DittoHeaders dittoHeaders) {
final ActorRef connectionActor,
final boolean dryRun) {

super(connection, commandForwarderActor, connectionActor, dittoHeaders, ConfigFactory.empty());
super(connection, commandForwarderActor, connectionActor, dryRun, ConfigFactory.empty());

this.jmsConnectionFactory = jmsConnectionFactory;
connectionListener = new StatusReportingListener(getSelf(), logger, connectionLogger);
Expand All @@ -183,7 +184,7 @@ public static Props props(final Connection connection, final ActorRef commandFor
final ActorRef connectionActor, final Config configOverwrites, final ActorSystem actorSystem,
final DittoHeaders dittoHeaders) {
return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem),
commandForwarderActor, connectionActor, configOverwrites, dittoHeaders);
commandForwarderActor, connectionActor, configOverwrites, dittoHeaders.isDryRun());
}

/**
Expand All @@ -200,7 +201,7 @@ static Props propsForTest(final Connection connection, @Nullable final ActorRef
final ActorRef connectionActor, final JmsConnectionFactory jmsConnectionFactory,
final ActorSystem actorSystem) {
return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem),
jmsConnectionFactory, commandForwarderActor, connectionActor, DittoHeaders.empty());
jmsConnectionFactory, commandForwarderActor, connectionActor, false);
}

private static Connection validateConnection(final Connection connection, final ActorSystem actorSystem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ public final class HttpPushClientActor extends BaseClientActor {
private HttpPushClientActor(final Connection connection,
final ActorRef connectionActor,
final ActorRef commandForwarderActor,
final DittoHeaders dittoHeaders,
final boolean dryRun,
final Config connectivityConfigOverwrites) {

super(connection, commandForwarderActor, connectionActor, dittoHeaders, connectivityConfigOverwrites);
super(connection, commandForwarderActor, connectionActor, dryRun, connectivityConfigOverwrites);
httpPushConfig = connectivityConfig().getConnectionConfig().getHttpPushConfig();
final MonitoringLoggerConfig loggerConfig = connectivityConfig().getMonitoringConfig().logger();
factory = HttpPushFactory.of(connection, httpPushConfig, connectionLogger, this::getSshTunnelState);
Expand All @@ -93,8 +93,8 @@ private HttpPushClientActor(final Connection connection,
public static Props props(final Connection connection, final ActorRef commandForwarderActor,
final ActorRef connectionActor, final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {
return Props.create(HttpPushClientActor.class, connection, connectionActor, commandForwarderActor, dittoHeaders,
connectivityConfigOverwrites);
return Props.create(HttpPushClientActor.class, connection, connectionActor, commandForwarderActor,
dittoHeaders.isDryRun(), connectivityConfigOverwrites);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ private KafkaClientActor(final Connection connection,
final ActorRef commandForwarderActor,
final ActorRef connectionActor,
final KafkaPublisherActorFactory publisherActorFactory,
final DittoHeaders dittoHeaders,
final boolean dryRun,
final Config connectivityConfigOverwrites) {

super(connection, commandForwarderActor, connectionActor, dittoHeaders, connectivityConfigOverwrites);
super(connection, commandForwarderActor, connectionActor, dryRun, connectivityConfigOverwrites);
kafkaConfig = connectivityConfig().getConnectionConfig().getKafkaConfig();
kafkaConsumerActors = new ArrayList<>();
propertiesFactory = PropertiesFactory.newInstance(connection, kafkaConfig, getClientId(connection.getId()));
Expand All @@ -89,11 +89,11 @@ private KafkaClientActor(final Connection connection,
private KafkaClientActor(final Connection connection,
final ActorRef commandForwarderActor,
final ActorRef connectionActor,
final DittoHeaders dittoHeaders,
final boolean dryRun,
final Config connectivityConfigOverwrites) {

this(connection, commandForwarderActor, connectionActor, DefaultKafkaPublisherActorFactory.getInstance(),
dittoHeaders, connectivityConfigOverwrites);
dryRun, connectivityConfigOverwrites);

connectionCounterRegistry.registerAlertFactory(ConnectionType.KAFKA, MetricType.THROTTLED,
MetricDirection.INBOUND,
Expand All @@ -118,7 +118,7 @@ public static Props props(final Connection connection,
final Config connectivityConfigOverwrites) {

return Props.create(KafkaClientActor.class, validateConnection(connection), commandForwarderActor,
connectionActor, dittoHeaders, connectivityConfigOverwrites);
connectionActor, dittoHeaders.isDryRun(), connectivityConfigOverwrites);
}

static Props propsForTests(final Connection connection,
Expand All @@ -128,7 +128,7 @@ static Props propsForTests(final Connection connection,
final DittoHeaders dittoHeaders) {

return Props.create(KafkaClientActor.class, validateConnection(connection), proxyActor, connectionActor,
factory, dittoHeaders, ConfigFactory.empty());
factory, dittoHeaders.isDryRun(), ConfigFactory.empty());
}

private static Connection validateConnection(final Connection connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ public final class MqttClientActor extends BaseClientActor {
private MqttClientActor(final Connection connection,
final ActorRef commandForwarder,
final ActorRef connectionActor,
final DittoHeaders dittoHeaders,
final boolean dryRun,
final Config connectivityConfigOverwrites) {

super(connection, commandForwarder, connectionActor, dittoHeaders, connectivityConfigOverwrites);
super(connection, commandForwarder, connectionActor, dryRun, connectivityConfigOverwrites);

final var connectivityConfig = connectivityConfig();
final var connectionConfig = connectivityConfig.getConnectionConfig();
Expand Down Expand Up @@ -129,7 +129,7 @@ public static Props props(final Connection mqttConnection,
ConditionChecker.checkNotNull(mqttConnection, "mqttConnection"),
ConditionChecker.checkNotNull(commandForwarder, "commandForwarder"),
ConditionChecker.checkNotNull(connectionActor, "connectionActor"),
ConditionChecker.checkNotNull(dittoHeaders, "dittoHeaders"),
ConditionChecker.checkNotNull(dittoHeaders, "dittoHeaders").isDryRun(),
ConditionChecker.checkNotNull(connectivityConfigOverwrites, "connectivityConfigOverwrites"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ protected void processPingCommand(final PingCommand ping) {
// only ask for connection status after initial recovery was completed + some grace period
askSelfForRetrieveConnectionStatus(ping.getCorrelationId().orElse(null));
}

// refresh client actors
if (isDesiredStateOpen()) {
startClientActors(getClientCount(), DittoHeaders.empty());
}
}

private void askSelfForRetrieveConnectionStatus(@Nullable final CharSequence correlationId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ public final class RabbitMQClientActor extends BaseClientActor {
private RabbitMQClientActor(final Connection connection,
final ActorRef commandForwarderActor,
final ActorRef connectionActor,
final DittoHeaders dittoHeaders,
final boolean dryRun,
final Config connectivityOverwritesConfig) {

super(connection, commandForwarderActor, connectionActor, dittoHeaders, connectivityOverwritesConfig);
super(connection, commandForwarderActor, connectionActor, dryRun, connectivityOverwritesConfig);

rabbitConnectionFactoryFactory =
ConnectionBasedRabbitConnectionFactoryFactory.getInstance(this::getSshTunnelState);
Expand All @@ -116,10 +116,10 @@ private RabbitMQClientActor(final Connection connection,
private RabbitMQClientActor(final Connection connection, final ActorRef proxyActor,
final ActorRef connectionActor,
final RabbitConnectionFactoryFactory rabbitConnectionFactoryFactory,
final DittoHeaders dittoHeaders,
final boolean dryRun,
final Config connectivityOverwritesConfig) {

super(connection, proxyActor, connectionActor, dittoHeaders, connectivityOverwritesConfig);
super(connection, proxyActor, connectionActor, dryRun, connectivityOverwritesConfig);
this.rabbitConnectionFactoryFactory = rabbitConnectionFactoryFactory;
consumedTagsToAddresses = new HashMap<>();
consumerByAddressWithIndex = new HashMap<>();
Expand All @@ -143,7 +143,7 @@ public static Props props(final Connection connection, final ActorRef commandFor
final Config connectivityOverwritesConfig) {

return Props.create(RabbitMQClientActor.class, validateConnection(connection), commandForwarderActor,
connectionActor, dittoHeaders, connectivityOverwritesConfig);
connectionActor, dittoHeaders.isDryRun(), connectivityOverwritesConfig);
}

@Override
Expand All @@ -167,7 +167,7 @@ static Props propsForTests(final Connection connection, @Nullable final ActorRef
final ActorRef connectionActor, final RabbitConnectionFactoryFactory rabbitConnectionFactoryFactory) {

return Props.create(RabbitMQClientActor.class, validateConnection(connection), proxyActor, connectionActor,
rabbitConnectionFactoryFactory, DittoHeaders.empty(), ConfigFactory.empty());
rabbitConnectionFactoryFactory, false, ConfigFactory.empty());
}

private static Connection validateConnection(final Connection connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ public DummyClientActor(final Connection connection,
final BaseClientActor delegate,
final Config config) {

super(connection, proxyActor, connectionActor, DittoHeaders.empty(), config);
super(connection, proxyActor, connectionActor, false, config);
this.publisherActor = publisherActor;
this.delegate = delegate;
}
Expand Down

0 comments on commit 67c8f3c

Please sign in to comment.