Skip to content

Commit

Permalink
move custom processing of search updates to EnforcementFlow to allow …
Browse files Browse the repository at this point in the history
…usage of the available thing json data in the extension, add reason to search update metadata (e.g. if it was caused by thing update or background sync)

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Nov 30, 2021
1 parent 226378f commit 78a73ba
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public final class DittoSearchConfig implements SearchConfig, WithConfigPath {
@Nullable private final String mongoHintsByNamespace;
private final String queryCriteriaValidator;
private final String searchUpdateMapper;
private final String thingEventObserver;
private final String searchUpdateObserver;
private final UpdaterConfig updaterConfig;
private final HealthCheckConfig healthCheckConfig;
private final IndexInitializationConfig indexInitializationConfig;
Expand All @@ -66,7 +66,7 @@ private DittoSearchConfig(final ScopedConfig dittoScopedConfig) {
mongoHintsByNamespace = configWithFallback.getStringOrNull(SearchConfigValue.MONGO_HINTS_BY_NAMESPACE);
queryCriteriaValidator = configWithFallback.getStringOrNull(SearchConfigValue.QUERY_CRITERIA_VALIDATOR);
searchUpdateMapper = configWithFallback.getStringOrNull(SearchConfigValue.SEARCH_UPDATE_MAPPER);
thingEventObserver = configWithFallback.getStringOrNull(SearchConfigValue.THING_EVENT_OBSERVER);
searchUpdateObserver = configWithFallback.getStringOrNull(SearchConfigValue.SEARCH_UPDATE_OBSERVER);
updaterConfig = DefaultUpdaterConfig.of(configWithFallback);
indexInitializationConfig = DefaultIndexInitializationConfig.of(configWithFallback);
}
Expand Down Expand Up @@ -99,8 +99,8 @@ public String getSearchUpdateMapperImplementation() {
}

@Override
public String getThingEventObserverImplementation() {
return thingEventObserver;
public String getSearchUpdateObserverImplementation() {
return searchUpdateObserver;
}

@Override
Expand Down Expand Up @@ -166,7 +166,7 @@ public boolean equals(final Object o) {
return Objects.equals(mongoHintsByNamespace, that.mongoHintsByNamespace) &&
Objects.equals(queryCriteriaValidator, that.queryCriteriaValidator) &&
Objects.equals(searchUpdateMapper, that.searchUpdateMapper) &&
Objects.equals(thingEventObserver, that.thingEventObserver) &&
Objects.equals(searchUpdateObserver, that.searchUpdateObserver) &&
Objects.equals(updaterConfig, that.updaterConfig) &&
Objects.equals(dittoServiceConfig, that.dittoServiceConfig) &&
Objects.equals(healthCheckConfig, that.healthCheckConfig) &&
Expand All @@ -177,7 +177,7 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(mongoHintsByNamespace, queryCriteriaValidator, searchUpdateMapper, thingEventObserver,
return Objects.hash(mongoHintsByNamespace, queryCriteriaValidator, searchUpdateMapper, searchUpdateObserver,
updaterConfig, dittoServiceConfig, healthCheckConfig, indexInitializationConfig,
persistenceOperationsConfig,
mongoDbConfig);
Expand All @@ -189,7 +189,7 @@ public String toString() {
"mongoHintsByNamespace=" + mongoHintsByNamespace +
", queryCriteriaValidator=" + queryCriteriaValidator +
", searchUpdateMapper=" + searchUpdateMapper +
", thingEventObserver=" + thingEventObserver +
", searchUpdateObserver=" + searchUpdateObserver +
", updaterConfig=" + updaterConfig +
", dittoServiceConfig=" + dittoServiceConfig +
", healthCheckConfig=" + healthCheckConfig +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public interface SearchConfig extends ServiceSpecificConfig, WithHealthCheckConf
String getSearchUpdateMapperImplementation();

/**
* Returns the {@code ThingEventObserver} to be used for additional processing of thing events.
* Returns the {@code SearchUpdateObserver} to be used for additional processing of search updates.
*
* @return the name of the implementing class.
*/
String getThingEventObserverImplementation();
String getSearchUpdateObserverImplementation();

/**
* Returns the configuration settings for the search updating functionality.
Expand Down Expand Up @@ -90,12 +90,12 @@ enum SearchConfigValue implements KnownConfigValue {
"org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper"),

/**
* The {@code ThingEventObserver} used for additional custom processing of thing events.
* The {@code SearchUpdateObserver} used for additional custom processing of thing events.
*
* @since 2.3.0
*/
THING_EVENT_OBSERVER("thing-event-observer.implementation",
"org.eclipse.ditto.thingsearch.service.updater.actors.DefaultThingEventObserver");
SEARCH_UPDATE_OBSERVER("search-update-observer.implementation",
"org.eclipse.ditto.thingsearch.service.updater.actors.DefaultSearchUpdateObserver");

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class Metadata {
private final boolean invalidateThing;
private final boolean invalidatePolicy;
@Nullable private final ActorRef origin;
private final UpdateReason reason;

private Metadata(final ThingId thingId,
final long thingRevision,
Expand All @@ -63,7 +64,8 @@ private Metadata(final ThingId thingId,
final Collection<ActorRef> senders,
final boolean invalidateThing,
final boolean invalidatePolicy,
@Nullable final ActorRef origin) {
@Nullable final ActorRef origin,
final UpdateReason reason) {

this.thingId = thingId;
this.thingRevision = thingRevision;
Expand All @@ -76,6 +78,7 @@ private Metadata(final ThingId thingId,
this.invalidateThing = invalidateThing;
this.invalidatePolicy = invalidatePolicy;
this.origin = origin;
this.reason = reason;
}

/**
Expand All @@ -95,7 +98,8 @@ public static Metadata of(final ThingId thingId,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, null,
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false, null);
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false, null,
UpdateReason.UNKNOWN);
}

/**
Expand All @@ -119,7 +123,7 @@ public static Metadata of(final ThingId thingId,

return new Metadata(thingId, thingRevision, policyId, policyRevision, null, events,
null != timer ? List.of(timer) : List.of(),
null != sender ? List.of(sender) : List.of(), false, false, null);
null != sender ? List.of(sender) : List.of(), false, false, null, UpdateReason.UNKNOWN);
}

/**
Expand All @@ -143,7 +147,7 @@ public static Metadata of(final ThingId thingId,
final Collection<ActorRef> senders) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, List.of(), timers, senders,
false, false, null);
false, false, null, UpdateReason.UNKNOWN);
}

/**
Expand All @@ -165,7 +169,8 @@ public static Metadata of(final ThingId thingId,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, modified,
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false, null);
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false, null,
UpdateReason.UNKNOWN);
}

/**
Expand All @@ -190,7 +195,7 @@ public static Metadata fromResponse(final UpdateThingResponse updateThingRespons
*/
public Metadata invalidateCaches(final boolean invalidateThing, final boolean invalidatePolicy) {
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders,
invalidateThing, invalidatePolicy, origin);
invalidateThing, invalidatePolicy, origin, reason);
}

/**
Expand All @@ -200,7 +205,7 @@ public Metadata invalidateCaches(final boolean invalidateThing, final boolean in
*/
public Metadata withOrigin(@Nullable final ActorRef origin) {
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders,
invalidateThing, invalidatePolicy, origin);
invalidateThing, invalidatePolicy, origin, reason);
}

/**
Expand All @@ -210,7 +215,17 @@ public Metadata withOrigin(@Nullable final ActorRef origin) {
*/
public Metadata withSender(final ActorRef sender) {
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, events, timers, List.of(sender),
invalidateThing, invalidatePolicy, origin);
invalidateThing, invalidatePolicy, origin, reason);
}

/**
* Create a copy of this metadata with senders replaced by the argument.
*
* @return the copy.
*/
public Metadata withUpdateReason(final UpdateReason reason) {
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders,
invalidateThing, invalidatePolicy, origin, reason);
}

/**
Expand Down Expand Up @@ -304,6 +319,15 @@ public List<ActorRef> getSenders() {
return senders;
}

/**
* Return the reason of the update.
*
* @return the update reason.
*/
public UpdateReason getUpdateReason() {
return reason;
}

/**
* Returns whether an acknowledgement for the successful adding to the search index is requested.
*
Expand Down Expand Up @@ -348,7 +372,7 @@ public Metadata append(final Metadata newMetadata) {
newMetadata.policyRevision, newMetadata.modified, newEvents, newTimers, newSenders,
invalidateThing || newMetadata.invalidateThing,
invalidatePolicy || newMetadata.invalidatePolicy,
newMetadata.origin);
newMetadata.origin, newMetadata.reason);
}

/**
Expand Down Expand Up @@ -404,13 +428,14 @@ public boolean equals(final Object o) {
Objects.equals(senders, that.senders) &&
invalidateThing == that.invalidateThing &&
invalidatePolicy == that.invalidatePolicy &&
Objects.equals(origin, that.origin);
Objects.equals(origin, that.origin) &&
Objects.equals(reason, that.reason);
}

@Override
public int hashCode() {
return Objects.hash(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders,
invalidateThing, invalidatePolicy, origin);
invalidateThing, invalidatePolicy, origin, reason);
}

@Override
Expand All @@ -427,6 +452,7 @@ public String toString() {
", invalidateThing=" + invalidateThing +
", invalidatePolicy=" + invalidatePolicy +
", origin=" + origin +
", reason=" + reason +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.persistence.write.model;

/**
* Describes the reason why a thing is being updated in the search index.
*/
public enum UpdateReason {

/**
* The policy referenced by a thing was updated.
*/
POLICY_UPDATE,
/**
* The thing is indexed as part of a manual re-indexing.
*/
MANUAL_REINDEXING,
/**
* The thing is indexed as part of the automatic background sync.
*/
BACKGROUND_SYNC,
/**
* The thing was updated.
*/
THING_UPDATE,
/**
* Reason not known.
*/
UNKNOWN

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingDeleteModel;
import org.eclipse.ditto.thingsearch.service.updater.actors.SearchUpdateObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -72,6 +73,7 @@ final class EnforcementFlow {
private final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache;
private final Duration cacheRetryDelay;
private final int maxArraySize;
private final SearchUpdateObserver searchUpdateObserver;

private EnforcementFlow(final ActorSystem actorSystem,
final ActorRef thingsShardRegion,
Expand All @@ -84,6 +86,7 @@ private EnforcementFlow(final ActorSystem actorSystem,
thingsFacade = createThingsFacade(actorSystem, thingsShardRegion, askWithRetryConfig.getAskTimeout(),
streamCacheConfig, cacheDispatcher);
this.policyEnforcerCache = policyEnforcerCache;
searchUpdateObserver = SearchUpdateObserver.get(actorSystem);
cacheRetryDelay = streamCacheConfig.getRetryDelay();
this.maxArraySize = maxArraySize;
}
Expand Down Expand Up @@ -158,9 +161,12 @@ public Flow<Map<ThingId, Metadata>, Source<AbstractWriteModel, NotUsed>, NotUsed
log.info("Updating search index of <{}> things", changeMap.size());
return sudoRetrieveThingJsons(parallelism, changeMap).flatMapConcat(responseMap ->
Source.fromIterator(changeMap.values()::iterator)
.flatMapMerge(parallelism, metadataRef ->
computeWriteModel(metadataRef, responseMap.get(metadataRef.getThingId()))
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, parallelism)
.flatMapMerge(parallelism, metadataRef -> {
final JsonObject thing = responseMap.get(metadataRef.getThingId());
searchUpdateObserver.process(metadataRef, thing);
return computeWriteModel(metadataRef, thing)
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, parallelism);
}
)
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
*/
package org.eclipse.ditto.thingsearch.service.updater.actors;

import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;

import akka.actor.ActorSystem;

/**
* Default ThingEventObserver implementation.
* Default SearchUpdateObserver implementation.
*/
public class DefaultThingEventObserver extends ThingEventObserver {
public class DefaultSearchUpdateObserver extends SearchUpdateObserver {

public DefaultThingEventObserver(final ActorSystem system) {
public DefaultSearchUpdateObserver(final ActorSystem system) {
// nothing to do
}

@Override
public void processThingEvent(final ThingEvent<?> event) {
public void process(final Metadata metadata, final JsonObject thingJson) {
// noop
}
}

0 comments on commit 78a73ba

Please sign in to comment.