Skip to content

Commit

Permalink
[wip] send thing events directly to search updater shard region.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 19, 2022
1 parent e82ec3c commit d6f3100
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.eclipse.ditto.things.service.persistence.actors.strategies.commands.ThingCommandStrategies;
import org.eclipse.ditto.things.service.persistence.actors.strategies.events.ThingEventStrategies;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.RecoveryCompleted;
Expand Down Expand Up @@ -78,17 +79,20 @@ public final class ThingPersistenceActor

private final ThingConfig thingConfig;
private final DistributedPub<ThingEvent<?>> distributedPub;
@Nullable private final ActorRef searchShardRegionProxy;

@SuppressWarnings("unused")
private ThingPersistenceActor(final ThingId thingId,
final DistributedPub<ThingEvent<?>> distributedPub) {
final DistributedPub<ThingEvent<?>> distributedPub,
@Nullable final ActorRef searchShardRegionProxy) {

super(thingId);
final DittoThingsConfig thingsConfig = DittoThingsConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
);
thingConfig = thingsConfig.getThingConfig();
this.distributedPub = distributedPub;
this.searchShardRegionProxy = searchShardRegionProxy;
}

/**
Expand All @@ -99,9 +103,10 @@ private ThingPersistenceActor(final ThingId thingId,
* @return the Akka configuration Props object
*/
public static Props props(final ThingId thingId,
final DistributedPub<ThingEvent<?>> distributedPub) {
final DistributedPub<ThingEvent<?>> distributedPub,
@Nullable final ActorRef searchShardRegionProxy) {

return Props.create(ThingPersistenceActor.class, thingId, distributedPub);
return Props.create(ThingPersistenceActor.class, thingId, distributedPub, searchShardRegionProxy);
}

@Override
Expand Down Expand Up @@ -200,6 +205,9 @@ protected void recoveryCompleted(final RecoveryCompleted event) {
@Override
protected void publishEvent(@Nullable final Thing previousEntity, final ThingEvent<?> event) {
distributedPub.publishWithAcks(event, ACK_EXTRACTOR, getSelf());
if (searchShardRegionProxy != null) {
searchShardRegionProxy.tell(event, getSelf());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
*/
package org.eclipse.ditto.things.service.persistence.actors;

import javax.annotation.Nullable;

import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;

import akka.actor.ActorRef;
import akka.actor.Props;

/**
Expand All @@ -29,7 +32,9 @@ public interface ThingPersistenceActorPropsFactory {
*
* @param thingId the thing ID.
* @param distributedPub the distributed-pub access.
* @param searchShardRegionProxy the proxy of the shard region of search updaters.
* @return Props of the thing-persistence-actor.
*/
Props props(ThingId thingId, DistributedPub<ThingEvent<?>> distributedPub);
Props props(ThingId thingId, DistributedPub<ThingEvent<?>> distributedPub,
@Nullable final ActorRef searchShardRegionProxy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor<Th
private final SupervisorSmartChannelDispatching smartChannelDispatching;

private final PolicyEnforcerProvider policyEnforcerProvider;
private final ActorRef searchShardRegionProxy;

@SuppressWarnings("unused")
private ThingSupervisorActor(final ActorRef pubSubMediator,
Expand Down Expand Up @@ -136,6 +137,9 @@ private ThingSupervisorActor(final ActorRef pubSubMediator,
ThingsMessagingConstants.CLUSTER_ROLE,
ThingsMessagingConstants.SHARD_REGION
);
// TODO
searchShardRegionProxy =
shardRegionProxyActorFactory.getShardRegionProxyActor("search", "search-wildcard-updater");

try {
inlinePolicyEnrichment = new SupervisorInlinePolicyEnrichment(getContext().getSystem(), log, getEntityId(),
Expand Down Expand Up @@ -302,13 +306,14 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<

@Override
protected ThingId getEntityId() throws Exception {
return ThingId.of(URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8.name()));
return ThingId.of(URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8));
}

@Override
protected Props getPersistenceActorProps(final ThingId entityId) {
assert thingPersistenceActorPropsFactory != null;
return thingPersistenceActorPropsFactory.props(entityId, distributedPubThingEventsForTwin);
return thingPersistenceActorPropsFactory.props(entityId, distributedPubThingEventsForTwin,
searchShardRegionProxy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.argumentNotEmpty;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
Expand All @@ -22,6 +23,7 @@
import org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor;
import org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActorPropsFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

Expand Down Expand Up @@ -49,8 +51,9 @@ static DefaultThingPersistenceActorPropsFactory of(final ActorSystem actorSystem
}

@Override
public Props props(final ThingId thingId, final DistributedPub<ThingEvent<?>> distributedPub) {
public Props props(final ThingId thingId, final DistributedPub<ThingEvent<?>> distributedPub,
@Nullable final ActorRef searchShardRegionProxy) {
argumentNotEmpty(thingId);
return ThingPersistenceActor.props(thingId, distributedPub);
return ThingPersistenceActor.props(thingId, distributedPub, searchShardRegionProxy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ protected ActorRef createPersistenceActorWithPubSubFor(final ThingId thingId) {
}

private Props getPropsOfThingPersistenceActor(final ThingId thingId, final DistributedPub<ThingEvent<?>> pub) {
return ThingPersistenceActor.props(thingId, pub);
return ThingPersistenceActor.props(thingId, pub, null);
}

protected ActorRef createSupervisorActorFor(final ThingId thingId) {
Expand All @@ -257,7 +257,7 @@ public <S extends ThingEvent<?>> Object wrapForPublicationWithAcks(final S messa
}
},
liveSignalPub,
this::getPropsOfThingPersistenceActor,
(thingId1, pub, searchShardRegionProxy) -> getPropsOfThingPersistenceActor(thingId1, pub),
null,
policyEnforcerProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void tryToCreateThingWithDifferentThingId() {
final Thing thing = createThingV2WithRandomId();
final CreateThing createThing = CreateThing.of(thing, null, dittoHeadersV2);

final Props props = ThingPersistenceActor.props(thingIdOfActor, getDistributedPub());
final Props props = ThingPersistenceActor.props(thingIdOfActor, getDistributedPub(), null);
final TestActorRef<ThingPersistenceActor> underTest = TestActorRef.create(actorSystem, props);
final ThingPersistenceActor thingPersistenceActor = underTest.underlyingActor();
final PartialFunction<Object, BoxedUnit> receiveCommand = thingPersistenceActor.receiveCommand();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public <S extends ThingEvent<?>> Object wrapForPublicationWithAcks(final S messa
}
},
liveSignalPub,
ThingPersistenceActor::props,
(thingId, distributedPub, searchShardRegionProxy) -> ThingPersistenceActor.props(thingId,
distributedPub, null),
null,
policyEnforcerProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,21 @@ public Receive createReceive() {
}

private void retrieveClusterShardingStats(final Clock rebalanceTick) {
shardRegion.tell(getClusterShardingStats, getSelf());
// TODO
// shardRegion.tell(getClusterShardingStats, getSelf());
}

private void updateSubscriptions(final ShardRegion.ClusterShardingStats stats) {
final Set<String> inactiveShardIds = shardRegionExtractor.getInactiveShardIds(getActiveShardIds(stats));
log.debug("Updating event subscriptions for inactive shards: <{}> -> <{}>", previousShardIds, inactiveShardIds);
final List<String> toSubscribe =
inactiveShardIds.stream().filter(s -> !previousShardIds.contains(s)).collect(Collectors.toList());
final List<String> toUnsubscribe =
previousShardIds.stream().filter(s -> !inactiveShardIds.contains(s)).collect(Collectors.toList());
thingEventSub.subscribeWithoutAck(toSubscribe, getSelf());
thingEventSub.unsubscribeWithoutAck(toUnsubscribe, getSelf());
previousShardIds = inactiveShardIds;
// TODO
// final Set<String> inactiveShardIds = shardRegionExtractor.getInactiveShardIds(getActiveShardIds(stats));
// log.debug("Updating event subscriptions for inactive shards: <{}> -> <{}>", previousShardIds, inactiveShardIds);
// final List<String> toSubscribe =
// inactiveShardIds.stream().filter(s -> !previousShardIds.contains(s)).collect(Collectors.toList());
// final List<String> toUnsubscribe =
// previousShardIds.stream().filter(s -> !inactiveShardIds.contains(s)).collect(Collectors.toList());
// thingEventSub.subscribeWithoutAck(toSubscribe, getSelf());
// thingEventSub.unsubscribeWithoutAck(toUnsubscribe, getSelf());
// previousShardIds = inactiveShardIds;
}

private void processThingEvent(final ThingEvent<?> thingEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,12 @@ private void updateSubscriptions(final ShardRegion.ShardRegionStats stats) {
final Set<String> currentShardIds = stats.getStats().keySet();
log.debug("Updating event subscriptions: <{}> -> <{}>", previousShardIds, currentShardIds);
final List<String> toSubscribe =
currentShardIds.stream().filter(s -> !previousShardIds.contains(s)).collect(Collectors.toList());
currentShardIds.stream().filter(s -> !previousShardIds.contains(s)).toList();
final List<String> toUnsubscribe =
previousShardIds.stream().filter(s -> !currentShardIds.contains(s)).collect(Collectors.toList());
thingEventSub.subscribeWithoutAck(toSubscribe, getSelf());
thingEventSub.unsubscribeWithoutAck(toUnsubscribe, getSelf());
previousShardIds.stream().filter(s -> !currentShardIds.contains(s)).toList();
// TODO
// thingEventSub.subscribeWithoutAck(toSubscribe, getSelf());
// thingEventSub.unsubscribeWithoutAck(toUnsubscribe, getSelf());
previousShardIds = currentShardIds;
}

Expand Down

0 comments on commit d6f3100

Please sign in to comment.