diff --git a/distributed-commandbus-springcloud/src/main/java/org/axonframework/springcloud/commandhandling/SpringCloudCommandRouter.java b/distributed-commandbus-springcloud/src/main/java/org/axonframework/springcloud/commandhandling/SpringCloudCommandRouter.java index da0e897898..7589044d3c 100644 --- a/distributed-commandbus-springcloud/src/main/java/org/axonframework/springcloud/commandhandling/SpringCloudCommandRouter.java +++ b/distributed-commandbus-springcloud/src/main/java/org/axonframework/springcloud/commandhandling/SpringCloudCommandRouter.java @@ -13,6 +13,7 @@ import java.net.URI; import java.util.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -25,7 +26,7 @@ public class SpringCloudCommandRouter implements CommandRouter { private final DiscoveryClient discoveryClient; private final RoutingStrategy routingStrategy; private final Serializer serializer; - private ConsistentHash consistentHash = new ConsistentHash(); + private final AtomicReference atomicConsistentHash = new AtomicReference<>(new ConsistentHash()); public SpringCloudCommandRouter(DiscoveryClient discoveryClient, RoutingStrategy routingStrategy, Serializer serializer) { @@ -36,7 +37,7 @@ public SpringCloudCommandRouter(DiscoveryClient discoveryClient, RoutingStrategy @Override public Optional findDestination(CommandMessage commandMessage) { - return consistentHash.getMember(routingStrategy.getRoutingKey(commandMessage), commandMessage); + return atomicConsistentHash.get().getMember(routingStrategy.getRoutingKey(commandMessage), commandMessage); } @Override @@ -68,7 +69,9 @@ private void updateMemberships(Set serviceInstances) { serviceInstances.forEach(serviceInstance -> { SimpleMember simpleMember = new SimpleMember<>( serviceInstance.getServiceId().toUpperCase(), - serviceInstance.getUri(), member -> consistentHash.without(member)); + serviceInstance.getUri(), + member -> atomicConsistentHash.updateAndGet(consistentHash -> consistentHash.without(member)) + ); Map serviceInstanceMetadata = serviceInstance.getMetadata(); @@ -78,7 +81,9 @@ private void updateMemberships(Set serviceInstances) { serviceInstanceMetadata.get(SERIALIZED_COMMAND_FILTER_CLASS_NAME), null); CommandNameFilter commandNameFilter = serializer.deserialize(serializedObject); - consistentHash = consistentHash.with(simpleMember, loadFactor, commandNameFilter); + atomicConsistentHash.updateAndGet( + consistentHash -> consistentHash.with(simpleMember, loadFactor, commandNameFilter) + ); }); } diff --git a/distributed-commandbus-springcloud/src/test/java/org/axonframework/springcloud/commandhandling/SpringCloudCommandRouterTest.java b/distributed-commandbus-springcloud/src/test/java/org/axonframework/springcloud/commandhandling/SpringCloudCommandRouterTest.java index 528a47d629..78aa80bfac 100644 --- a/distributed-commandbus-springcloud/src/test/java/org/axonframework/springcloud/commandhandling/SpringCloudCommandRouterTest.java +++ b/distributed-commandbus-springcloud/src/test/java/org/axonframework/springcloud/commandhandling/SpringCloudCommandRouterTest.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; import static org.mockito.Matchers.any; @@ -56,7 +57,7 @@ public class SpringCloudCommandRouterTest { @Mock private Serializer serializer; - private Field consistentHashField; + private Field atomicConsistentHashField; @Mock private SerializedObject serializedObject; private HashMap serviceInstanceMetadata; @@ -66,8 +67,8 @@ public class SpringCloudCommandRouterTest { @Before public void setUp() throws Exception { - String consistentHashFieldName = "consistentHash"; - consistentHashField = SpringCloudCommandRouter.class.getDeclaredField(consistentHashFieldName); + String atomicConsistentHashFieldName = "atomicConsistentHash"; + atomicConsistentHashField = SpringCloudCommandRouter.class.getDeclaredField(atomicConsistentHashFieldName); SerializedType serializedType = mock(SerializedType.class); when(serializedType.getName()).thenReturn(SERIALIZED_COMMAND_FILTER_CLASS_NAME); @@ -103,8 +104,9 @@ public void testFindDestinationReturnsEmptyOptionalMemberForCommandMessage() thr @Test public void testFindDestinationReturnsMemberForCommandMessage() throws Exception { SimpleMember testMember = new SimpleMember<>(SERVICE_INSTANCE_ID, SERVICE_INSTANCE_URI, null); - ConsistentHash testConsistentHash = new ConsistentHash().with(testMember, LOAD_FACTOR, commandMessage -> true); - ReflectionUtils.setFieldValue(consistentHashField, testSubject, testConsistentHash); + AtomicReference testAtomicConsistentHash = + new AtomicReference<>(new ConsistentHash().with(testMember, LOAD_FACTOR, commandMessage -> true)); + ReflectionUtils.setFieldValue(atomicConsistentHashField, testSubject, testAtomicConsistentHash); Optional resultOptional = testSubject.findDestination(TEST_COMMAND); @@ -133,9 +135,10 @@ public void testUpdateMembershipUpdatesLocalServiceInstance() throws Exception { public void testUpdateMemberShipUpdatesConsistentHash() throws Exception { testSubject.updateMembership(LOAD_FACTOR, COMMAND_NAME_FILTER); - ConsistentHash resultConsistentHash = ReflectionUtils.getFieldValue(consistentHashField, testSubject); + AtomicReference resultAtomicConsistentHash = + ReflectionUtils.getFieldValue(atomicConsistentHashField, testSubject); - Set resultMemberSet = resultConsistentHash.getMembers(); + Set resultMemberSet = resultAtomicConsistentHash.get().getMembers(); assertFalse(resultMemberSet.isEmpty()); assertMember(SERVICE_INSTANCE_ID, SERVICE_INSTANCE_URI, resultMemberSet.iterator().next()); @@ -153,9 +156,10 @@ public void testUpdateMembershipsOnHeartbeatEventUpdatesConsistentHash() throws testSubject.updateMemberships(mock(HeartbeatEvent.class)); - ConsistentHash resultConsistentHash = ReflectionUtils.getFieldValue(consistentHashField, testSubject); + AtomicReference resultAtomicConsistentHash = + ReflectionUtils.getFieldValue(atomicConsistentHashField, testSubject); - Set resultMemberSet = resultConsistentHash.getMembers(); + Set resultMemberSet = resultAtomicConsistentHash.get().getMembers(); assertFalse(resultMemberSet.isEmpty()); assertMember(SERVICE_INSTANCE_ID, SERVICE_INSTANCE_URI, resultMemberSet.iterator().next()); @@ -184,9 +188,10 @@ public void testUpdateMembershipsOnHeartbeatEventFiltersInstancesWithoutCommandR testSubject.updateMemberships(mock(HeartbeatEvent.class)); - ConsistentHash resultConsistentHash = ReflectionUtils.getFieldValue(consistentHashField, testSubject); + AtomicReference resultAtomicConsistentHash = + ReflectionUtils.getFieldValue(atomicConsistentHashField, testSubject); - Set resultMemberSet = resultConsistentHash.getMembers(); + Set resultMemberSet = resultAtomicConsistentHash.get().getMembers(); assertEquals(expectedMemberSetSize, resultMemberSet.size()); verify(discoveryClient).getServices();