Skip to content

Commit

Permalink
add some more test cases, make thing deletion timeout configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Apr 22, 2022
1 parent 21c61fb commit a1a7364
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class DefaultStreamConfig implements StreamConfig {

private final int maxArraySize;
private final Duration writeInterval;
private final Duration thingDeletionTimeout;
private final AskWithRetryConfig askWithRetryConfig;
private final StreamStageConfig retrievalConfig;
private final PersistenceStreamConfig persistenceStreamConfig;
Expand All @@ -48,6 +49,7 @@ public final class DefaultStreamConfig implements StreamConfig {
private DefaultStreamConfig(final ConfigWithFallback streamScopedConfig) {
maxArraySize = streamScopedConfig.getNonNegativeIntOrThrow(StreamConfigValue.MAX_ARRAY_SIZE);
writeInterval = streamScopedConfig.getNonNegativeDurationOrThrow(StreamConfigValue.WRITE_INTERVAL);
thingDeletionTimeout = streamScopedConfig.getNonNegativeDurationOrThrow(StreamConfigValue.THING_DELETION_TIMEOUT);
askWithRetryConfig = DefaultAskWithRetryConfig.of(streamScopedConfig, ASK_WITH_RETRY_CONFIG_PATH);
retrievalConfig = DefaultStreamStageConfig.getInstance(streamScopedConfig, RETRIEVAL_CONFIG_PATH);
persistenceStreamConfig = DefaultPersistenceStreamConfig.of(streamScopedConfig);
Expand Down Expand Up @@ -76,6 +78,11 @@ public Duration getWriteInterval() {
return writeInterval;
}

@Override
public Duration getThingDeletionTimeout() {
return thingDeletionTimeout;
}

@Override
public AskWithRetryConfig getAskWithRetryConfig() {
return askWithRetryConfig;
Expand Down Expand Up @@ -112,6 +119,7 @@ public boolean equals(@Nullable final Object o) {
final DefaultStreamConfig that = (DefaultStreamConfig) o;
return maxArraySize == that.maxArraySize &&
writeInterval.equals(that.writeInterval) &&
thingDeletionTimeout.equals(that.thingDeletionTimeout) &&
askWithRetryConfig.equals(that.askWithRetryConfig) &&
retrievalConfig.equals(that.retrievalConfig) &&
persistenceStreamConfig.equals(that.persistenceStreamConfig) &&
Expand All @@ -122,14 +130,15 @@ public boolean equals(@Nullable final Object o) {
@Override
public int hashCode() {
return Objects.hash(maxArraySize, writeInterval, askWithRetryConfig, retrievalConfig,
persistenceStreamConfig, policyCacheConfig, thingCacheConfig);
persistenceStreamConfig, policyCacheConfig, thingCacheConfig, thingDeletionTimeout);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"maxArraySize=" + maxArraySize +
", writeInterval=" + writeInterval +
", thingDeletionTimeout=" + thingDeletionTimeout +
", askWithRetryConfig=" + askWithRetryConfig +
", retrievalConfig=" + retrievalConfig +
", persistenceStreamConfig=" + persistenceStreamConfig +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public interface StreamConfig {
*/
Duration getWriteInterval();

/**
* @return the duration before the update actor is stopped after receiving a ThingDeleted event
*/
Duration getThingDeletionTimeout();

/**
* Returns the configuration for the used "ask with retry" pattern in the search updater for retrieval of things and
* policies.
Expand Down Expand Up @@ -88,7 +93,12 @@ enum StreamConfigValue implements KnownConfigValue {
/**
* The minimal delay between event dumps.
*/
WRITE_INTERVAL("write-interval", Duration.ofSeconds(1L));
WRITE_INTERVAL("write-interval", Duration.ofSeconds(1L)),

/**
* The delay before the updater actor is stopped after receiving a ThingDeleted event.
*/
THING_DELETION_TIMEOUT("thing-deletion-timeout", Duration.ofMinutes(5));

private final String configPath;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.ditto.base.api.common.ShutdownReasonType;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOff;
import org.eclipse.ditto.internal.models.streaming.IdentifiableStreamingMessage;
Expand Down Expand Up @@ -96,14 +97,14 @@ public final class ThingUpdater extends AbstractFSMWithStash<ThingUpdater.State,
private static final AcknowledgementRequest SEARCH_PERSISTED_REQUEST =
AcknowledgementRequest.of(DittoAcknowledgementLabel.SEARCH_PERSISTED);

private static final Duration THING_DELETION_TIMEOUT = Duration.ofMinutes(5);
private static final String FORCE_UPDATE = "force-update";

private final DittoDiagnosticLoggingAdapter log;
private final ThingId thingId;
private final Flow<Data, Result, NotUsed> flow;
private final Materializer materializer;
private final Duration writeInterval;
private final Duration thingDeletionTimeout;
private ExponentialBackOff backOff;
private boolean shuttingDown = false;
@Nullable private UniqueKillSwitch killSwitch;
Expand Down Expand Up @@ -152,8 +153,7 @@ enum ShutdownTrigger {

private ThingUpdater(final Flow<Data, Result, NotUsed> flow,
final Function<ThingId, Source<AbstractWriteModel, NotUsed>> recoveryFunction,
final SearchConfig config,
final ActorRef pubSubMediator) {
final SearchConfig config, final ActorRef pubSubMediator) {

log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
thingId = tryToGetThingId();
Expand All @@ -162,6 +162,7 @@ private ThingUpdater(final Flow<Data, Result, NotUsed> flow,
writeInterval = config.getUpdaterConfig().getStreamConfig().getWriteInterval();
backOff = ExponentialBackOff.initial(
config.getUpdaterConfig().getStreamConfig().getPersistenceConfig().getExponentialBackOffConfig());
thingDeletionTimeout = config.getUpdaterConfig().getStreamConfig().getThingDeletionTimeout();

startSingleTimer(ShutdownTrigger.IDLE.name(), ShutdownTrigger.IDLE, config.getUpdaterConfig().getMaxIdleTime());

Expand Down Expand Up @@ -457,19 +458,24 @@ private Optional<Metadata> computeEventMetadata(final ThingEvent<?> thingEvent,
thingEvent.getDittoHeaders().getAcknowledgementRequests().contains(SEARCH_PERSISTED_REQUEST);

// check if the revision is valid (thingEvent.revision = 1 + sequenceNumber)
if (thingEvent.getRevision() <= data.metadata().getThingRevision() && !shouldAcknowledge) {
if (thingEvent.getRevision() <= data.metadata().getThingRevision()) {
l.debug("Dropped thing event for thing id <{}> with revision <{}> because it was older than or "
+ "equal to the current sequence number <{}> of the update actor.", thingId,
thingEvent.getRevision(), data.metadata().getThingRevision());
if (shouldAcknowledge) {
final String hint = String.format("Thing event with revision <%d> for thing <%s> dropped.", thingEvent.getRevision(),
thingId);
exportMetadataWithSender(shouldAcknowledge, thingEvent, getSender(), null, data).sendWeakAck(JsonValue.of(hint));
}
return Optional.empty();
}

l.debug("Applying thing event <{}>.", thingEvent);
if (thingEvent instanceof ThingDeleted) {
// will stop this actor after 5 minutes (finishing up updating the index):
// this time should be longer than the consistency lag, otherwise the actor will be stopped before the
// actual "delete" is applied to the search index:
getContext().setReceiveTimeout(THING_DELETION_TIMEOUT);
// will stop this actor after configured timeout (finishing up updating the index)
startSingleTimer(ShutdownTrigger.DELETE.name(), ShutdownTrigger.DELETE, thingDeletionTimeout);
} else {
cancelTimer(ShutdownTrigger.DELETE.name());
}
if (shouldAcknowledge) {
tickNow();
Expand Down
4 changes: 4 additions & 0 deletions thingsearch/service/src/main/resources/search.conf
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ ditto {
write-interval = 1s
write-interval = ${?THINGS_SEARCH_UPDATER_STREAM_WRITE_INTERVAL}

# delay before updater actor is stopped after receiving thing deleted event
thing-deletion-timeout = 5m
thing-deletion-timeout = ${?THINGS_SEARCH_UPDATER_STREAM_WRITE_INTERVAL}

# configuration for retrieval of policies/things via sharding
ask-with-retry {
ask-timeout = 5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
softly.assertThat(underTest.getWriteInterval())
.as(StreamConfigValue.WRITE_INTERVAL.getConfigPath())
.isEqualTo(StreamConfigValue.WRITE_INTERVAL.getDefaultValue());

softly.assertThat(underTest.getThingDeletionTimeout())
.as(StreamConfigValue.THING_DELETION_TIMEOUT.getConfigPath())
.isEqualTo(StreamConfigValue.THING_DELETION_TIMEOUT.getDefaultValue());
}

@Test
Expand All @@ -84,6 +88,10 @@ public void underTestReturnsValuesOfConfigFile() {
softly.assertThat(underTest.getWriteInterval())
.as(StreamConfigValue.WRITE_INTERVAL.getConfigPath())
.isEqualTo(Duration.ofSeconds(2));

softly.assertThat(underTest.getThingDeletionTimeout())
.as(StreamConfigValue.THING_DELETION_TIMEOUT.getConfigPath())
.isEqualTo(Duration.ofSeconds(3));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down
Loading

0 comments on commit a1a7364

Please sign in to comment.