Skip to content

Commit

Permalink
Issue #559: Minor refactoring. Mainly code formatting, simplified som…
Browse files Browse the repository at this point in the history
…e methods.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Oct 20, 2021
1 parent 6f658ce commit a9b2d24
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta

private static final String DITTO_STATE_TIMEOUT_TIMER = "dittoStateTimeout";
private static final int SOCKET_CHECK_TIMEOUT_MS = 2000;
private static final String CLOSED_BECAUSE_OF_UNKNOWN_FAILURE_MISCONFIGURATION_STATUS_IN_CLIENT =
private static final String CLOSED_BECAUSE_OF_UNKNOWN_FAILURE_MISCONFIGURATION_STATUS_IN_CLIENT =
"Closed because of unknown/failure/misconfiguration status in client.";
/**
* Common logger for all sub-classes of BaseClientActor as its MDC already contains the connection ID.
Expand All @@ -182,7 +182,6 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta

private final Connection connection;
private final ActorRef connectionActor;
private final ActorRef proxyActor;
private final ActorSelection proxyActorSelection;
private final Gauge clientGauge;
private final Gauge clientConnectingGauge;
Expand Down Expand Up @@ -222,7 +221,7 @@ protected BaseClientActor(final Connection connection,
materializer = Materializer.createMaterializer(system);
this.connection = checkNotNull(connection, "connection");
this.connectionActor = connectionActor;
this.proxyActor = proxyActor;

// this is retrieve via the extension for each baseClientActor in order to not pass it as constructor arg
// as all constructor arguments need to be serializable as the BaseClientActor is started behind a cluster
// router
Expand Down Expand Up @@ -296,10 +295,13 @@ public void preStart() throws Exception {
when(TESTING, inTestingState());

// start with UNKNOWN state but send self OpenConnection because client actors are never created closed
final BaseClientData startingData =
BaseClientData.BaseClientDataBuilder.from(connection.getId(), connection, ConnectivityStatus.UNKNOWN,
ConnectivityStatus.OPEN, "initialized", Instant.now())
.build();
final var startingData = BaseClientData.BaseClientDataBuilder.from(connection.getId(),
connection,
ConnectivityStatus.UNKNOWN,
ConnectivityStatus.OPEN,
"initialized",
Instant.now())
.build();
startWith(UNKNOWN, startingData);

onTransition(this::onTransition);
Expand All @@ -311,7 +313,8 @@ public void preStart() throws Exception {
// inform connection actor of my presence if there are other client actors
if (connection.getClientCount() > 1 && !dryRun) {
connectionActor.tell(getSelf(), getSelf());
startTimerWithFixedDelay(Control.REFRESH_CLIENT_ACTOR_REFS.name(), Control.REFRESH_CLIENT_ACTOR_REFS,
startTimerWithFixedDelay(Control.REFRESH_CLIENT_ACTOR_REFS.name(),
Control.REFRESH_CLIENT_ACTOR_REFS,
clientActorRefsNotificationDelay);
}
clientActorRefs.add(getSelf());
Expand Down Expand Up @@ -783,10 +786,12 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedSt
private FSM.State<BaseClientState, BaseClientData> publishMappedMessage(final PublishMappedMessage message,
final BaseClientData data) {

if (getPublisherActor() != null) {
getPublisherActor().forward(message.getOutboundSignal(), getContext());
final var publisherActor = getPublisherActor();
final var outboundSignal = message.getOutboundSignal();
if (publisherActor != null) {
publisherActor.forward(outboundSignal, getContext());
} else {
logger.withCorrelationId(message.getOutboundSignal().getSource())
logger.withCorrelationId(outboundSignal.getSource())
.error("No publisher actor available, dropping message: {}", message);
}
return stay();
Expand Down Expand Up @@ -1237,10 +1242,10 @@ private State<BaseClientState, BaseClientData> handleInitializationResult(
connectionLogger.success("Connection successful.");
data.getSessionSenders().forEach(origin -> origin.first().tell(new Status.Success(CONNECTED), getSelf()));
return goTo(CONNECTED).using(data.resetSession()
.resetFailureCount()
.setConnectionStatus(ConnectivityStatus.OPEN)
.setConnectionStatusDetails("Connected at " + Instant.now())
);
.resetFailureCount()
.setConnectionStatus(ConnectivityStatus.OPEN)
.setConnectionStatusDetails("Connected at " + Instant.now())
);
} else {
logger.info("Initialization of consumers, publisher and subscriptions failed: {}. Staying in CONNECTING " +
"state to continue with connection recovery after backoff.", initializationResult.getFailure());
Expand Down Expand Up @@ -1279,22 +1284,22 @@ protected CompletionStage<Status.Status> startConsumerActors(@Nullable final Cli
private State<BaseClientState, BaseClientData> clientDisconnected(final ClientDisconnected event,
final BaseClientData data) {

connectionLogger.success("Disconnected successfully.");
connectionLogger.success("Disconnected successfully.");

cleanupResourcesForConnection();
tellTunnelActor(SshTunnelActor.TunnelControl.STOP_TUNNEL);
data.getSessionSenders()
.forEach(sender -> sender.first().tell(new Status.Success(DISCONNECTED), getSelf()));
cleanupResourcesForConnection();
tellTunnelActor(SshTunnelActor.TunnelControl.STOP_TUNNEL);
data.getSessionSenders()
.forEach(sender -> sender.first().tell(new Status.Success(DISCONNECTED), getSelf()));

final BaseClientData nextStateData = data.resetSession()
.setConnectionStatus(ConnectivityStatus.CLOSED)
.setConnectionStatusDetails("Disconnected at " + Instant.now());
final BaseClientData nextStateData = data.resetSession()
.setConnectionStatus(ConnectivityStatus.CLOSED)
.setConnectionStatusDetails("Disconnected at " + Instant.now());

if (event.shutdownAfterDisconnected()) {
return stop(Normal(), nextStateData);
} else {
return goTo(DISCONNECTED).using(nextStateData);
}
if (event.shutdownAfterDisconnected()) {
return stop(Normal(), nextStateData);
} else {
return goTo(DISCONNECTED).using(nextStateData);
}
}

private void tellTunnelActor(final SshTunnelActor.TunnelControl control) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,9 +588,9 @@ protected void escalate(final Throwable error, final String description) {
* @return the configured auto-ack label if any exists, or an empty optional.
*/
protected Optional<AcknowledgementLabel> getAcknowledgementLabel(@Nullable final Target target) {
return Optional.ofNullable(target).flatMap(Target::getIssuedAcknowledgementLabel)
.flatMap(
ackLabel -> ConnectionValidator.resolveConnectionIdPlaceholder(connectionIdResolver, ackLabel));
return Optional.ofNullable(target)
.flatMap(Target::getIssuedAcknowledgementLabel)
.flatMap(ackLbl -> ConnectionValidator.resolveConnectionIdPlaceholder(connectionIdResolver, ackLbl));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ private AmqpPublisherActor(final Connection connection,
final Pair<SourceQueueWithComplete<Pair<ExternalMessage, AmqpMessageContext>>, UniqueKillSwitch> materialized =
Source.<Pair<ExternalMessage, AmqpMessageContext>>queue(config.getPublisherConfig().getMaxQueueSize(),
OverflowStrategy.dropNew())
.mapAsync(config.getPublisherConfig().getParallelism(), msg -> triggerPublishAsync(msg, jmsDispatcher))
.mapAsync(config.getPublisherConfig().getParallelism(),
msg -> triggerPublishAsync(msg, jmsDispatcher))
.recover(new PFBuilder<Throwable, Object>()
// the "Done" instance is not used, this just means to not fail the stream for any Throwables
.matchAny(x -> Done.getInstance())
Expand Down Expand Up @@ -158,7 +159,8 @@ private AmqpPublisherActor(final Connection connection,
*/
private static CompletableFuture<Object> triggerPublishAsync(
final Pair<ExternalMessage, AmqpMessageContext> messageToPublish,
final Executor jmsDispatcher) {
final Executor jmsDispatcher
) {
final ExternalMessage message = messageToPublish.first();
final AmqpMessageContext context = messageToPublish.second();
return CompletableFuture.supplyAsync(() -> context.onPublishMessage(message), jmsDispatcher)
Expand All @@ -178,9 +180,17 @@ private static CompletableFuture<Object> triggerPublishAsync(
* status.
* @return the Akka configuration Props object.
*/
static Props props(final Connection connection, final Session session, final ConnectionConfig connectionConfig,
final String clientId, final ConnectivityStatusResolver connectivityStatusResolver) {
return Props.create(AmqpPublisherActor.class, connection, session, connectionConfig, clientId,
static Props props(final Connection connection,
final Session session,
final ConnectionConfig connectionConfig,
final String clientId,
final ConnectivityStatusResolver connectivityStatusResolver) {

return Props.create(AmqpPublisherActor.class,
connection,
session,
connectionConfig,
clientId,
connectivityStatusResolver);
}

Expand Down Expand Up @@ -565,4 +575,5 @@ enum Control {
*/
START_PRODUCER
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.eclipse.ditto.connectivity.service.messaging.SendResult;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.signing.NoOpSigning;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.PreparedTimer;
import org.eclipse.ditto.json.JsonFactory;
Expand Down Expand Up @@ -320,10 +319,10 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
final int maxTotalMessageSize,
final int ackSizeQuota) {

final CompletableFuture<SendResult> resultFuture = new CompletableFuture<>();
final HttpRequest request = createRequest(publishTarget, message);
final HttpPushContext context = newContext(signal, autoAckTarget, request, message, maxTotalMessageSize,
ackSizeQuota, resultFuture);
final var resultFuture = new CompletableFuture<SendResult>();
final var request = createRequest(publishTarget, message);
final var context =
newContext(signal, autoAckTarget, request, message, maxTotalMessageSize, ackSizeQuota, resultFuture);
sourceQueue.offer(Pair.create(request, context))
.handle(handleQueueOfferResult(message, resultFuture));
return resultFuture;
Expand Down Expand Up @@ -388,22 +387,10 @@ private HttpPushContext newContext(final Signal<?> signal,
final CompletableFuture<SendResult> resultFuture) {

return tryResponse -> {
final Uri requestUri = stripUserInfo(request.getUri());

final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(message.getInternalHeaders());

if (tryResponse.isFailure()) {
final Throwable error = tryResponse.toEither().left().get();
final String errorDescription = MessageFormat.format("Failed to send HTTP request to <{0}>.",
requestUri);
l.info("Failed to send message due to <{}: {}>", error.getClass().getSimpleName(),
error.getMessage());
l.debug("Failed to send message <{}> due to <{}: {}>", message, error.getClass().getSimpleName(),
error.getMessage());
resultFuture.completeExceptionally(error);
escalate(error, errorDescription);
} else {
final HttpResponse response = tryResponse.toEither().right().get();
final var l = logger.withCorrelationId(message.getInternalHeaders());

if (tryResponse.isSuccess()) {
final var response = tryResponse.get();
l.info("Got response status <{}>", response.status());
l.debug("Sent message <{}>.", message);
l.debug("Got response <{} {} {}>", response.status(), response.getHeaders(),
Expand All @@ -415,8 +402,23 @@ private HttpPushContext newContext(final Signal<?> signal,
resultFuture.completeExceptionally(e);
return null;
});
} else {
final var failure = tryResponse.failed();
final var error = failure.get();
if (l.isDebugEnabled()) {
l.debug("Failed to send message <{}> due to <{}: {}>",
message,
error.getClass().getSimpleName(),
error.getMessage());
} else {
l.info("Failed to send message due to <{}: {}>",
error.getClass().getSimpleName(),
error.getMessage());
}
resultFuture.completeExceptionally(error);
escalate(error,
MessageFormat.format("Failed to send HTTP request to <{0}>.", stripUserInfo(request.getUri())));
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ abstract P mapExternalMessageToMqttMessage(String topic, MqttQos qos, boolean re
* @return the acknowledgement.
*/
protected SendResult buildResponse(final Signal<?> signal, @Nullable final Target target) {

final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final Optional<AcknowledgementLabel> autoAckLabel = getAcknowledgementLabel(target);
final Optional<EntityId> entityIdOptional =
Expand Down Expand Up @@ -167,14 +166,16 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
final int ackSizeQuota) {

try {
final String topic = determineTopic(publishTarget, message);
final MqttQos qos = determineQos(message.getHeaders(), publishTarget);
final boolean retain = determineMqttRetainFromHeaders(message.getHeaders());
final P mqttMessage = mapExternalMessageToMqttMessage(topic, qos, retain, message);

final MqttSendingContext<P> mqttSendingContext =
new MqttSendingContext<>(mqttMessage, signal, new CompletableFuture<>(), message, autoAckTarget);
return offerToSourceQueue(mqttSendingContext);
final var messageHeaders = message.getHeaders();
final var mqttMessage = mapExternalMessageToMqttMessage(determineTopic(messageHeaders, publishTarget),
determineQos(messageHeaders, publishTarget),
isMqttRetain(messageHeaders),
message);
return offerToSourceQueue(new MqttSendingContext<>(mqttMessage,
signal,
new CompletableFuture<>(),
message,
autoAckTarget));
} catch (final Exception e) {
return CompletableFuture.failedFuture(e);
}
Expand Down Expand Up @@ -216,34 +217,22 @@ private void publishMqttMessage(final MqttSendingContext<P> sendingContext) {
.thenAccept(sendResult -> sendingContext.getSendResult().complete(sendResult));
}

private String determineTopic(final MqttPublishTarget publishTarget, final ExternalMessage message) {
return Optional.of(message)
.map(ExternalMessage::getHeaders)
.flatMap(m -> Optional.ofNullable(m.get(MQTT_TOPIC.getName())))
.orElse(publishTarget.getTopic());
private static String determineTopic(final Map<String, String> headers, final MqttPublishTarget publishTarget) {
return headers.getOrDefault(MQTT_TOPIC.getName(), publishTarget.getTopic());
}

private static MqttQos determineQos(final Map<String, String> headers, final MqttPublishTarget publishTarget) {
return getMqttQosFromHeaders(headers).orElseGet(() -> AbstractMqttValidator.getHiveQoS(publishTarget.getQos()));
}

private static Optional<MqttQos> getMqttQosFromHeaders(final Map<String, String> headers) {
return Optional.ofNullable(headers.get(MQTT_QOS.getName()))
.flatMap(qosHeader -> {
try {
return Optional.of(Integer.parseInt(qosHeader));
} catch (final NumberFormatException nfe) {
return Optional.empty();
}
})
.map(MqttQos::fromCode);
int qosCode;
try {
qosCode = Integer.parseInt(headers.get(MQTT_QOS.getName()));
} catch (final NumberFormatException e) {
qosCode = publishTarget.getQos();
}
return AbstractMqttValidator.getHiveQoS(qosCode);
}

private boolean determineMqttRetainFromHeaders(final Map<String, String> headers) {
return Optional.of(headers)
.map(h -> h.get(MQTT_RETAIN.getName()))
.map(Boolean::parseBoolean)
.orElse(false);
private static boolean isMqttRetain(final Map<String, String> headers) {
return Boolean.parseBoolean(headers.get(MQTT_RETAIN.getName()));
}

private boolean isDryRun(final Object message) {
Expand Down

0 comments on commit a9b2d24

Please sign in to comment.