Skip to content

Commit

Permalink
Issue #106: Fixed some code smells.
Browse files Browse the repository at this point in the history
Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Nov 12, 2021
1 parent 83be45a commit 330613d
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import javax.annotation.Nonnull;

import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.ShutdownReason;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.things.model.ThingId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,17 +32,16 @@
*/
public final class ShutdownBehaviour {

private static final Logger LOG = LoggerFactory.getLogger(ShutdownBehaviour.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownBehaviour.class);

private final String namespace;
private final EntityId entityId;

private final ActorRef self;

private ShutdownBehaviour(final String namespace, final EntityId entityId, final ActorRef self) {
this.namespace = namespace;
this.entityId = entityId;
this.self = self;
this.namespace = checkNotNull(namespace, "namespace");
this.entityId = checkNotNull(entityId, "entityId");
this.self = checkNotNull(self, "self");
}

/**
Expand All @@ -57,7 +52,8 @@ private ShutdownBehaviour(final String namespace, final EntityId entityId, final
* @param self reference of the actor itself.
* @return the actor behavior.
*/
public static ShutdownBehaviour fromId(final NamespacedEntityId entityId, final ActorRef pubSubMediator,
public static ShutdownBehaviour fromId(final NamespacedEntityId entityId,
final ActorRef pubSubMediator,
final ActorRef self) {

checkNotNull(entityId, "entityId");
Expand All @@ -72,10 +68,11 @@ public static ShutdownBehaviour fromId(final NamespacedEntityId entityId, final
* @param self reference of the actor itself.
* @return the actor behavior.
*/
public static ShutdownBehaviour fromIdWithoutNamespace(final EntityId entityId, final ActorRef pubSubMediator,
public static ShutdownBehaviour fromIdWithoutNamespace(final EntityId entityId,
final ActorRef pubSubMediator,
final ActorRef self) {

return fromIdWithNamespace(checkNotNull(entityId, "entityId"), pubSubMediator, self, "");
return fromIdWithNamespace(entityId, pubSubMediator, self, "");
}

/**
Expand All @@ -87,13 +84,13 @@ public static ShutdownBehaviour fromIdWithoutNamespace(final EntityId entityId,
* @param namespace the namespace of the actor.
* @return the actor behavior.
*/
public static ShutdownBehaviour fromIdWithNamespace(@Nonnull final EntityId entityId,
final ActorRef pubSubMediator, final ActorRef self, final String namespace) {
checkNotNull(pubSubMediator, "pubSubMediator");
checkNotNull(self, "self");
checkNotNull(namespace, "namespace");
final ShutdownBehaviour shutdownBehaviour = new ShutdownBehaviour(namespace, entityId, self);
shutdownBehaviour.subscribePubSub(pubSubMediator);
public static ShutdownBehaviour fromIdWithNamespace(final EntityId entityId,
final ActorRef pubSubMediator,
final ActorRef self,
final String namespace) {

final var shutdownBehaviour = new ShutdownBehaviour(namespace, entityId, self);
shutdownBehaviour.subscribePubSub(checkNotNull(pubSubMediator, "pubSubMediator"));
return shutdownBehaviour;
}

Expand All @@ -102,7 +99,7 @@ private void subscribePubSub(final ActorRef pubSubMediator) {
}

/**
* Create a new receive builder matching on messages handled by this actor.
* Create a new {@code ReceiveBuilder} matching on messages handled by this actor.
*
* @return new receive builder.
*/
Expand All @@ -113,15 +110,16 @@ public ReceiveBuilder createReceive() {
}

private void shutdown(final Shutdown shutdown) {
final ShutdownReason shutdownReason = shutdown.getReason();
final var shutdownReason = shutdown.getReason();

if (shutdownReason.isRelevantFor(namespace) || shutdownReason.isRelevantFor(entityId)) {
LOG.info("Shutting down <{}> due to <{}>.", self, shutdown);
LOGGER.info("Shutting down <{}> due to <{}>.", self, shutdown);
self.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
}

private void subscribeAck(final DistributedPubSubMediator.SubscribeAck ack) {
// do nothing
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.connectivity.model.ConnectionId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta

private Sink<Object, NotUsed> inboundMappingSink;
private ActorRef outboundDispatchingActor;
private ActorRef outboundMappingProcessorActor;
private ActorRef subscriptionManager;
private ActorRef tunnelActor;

Expand Down Expand Up @@ -324,12 +323,10 @@ protected ConnectivityConfig connectivityConfig() {
* Initialize child actors.
*/
protected void init() {
final Pair<ActorRef, ActorRef> actorPair = startOutboundActors(protocolAdapter);
final var actorPair = startOutboundActors(protocolAdapter);
outboundDispatchingActor = actorPair.first();
outboundMappingProcessorActor = actorPair.second();

final Sink<Object, NotUsed> inboundDispatchingSink =
getInboundDispatchingSink(connection, protocolAdapter, outboundMappingProcessorActor);
final var inboundDispatchingSink = getInboundDispatchingSink(connection, protocolAdapter, actorPair.second());
inboundMappingSink = getInboundMappingSink(protocolAdapter, inboundDispatchingSink);
subscriptionManager = startSubscriptionManager(proxyActorSelection, connectivityConfig().getClientConfig());

Expand Down Expand Up @@ -1748,9 +1745,15 @@ private Sink<Object, NotUsed> getInboundDispatchingSink(final Connection connect
final ProtocolAdapter protocolAdapter,
final ActorRef outboundMappingProcessorActor) {

return InboundDispatchingSink.createSink(connection, protocolAdapter.headerTranslator(), proxyActorSelection,
connectionActor, outboundMappingProcessorActor, getSelf(), getContext(),
getContext().system().settings().config());
final var actorContext = getContext();
return InboundDispatchingSink.createSink(connection,
protocolAdapter.headerTranslator(),
proxyActorSelection,
connectionActor,
outboundMappingProcessorActor,
getSelf(),
actorContext,
actorContext.system().settings().config());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public final class AcknowledgementAggregatorActor extends AbstractActorWithTimer
private final Consumer<Object> responseSignalConsumer;
private final Duration timeout;

@SuppressWarnings("java:S1144")
private AcknowledgementAggregatorActor(final EntityId entityId,
final DittoHeaders dittoHeaders,
final Duration maxTimeout,
Expand Down
4 changes: 4 additions & 0 deletions internal/models/signalenrichment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-akka</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signal</artifactId>
</dependency>

<!-- test-only -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithResource;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cache.Cache;
Expand Down Expand Up @@ -137,15 +138,16 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,
* @param minAcceptableSeqNr minimum sequence number of the concerned signals to not invalidate the cache.
* @return future that completes with the parts of a thing or fails with an error.
*/
@SuppressWarnings("java:S1612")
public CompletionStage<JsonObject> retrievePartialThing(final EntityId thingId,
final JsonFieldSelector jsonFieldSelector,
final DittoHeaders dittoHeaders,
final Collection<? extends Signal<?>> concernedSignals,
final long minAcceptableSeqNr) {

final List<ThingEvent<?>> thingEvents = concernedSignals.stream()
.filter(signal -> (signal instanceof ThingEvent) && !(ProtocolAdapter.isLiveSignal(signal)))
.map(event -> (ThingEvent<?>) event)
.filter(signal -> signal instanceof ThingEvent && !SignalInformationPoint.isChannelLive(signal))
.map(signal -> (ThingEvent<?>) signal)
.collect(Collectors.toList());

// as second step only return what was originally requested as fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

import java.util.UUID;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.policies.api.commands.sudo.SudoRetrievePolicy;
Expand All @@ -35,25 +33,21 @@ private PolicyCommandFactory() {
* Creates a sudo command for retrieving a policy.
*
* @param policyId the policyId.
* @param context the context to apply when doing the cache lookup.
* @return the created command.
*/
static SudoRetrievePolicy sudoRetrievePolicy(final EntityId policyId, @Nullable final EnforcementContext context) {
return sudoRetrievePolicy(PolicyId.of(policyId), context);
static SudoRetrievePolicy sudoRetrievePolicy(final EntityId policyId) {
return sudoRetrievePolicy(PolicyId.of(policyId));
}

/**
* Creates a sudo command for retrieving a policy.
*
* @param policyId the policyId.
* @param context the context to apply when doing the cache lookup.
* @return the created command.
*/
static SudoRetrievePolicy sudoRetrievePolicy(final PolicyId policyId, @Nullable final EnforcementContext context) {
static SudoRetrievePolicy sudoRetrievePolicy(final PolicyId policyId) {
return SudoRetrievePolicy.of(policyId,
DittoHeaders.newBuilder()
.correlationId("sudoRetrievePolicy-" + UUID.randomUUID())
.build());
DittoHeaders.newBuilder().correlationId("sudoRetrievePolicy-" + UUID.randomUUID()).build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@
*/
package org.eclipse.ditto.internal.utils.cacheloaders;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
Expand Down Expand Up @@ -52,17 +48,15 @@ public final class PolicyEnforcerCacheLoader implements AsyncCacheLoader<Enforce
* @param policiesShardRegionProxy the shard-region-proxy.
*/
public PolicyEnforcerCacheLoader(final AskWithRetryConfig askWithRetryConfig,
final Scheduler scheduler, final ActorRef policiesShardRegionProxy) {
requireNonNull(askWithRetryConfig);
requireNonNull(policiesShardRegionProxy);

final BiFunction<EntityId, EnforcementContext, Command<?>> commandCreator =
PolicyCommandFactory::sudoRetrievePolicy;
final BiFunction<Object, EnforcementContext, Entry<PolicyEnforcer>> responseTransformer =
PolicyEnforcerCacheLoader::handleSudoRetrievePolicyResponse;
final Scheduler scheduler,
final ActorRef policiesShardRegionProxy) {

delegate = ActorAskCacheLoader.forShard(askWithRetryConfig, scheduler, PolicyConstants.ENTITY_TYPE,
policiesShardRegionProxy, commandCreator, responseTransformer);
delegate = ActorAskCacheLoader.forShard(askWithRetryConfig,
scheduler,
PolicyConstants.ENTITY_TYPE,
policiesShardRegionProxy,
(entityId, enforcementContext) -> PolicyCommandFactory.sudoRetrievePolicy(entityId),
PolicyEnforcerCacheLoader::handleSudoRetrievePolicyResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

import java.util.UUID;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -39,22 +37,20 @@ private ThingCommandFactory() {
* Creates a sudo command for retrieving a thing.
*
* @param thingId the thingId.
* @param context the context to apply when doing the cache lookup.
* @return the created command.
*/
static SudoRetrieveThing sudoRetrieveThing(final EntityId thingId, @Nullable final EnforcementContext context) {
return sudoRetrieveThing(ThingId.of(thingId), context);
static SudoRetrieveThing sudoRetrieveThing(final EntityId thingId) {
return sudoRetrieveThing(ThingId.of(thingId));
}

/**
* Creates a sudo command for retrieving a thing.
*
* @param thingId the thingId.
* @param context the context to apply when doing the cache lookup.
* @return the created command.
*/
static SudoRetrieveThing sudoRetrieveThing(final ThingId thingId, @Nullable final EnforcementContext context) {
LOGGER.debug("Sending SudoRetrieveThing for Thing with ID <{}>", thingId);
static SudoRetrieveThing sudoRetrieveThing(final ThingId thingId) {
LOGGER.debug("Sending SudoRetrieveThing for Thing with ID <{}>.", thingId);
return SudoRetrieveThing.withOriginalSchemaVersion(thingId, DittoHeaders.newBuilder()
.correlationId("sudoRetrieveThing-" + UUID.randomUUID())
.putHeader(DittoHeaderDefinition.DITTO_RETRIEVE_DELETED.getKey(), Boolean.TRUE.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Supplier;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.api.persistence.PersistenceLifecycle;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
Expand Down Expand Up @@ -52,14 +50,15 @@ public final class ThingEnforcementIdCacheLoader
* @param shardRegionProxy the shard-region-proxy.
*/
public ThingEnforcementIdCacheLoader(final AskWithRetryConfig askWithRetryConfig,
final Scheduler scheduler, final ActorRef shardRegionProxy) {
final BiFunction<EntityId, EnforcementContext, Command<?>> commandCreator =
ThingCommandFactory::sudoRetrieveThing;
final BiFunction<Object, EnforcementContext, Entry<EnforcementCacheKey>> responseTransformer =
ThingEnforcementIdCacheLoader::handleSudoRetrieveThingResponse;
final Scheduler scheduler,
final ActorRef shardRegionProxy) {

delegate = ActorAskCacheLoader.forShard(askWithRetryConfig, scheduler, ThingConstants.ENTITY_TYPE,
shardRegionProxy, commandCreator, responseTransformer);
delegate = ActorAskCacheLoader.forShard(askWithRetryConfig,
scheduler,
ThingConstants.ENTITY_TYPE,
shardRegionProxy,
(entityId, enforcementContext) -> ThingCommandFactory.sudoRetrieveThing(entityId),
ThingEnforcementIdCacheLoader::handleSudoRetrieveThingResponse);
}

@Override
Expand Down

0 comments on commit 330613d

Please sign in to comment.