Skip to content

Commit

Permalink
Wrap the ConsistentHash in an AtomicReference in the SpringCloudComma…
Browse files Browse the repository at this point in the history
…ndRouter

Wrap the ConsistentHash in an AtomicReference the ensure no 2+ threads can access the ConsistentHash at the same time to overcome potential issues.
  • Loading branch information
stevenb committed Nov 25, 2016
1 parent aac5210 commit 5d201dd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
Expand Up @@ -13,6 +13,7 @@


import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;


Expand All @@ -25,7 +26,7 @@ public class SpringCloudCommandRouter implements CommandRouter {
private final DiscoveryClient discoveryClient; private final DiscoveryClient discoveryClient;
private final RoutingStrategy routingStrategy; private final RoutingStrategy routingStrategy;
private final Serializer serializer; private final Serializer serializer;
private ConsistentHash consistentHash = new ConsistentHash(); private final AtomicReference<ConsistentHash> atomicConsistentHash = new AtomicReference<>(new ConsistentHash());


public SpringCloudCommandRouter(DiscoveryClient discoveryClient, RoutingStrategy routingStrategy, public SpringCloudCommandRouter(DiscoveryClient discoveryClient, RoutingStrategy routingStrategy,
Serializer serializer) { Serializer serializer) {
Expand All @@ -36,7 +37,7 @@ public SpringCloudCommandRouter(DiscoveryClient discoveryClient, RoutingStrategy


@Override @Override
public Optional<Member> findDestination(CommandMessage<?> commandMessage) { public Optional<Member> findDestination(CommandMessage<?> commandMessage) {
return consistentHash.getMember(routingStrategy.getRoutingKey(commandMessage), commandMessage); return atomicConsistentHash.get().getMember(routingStrategy.getRoutingKey(commandMessage), commandMessage);
} }


@Override @Override
Expand Down Expand Up @@ -68,7 +69,9 @@ private void updateMemberships(Set<ServiceInstance> serviceInstances) {
serviceInstances.forEach(serviceInstance -> { serviceInstances.forEach(serviceInstance -> {
SimpleMember<URI> simpleMember = new SimpleMember<>( SimpleMember<URI> simpleMember = new SimpleMember<>(
serviceInstance.getServiceId().toUpperCase(), serviceInstance.getServiceId().toUpperCase(),
serviceInstance.getUri(), member -> consistentHash.without(member)); serviceInstance.getUri(),
member -> atomicConsistentHash.updateAndGet(consistentHash -> consistentHash.without(member))
);


Map<String, String> serviceInstanceMetadata = serviceInstance.getMetadata(); Map<String, String> serviceInstanceMetadata = serviceInstance.getMetadata();


Expand All @@ -78,7 +81,9 @@ private void updateMemberships(Set<ServiceInstance> serviceInstances) {
serviceInstanceMetadata.get(SERIALIZED_COMMAND_FILTER_CLASS_NAME), null); serviceInstanceMetadata.get(SERIALIZED_COMMAND_FILTER_CLASS_NAME), null);
CommandNameFilter commandNameFilter = serializer.deserialize(serializedObject); CommandNameFilter commandNameFilter = serializer.deserialize(serializedObject);


consistentHash = consistentHash.with(simpleMember, loadFactor, commandNameFilter); atomicConsistentHash.updateAndGet(
consistentHash -> consistentHash.with(simpleMember, loadFactor, commandNameFilter)
);
}); });
} }


Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;


import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
Expand All @@ -56,7 +57,7 @@ public class SpringCloudCommandRouterTest {
@Mock @Mock
private Serializer serializer; private Serializer serializer;


private Field consistentHashField; private Field atomicConsistentHashField;
@Mock @Mock
private SerializedObject<String> serializedObject; private SerializedObject<String> serializedObject;
private HashMap<String, String> serviceInstanceMetadata; private HashMap<String, String> serviceInstanceMetadata;
Expand All @@ -66,8 +67,8 @@ public class SpringCloudCommandRouterTest {


@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
String consistentHashFieldName = "consistentHash"; String atomicConsistentHashFieldName = "atomicConsistentHash";
consistentHashField = SpringCloudCommandRouter.class.getDeclaredField(consistentHashFieldName); atomicConsistentHashField = SpringCloudCommandRouter.class.getDeclaredField(atomicConsistentHashFieldName);


SerializedType serializedType = mock(SerializedType.class); SerializedType serializedType = mock(SerializedType.class);
when(serializedType.getName()).thenReturn(SERIALIZED_COMMAND_FILTER_CLASS_NAME); when(serializedType.getName()).thenReturn(SERIALIZED_COMMAND_FILTER_CLASS_NAME);
Expand Down Expand Up @@ -103,8 +104,9 @@ public void testFindDestinationReturnsEmptyOptionalMemberForCommandMessage() thr
@Test @Test
public void testFindDestinationReturnsMemberForCommandMessage() throws Exception { public void testFindDestinationReturnsMemberForCommandMessage() throws Exception {
SimpleMember<URI> testMember = new SimpleMember<>(SERVICE_INSTANCE_ID, SERVICE_INSTANCE_URI, null); SimpleMember<URI> testMember = new SimpleMember<>(SERVICE_INSTANCE_ID, SERVICE_INSTANCE_URI, null);
ConsistentHash testConsistentHash = new ConsistentHash().with(testMember, LOAD_FACTOR, commandMessage -> true); AtomicReference<ConsistentHash> testAtomicConsistentHash =
ReflectionUtils.setFieldValue(consistentHashField, testSubject, testConsistentHash); new AtomicReference<>(new ConsistentHash().with(testMember, LOAD_FACTOR, commandMessage -> true));
ReflectionUtils.setFieldValue(atomicConsistentHashField, testSubject, testAtomicConsistentHash);


Optional<Member> resultOptional = testSubject.findDestination(TEST_COMMAND); Optional<Member> resultOptional = testSubject.findDestination(TEST_COMMAND);


Expand Down Expand Up @@ -133,9 +135,10 @@ public void testUpdateMembershipUpdatesLocalServiceInstance() throws Exception {
public void testUpdateMemberShipUpdatesConsistentHash() throws Exception { public void testUpdateMemberShipUpdatesConsistentHash() throws Exception {
testSubject.updateMembership(LOAD_FACTOR, COMMAND_NAME_FILTER); testSubject.updateMembership(LOAD_FACTOR, COMMAND_NAME_FILTER);


ConsistentHash resultConsistentHash = ReflectionUtils.getFieldValue(consistentHashField, testSubject); AtomicReference<ConsistentHash> resultAtomicConsistentHash =
ReflectionUtils.getFieldValue(atomicConsistentHashField, testSubject);


Set<Member> resultMemberSet = resultConsistentHash.getMembers(); Set<Member> resultMemberSet = resultAtomicConsistentHash.get().getMembers();
assertFalse(resultMemberSet.isEmpty()); assertFalse(resultMemberSet.isEmpty());


assertMember(SERVICE_INSTANCE_ID, SERVICE_INSTANCE_URI, resultMemberSet.iterator().next()); assertMember(SERVICE_INSTANCE_ID, SERVICE_INSTANCE_URI, resultMemberSet.iterator().next());
Expand All @@ -153,9 +156,10 @@ public void testUpdateMembershipsOnHeartbeatEventUpdatesConsistentHash() throws


testSubject.updateMemberships(mock(HeartbeatEvent.class)); testSubject.updateMemberships(mock(HeartbeatEvent.class));


ConsistentHash resultConsistentHash = ReflectionUtils.getFieldValue(consistentHashField, testSubject); AtomicReference<ConsistentHash> resultAtomicConsistentHash =
ReflectionUtils.getFieldValue(atomicConsistentHashField, testSubject);


Set<Member> resultMemberSet = resultConsistentHash.getMembers(); Set<Member> resultMemberSet = resultAtomicConsistentHash.get().getMembers();
assertFalse(resultMemberSet.isEmpty()); assertFalse(resultMemberSet.isEmpty());


assertMember(SERVICE_INSTANCE_ID, SERVICE_INSTANCE_URI, resultMemberSet.iterator().next()); assertMember(SERVICE_INSTANCE_ID, SERVICE_INSTANCE_URI, resultMemberSet.iterator().next());
Expand Down Expand Up @@ -184,9 +188,10 @@ public void testUpdateMembershipsOnHeartbeatEventFiltersInstancesWithoutCommandR


testSubject.updateMemberships(mock(HeartbeatEvent.class)); testSubject.updateMemberships(mock(HeartbeatEvent.class));


ConsistentHash resultConsistentHash = ReflectionUtils.getFieldValue(consistentHashField, testSubject); AtomicReference<ConsistentHash> resultAtomicConsistentHash =
ReflectionUtils.getFieldValue(atomicConsistentHashField, testSubject);


Set<Member> resultMemberSet = resultConsistentHash.getMembers(); Set<Member> resultMemberSet = resultAtomicConsistentHash.get().getMembers();
assertEquals(expectedMemberSetSize, resultMemberSet.size()); assertEquals(expectedMemberSetSize, resultMemberSet.size());


verify(discoveryClient).getServices(); verify(discoveryClient).getServices();
Expand Down

0 comments on commit 5d201dd

Please sign in to comment.