-
Notifications
You must be signed in to change notification settings - Fork 214
/
ShardRegionFactory.java
125 lines (106 loc) · 4.58 KB
/
ShardRegionFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.updater.actors;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
/**
* Factory for Shard Region {@link ActorRef}s of different services.
*/
@NotThreadSafe
public final class ShardRegionFactory {
static final String UPDATER_SHARD_REGION = ThingsSearchConstants.SHARD_REGION;
private final ActorSystem actorSystem;
private ShardRegionFactory(final ActorSystem theActorSystem) {
actorSystem = theActorSystem;
}
/**
* Returns an instance of {@code ShardRegionFactory} for the given ActorSystem.
*
* @param actorSystem the actor system for registering the cluster sharding.
* @return the instance.
* @throws NullPointerException if {@code actorSystem} is {@code null}.
*/
@Nonnull
public static ShardRegionFactory getInstance(@Nonnull final ActorSystem actorSystem) {
checkNotNull(actorSystem, "Actor system");
return new ShardRegionFactory(actorSystem);
}
/**
* Returns a new Sharding Region for Things.
*
* @param numberOfShards the number of shards to use.
* @return the Sharding Region.
*/
@Nonnull
public ActorRef getThingsShardRegion(final int numberOfShards) {
return createShardRegionProxy(ThingsMessagingConstants.SHARD_REGION, ThingsMessagingConstants.CLUSTER_ROLE,
numberOfShards);
}
/**
* Returns a new Sharding Region for Policies.
*
* @param numberOfShards the number of shards to use.
* @return the Sharding Region.
*/
@Nonnull
public ActorRef getPoliciesShardRegion(final int numberOfShards) {
return createShardRegionProxy(PoliciesMessagingConstants.SHARD_REGION, PoliciesMessagingConstants.CLUSTER_ROLE,
numberOfShards);
}
private ActorRef createShardRegionProxy(final String shardRegion, final String clusterRole,
final int numberOfShards) {
final ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);
final ShardRegionExtractor shardRegionExtractor = ShardRegionExtractor.of(numberOfShards, actorSystem);
return clusterSharding.startProxy(shardRegion, Optional.of(clusterRole), shardRegionExtractor);
}
/**
* Returns a new Sharding Region for the Search Updater.
*
* @param numberOfShards the number of shards to use.
* @param thingUpdaterProps the Props of the ThingUpdater actor.
* @return the Sharding Region.
* @throws NullPointerException if {@code thingUpdaterProps} is {@code null}.
*/
@Nonnull
public ActorRef getSearchUpdaterShardRegion(final int numberOfShards, @Nonnull final Props thingUpdaterProps,
final String clusterRole) {
return createShardRegion(numberOfShards, thingUpdaterProps, UPDATER_SHARD_REGION, clusterRole);
}
/**
* Create a new shard region.
*
* @param shards number of shards.
* @param props props of actors in the shard region.
* @param name name of the shard region.
* @param role cluster role where the shard region starts.
* @return the shard region.
*/
public ActorRef createShardRegion(final int shards, final Props props, final String name, final String role) {
final ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);
final ClusterShardingSettings shardingSettings =
ClusterShardingSettings.create(actorSystem).withRole(role);
final ShardRegionExtractor shardRegionExtractor = ShardRegionExtractor.of(shards, actorSystem);
return clusterSharding.start(name, props, shardingSettings, shardRegionExtractor);
}
}