Skip to content

Commit

Permalink
Issue #559: Introduced factory for creating shard region proxy actors.
Browse files Browse the repository at this point in the history
Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Oct 29, 2021
1 parent 7a68338 commit 32c5b1f
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,54 @@
*/
package org.eclipse.ditto.concierge.service.actors;

import java.util.Optional;

import org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.sharding.ClusterSharding;

/**
* Create and retrieve shard region proxies.
*/
public final class ShardRegions {

private final ClusterSharding clusterSharding;
private final ShardRegionExtractor extractor;
private final ShardRegionProxyActorFactory shardRegionProxyActorFactory;
private final ActorRef policies;
private final ActorRef things;
private final ActorRef connections;

private ShardRegions(final ActorSystem system, final ClusterConfig clusterConfig) {
clusterSharding = ClusterSharding.get(system);
extractor = ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), system);
policies = startShardRegionProxy(clusterSharding, extractor, PoliciesMessagingConstants.CLUSTER_ROLE,
PoliciesMessagingConstants.SHARD_REGION);
private ShardRegions(final ShardRegionProxyActorFactory shardRegionProxyActorFactory) {
this.shardRegionProxyActorFactory = shardRegionProxyActorFactory;
policies =
startShardRegionProxy(PoliciesMessagingConstants.CLUSTER_ROLE, PoliciesMessagingConstants.SHARD_REGION);

things = startShardRegionProxy(clusterSharding, extractor, ThingsMessagingConstants.CLUSTER_ROLE,
ThingsMessagingConstants.SHARD_REGION);
things = startShardRegionProxy(ThingsMessagingConstants.CLUSTER_ROLE, ThingsMessagingConstants.SHARD_REGION);

connections = startShardRegionProxy(clusterSharding, extractor, ConnectivityMessagingConstants.CLUSTER_ROLE,
connections = startShardRegionProxy(ConnectivityMessagingConstants.CLUSTER_ROLE,
ConnectivityMessagingConstants.SHARD_REGION);
}

private ActorRef startShardRegionProxy(final CharSequence clusterRole, final CharSequence shardRegionName) {
return shardRegionProxyActorFactory.getShardRegionProxyActor(clusterRole, shardRegionName);
}

/**
* Create a set of shard region proxies
*
* @param actorSystem the actor system.
* @param clusterConfig the cluster config of the actor system.
* @return a new ShardRegions object.
* @throws NullPointerException if any argument is {@code null}.
*/
public static ShardRegions of(final ActorSystem actorSystem, final ClusterConfig clusterConfig) {
return new ShardRegions(actorSystem, clusterConfig);
return new ShardRegions(ShardRegionProxyActorFactory.newInstance(actorSystem, clusterConfig));
}

/**
* Return the policies shard region proxy.
* Return the policies' shard region proxy.
*
* @return policies shard region proxy.
*/
Expand All @@ -69,7 +68,7 @@ public ActorRef policies() {
}

/**
* Return the things shard region proxy.
* Return the things' shard region proxy.
*
* @return things shard region proxy.
*/
Expand All @@ -78,28 +77,12 @@ public ActorRef things() {
}

/**
* Return the connections shard region proxy.
* Return the connections' shard region proxy.
*
* @return connections shard region proxy.
*/
public ActorRef connections() {
return connections;
}

/**
* Start proxy of a shard region of one's choosing. The actor reference is not cached.
*
* @param shardRegionName name of the shard region.
* @param clusterRole role of cluster members where the shard region resides.
* @return reference of the shard region proxy.
*/
public ActorRef startProxy(final String shardRegionName, final String clusterRole) {
return startShardRegionProxy(clusterSharding, extractor, clusterRole, shardRegionName);
}

private static ActorRef startShardRegionProxy(final ClusterSharding clusterSharding,
final ShardRegionExtractor extractor, final String clusterRole, final String shardRegionName) {

return clusterSharding.startProxy(shardRegionName, Optional.of(clusterRole), extractor);
}
}
6 changes: 6 additions & 0 deletions internal/utils/cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-akka</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
@Immutable
public abstract class MappingStrategies implements Map<String, JsonParsable<Jsonifiable<?>>> {

private static final String CONFIG_KEY_DITTO_MAPPING_STRATEGY_IMPLEMENTATION =
static final String CONFIG_KEY_DITTO_MAPPING_STRATEGY_IMPLEMENTATION =
"ditto.mapping-strategy.implementation";

private final Map<String, JsonParsable<Jsonifiable<?>>> strategies;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.cluster;

import java.util.Optional;

import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.sharding.ClusterSharding;

/**
* Factory for creating shard region proxy actors.
*
* @since 2.2.0
*/
@NotThreadSafe
public final class ShardRegionProxyActorFactory {

private final ShardRegionExtractor extractor;
private final ClusterSharding clusterSharding;

private ShardRegionProxyActorFactory(final ShardRegionExtractor extractor, final ClusterSharding clusterSharding) {
this.extractor = extractor;
this.clusterSharding = clusterSharding;
}

/**
* Returns a new instance of {@code ShardRegionFactory}.
*
* @return the instance.
* @throws NullPointerException if any argument is {@code null}.
*/
public static ShardRegionProxyActorFactory newInstance(final ActorSystem actorSystem,
final ClusterConfig clusterConfig) {

ConditionChecker.checkNotNull(actorSystem, "actorSystem");
ConditionChecker.checkNotNull(clusterConfig, "clusterConfig");

return new ShardRegionProxyActorFactory(ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), actorSystem),
ClusterSharding.get(actorSystem));
}

/**
* Starts a proxy of a shard region specified by the cluster role and shard region name arguments.
* The actor reference is not being cached.
*
* @param shardRegionName name of the shard region.
* @param clusterRole role of cluster members where the shard region resides.
* @return reference of the shard region proxy.
* @throws NullPointerException if any argument is {@code null}.
* @throws IllegalArgumentException if any argument is empty.
*/
public ActorRef getShardRegionProxyActor(final CharSequence clusterRole, final CharSequence shardRegionName) {
ConditionChecker.argumentNotEmpty(clusterRole, "clusterRole");
ConditionChecker.argumentNotEmpty(shardRegionName, "shardRegionName");

return clusterSharding.startProxy(shardRegionName.toString(), Optional.of(clusterRole.toString()), extractor);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.cluster;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;

import java.util.Map;

import org.eclipse.ditto.internal.utils.akka.ActorSystemResource;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import com.typesafe.config.ConfigFactory;

/**
* Unit test for {@link ShardRegionProxyActorFactoryTest}.
*/
@RunWith(MockitoJUnitRunner.class)
public final class ShardRegionProxyActorFactoryTest {

@ClassRule
public static final ActorSystemResource ACTOR_SYSTEM_RESOURCE = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.ofEntries(
Map.entry(MappingStrategies.CONFIG_KEY_DITTO_MAPPING_STRATEGY_IMPLEMENTATION,
TestMappingStrategies.class.getName()),
Map.entry("akka.actor.provider", "cluster")
))
);

private static final int NUMBER_OF_SHARDS = 3;

@Mock
private ClusterConfig clusterConfig;

@Before
public void before() {
Mockito.when(clusterConfig.getNumberOfShards()).thenReturn(NUMBER_OF_SHARDS);
}

@Test
public void newInstanceWithNullActorSystemThrowsException() {
assertThatNullPointerException()
.isThrownBy(() -> ShardRegionProxyActorFactory.newInstance(null, clusterConfig))
.withMessage("The actorSystem must not be null!")
.withNoCause();
}

@Test
public void newInstanceWithNullClusterConfigThrowsException() {
assertThatNullPointerException()
.isThrownBy(() -> ShardRegionProxyActorFactory.newInstance(ACTOR_SYSTEM_RESOURCE.getActorSystem(),
null))
.withMessage("The clusterConfig must not be null!")
.withNoCause();
}

@Test
public void newInstanceReturnsNotNull() {
final var underTest =
ShardRegionProxyActorFactory.newInstance(ACTOR_SYSTEM_RESOURCE.getActorSystem(), clusterConfig);

assertThat(underTest).isNotNull();
}

@Test
public void getShardRegionProxyActorWithNullClusterRoleThrowsException() {
final var underTest =
ShardRegionProxyActorFactory.newInstance(ACTOR_SYSTEM_RESOURCE.getActorSystem(), clusterConfig);

assertThatNullPointerException()
.isThrownBy(() -> underTest.getShardRegionProxyActor(null, "myShardRegion"))
.withMessage("The clusterRole must not be null!")
.withNoCause();
}

@Test
public void getShardRegionProxyActorWithEmptyClusterRoleThrowsException() {
final var underTest =
ShardRegionProxyActorFactory.newInstance(ACTOR_SYSTEM_RESOURCE.getActorSystem(), clusterConfig);

assertThatIllegalArgumentException()
.isThrownBy(() -> underTest.getShardRegionProxyActor("", "myShardRegion"))
.withMessage("The argument 'clusterRole' must not be empty!")
.withNoCause();
}

@Test
public void getShardRegionProxyActorWithNullShardRegionNameThrowsException() {
final var underTest =
ShardRegionProxyActorFactory.newInstance(ACTOR_SYSTEM_RESOURCE.getActorSystem(), clusterConfig);

assertThatNullPointerException()
.isThrownBy(() -> underTest.getShardRegionProxyActor("myClusterRole", null))
.withMessage("The shardRegionName must not be null!")
.withNoCause();
}

@Test
public void getShardRegionProxyActorWithEmptyShardRegionNameThrowsException() {
final var underTest =
ShardRegionProxyActorFactory.newInstance(ACTOR_SYSTEM_RESOURCE.getActorSystem(), clusterConfig);

assertThatIllegalArgumentException()
.isThrownBy(() -> underTest.getShardRegionProxyActor("myClusterRole", ""))
.withMessage("The argument 'shardRegionName' must not be empty!")
.withNoCause();
}

@Test
public void getShardRegionProxyActorReturnsNotNull() {
final var underTest =
ShardRegionProxyActorFactory.newInstance(ACTOR_SYSTEM_RESOURCE.getActorSystem(), clusterConfig);

final var shardRegionProxyActor = underTest.getShardRegionProxyActor("myClusterRole", "myShardRegionName");

assertThat(shardRegionProxyActor).isNotNull();
}

public static final class TestMappingStrategies extends MappingStrategies {

public TestMappingStrategies() {
super(Map.of());
}

}

}

0 comments on commit 32c5b1f

Please sign in to comment.