Skip to content

Commit

Permalink
change instantiation of DefaultPersistenceCleanupConfig when updating;
Browse files Browse the repository at this point in the history
check if cleanup isEnabled before scheduling the cleanup in EventSnapshotCleanupCoordinator;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Jul 8, 2021
1 parent db75cf4 commit 48a1eb4
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 89 deletions.
Expand Up @@ -21,32 +21,32 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.api.persistence.cleanup.CleanupPersistence;
import org.eclipse.ditto.base.api.persistence.cleanup.CleanupPersistenceResponse;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.concierge.service.actors.ShardRegions;
import org.eclipse.ditto.concierge.service.actors.cleanup.credits.CreditDecisionSource;
import org.eclipse.ditto.concierge.service.actors.cleanup.messages.CreditDecision;
import org.eclipse.ditto.concierge.service.actors.cleanup.persistenceids.PersistenceIdSource;
import org.eclipse.ditto.concierge.service.common.PersistenceCleanupConfig;
import org.eclipse.ditto.connectivity.api.ConnectionTag;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.internal.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.internal.utils.akka.controlflow.Transistor;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.health.AbstractBackgroundStreamingActorWithConfigWithStatusReport;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.connectivity.api.ConnectionTag;
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.internal.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.policies.model.signals.commands.PolicyCommand;
import org.eclipse.ditto.things.api.ThingSnapshotTaken;
import org.eclipse.ditto.things.api.ThingTag;
import org.eclipse.ditto.internal.utils.akka.controlflow.Transistor;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.health.AbstractBackgroundStreamingActorWithConfigWithStatusReport;
import org.eclipse.ditto.base.api.persistence.cleanup.CleanupPersistence;
import org.eclipse.ditto.base.api.persistence.cleanup.CleanupPersistenceResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.policies.model.signals.commands.PolicyCommand;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;

import com.typesafe.config.Config;
Expand Down Expand Up @@ -185,13 +185,16 @@ private void onCleanupResponse(final CleanupPersistenceResponse cleanupResponse)
enqueue(actions, cleanupResponse, config.getKeptActions());
}

private void onCreditDecision(final CreditDecision creditDecision) {
enqueue(creditDecisions, creditDecision, config.getKeptCreditDecisions());
// reset credit for requests if credit decision is positive
if (creditDecision.getCredit() > 0) {
creditForRequests = config.getCreditDecisionConfig().getCreditForRequests();
flushPendingRequests();
}
private static JsonObject renderAction(final Pair<Instant, CleanupPersistenceResponse> element) {
final var response = element.second();
final var headers = response.getDittoHeaders();
final var status = response.getHttpStatus();
final var start = headers.getOrDefault(START, "unknown");
final var message = getResponseMessage(response);
final var tagLine = String.format("%d start=%s <%s>", status.getCode(), start, message);
return JsonObject.newBuilder()
.set(element.first().toString(), tagLine)
.build();
}

private void flushPendingRequests() {
Expand All @@ -203,26 +206,23 @@ private void flushPendingRequests() {
}
}

private void onThingSnapshotTaken(final ThingSnapshotTaken event) {
if (creditForRequests > 0) {
--creditForRequests;
cleanUpThingByRequest(event.getEntityId());
} else if (pendingRequests.size() < config.getCreditDecisionConfig().getMaxPendingRequests()) {
pendingRequests.add(event.getEntityId());
} else {
log.info("Dropping <{}> because cache is full.", event);
}
private static JsonObject renderCreditDecision(final Pair<Instant, CreditDecision> element) {
return JsonObject.newBuilder()
.set(element.first().toString(), element.second().toString())
.build();
}

private void cleanUpThingByRequest(final ThingId thingId) {
final ThingTag thingTag = ThingTag.of(thingId, 0);
askShardRegionForCleanup(shardRegions.things(), ThingCommand.RESOURCE_TYPE, thingTag)
.thenAccept(response -> {
final var withRequestedHeader = response.setDittoHeaders(response.getDittoHeaders().toBuilder()
.putHeader(REQUESTED_MESSAGE_HEADER, "true")
.build());
getSelf().tell(withRequestedHeader, ActorRef.noSender());
});
private void onThingSnapshotTaken(final ThingSnapshotTaken event) {
if (config.isEnabled()) {
if (creditForRequests > 0) {
--creditForRequests;
cleanUpThingByRequest(event.getEntityId());
} else if (pendingRequests.size() < config.getCreditDecisionConfig().getMaxPendingRequests()) {
pendingRequests.add(event.getEntityId());
} else {
log.info("Dropping <{}> because cache is full.", event);
}
}
}

private <T> Flow<T, T, NotUsed> reportToSelf() {
Expand Down Expand Up @@ -265,6 +265,28 @@ private Graph<SourceShape<EntityIdWithRevision<?>>, NotUsed> persistenceIdSource
return PersistenceIdSource.create(config.getPersistenceIdsConfig(), pubSubMediator);
}

private void onCreditDecision(final CreditDecision creditDecision) {
if (config.isEnabled()) {
enqueue(creditDecisions, creditDecision, config.getKeptCreditDecisions());
// reset credit for requests if credit decision is positive
if (creditDecision.getCredit() > 0) {
creditForRequests = config.getCreditDecisionConfig().getCreditForRequests();
flushPendingRequests();
}
}
}

private void cleanUpThingByRequest(final ThingId thingId) {
final var thingTag = ThingTag.of(thingId, 0);
askShardRegionForCleanup(shardRegions.things(), ThingCommand.RESOURCE_TYPE, thingTag)
.thenAccept(response -> {
final var withRequestedHeader = response.setDittoHeaders(response.getDittoHeaders().toBuilder()
.putHeader(REQUESTED_MESSAGE_HEADER, "true")
.build());
getSelf().tell(withRequestedHeader, ActorRef.noSender());
});
}

@Override
protected Source<CleanupPersistenceResponse, NotUsed> getSource() {

Expand All @@ -282,7 +304,7 @@ protected Source<CleanupPersistenceResponse, NotUsed> getSource() {
.matchAny(e -> {
final String errorMessage = "Unexpected entity ID type: " + e;
log.error(errorMessage);
final CleanupPersistenceResponse failureResponse =
final var failureResponse =
CleanupPersistenceResponse.failure(e.getEntityId(),
DittoHeaders.newBuilder().putHeader(ERROR_MESSAGE_HEADER, errorMessage)
.build());
Expand All @@ -296,11 +318,22 @@ protected Source<CleanupPersistenceResponse, NotUsed> getSource() {
.log(EventSnapshotCleanupCoordinator.class.getSimpleName(), log);
}

@Override
protected void postEnhanceStatusReport(final JsonObjectBuilder statusReportBuilder) {
statusReportBuilder.set(JSON_CREDIT_DECISIONS, creditDecisions.stream()
.map(EventSnapshotCleanupCoordinator::renderCreditDecision)
.collect(JsonCollectors.valuesToArray()))
.set(JSON_ACTIONS, actions.stream()
.map(EventSnapshotCleanupCoordinator::renderAction)
.collect(JsonCollectors.valuesToArray()))
.build();
}

private CompletionStage<CleanupPersistenceResponse> askShardRegionForCleanup(final ActorRef shardRegion,
final String resourceType, final EntityIdWithRevision<?> tag) {

final EntityId id = tag.getEntityId();
final CleanupPersistence cleanupPersistence = getCleanupCommand(id);
final var cleanupPersistence = getCleanupCommand(id);
return Patterns.ask(shardRegion, cleanupPersistence, config.getCleanupTimeout())
.handle((result, error) -> {
if (result instanceof CleanupPersistenceResponse) {
Expand All @@ -312,7 +345,7 @@ private CompletionStage<CleanupPersistenceResponse> askShardRegionForCleanup(fin
.build();
return response.setDittoHeaders(headers);
} else {
final String errorMessage =
final var errorMessage =
String.format("Unexpected response from shard <%s>: result=<%s> error=<%s>",
resourceType, result, error);
return CleanupPersistenceResponse.failure(id,
Expand All @@ -323,35 +356,6 @@ private CompletionStage<CleanupPersistenceResponse> askShardRegionForCleanup(fin
});
}

@Override
protected void postEnhanceStatusReport(final JsonObjectBuilder statusReportBuilder) {
statusReportBuilder.set(JSON_CREDIT_DECISIONS, creditDecisions.stream()
.map(EventSnapshotCleanupCoordinator::renderCreditDecision)
.collect(JsonCollectors.valuesToArray()))
.set(JSON_ACTIONS, actions.stream()
.map(EventSnapshotCleanupCoordinator::renderAction)
.collect(JsonCollectors.valuesToArray()))
.build();
}

private static JsonObject renderCreditDecision(final Pair<Instant, CreditDecision> element) {
return JsonObject.newBuilder()
.set(element.first().toString(), element.second().toString())
.build();
}

private static JsonObject renderAction(final Pair<Instant, CleanupPersistenceResponse> element) {
final var response = element.second();
final var headers = response.getDittoHeaders();
final var status = response.getHttpStatus();
final var start = headers.getOrDefault(START, "unknown");
final var message = getResponseMessage(response);
final String tagLine = String.format("%d start=%s <%s>", status.getCode(), start, message);
return JsonObject.newBuilder()
.set(element.first().toString(), tagLine)
.build();
}

private static String getResponseMessage(final CleanupPersistenceResponse response) {
final var messageBuilder = new StringBuilder();
final var dittoHeaders = response.getDittoHeaders();
Expand All @@ -373,4 +377,5 @@ private static CleanupPersistence getCleanupCommand(final EntityId id) {
.build();
return CleanupPersistence.of(id, headers);
}

}
Expand Up @@ -16,6 +16,7 @@
import java.util.Objects;

import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;

import com.typesafe.config.Config;
Expand Down Expand Up @@ -55,7 +56,7 @@ static PersistenceCleanupConfig of(final Config serviceSpecificConfig) {

static PersistenceCleanupConfig updated(final Config extractedConfig) {
return new DefaultPersistenceCleanupConfig(
ConfigWithFallback.newInstance(extractedConfig, CONFIG_PATH, ConfigValue.values()));
ConfigWithFallback.newInstance(extractedConfig, new KnownConfigValue[0]));
}

@Override
Expand Down
Expand Up @@ -113,7 +113,7 @@ enum ConfigValue implements KnownConfigValue {
CLEANUP_TIMEOUT("cleanup-timeout", Duration.ofSeconds(30L)),

/**
* Number of clewanup commands to execute in parallel.
* Number of cleanup commands to execute in parallel.
*/
PARALLELISM("parallelism", 1),

Expand Down
Expand Up @@ -97,7 +97,8 @@ public static DefaultConnectionConfig of(final Config config) {

private Collection<String> fromCommaSeparatedString(final ConfigWithFallback config,
final ConnectionConfigValue configValue) {
final String commaSeparated = config.getString(configValue.getConfigPath());
final var commaSeparated = config.getString(configValue.getConfigPath());

return List.of(commaSeparated.split(","));
}

Expand Down
Expand Up @@ -12,9 +12,9 @@
*/
package org.eclipse.ditto.internal.utils.akka.actors;

import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.api.common.ModifyConfig;
import org.eclipse.ditto.base.api.common.ModifyConfigResponse;
import org.eclipse.ditto.json.JsonObject;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand Down Expand Up @@ -45,10 +45,10 @@ public interface ModifyConfigBehavior extends Actor {
default AbstractActor.Receive modifyConfigBehavior() {
return ReceiveBuilder.create()
.match(ModifyConfig.class, cmd -> {
final Config newConfig = setConfig(ConfigFactory.parseString(cmd.getConfig().toString()));
final JsonObject newConfigJson =
final var newConfig = setConfig(ConfigFactory.parseString(cmd.getConfig().toString()));
final var newConfigJson =
JsonObject.of(newConfig.root().render(ConfigRenderOptions.concise()));
final ModifyConfigResponse response =
final var response =
ModifyConfigResponse.of(newConfigJson, cmd.getDittoHeaders());
sender().tell(response, self());
})
Expand Down
Expand Up @@ -164,10 +164,10 @@ public Config getConfig() {

@Override
public Config setConfig(final Config config) {
final C previousConfig = this.config;
final var previousConfig = this.config;
// TODO Ditto issue #439: replace ConfigWithFallback - it breaks AbstractConfigValue.withFallback!
// Workaround: re-parse my config
final Config fallback = ConfigFactory.parseString(getConfig().root().render(ConfigRenderOptions.concise()));
final var fallback = ConfigFactory.parseString(getConfig().root().render(ConfigRenderOptions.concise()));
try {
this.config = parseConfig(config.withFallback(fallback));
} catch (final DittoConfigError | ConfigException e) {
Expand All @@ -180,7 +180,7 @@ public Config setConfig(final Config config) {
}

private Receive sleeping() {
final ReceiveBuilder sleepingReceiveBuilder = ReceiveBuilder.create();
final var sleepingReceiveBuilder = ReceiveBuilder.create();
preEnhanceSleepingBehavior(sleepingReceiveBuilder);
return sleepingReceiveBuilder.match(WokeUp.class, this::wokeUp)
.match(Event.class, this::addCustomEventToLog)
Expand All @@ -193,7 +193,7 @@ private Receive sleeping() {
}

private Receive streaming() {
final ReceiveBuilder streamingReceiveBuilder = ReceiveBuilder.create();
final var streamingReceiveBuilder = ReceiveBuilder.create();
preEnhanceStreamingBehavior(streamingReceiveBuilder);
return streamingReceiveBuilder
.match(StreamTerminated.class, this::streamTerminated)
Expand Down Expand Up @@ -255,11 +255,13 @@ private void shutdownStream(final Shutdown shutdown) {

if (config.isEnabled()) {
final Duration wakeUpDelay = config.getQuietPeriod();
final String message = String.format("Restarting in <%s>.", wakeUpDelay);
final var message = String.format("Restarting in <%s>.", wakeUpDelay);
log.info(message);
scheduleWakeUp(wakeUpDelay);
getSender().tell(ShutdownResponse.of(message, shutdown.getDittoHeaders()), getSelf());
} else {
final String message = "Not restarting stream because I am disabled.";
final var message = "Not restarting stream because I am disabled.";
log.info(message);
getSender().tell(ShutdownResponse.of(message, shutdown.getDittoHeaders()), getSelf());
}
}
Expand All @@ -280,7 +282,7 @@ private void restartStream() {

materializedValues.second()
.<Void>handle((result, error) -> {
final String description = String.format("Stream terminated. Result=<%s> Error=<%s>",
final var description = String.format("Stream terminated. Result=<%s> Error=<%s>",
result, error);
if (error != null) {
log.error(error, description);
Expand Down
Expand Up @@ -16,13 +16,13 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.api.persistence.cleanup.CleanupPersistence;
import org.eclipse.ditto.base.api.persistence.cleanup.CleanupPersistenceResponse;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.base.api.persistence.cleanup.CleanupPersistence;
import org.eclipse.ditto.base.api.persistence.cleanup.CleanupPersistenceResponse;

import akka.actor.ActorRef;
import akka.persistence.AbstractPersistentActorWithTimers;
Expand Down Expand Up @@ -117,13 +117,13 @@ private void handleCleanupCommand(final CleanupPersistence cleanupPersistence) {
private EntityId extractEntityIdFromPersistenceId(final String persistenceId) {
final int indexOfSeparator = persistenceId.indexOf(':');
if (indexOfSeparator < 0) {
final String message =
final var message =
String.format("Persistence ID <%s> wasn't prefixed with an entity type.", persistenceId);
log.error(message);
throw new IllegalArgumentException(message);
}
final String id = persistenceId.substring(indexOfSeparator + 1);
final EntityType type = EntityType.of(persistenceId.substring(0, indexOfSeparator));
final var id = persistenceId.substring(indexOfSeparator + 1);
final var type = EntityType.of(persistenceId.substring(0, indexOfSeparator));
return EntityId.of(type, id);
}

Expand Down Expand Up @@ -191,7 +191,7 @@ private void startCleanup(final long latestSnapshotSequenceNumber) {
final long maxSnapSeqNoToDelete = latestSnapshotSequenceNumber - 1;
log.info("Starting cleanup for '{}', deleting snapshots to sequence number {} and events to {}.",
persistenceId(), maxSnapSeqNoToDelete, maxEventSeqNoToDelete);
final SnapshotSelectionCriteria deletionCriteria =
final var deletionCriteria =
SnapshotSelectionCriteria.create(maxSnapSeqNoToDelete, Long.MAX_VALUE);
deleteMessages(maxEventSeqNoToDelete);
deleteSnapshots(deletionCriteria);
Expand Down

0 comments on commit 48a1eb4

Please sign in to comment.