Skip to content

Commit

Permalink
Issue #350: Split functionality of ConfigUtil into custom classes.
Browse files Browse the repository at this point in the history
Signed-off-by: Juergen Fickel <juergen.fickel@bosch-si.com>
  • Loading branch information
Juergen Fickel committed May 9, 2019
1 parent 4e01034 commit 91f2d63
Show file tree
Hide file tree
Showing 21 changed files with 500 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.eclipse.ditto.services.utils.cluster.ClusterStatusSupplier;
import org.eclipse.ditto.services.utils.cluster.ClusterUtil;
import org.eclipse.ditto.services.utils.cluster.RetrieveStatisticsDetailsResponseSupplier;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.config.LocalHostAddressSupplier;
import org.eclipse.ditto.services.utils.health.DefaultHealthCheckingActorFactory;
import org.eclipse.ditto.services.utils.health.HealthCheckingActorOptions;
import org.eclipse.ditto.services.utils.health.config.HealthCheckConfig;
Expand Down Expand Up @@ -243,7 +243,7 @@ private void bindHttpStatusRoute(final ActorRef healthCheckingActor, final HttpC

String hostname = httpConfig.getHostname();
if (hostname.isEmpty()) {
hostname = ConfigUtil.getLocalHostAddress();
hostname = LocalHostAddressSupplier.getInstance().get();
log.info("No explicit hostname configured, using HTTP hostname: {}", hostname);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public ActorRef startEnforcerActor(final ActorContext context, final ConciergeCo
final ActorRef enforcerShardRegion = startShardRegion(context.system(), clusterConfig, enforcerProps);

// start cache updaters
final String instanceIndex = ConfigUtil.instanceIdentifier();
final String instanceIndex = InstanceIdentifierSupplier.getInstance().get();
final Props policyCacheUpdateActorProps =
PolicyCacheUpdateActor.props(policyEnforcerCache, pubSubMediator, instanceIndex);
context.actorOf(policyCacheUpdateActorProps, PolicyCacheUpdateActor.ACTOR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.eclipse.ditto.services.connectivity.messaging.metrics.ConnectivityCounterRegistry;
import org.eclipse.ditto.services.models.connectivity.OutboundSignal;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.services.utils.protocol.ProtocolAdapterProvider;
import org.eclipse.ditto.services.utils.protocol.config.ProtocolConfig;
import org.eclipse.ditto.signals.base.Signal;
Expand Down Expand Up @@ -684,7 +684,7 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
});

final ResourceStatus clientStatus =
ConnectivityModelFactory.newClientStatus(ConfigUtil.instanceIdentifier(),
ConnectivityModelFactory.newClientStatus(getInstanceIdentifier(),
data.getConnectionStatus(),
"[" + stateName().name() + "] " + data.getConnectionStatusDetails().orElse(""),
getInConnectionStatusSince());
Expand All @@ -693,13 +693,17 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
return stay();
}

private static String getInstanceIdentifier() {
return InstanceIdentifierSupplier.getInstance().get();
}

private FSM.State<BaseClientState, BaseClientData> retrieveConnectionMetrics(
final RetrieveConnectionMetrics command, final BaseClientData data) {

LogUtil.enhanceLogWithCorrelationId(log, command);
log.debug("Received RetrieveConnectionMetrics message, gathering metrics.");
final DittoHeaders dittoHeaders = command.getDittoHeaders().toBuilder()
.source(ConfigUtil.instanceIdentifier())
.source(getInstanceIdentifier())
.build();

final SourceMetrics sourceMetrics = ConnectivityCounterRegistry.aggregateSourceMetrics(connectionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.eclipse.ditto.model.connectivity.ResourceStatus;
import org.eclipse.ditto.services.connectivity.messaging.metrics.ConnectionMetricsCollector;
import org.eclipse.ditto.services.connectivity.messaging.metrics.ConnectivityCounterRegistry;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
Expand All @@ -49,27 +49,31 @@ protected BaseConsumerActor(final String connectionId, final String sourceAddres
this.messageMappingProcessor = checkNotNull(messageMappingProcessor, "messageMappingProcessor");
this.authorizationContext = checkNotNull(authorizationContext, "authorizationContext");
this.headerMapping = headerMapping;
resourceStatus = ConnectivityModelFactory.newSourceStatus(ConfigUtil.instanceIdentifier(),
resourceStatus = ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
ConnectivityStatus.OPEN, sourceAddress,"Started at " + Instant.now());

inboundCounter = ConnectivityCounterRegistry.getInboundConsumedCounter(connectionId, sourceAddress);
}

protected ResourceStatus getCurrentSourceStatus() {

return ConnectivityModelFactory.newSourceStatus(ConfigUtil.instanceIdentifier(),
return ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
resourceStatus != null ? resourceStatus.getStatus() : ConnectivityStatus.UNKNOWN,
sourceAddress,
resourceStatus != null ? resourceStatus.getStatusDetails().orElse(null) : null);
}

protected void handleAddressStatus(final ResourceStatus resourceStatus) {
if (resourceStatus.getResourceType() == ResourceStatus.ResourceType.UNKNOWN) {
this.resourceStatus = ConnectivityModelFactory.newSourceStatus(ConfigUtil.instanceIdentifier(),
this.resourceStatus = ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
resourceStatus.getStatus(), sourceAddress,
resourceStatus.getStatusDetails().orElse(null));
} else {
this.resourceStatus = resourceStatus;
}
}

private static String getInstanceIdentifier() {
return InstanceIdentifierSupplier.getInstance().get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.eclipse.ditto.services.models.connectivity.placeholder.ThingPlaceholder;
import org.eclipse.ditto.services.models.connectivity.placeholder.TopicPathPlaceholder;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.signals.base.Signal;

import akka.actor.AbstractActor;
Expand All @@ -66,29 +66,33 @@ public abstract class BasePublisherActor<T extends PublishTarget> extends Abstra
protected final Set<Target> targets;
protected final Map<Target, ResourceStatus> resourceStatusMap;

private ConnectionMetricsCollector responseDroppedCounter;
private ConnectionMetricsCollector responsePublishedCounter;

private final ConnectionMetricsCollector responseDroppedCounter;
private final ConnectionMetricsCollector responsePublishedCounter;

protected BasePublisherActor(final String connectionId, final Set<Target> targets) {
this.connectionId = checkNotNull(connectionId, "connectionId");
this.targets = checkNotNull(targets, "targets");
resourceStatusMap = new HashMap<>();
final Instant now = Instant.now();
targets.forEach(target ->
resourceStatusMap.put(target, ConnectivityModelFactory.newTargetStatus(ConfigUtil.instanceIdentifier(),
ConnectivityStatus.OPEN, target.getAddress(), "Started at " + now)));
resourceStatusMap.put(target,
ConnectivityModelFactory.newTargetStatus(getInstanceIdentifier(), ConnectivityStatus.OPEN,
target.getAddress(), "Started at " + now)));
responseDroppedCounter = ConnectivityCounterRegistry.getResponseDroppedCounter(this.connectionId);
responsePublishedCounter = ConnectivityCounterRegistry.getResponsePublishedCounter(connectionId);
}

private static String getInstanceIdentifier() {
return InstanceIdentifierSupplier.getInstance().get();
}

@Override
public Receive createReceive() {
final ReceiveBuilder receiveBuilder = receiveBuilder();
preEnhancement(receiveBuilder);

receiveBuilder
.match(OutboundSignal.WithExternalMessage.class, this::isResponseOrError, outbound -> {
.match(OutboundSignal.WithExternalMessage.class, BasePublisherActor::isResponseOrError, outbound -> {
final ExternalMessage response = outbound.getExternalMessage();
final String correlationId = response.getHeaders().get(CORRELATION_ID.getKey());
LogUtil.enhanceLogWithCorrelationId(log(), correlationId);
Expand Down Expand Up @@ -147,11 +151,10 @@ public Receive createReceive() {
private Collection<ResourceStatus> getCurrentTargetStatus() {
if (resourceStatusMap.isEmpty()) {
return Collections.singletonList(
ConnectivityModelFactory.newTargetStatus(ConfigUtil.instanceIdentifier(), ConnectivityStatus.UNKNOWN,
null, null));
} else {
return resourceStatusMap.values();
ConnectivityModelFactory.newTargetStatus(getInstanceIdentifier(), ConnectivityStatus.UNKNOWN, null,
null));
}
return resourceStatusMap.values();
}

/**
Expand Down Expand Up @@ -205,8 +208,8 @@ protected abstract void publishMessage(@Nullable final Target target, final T pu
* @param outboundSignal the OutboundSignal to check.
* @return {@code true} if the OutboundSignal is a response or an error, {@code false} otherwise
*/
private boolean isResponseOrError(final OutboundSignal.WithExternalMessage outboundSignal) {
return (outboundSignal.getExternalMessage().isResponse() || outboundSignal.getExternalMessage().isError());
private static boolean isResponseOrError(final OutboundSignal.WithExternalMessage outboundSignal) {
return outboundSignal.getExternalMessage().isResponse() || outboundSignal.getExternalMessage().isError();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.eclipse.ditto.services.models.connectivity.OutboundSignal;
import org.eclipse.ditto.services.models.connectivity.OutboundSignalFactory;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.services.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
Expand Down Expand Up @@ -795,7 +795,7 @@ private void respondWithEmptyStatus(final RetrieveConnectionStatus command, fina
log.debug("ClientActor not started, responding with empty connection status with status closed.");
final RetrieveConnectionStatusResponse statusResponse =
RetrieveConnectionStatusResponse.closedResponse(connectionId,
ConfigUtil.instanceIdentifier(),
InstanceIdentifierSupplier.getInstance().get(),
connectionClosedAt == null ? Instant.EPOCH : connectionClosedAt,
ConnectivityStatus.CLOSED,
"[" + BaseClientState.DISCONNECTED + "] connection is closed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.eclipse.ditto.services.connectivity.messaging.internal.DisconnectClient;
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.services.utils.protocol.config.ProtocolConfig;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;

Expand Down Expand Up @@ -450,7 +450,7 @@ private FSM.State<BaseClientState, BaseClientData> handleStatusReport(final Stat
final ActorRef consumerActor = consumerByNamePrefix.get(c.getActorNamePrefix());
if (consumerActor != null) {
final Object message = ConnectivityModelFactory.newStatusUpdate(
ConfigUtil.instanceIdentifier(),
InstanceIdentifierSupplier.getInstance().get(),
ConnectivityStatus.FAILED,
c.getAddress(),
"Consumer closed", Instant.now());
Expand Down Expand Up @@ -532,47 +532,38 @@ static class JmsDisconnected extends AbstractWithOrigin implements ClientDisconn
*/
private static final class StatusReport {

private final boolean consumedMessage;
private final boolean connectionRestored;
@Nullable private final ConnectionFailure failure;
@Nullable private final MessageConsumer closedConsumer;
@Nullable private final MessageProducer closedProducer;

private StatusReport(final boolean consumedMessage,
final boolean connectionRestored,
private StatusReport(final boolean connectionRestored,
@Nullable final ConnectionFailure failure,
@Nullable final MessageConsumer closedConsumer,
@Nullable final MessageProducer closedProducer) {

this.consumedMessage = consumedMessage;
this.connectionRestored = connectionRestored;
this.failure = failure;
this.closedConsumer = closedConsumer;
this.closedProducer = closedProducer;
}

private static StatusReport connectionRestored() {
return new StatusReport(false, true, null, null, null);
return new StatusReport(true, null, null, null);
}

private static StatusReport failure(final ConnectionFailure failure) {
return new StatusReport(false, false, failure, null, null);
return new StatusReport(false, failure, null, null);
}

private static StatusReport consumedMessage() {
return new StatusReport(true, false, null, null, null);
return new StatusReport(false, null, null, null);
}

private static StatusReport consumerClosed(final MessageConsumer consumer) {
return new StatusReport(false, false, null, consumer, null);
return new StatusReport(false, null, consumer, null);
}

private static StatusReport producerClosed(final MessageProducer producer) {
return new StatusReport(false, false, null, null, producer);
}

private boolean hasConsumedMessage() {
return consumedMessage;
return new StatusReport(false, null, null, producer);
}

private boolean isConnectionRestored() {
Expand All @@ -587,10 +578,6 @@ private Optional<MessageConsumer> getClosedConsumer() {
return Optional.ofNullable(closedConsumer);
}

private Optional<MessageProducer> getClosedProducer() {
return Optional.ofNullable(closedProducer);
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.services.utils.protocol.config.ProtocolConfig;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;

Expand Down Expand Up @@ -576,7 +576,7 @@ public void handleRecoverOk(final String consumerTag) {
}

private void updateSourceStatus(final ConnectivityStatus connectionStatus, final String statusDetails) {
consumerActor.tell(ConnectivityModelFactory.newStatusUpdate(ConfigUtil.instanceIdentifier(),
consumerActor.tell(ConnectivityModelFactory.newStatusUpdate(InstanceIdentifierSupplier.getInstance().get(),
connectionStatus, address, statusDetails, Instant.now()), ActorRef.noSender());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.eclipse.ditto.services.connectivity.messaging.metrics.ConnectionMetricsCollector;
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;

import com.newmotion.akka.rabbitmq.ChannelCreated;
import com.newmotion.akka.rabbitmq.ChannelMessage;
Expand Down Expand Up @@ -115,7 +115,7 @@ protected void preEnhancement(final ReceiveBuilder receiveBuilder) {
resourceStatusMap.put(
target,
ConnectivityModelFactory.newTargetStatus(
ConfigUtil.instanceIdentifier(),
InstanceIdentifierSupplier.getInstance().get(),
ConnectivityStatus.FAILED,
target.getAddress(),
"Exchange '" + exchange + "' was missing at " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
import org.eclipse.ditto.services.utils.cluster.ClusterStatusSupplier;
import org.eclipse.ditto.services.utils.cluster.ClusterUtil;
import org.eclipse.ditto.services.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.services.utils.config.LocalHostAddressSupplier;
import org.eclipse.ditto.services.utils.health.DefaultHealthCheckingActorFactory;
import org.eclipse.ditto.services.utils.health.HealthCheckingActorOptions;
import org.eclipse.ditto.services.utils.health.config.HealthCheckConfig;
Expand Down Expand Up @@ -339,8 +340,8 @@ private CompletionStage<ServerBinding> getHttpBinding(final HttpConfig httpConfi

String hostname = httpConfig.getHostname();
if (hostname.isEmpty()) {
hostname = ConfigUtil.getLocalHostAddress();
log.info("No explicit hostname configured, using HTTP hostname: {}", hostname);
hostname = LocalHostAddressSupplier.getInstance().get();
log.info("No explicit hostname configured, using HTTP hostname <{}>.", hostname);
}

return Http.get(actorSystem).bindAndHandle(
Expand Down
Loading

0 comments on commit 91f2d63

Please sign in to comment.