Skip to content

Commit

Permalink
review: don't assume ConnectivityStatus.FAILED in AmqpPublisherActor …
Browse files Browse the repository at this point in the history
…when producer is closed

* use ConnectivityStatusResolver instead
* improve "match()" check with equals predicate to matchEquals()
* added ProviderSecurityException to user-indicated-error list

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Sep 8, 2021
1 parent 46bdbd1 commit 652b001
Show file tree
Hide file tree
Showing 34 changed files with 262 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ public static ResourceStatus newClientStatus(final String client, final Connecti
* @param statusDetails the optional details about the connection status
* @param inStateSince the instant since the resource is in the given state
* @return a new AddressMetric which is initialised with the extracted data from {@code jsonObject}.
* @throws NullPointerException if any parameter is {@code null}.
* @throws NullPointerException if any non-nullable argument is {@code null}.
*/
public static ResourceStatus newClientStatus(final String client,
final ConnectivityStatus status,
@Nullable final String statusDetails,
final Instant inStateSince) {
@Nullable final Instant inStateSince) {

return ImmutableResourceStatus.of(ResourceStatus.ResourceType.CLIENT, client, status, null, statusDetails,
inStateSince);
Expand All @@ -249,8 +249,7 @@ public static ResourceStatus newClientStatus(final String client,
* @param statusDetails the optional details about the connection status
* @param inStateSince the instant since the resource is in the given state
* @return a new AddressMetric which is initialised with the extracted data from {@code jsonObject}.
* @throws NullPointerException if any parameter is {@code null}.
*
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 2.0.0
*/
public static ResourceStatus newSshTunnelStatus(final String client,
Expand Down Expand Up @@ -289,7 +288,7 @@ public static ResourceStatus newSourceStatus(final String client,
* @param statusDetails the optional details about the connection status
* @param inStatusSince the instant since the target is in the given state
* @return a new AddressMetric which is initialised with the extracted data from {@code jsonObject}.
* @throws NullPointerException if any parameter is {@code null}.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 2.1.0
*/
public static ResourceStatus newSourceStatus(final String client,
Expand Down Expand Up @@ -330,7 +329,7 @@ public static ResourceStatus newTargetStatus(final String client,
* @param statusDetails the optional details about the connection status
* @param inStatusSince the instant since the target is in the given state
* @return a new AddressMetric which is initialised with the extracted data from {@code jsonObject}.
* @throws NullPointerException if any parameter is {@code null}.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 2.1.0
*/
public static ResourceStatus newTargetStatus(final String client,
Expand Down Expand Up @@ -370,12 +369,12 @@ public static ResourceStatus newStatusUpdate(final String client,
* @param statusDetails the optional details about the connection status
* @param inStatusSince the instant since the resource is in the described status
* @return a new AddressMetric which is initialised with the extracted data from {@code jsonObject}.
* @throws NullPointerException if any parameter is {@code null}.
* @throws NullPointerException if any non-nullable argument is {@code null}.
*/
public static ResourceStatus newStatusUpdate(final String client, final ConnectivityStatus status,
@Nullable final String address,
@Nullable final String statusDetails,
final Instant inStatusSince) {
@Nullable final Instant inStatusSince) {

return ImmutableResourceStatus.of(ResourceStatus.ResourceType.UNKNOWN, client, status, address,
statusDetails, inStatusSince);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.connectivity.model;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -20,12 +22,12 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;

/**
* Immutable implementation of {@link ResourceStatus}.
Expand All @@ -44,10 +46,10 @@ private ImmutableResourceStatus(final ResourceType type, final String client, fi
@Nullable final String address,
@Nullable final String statusDetails,
@Nullable final Instant inStateSince) {
this.type = type;
this.client = client;
this.type = checkNotNull(type, "type");
this.client = checkNotNull(client, "client");
this.status = checkNotNull(status, "status");
this.address = address;
this.status = status;
this.statusDetails = statusDetails;
this.inStateSince = inStateSince;
}
Expand All @@ -62,8 +64,10 @@ private ImmutableResourceStatus(final ResourceType type, final String client, fi
* @param statusDetails the optional status details
* @param inStateSince the instant since the resource is in the given state
* @return a new instance of ImmutableResourceStatus
* @throws NullPointerException if any non-nullable argument is {@code null}.
*/
public static ImmutableResourceStatus of(final ResourceType type, final String client,
public static ImmutableResourceStatus of(final ResourceType type,
final String client,
final ConnectivityStatus status,
@Nullable final String address,
@Nullable final String statusDetails,
Expand All @@ -80,8 +84,10 @@ public static ImmutableResourceStatus of(final ResourceType type, final String c
* @param address an address describing the resource
* @param statusDetails the optional status details
* @return a new instance of ImmutableResourceStatus
* @throws NullPointerException if any non-nullable argument is {@code null}.
*/
public static ImmutableResourceStatus of(final ResourceType type, final String client,
public static ImmutableResourceStatus of(final ResourceType type,
final String client,
final ConnectivityStatus status,
@Nullable final String address,
@Nullable final String statusDetails) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ public abstract class BaseConsumerActor extends AbstractActorWithTimers {

@Nullable private ResourceStatus resourceStatus;

protected BaseConsumerActor(final Connection connection, final String sourceAddress,
final Sink<Object, ?> inboundMappingSink, final Source source,
protected BaseConsumerActor(final Connection connection,
final String sourceAddress,
final Sink<Object, ?> inboundMappingSink,
final Source source,
final ConnectivityStatusResolver connectivityStatusResolver) {
this.connectionId = checkNotNull(connection, "connection").getId();
this.sourceAddress = checkNotNull(sourceAddress, "sourceAddress");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public abstract class BasePublisherActor<T extends PublishTarget> extends Abstra
protected final ConnectivityConfig connectivityConfig;
protected final ConnectionConfig connectionConfig;
protected final ConnectionLogger connectionLogger;
protected final ConnectivityStatusResolver connectivityStatusResolver;

/**
* Common logger for all sub-classes of BasePublisherActor as its MDC already contains the connection ID.
Expand All @@ -119,7 +120,9 @@ public abstract class BasePublisherActor<T extends PublishTarget> extends Abstra
private final String clientId;
protected final ExpressionResolver connectionIdResolver;

protected BasePublisherActor(final Connection connection, final String clientId) {
protected BasePublisherActor(final Connection connection,
final String clientId,
final ConnectivityStatusResolver connectivityStatusResolver) {
this.connection = checkNotNull(connection, "connection");
this.clientId = checkNotNull(clientId, "clientId");
resourceStatusMap = new HashMap<>();
Expand All @@ -130,6 +133,7 @@ protected BasePublisherActor(final Connection connection, final String clientId)
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
final MonitoringLoggerConfig loggerConfig = monitoringConfig.logger();
this.connectionLogger = ConnectionLogger.getInstance(connection.getId(), loggerConfig);
this.connectivityStatusResolver = checkNotNull(connectivityStatusResolver, "connectivityStatusResolver");
connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(monitoringConfig);
responseDroppedMonitor = connectionMonitorRegistry.forResponseDropped(connection);
responsePublishedMonitor = connectionMonitorRegistry.forResponsePublished(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ protected CompletionStage<Status.Status> startPublisherActor() {
stopChildActor(amqpPublisherActor);
if (null != jmsSession) {
final Props props = AmqpPublisherActor.props(connection(), jmsSession,
connectionContext.getConnectivityConfig().getConnectionConfig(), getDefaultClientId());
connectionContext.getConnectivityConfig().getConnectionConfig(), getDefaultClientId(),
connectivityStatusResolver);
amqpPublisherActor = startChildActorConflictFree(AmqpPublisherActor.ACTOR_NAME_PREFIX, props);
Patterns.ask(amqpPublisherActor, AmqpPublisherActor.Control.INITIALIZE, clientAskTimeout)
.whenComplete((result, error) -> {
Expand Down Expand Up @@ -513,7 +514,7 @@ private FSM.State<BaseClientState, BaseClientData> handleConnectionFailure(
final ConnectivityStatus newStatus = connectivityStatusResolver.resolve(failure);

if (!statusReport.isRecoverable()) {
logger.debug("Unrecoverable failure occurred, triggering client actor failure handling: {}", failure);
logger.info("Unrecoverable failure occurred, triggering client actor failure handling: {}", failure);
getSelf().tell(failure, getSelf());
}

Expand Down Expand Up @@ -855,7 +856,7 @@ public void onProducerClosed(final MessageProducer producer, final Throwable cau
connectionLogger.failure("Producer {0} was closed: {1}", producer.toString(), cause.getMessage());
logger.warning("Producer <{}> closed due to {}: {}", producer, cause.getClass().getSimpleName(),
cause.getMessage());
self.tell(ProducerClosedStatusReport.get(producer), ActorRef.noSender());
self.tell(ProducerClosedStatusReport.get(producer, cause), ActorRef.noSender());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.Enforcement;
import org.eclipse.ditto.connectivity.model.EnforcementFilterFactory;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
Expand Down Expand Up @@ -162,7 +161,7 @@ public Receive createReceive() {
.match(JmsMessage.class, this::handleJmsMessage)
.match(ResourceStatus.class, this::handleAddressStatus)
.match(RetrieveAddressStatus.class, ras -> getSender().tell(getCurrentSourceStatus(), getSelf()))
.match(Control.class, Control.CREATE_CONSUMER::equals, this::createMessageConsumer)
.matchEquals(Control.CREATE_CONSUMER, this::createMessageConsumer)
.match(ConsumerClosedStatusReport.class, this::matchesOwnConsumer, this::handleConsumerClosed)
.match(ConsumerClosedStatusReport.class, this::handleNonMatchingConsumerClosed)
.match(CreateMessageConsumerResponse.class, this::messageConsumerCreated)
Expand Down Expand Up @@ -274,14 +273,15 @@ private void handleConsumerClosed(final ConsumerClosedStatusReport event) {
final String statusDetails = buildStatusDetailsFromStatusReport(event);
final ResourceStatus addressStatus =
ConnectivityModelFactory.newStatusUpdate(InstanceIdentifierSupplier.getInstance().get(),
ConnectivityStatus.MISCONFIGURED, sourceAddress, statusDetails, Instant.now());
connectivityStatusResolver.resolve(event.getCause()), sourceAddress, statusDetails,
Instant.now());
handleAddressStatus(addressStatus);

// destroy current message consumer in any case
destroyMessageConsumer();

log.info("Consumer for destination '{}' was closed. Will try to recreate after some backoff.", sourceAddress);
backOffActor.tell(BackOffActor.createBackOffWithAnswerMessage(AmqpConsumerActor.Control.CREATE_CONSUMER),
backOffActor.tell(BackOffActor.createBackOffWithAnswerMessage(Control.CREATE_CONSUMER),
getSelf());
}

Expand Down Expand Up @@ -541,10 +541,10 @@ private CreateMessageConsumerResponse(final ConsumerData consumerData, final Mes
/**
* Actor control messages.
*/
enum Control {
private enum Control {
/**
* Triggers creation of a new message consumer.
*/
CREATE_CONSUMER;
CREATE_CONSUMER
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.eclipse.ditto.connectivity.service.config.Amqp10Config;
import org.eclipse.ditto.connectivity.service.config.ConnectionConfig;
import org.eclipse.ditto.connectivity.service.messaging.BasePublisherActor;
import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver;
import org.eclipse.ditto.connectivity.service.messaging.SendResult;
import org.eclipse.ditto.connectivity.service.messaging.amqp.status.ProducerClosedStatusReport;
import org.eclipse.ditto.connectivity.service.messaging.backoff.BackOffActor;
Expand Down Expand Up @@ -110,10 +111,13 @@ public final class AmqpPublisherActor extends BasePublisherActor<AmqpTarget> {
private boolean isInBackOffMode;

@SuppressWarnings("unused")
private AmqpPublisherActor(final Connection connection, final Session session,
final ConnectionConfig connectionConfig, final String clientId) {
private AmqpPublisherActor(final Connection connection,
final Session session,
final ConnectionConfig connectionConfig,
final String clientId,
final ConnectivityStatusResolver connectivityStatusResolver) {

super(connection, clientId);
super(connection, clientId, connectivityStatusResolver);
this.session = checkNotNull(session, "session");

final Executor jmsDispatcher = JMSConnectionHandlingActor.getOwnDispatcher(getContext().system());
Expand Down Expand Up @@ -170,18 +174,21 @@ private static CompletableFuture<Object> triggerPublishAsync(
* @param session the jms session
* @param connectionConfig configuration for all connections.
* @param clientId identifier of the client actor.
* @param connectivityStatusResolver connectivity status resolver to resolve occurred exceptions to a connectivity
* status.
* @return the Akka configuration Props object.
*/
static Props props(final Connection connection, final Session session, final ConnectionConfig connectionConfig,
final String clientId) {
return Props.create(AmqpPublisherActor.class, connection, session, connectionConfig, clientId);
final String clientId, final ConnectivityStatusResolver connectivityStatusResolver) {
return Props.create(AmqpPublisherActor.class, connection, session, connectionConfig, clientId,
connectivityStatusResolver);
}

@Override
protected void preEnhancement(final ReceiveBuilder receiveBuilder) {
receiveBuilder.match(ProducerClosedStatusReport.class, this::handleProducerClosedStatusReport)
.match(Control.class, Control.START_PRODUCER::equals, this::handleStartProducer)
.match(Control.class, Control.INITIALIZE::equals, this::initialize);
.matchEquals(Control.START_PRODUCER, this::handleStartProducer)
.matchEquals(Control.INITIALIZE, this::initialize);
}

private void initialize(final Control initialize) {
Expand All @@ -197,6 +204,7 @@ private void initialize(final Control initialize) {
private void handleProducerClosedStatusReport(final ProducerClosedStatusReport report) {
if (!isInBackOffMode) {
final MessageProducer producer = report.getMessageProducer();
final Throwable cause = report.getCause();
final String genericLogInfo = "Will try to re-establish the static targets after some cool-down period.";
logger.info("Got closed AMQP 1.0 producer '{}'. {}", producer, genericLogInfo);
connectionLogger.failure("Targets were closed due to an error in the target. {0}", genericLogInfo);
Expand All @@ -209,8 +217,10 @@ private void handleProducerClosedStatusReport(final ProducerClosedStatusReport r
backOff();
// update resource status of closed targets
final String statusDetails =
String.format("Producer for destination '%s' closed due to error.", destination);
updateTargetResourceStatusForDestination(destination, ConnectivityStatus.FAILED, statusDetails);
String.format("Producer for destination '%s' was closed.", destination);

updateTargetResourceStatusForDestination(destination,
connectivityStatusResolver.resolve(cause), statusDetails);
});

// dynamic targets are not recreated, they are opened on-demand with the next message, no need to backoff
Expand Down Expand Up @@ -551,6 +561,6 @@ enum Control {
/**
* Message to trigger the creation of the static message producers.
*/
START_PRODUCER;
START_PRODUCER
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ public boolean isRecoverable() {

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ConnectionFailureStatusReport that = (ConnectionFailureStatusReport) o;
return recoverable == that.recoverable && Objects.equals(failure, that.failure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ public Throwable getCause() {

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ConsumerClosedStatusReport that = (ConsumerClosedStatusReport) o;
return Objects.equals(messageConsumer, that.messageConsumer) &&
Objects.equals(cause, that.cause);
Expand Down
Loading

0 comments on commit 652b001

Please sign in to comment.