Skip to content

Commit

Permalink
introduce extendable ThingEventObserver to allow custom processing of…
Browse files Browse the repository at this point in the history
… thing events

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Nov 30, 2021
1 parent 29a00db commit d783418
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public final class DittoSearchConfig implements SearchConfig, WithConfigPath {
@Nullable private final String mongoHintsByNamespace;
private final String queryCriteriaValidator;
private final String searchUpdateMapper;
private final String thingEventObserver;
private final UpdaterConfig updaterConfig;
private final HealthCheckConfig healthCheckConfig;
private final IndexInitializationConfig indexInitializationConfig;
Expand All @@ -65,6 +66,7 @@ private DittoSearchConfig(final ScopedConfig dittoScopedConfig) {
mongoHintsByNamespace = configWithFallback.getStringOrNull(SearchConfigValue.MONGO_HINTS_BY_NAMESPACE);
queryCriteriaValidator = configWithFallback.getStringOrNull(SearchConfigValue.QUERY_CRITERIA_VALIDATOR);
searchUpdateMapper = configWithFallback.getStringOrNull(SearchConfigValue.SEARCH_UPDATE_MAPPER);
thingEventObserver = configWithFallback.getStringOrNull(SearchConfigValue.THING_EVENT_OBSERVER);
updaterConfig = DefaultUpdaterConfig.of(configWithFallback);
indexInitializationConfig = DefaultIndexInitializationConfig.of(configWithFallback);
}
Expand Down Expand Up @@ -96,6 +98,11 @@ public String getSearchUpdateMapperImplementation() {
return searchUpdateMapper;
}

@Override
public String getThingEventObserverImplementation() {
return thingEventObserver;
}

@Override
public UpdaterConfig getUpdaterConfig() {
return updaterConfig;
Expand Down Expand Up @@ -159,6 +166,7 @@ public boolean equals(final Object o) {
return Objects.equals(mongoHintsByNamespace, that.mongoHintsByNamespace) &&
Objects.equals(queryCriteriaValidator, that.queryCriteriaValidator) &&
Objects.equals(searchUpdateMapper, that.searchUpdateMapper) &&
Objects.equals(thingEventObserver, that.thingEventObserver) &&
Objects.equals(updaterConfig, that.updaterConfig) &&
Objects.equals(dittoServiceConfig, that.dittoServiceConfig) &&
Objects.equals(healthCheckConfig, that.healthCheckConfig) &&
Expand All @@ -169,8 +177,9 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(mongoHintsByNamespace, queryCriteriaValidator, searchUpdateMapper, updaterConfig,
dittoServiceConfig, healthCheckConfig, indexInitializationConfig, persistenceOperationsConfig,
return Objects.hash(mongoHintsByNamespace, queryCriteriaValidator, searchUpdateMapper, thingEventObserver,
updaterConfig, dittoServiceConfig, healthCheckConfig, indexInitializationConfig,
persistenceOperationsConfig,
mongoDbConfig);
}

Expand All @@ -180,6 +189,7 @@ public String toString() {
"mongoHintsByNamespace=" + mongoHintsByNamespace +
", queryCriteriaValidator=" + queryCriteriaValidator +
", searchUpdateMapper=" + searchUpdateMapper +
", thingEventObserver=" + thingEventObserver +
", updaterConfig=" + updaterConfig +
", dittoServiceConfig=" + dittoServiceConfig +
", healthCheckConfig=" + healthCheckConfig +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public interface SearchConfig extends ServiceSpecificConfig, WithHealthCheckConf
*/
String getSearchUpdateMapperImplementation();

/**
* Returns the {@code ThingEventObserver} to be used for additional processing of thing events.
*
* @return the name of the implementing class.
*/
String getThingEventObserverImplementation();

/**
* Returns the configuration settings for the search updating functionality.
*
Expand Down Expand Up @@ -80,7 +87,15 @@ enum SearchConfigValue implements KnownConfigValue {
* @since 2.1.0
*/
SEARCH_UPDATE_MAPPER("search-update-mapper.implementation",
"org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper");
"org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper"),

/**
* The {@code ThingEventObserver} used for additional custom processing of thing events.
*
* @since 2.3.0
*/
THING_EVENT_OBSERVER("thing-event-observer.implementation",
"org.eclipse.ditto.thingsearch.service.updater.actors.DefaultThingEventObserver");

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.updater.actors;

import org.eclipse.ditto.things.model.signals.events.ThingEvent;

/**
* Default ThingEventObserver implementation.
*/
public class DefaultThingEventObserver extends ThingEventObserver {

@Override
public void processThingEvent(final ThingEvent<?> event) {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ private SearchUpdaterRootActor(final SearchConfig searchConfig,
log.warning("Event processing is disabled!");
}

final var thingUpdaterProps = ThingUpdater.props(pubSubMediator, changeQueueActor, updaterConfig);
final var thingEventObserver = ThingEventObserver.get(actorSystem);
final var thingUpdaterProps = ThingUpdater.props(pubSubMediator, changeQueueActor, thingEventObserver,
updaterConfig);

final ActorRef thingsShard = shardRegionFactory.getThingsShardRegion(numberOfShards);
final ActorRef policiesShard = shardRegionFactory.getPoliciesShardRegion(numberOfShards);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.updater.actors;

import java.util.List;

import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig;

import akka.actor.AbstractExtensionId;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;

/**
* Thing event observer to be loaded by reflection.
* Can be used as an extension point to use process thing events.
* Implementations MUST have a public constructor taking an actorSystem as argument.
*
* @since 2.3.0
*/
public abstract class ThingEventObserver implements Extension {

private static final ThingEventObserver.ExtensionId EXTENSION_ID = new ThingEventObserver.ExtensionId();

/**
* Load a {@code ThingEventObserver} dynamically according to the search configuration.
*
* @param actorSystem The actor system in which to load the observer.
* @return The thing event observer.
*/
public static ThingEventObserver get(final ActorSystem actorSystem) {
return EXTENSION_ID.get(actorSystem);
}

/**
* Process the given {@code ThingEvent}.
*
* @param event the thing event
*/
public abstract void processThingEvent(final ThingEvent<?> event);


/**
* ID of the actor system extension to validate the {@code ThingEventObserver}.
*/
private static final class ExtensionId extends AbstractExtensionId<ThingEventObserver> {

@Override
public ThingEventObserver createExtension(final ExtendedActorSystem system) {
final SearchConfig searchConfig =
DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(system.settings().config()));

return AkkaClassLoader.instantiate(system, ThingEventObserver.class,
searchConfig.getThingEventObserverImplementation(), List.of(ActorSystem.class), List.of(system));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ final class ThingUpdater extends AbstractActorWithStash {
private final ThingId thingId;
private final ShutdownBehaviour shutdownBehaviour;
private final ActorRef changeQueueActor;
private final ThingEventObserver thingEventObserver;
private final double forceUpdateProbability;

// state of Thing and Policy
Expand All @@ -94,12 +95,14 @@ final class ThingUpdater extends AbstractActorWithStash {
@SuppressWarnings("unused") //It is used via reflection. See props method.
private ThingUpdater(final ActorRef pubSubMediator,
final ActorRef changeQueueActor,
final ThingEventObserver thingEventObserver,
final double forceUpdateProbability) {
this(pubSubMediator, changeQueueActor, forceUpdateProbability, true, true);
this(pubSubMediator, changeQueueActor, thingEventObserver, forceUpdateProbability, true, true);
}

ThingUpdater(final ActorRef pubSubMediator,
final ActorRef changeQueueActor,
final ThingEventObserver thingEventObserver,
final double forceUpdateProbability,
final boolean loadPreviousState,
final boolean awaitRecovery) {
Expand All @@ -111,6 +114,7 @@ private ThingUpdater(final ActorRef pubSubMediator,
thingId = tryToGetThingId();
shutdownBehaviour = ShutdownBehaviour.fromId(thingId, pubSubMediator, getSelf());
this.changeQueueActor = changeQueueActor;
this.thingEventObserver = thingEventObserver;
this.forceUpdateProbability = forceUpdateProbability;

getContext().setReceiveTimeout(dittoSearchConfig.getUpdaterConfig().getMaxIdleTime());
Expand All @@ -129,13 +133,14 @@ private ThingUpdater(final ActorRef pubSubMediator,
*
* @param pubSubMediator Akka pub-sub mediator.
* @param changeQueueActor reference of the change queue actor.
* @param thingEventObserver the thing event observer
* @param updaterConfig the updater config.
* @return the Akka configuration Props object
*/
static Props props(final ActorRef pubSubMediator, final ActorRef changeQueueActor,
final UpdaterConfig updaterConfig) {
final ThingEventObserver thingEventObserver, final UpdaterConfig updaterConfig) {

return Props.create(ThingUpdater.class, pubSubMediator, changeQueueActor,
return Props.create(ThingUpdater.class, pubSubMediator, changeQueueActor, thingEventObserver,
updaterConfig.getForceUpdateProbability());
}

Expand Down Expand Up @@ -342,6 +347,7 @@ private void processThingEvent(final ThingEvent<?> thingEvent) {
DittoTracing.wrapTimer(DittoTracing.extractTraceContext(thingEvent), timer);
ConsistencyLag.startS0InUpdater(timer);
enqueueMetadata(exportMetadataWithSender(shouldAcknowledge, thingEvent, getSender(), timer));
thingEventObserver.processThingEvent(thingEvent);
}
}

Expand Down
1 change: 1 addition & 0 deletions thingsearch/service/src/main/resources/things-search.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ ditto {
things-search {
query-criteria-validator.implementation = ${?QUERY_CRITERIA_VALIDATOR_IMPLEMENTATION}
search-update-mapper.implementation = ${?SEARCH_UPDATE_MAPPER_IMPLEMENTATION}
thing-event-observer.implementation = ${?THING_EVENT_OBSERVER_IMPLEMENTATION}
mongo-hints-by-namespace = ${?MONGO_HINTS_BY_NAMESPACE}

index-initialization {
Expand Down

0 comments on commit d783418

Please sign in to comment.