-
Notifications
You must be signed in to change notification settings - Fork 215
/
NewEventForwarder.java
152 lines (128 loc) · 6.5 KB
/
NewEventForwarder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.updater.actors;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockNamespaceBehavior;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.internal.utils.pubsub.DistributedSub;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.common.config.UpdaterConfig;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.sharding.ShardRegion;
import akka.japi.pf.ReceiveBuilder;
import scala.concurrent.duration.Duration;
/**
* This Actor forwards thing events belonging to inactive shard regions.
*/
final class NewEventForwarder extends AbstractActorWithTimers {
/**
* The name of this Actor in the ActorSystem.
*/
static final String ACTOR_NAME = "newEventForwarder";
private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
private final ActorRef shardRegion;
private final DistributedSub thingEventSub;
private final BlockNamespaceBehavior namespaceBlockingBehavior;
private final ShardRegion.GetClusterShardingStats getClusterShardingStats;
private final ShardRegionExtractor shardRegionExtractor;
private Set<String> previousShardIds = Collections.emptySet();
@SuppressWarnings("unused")
private NewEventForwarder(final DistributedSub thingEventSub,
final ActorRef thingUpdaterShardRegion,
final BlockedNamespaces blockedNamespaces) {
this.thingEventSub = thingEventSub;
final DittoSearchConfig searchConfig =
DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
final UpdaterConfig updaterConfig = searchConfig.getUpdaterConfig();
final ClusterConfig clusterConfig = searchConfig.getClusterConfig();
shardRegion = thingUpdaterShardRegion;
namespaceBlockingBehavior = BlockNamespaceBehavior.of(blockedNamespaces);
getClusterShardingStats = new ShardRegion.GetClusterShardingStats(
Duration.create(updaterConfig.getShardingStatePollInterval().toMillis(), TimeUnit.MILLISECONDS));
shardRegionExtractor = ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), getContext().getSystem());
if (updaterConfig.isEventProcessingActive()) {
// schedule regular updates of subscriptions
getTimers().startTimerAtFixedRate(Clock.REBALANCE_TICK, Clock.REBALANCE_TICK,
updaterConfig.getShardingStatePollInterval());
// subscribe for thing events immediately
getSelf().tell(Clock.REBALANCE_TICK, getSelf());
}
}
/**
* Creates Akka configuration object for this actor.
*
* @param thingEventSub Ditto distributed-sub access for thing events.
* @param thingUpdaterShardRegion shard region of thing-updaters
* @param blockedNamespaces cache of namespaces to block.
* @return the Akka configuration Props object
*/
static Props props(final DistributedSub thingEventSub,
final ActorRef thingUpdaterShardRegion,
final BlockedNamespaces blockedNamespaces) {
return Props.create(NewEventForwarder.class, thingEventSub, thingUpdaterShardRegion, blockedNamespaces);
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ThingEvent.class, this::processThingEvent)
.matchEquals(Clock.REBALANCE_TICK, this::retrieveClusterShardingStats)
.match(ShardRegion.ClusterShardingStats.class, this::updateSubscriptions)
.matchAny(m -> {
log.warning("Unknown message: {}", m);
unhandled(m);
}).build();
}
private void retrieveClusterShardingStats(final Clock rebalanceTick) {
shardRegion.tell(getClusterShardingStats, getSelf());
}
private void updateSubscriptions(final ShardRegion.ClusterShardingStats stats) {
final Set<String> inactiveShardIds = shardRegionExtractor.getInactiveShardIds(getActiveShardIds(stats));
log.debug("Updating event subscriptions for inactive shards: <{}> -> <{}>", previousShardIds, inactiveShardIds);
final List<String> toSubscribe =
inactiveShardIds.stream().filter(s -> !previousShardIds.contains(s)).collect(Collectors.toList());
final List<String> toUnsubscribe =
previousShardIds.stream().filter(s -> !inactiveShardIds.contains(s)).collect(Collectors.toList());
thingEventSub.subscribeWithoutAck(toSubscribe, getSelf());
thingEventSub.unsubscribeWithoutAck(toUnsubscribe, getSelf());
previousShardIds = inactiveShardIds;
}
private void processThingEvent(final ThingEvent<?> thingEvent) {
log.withCorrelationId(thingEvent)
.debug("Forwarding incoming ThingEvent for thingId '{}'", thingEvent.getEntityId());
final ActorRef sender = getSender();
namespaceBlockingBehavior.block(thingEvent).thenAccept(m -> shardRegion.tell(m, sender));
}
private static Collection<String> getActiveShardIds(final ShardRegion.ClusterShardingStats stats) {
return stats.getRegions()
.values()
.stream()
.flatMap(shardRegionStats -> shardRegionStats.getStats().keySet().stream())
.collect(Collectors.toList());
}
private enum Clock {
REBALANCE_TICK
}
}