Skip to content

Commit

Permalink
remove redundant configuration to disable sending acks from search
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Jun 28, 2022
1 parent efae5e6 commit 25d4df6
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 45 deletions.
Expand Up @@ -40,7 +40,6 @@ public final class DefaultStreamConfig implements StreamConfig {
private final int maxArraySize;
private final Duration writeInterval;
private final Duration thingDeletionTimeout;
private final boolean sendingAcksEnabled;
private final AskWithRetryConfig askWithRetryConfig;
private final StreamStageConfig retrievalConfig;
private final PersistenceStreamConfig persistenceStreamConfig;
Expand All @@ -52,7 +51,6 @@ private DefaultStreamConfig(final ConfigWithFallback streamScopedConfig) {
writeInterval = streamScopedConfig.getNonNegativeDurationOrThrow(StreamConfigValue.WRITE_INTERVAL);
thingDeletionTimeout =
streamScopedConfig.getNonNegativeDurationOrThrow(StreamConfigValue.THING_DELETION_TIMEOUT);
sendingAcksEnabled = streamScopedConfig.getBoolean(StreamConfigValue.SENDING_ACKS_ENABLED.getConfigPath());
askWithRetryConfig = DefaultAskWithRetryConfig.of(streamScopedConfig, ASK_WITH_RETRY_CONFIG_PATH);
retrievalConfig = DefaultStreamStageConfig.getInstance(streamScopedConfig, RETRIEVAL_CONFIG_PATH);
persistenceStreamConfig = DefaultPersistenceStreamConfig.of(streamScopedConfig);
Expand Down Expand Up @@ -86,11 +84,6 @@ public Duration getThingDeletionTimeout() {
return thingDeletionTimeout;
}

@Override
public boolean isSendingAcksEnabled() {
return sendingAcksEnabled;
}

@Override
public AskWithRetryConfig getAskWithRetryConfig() {
return askWithRetryConfig;
Expand Down Expand Up @@ -126,7 +119,6 @@ public boolean equals(@Nullable final Object o) {
}
final DefaultStreamConfig that = (DefaultStreamConfig) o;
return maxArraySize == that.maxArraySize &&
sendingAcksEnabled == that.sendingAcksEnabled &&
writeInterval.equals(that.writeInterval) &&
thingDeletionTimeout.equals(that.thingDeletionTimeout) &&
askWithRetryConfig.equals(that.askWithRetryConfig) &&
Expand All @@ -139,7 +131,7 @@ public boolean equals(@Nullable final Object o) {
@Override
public int hashCode() {
return Objects.hash(maxArraySize, writeInterval, askWithRetryConfig, retrievalConfig,
persistenceStreamConfig, policyCacheConfig, thingCacheConfig, thingDeletionTimeout, sendingAcksEnabled);
persistenceStreamConfig, policyCacheConfig, thingCacheConfig, thingDeletionTimeout);
}

@Override
Expand All @@ -148,7 +140,6 @@ public String toString() {
"maxArraySize=" + maxArraySize +
", writeInterval=" + writeInterval +
", thingDeletionTimeout=" + thingDeletionTimeout +
", sendingAcksEnabled=" + sendingAcksEnabled +
", askWithRetryConfig=" + askWithRetryConfig +
", retrievalConfig=" + retrievalConfig +
", persistenceStreamConfig=" + persistenceStreamConfig +
Expand Down
Expand Up @@ -44,11 +44,6 @@ public interface StreamConfig {
*/
Duration getThingDeletionTimeout();

/**
* @return whether sending acks is enabled
*/
boolean isSendingAcksEnabled();

/**
* Returns the configuration for the used "ask with retry" pattern in the search updater for retrieval of things and
* policies.
Expand Down Expand Up @@ -103,12 +98,7 @@ enum StreamConfigValue implements KnownConfigValue {
/**
* The delay before the updater actor is stopped after receiving a ThingDeleted event.
*/
THING_DELETION_TIMEOUT("thing-deletion-timeout", Duration.ofMinutes(5)),

/**
* Whether sending acks from ThingUpdaterActor is enabled.
*/
SENDING_ACKS_ENABLED("sending-acks-enabled", true);
THING_DELETION_TIMEOUT("thing-deletion-timeout", Duration.ofMinutes(5));

private final String configPath;
private final Object defaultValue;
Expand Down
Expand Up @@ -105,7 +105,6 @@ public final class ThingUpdater extends AbstractFSMWithStash<ThingUpdater.State,
private final Materializer materializer;
private final Duration writeInterval;
private final Duration thingDeletionTimeout;
private final boolean sendingAcksEnabled;
private ExponentialBackOff backOff;
private boolean shuttingDown = false;
@Nullable private UniqueKillSwitch killSwitch;
Expand Down Expand Up @@ -164,7 +163,6 @@ private ThingUpdater(final Flow<Data, Result, NotUsed> flow,
backOff = ExponentialBackOff.initial(
config.getUpdaterConfig().getStreamConfig().getPersistenceConfig().getExponentialBackOffConfig());
thingDeletionTimeout = config.getUpdaterConfig().getStreamConfig().getThingDeletionTimeout();
sendingAcksEnabled = config.getUpdaterConfig().getStreamConfig().isSendingAcksEnabled();

startSingleTimer(ShutdownTrigger.IDLE.name(), ShutdownTrigger.IDLE, config.getUpdaterConfig().getMaxIdleTime());

Expand Down Expand Up @@ -216,10 +214,7 @@ public void postStop() {
}

private FSMStateFunctionBuilder<State, Data> unhandled() {
return matchEvent(Acknowledgement.class, (message, data) -> {
log.debug("Received redirected Acknowledgement: <{}>", message);
return stay();
}).anyEvent((message, data) -> {
return matchAnyEvent((message, data) -> {
log.warning("Unknown message in <{}>: <{}>", stateName(), message);
return stay();
});
Expand Down Expand Up @@ -419,7 +414,7 @@ private FSM.State<State, Data> updateThing(final UpdateThing updateThing, final
.withUpdateReason(updateThing.getUpdateReason());
final Metadata nextMetadata =
updateThing.getDittoHeaders().getAcknowledgementRequests().contains(SEARCH_PERSISTED_REQUEST)
? metadata.withSender(getAckRecipient())
? metadata.withSender(getSender())
: metadata;
return stay().using(new Data(nextMetadata, lastWriteModel));
}
Expand Down Expand Up @@ -478,7 +473,7 @@ private Optional<Metadata> computeEventMetadata(final ThingEvent<?> thingEvent,
final String hint = String.format("Thing event with revision <%d> for thing <%s> dropped.",
thingEvent.getRevision(),
thingId);
exportMetadataWithSender(true, thingEvent, getAckRecipient(), null, data)
exportMetadataWithSender(true, thingEvent, getSender(), null, data)
.sendWeakAck(JsonValue.of(hint));
}
return Optional.empty();
Expand All @@ -502,7 +497,7 @@ private Optional<Metadata> computeEventMetadata(final ThingEvent<?> thingEvent,
.start();
DittoTracing.wrapTimer(DittoTracing.extractTraceContext(thingEvent), timer);
ConsistencyLag.startS1InUpdater(timer);
final var metadata = exportMetadataWithSender(shouldAcknowledge, thingEvent, getAckRecipient(), timer, data)
final var metadata = exportMetadataWithSender(shouldAcknowledge, thingEvent, getSender(), timer, data)
.withUpdateReason(UpdateReason.THING_UPDATE);
return Optional.of(metadata);
}
Expand Down Expand Up @@ -580,7 +575,7 @@ private Metadata exportMetadata(@Nullable final ThingEvent<?> event, final long
}

private void acknowledge(final IdentifiableStreamingMessage message) {
final ActorRef sender = getAckRecipient();
final ActorRef sender = getSender();
if (!getContext().system().deadLetters().equals(sender)) {
sender.tell(StreamAck.success(message.asIdentifierString()), getSelf());
}
Expand All @@ -603,14 +598,4 @@ private static JsonValue getDescription(final org.eclipse.ditto.base.api.common.
}
}

private ActorRef getAckRecipient() {
if (sendingAcksEnabled) {
// normal behavior - return original sender
return getSender();
} else {
// return self as sender to prevent acknowledgements being sent to original sender
// this actor just write a log statement when receiving an acknowledgement
return getSelf();
}
}
}
4 changes: 0 additions & 4 deletions thingsearch/service/src/main/resources/search.conf
Expand Up @@ -143,10 +143,6 @@ ditto {
thing-deletion-timeout = 5m
thing-deletion-timeout = ${?THINGS_SEARCH_UPDATER_STREAM_WRITE_INTERVAL}

# whether sending acks from ThingUpdaterActor is enabled
sending-acks-enabled = true
sending-acks-enabled = ${?THINGS_SEARCH_UPDATER_SENDING_ACKS_ENABLED}

# configuration for retrieval of policies/things via sharding
ask-with-retry {
ask-timeout = 5s
Expand Down

0 comments on commit 25d4df6

Please sign in to comment.