Skip to content

Commit

Permalink
[#1081] Make Ditto headers serializable & add them as an extra parame…
Browse files Browse the repository at this point in the history
…ter for client actors.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 29, 2021
1 parent 672c3ff commit 6a5a52d
Show file tree
Hide file tree
Showing 27 changed files with 167 additions and 113 deletions.
Expand Up @@ -18,7 +18,8 @@
import java.util.Optional;
import java.util.Set;

import org.eclipse.ditto.json.JsonObject;
import javax.annotation.Nonnull;

import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
Expand All @@ -29,13 +30,14 @@
import org.eclipse.ditto.base.model.headers.metadata.MetadataHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.json.JsonObject;

/**
* Headers for commands and their responses which provide additional information needed for correlation and transfer.
*
* <em>Implementations of this interface are required to be immutable.</em>
*/
public interface DittoHeaders extends Jsonifiable<JsonObject>, Map<String, String> {
public interface DittoHeaders extends Jsonifiable<JsonObject>, Map<String, String>, WithManifest {

/**
* Returns an empty {@code DittoHeaders} object.
Expand Down Expand Up @@ -310,4 +312,11 @@ default DittoHeadersBuilder toBuilder() {
* @since 2.0.0
*/
Map<String, String> asCaseSensitiveMap();

@Override
@Nonnull
default String getManifest() {
// subclasses are serialized as DittoHeaders
return DittoHeaders.class.getSimpleName();
}
}
Expand Up @@ -77,6 +77,20 @@ default CompletionStage<ConnectionContext> getConnectionContext(final Connection
CompletionStage<Void> registerForConnectivityConfigChanges(ConnectionId connectionId,
DittoHeaders dittoHeaders, ActorRef subscriber);

/**
* Register the given {@code subscriber} for changes to the {@link ConnectivityConfig} of the given connection.
* The given {@link ActorRef} will receive {@link Event}s to build the modified
* {@link ConnectivityConfig}.
*
* @param context context of the connection whose config changes are subscribed
* @param subscriber the subscriber that will receive {@link org.eclipse.ditto.base.model.signals.events.Event}s
* @return a future that succeeds or fails depends on whether registration was successful.
*/
default CompletionStage<Void> registerForConnectivityConfigChanges(final ConnectionContext context,
final ActorRef subscriber) {
return registerForConnectivityConfigChanges(context.getConnection().getId(), DittoHeaders.empty(), subscriber);
}

/**
* Returns {@code true} if the implementation can handle the given {@code event} to generate a modified {@link
* ConnectivityConfig} when passed to {@link #handleEvent(Event)}.
Expand Down
Expand Up @@ -12,8 +12,6 @@
*/
package org.eclipse.ditto.connectivity.service.config;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.base.model.signals.events.Event;

import akka.actor.AbstractActor;
Expand Down Expand Up @@ -46,16 +44,6 @@ default void handleEvent(final Event<?> event) {
getConnectivityConfigProvider().handleEvent(event).ifPresent(this::onConnectivityConfigModified);
}

/**
* Registers this actor for changes to connectivity config.
*
* @param connectionId the connection id
*/
default void registerForConfigChanges(ConnectionId connectionId) {
getConnectivityConfigProvider()
.registerForConnectivityConfigChanges(connectionId, DittoHeaders.empty(), self());
}

/**
* @return a {@link ConnectionContextProvider} required to register this actor for config changes
*/
Expand Down
Expand Up @@ -199,7 +199,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private int childActorCount = 0;

protected BaseClientActor(final Connection connection, @Nullable final ActorRef proxyActor,
final ActorRef connectionActor) {
final ActorRef connectionActor, final DittoHeaders dittoHeaders) {

this.connection = checkNotNull(connection, "connection");
this.connectionActor = connectionActor;
Expand Down Expand Up @@ -246,7 +246,7 @@ protected BaseClientActor(final Connection connection, @Nullable final ActorRef
ConnectionPersistenceActor.getSubscriptionPrefixLength(connection.getClientCount());

// Send init message to allow for unsafe initialization of subclasses.
getSelf().tell(Control.INIT_START, ActorRef.noSender());
startInitialization(connection.getId(), dittoHeaders);
}

@Override
Expand Down Expand Up @@ -425,10 +425,9 @@ private FSM.State<BaseClientState, BaseClientData> completeInitialization() {
return state;
}

private FSM.State<BaseClientState, BaseClientData> startInitialization() {
pipeConnectionContextToSelf(connectionContextProvider, connection, getSelf(), getContext().getDispatcher());
connectionContextProvider.registerForConnectivityConfigChanges(connectionId(), DittoHeaders.empty(), getSelf());
return stay();
private void startInitialization(final ConnectionId connectionId, final DittoHeaders dittoHeaders) {
pipeConnectionContextToSelfAndRegisterForChanges(connectionContextProvider, connection, dittoHeaders, getSelf(),
getContext().getDispatcher());
}

/**
Expand Down Expand Up @@ -709,7 +708,6 @@ private FSM.State<BaseClientState, BaseClientData> goToTesting() {

private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inUnknownState() {
return matchEventEquals(Control.INIT_COMPLETE, (init, baseClientData) -> completeInitialization())
.eventEquals(Control.INIT_START, (init, baseClientData) -> startInitialization())
.event(ConnectionContext.class, this::initializeByConnectionContext)
.event(RuntimeException.class, this::failInitialization)
.anyEvent((o, baseClientData) -> {
Expand Down Expand Up @@ -1909,18 +1907,24 @@ private static Optional<Integer> parseHexString(final String hexString) {
}
}

private static void pipeConnectionContextToSelf(final ConnectionContextProvider connectionContextProvider,
final Connection connection, final ActorRef self, final ExecutionContext executionContext) {
private static void pipeConnectionContextToSelfAndRegisterForChanges(
final ConnectionContextProvider connectionContextProvider,
final Connection connection, final DittoHeaders dittoHeaders,
final ActorRef self, final ExecutionContext executionContext) {

final CompletionStage<Object> messageToSelfFuture = connectionContextProvider.getConnectionContext(connection)
.<Object>thenApply(x -> x)
.exceptionally(throwable -> {
if (throwable instanceof RuntimeException) {
return throwable;
} else {
return new RuntimeException(throwable);
}
});
final CompletionStage<Object> messageToSelfFuture =
connectionContextProvider.getConnectionContext(connection, dittoHeaders)
.thenCompose(context ->
connectionContextProvider.registerForConnectivityConfigChanges(context, self)
.<Object>thenApply(_void -> context)
)
.exceptionally(throwable -> {
if (throwable instanceof RuntimeException) {
return throwable;
} else {
return new RuntimeException(throwable);
}
});

Patterns.pipe(messageToSelfFuture, executionContext).to(self);
}
Expand Down Expand Up @@ -2116,7 +2120,6 @@ public String toString() {
}

private enum Control {
INIT_START,
INIT_COMPLETE,
REFRESH_CLIENT_ACTOR_REFS,
CONNECT_AFTER_TUNNEL_ESTABLISHED
Expand Down
Expand Up @@ -14,6 +14,7 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;

import akka.actor.ActorRef;
Expand All @@ -32,9 +33,10 @@ public interface ClientActorPropsFactory {
* @param proxyActor the actor used to send signals into the ditto cluster..
* @param connectionActor the connectionPersistenceActor which creates this client.
* @param actorSystem the actorSystem.
* @param dittoHeaders Ditto headers of the command that caused the client actors to be created.
* @return the actor props
*/
Props getActorPropsForType(Connection connection, @Nullable ActorRef proxyActor, ActorRef connectionActor,
ActorSystem actorSystem);
ActorSystem actorSystem, final DittoHeaders dittoHeaders);

}
Expand Up @@ -15,6 +15,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.service.messaging.amqp.AmqpClientActor;
Expand Down Expand Up @@ -49,29 +50,30 @@ public static DefaultClientActorPropsFactory getInstance() {

@Override
public Props getActorPropsForType(final Connection connection, @Nullable final ActorRef proxyActor,
final ActorRef connectionActor, final ActorSystem actorSystem) {
final ActorRef connectionActor, final ActorSystem actorSystem,
final DittoHeaders dittoHeaders) {
final ConnectionType connectionType = connection.getConnectionType();

final Props result;
switch (connectionType) {
case AMQP_091:
result = RabbitMQClientActor.props(connection, proxyActor, connectionActor);
result = RabbitMQClientActor.props(connection, proxyActor, connectionActor, dittoHeaders);
break;
case AMQP_10:
result = AmqpClientActor.props(connection, proxyActor, connectionActor, actorSystem);
result = AmqpClientActor.props(connection, proxyActor, connectionActor, actorSystem, dittoHeaders);
break;
case MQTT:
result = HiveMqtt3ClientActor.props(connection, proxyActor, connectionActor);
result = HiveMqtt3ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders);
break;
case MQTT_5:
result = HiveMqtt5ClientActor.props(connection, proxyActor, connectionActor);
result = HiveMqtt5ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders);
break;
case KAFKA:
result = KafkaClientActor.props(connection, proxyActor, connectionActor,
DefaultKafkaPublisherActorFactory.getInstance());
DefaultKafkaPublisherActorFactory.getInstance(), dittoHeaders);
break;
case HTTP_PUSH:
result = HttpPushClientActor.props(connection, connectionActor);
result = HttpPushClientActor.props(connection, connectionActor, dittoHeaders);
break;
default:
throw new IllegalArgumentException("ConnectionType <" + connectionType + "> is not supported.");
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.qpid.jms.JmsSession;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.provider.ProviderFactory;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
Expand Down Expand Up @@ -116,9 +117,10 @@ public final class AmqpClientActor extends BaseClientActor implements ExceptionL
private AmqpClientActor(final Connection connection,
@Nullable final ActorRef proxyActor,
final ActorRef connectionActor,
final Config amqp10configOverride) {
final Config amqp10configOverride,
final DittoHeaders dittoHeaders) {

super(connection, proxyActor, connectionActor);
super(connection, proxyActor, connectionActor, dittoHeaders);

final Config systemConfig = getContext().getSystem().settings().config();
final Config mergedConfig = systemConfig.withValue(AMQP_10_CONFIG_PATH,
Expand All @@ -144,9 +146,9 @@ private AmqpClientActor(final Connection connection,
private AmqpClientActor(final Connection connection,
final JmsConnectionFactory jmsConnectionFactory,
@Nullable final ActorRef proxyActor,
final ActorRef connectionActor) {
final ActorRef connectionActor, final DittoHeaders dittoHeaders) {

super(connection, proxyActor, connectionActor);
super(connection, proxyActor, connectionActor, dittoHeaders);

this.jmsConnectionFactory = jmsConnectionFactory;
connectionListener = new StatusReportingListener(getSelf(), logger, connectionLogger);
Expand All @@ -163,13 +165,15 @@ private AmqpClientActor(final Connection connection,
* @param proxyActor the actor used to send signals into the ditto cluster.
* @param connectionActor the connectionPersistenceActor which created this client.
* @param actorSystem the actor system.
* @param dittoHeaders headers of the command that caused this actor to be created.
* @return the Akka configuration Props object.
*/
public static Props props(final Connection connection, @Nullable final ActorRef proxyActor,
final ActorRef connectionActor, final ActorSystem actorSystem) {
final ActorRef connectionActor, final ActorSystem actorSystem,
final DittoHeaders dittoHeaders) {

return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem), proxyActor,
connectionActor, ConfigFactory.empty());
connectionActor, ConfigFactory.empty(), dittoHeaders);
}

/**
Expand All @@ -181,13 +185,15 @@ public static Props props(final Connection connection, @Nullable final ActorRef
* @param amqp10configOverride an override for Amqp10Config values -
* @param actorSystem the actor system.
* as Typesafe {@code Config} because this one is serializable in Akka by default.
* @param dittoHeaders headers of the command that caused this actor to be created.
* @return the Akka configuration Props object.
*/
public static Props props(final Connection connection, @Nullable final ActorRef proxyActor,
final ActorRef connectionActor, final Config amqp10configOverride, final ActorSystem actorSystem) {
final ActorRef connectionActor, final Config amqp10configOverride, final ActorSystem actorSystem,
final DittoHeaders dittoHeaders) {

return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem), proxyActor,
connectionActor, amqp10configOverride);
connectionActor, amqp10configOverride, dittoHeaders);
}

/**
Expand All @@ -205,7 +211,7 @@ static Props propsForTest(final Connection connection, @Nullable final ActorRef
final ActorSystem actorSystem) {

return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem),
jmsConnectionFactory, proxyActor, connectionActor);
jmsConnectionFactory, proxyActor, connectionActor, DittoHeaders.empty());
}

private static Connection validateConnection(final Connection connection, final ActorSystem actorSystem) {
Expand Down
Expand Up @@ -168,7 +168,8 @@ public Receive createReceive() {
@Override
public void preStart() throws JMSException {
initMessageConsumer();
registerForConfigChanges(connectionId);
getConnectivityConfigProvider()
.registerForConnectivityConfigChanges(consumerData.getConnectionContext(), getSelf());
}

@Override
Expand Down
Expand Up @@ -63,8 +63,9 @@ public final class HttpPushClientActor extends BaseClientActor {
private final HttpPushConfig httpPushConfig;

@SuppressWarnings("unused")
private HttpPushClientActor(final Connection connection, final ActorRef connectionActor) {
super(connection, ActorRef.noSender(), connectionActor);
private HttpPushClientActor(final Connection connection, final ActorRef connectionActor,
final DittoHeaders dittoHeaders) {
super(connection, ActorRef.noSender(), connectionActor, dittoHeaders);

final DittoConnectivityConfig connectivityConfig = DittoConnectivityConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
Expand All @@ -82,10 +83,12 @@ private HttpPushClientActor(final Connection connection, final ActorRef connecti
*
* @param connection the HTTP-push connection.
* @param connectionActor the connectionPersistenceActor which created this client.
* @param dittoHeaders headers of the command that caused this actor to be created.
* @return the {@code Props} object.
*/
public static Props props(final Connection connection, final ActorRef connectionActor) {
return Props.create(HttpPushClientActor.class, connection, connectionActor);
public static Props props(final Connection connection, final ActorRef connectionActor,
final DittoHeaders dittoHeaders) {
return Props.create(HttpPushClientActor.class, connection, connectionActor, dittoHeaders);
}

@Override
Expand Down
Expand Up @@ -63,9 +63,10 @@ public final class KafkaClientActor extends BaseClientActor {
private KafkaClientActor(final Connection connection,
@Nullable final ActorRef proxyActor,
final ActorRef connectionActor,
final KafkaPublisherActorFactory publisherActorFactory) {
final KafkaPublisherActorFactory publisherActorFactory,
final DittoHeaders dittoHeaders) {

super(connection, proxyActor, connectionActor);
super(connection, proxyActor, connectionActor, dittoHeaders);
final ConnectionConfig connectionConfig = connectionContext.getConnectivityConfig().getConnectionConfig();
kafkaConfig = connectionConfig.getKafkaConfig();
kafkaConsumerActors = new ArrayList<>();
Expand All @@ -81,15 +82,17 @@ private KafkaClientActor(final Connection connection,
* @param proxyActor the actor used to send signals into the ditto cluster.
* @param connectionActor the connectionPersistenceActor which created this client.
* @param factory factory for creating a kafka publisher actor.
* @param dittoHeaders headers of the command that caused this actor to be created.
* @return the Akka configuration Props object.
*/
public static Props props(final Connection connection,
@Nullable final ActorRef proxyActor,
final ActorRef connectionActor,
final KafkaPublisherActorFactory factory) {
final KafkaPublisherActorFactory factory,
final DittoHeaders dittoHeaders) {

return Props.create(KafkaClientActor.class, validateConnection(connection), proxyActor, connectionActor,
factory);
factory, dittoHeaders);
}

private static Connection validateConnection(final Connection connection) {
Expand Down

0 comments on commit 6a5a52d

Please sign in to comment.