Skip to content

Commit 434660e

Browse files
committed
Handle duplicate host_ids in node list refreshes
1 parent e3dff88 commit 434660e

File tree

5 files changed

+248
-26
lines changed

5 files changed

+248
-26
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,25 @@ public Result compute(
6262

6363
for (NodeInfo nodeInfo : nodeInfos) {
6464
UUID id = nodeInfo.getHostId();
65-
seen.add(id);
66-
DefaultNode node = (DefaultNode) oldNodes.get(id);
67-
if (node == null) {
68-
node = new DefaultNode(nodeInfo.getEndPoint(), context);
69-
LOG.debug("[{}] Adding new node {}", logPrefix, node);
70-
added.put(id, node);
71-
}
72-
if (tokenFactory == null && nodeInfo.getPartitioner() != null) {
73-
tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());
65+
if (seen.contains(id)) {
66+
LOG.warn(
67+
"[{}] Found duplicate entries with host_id {} in system.peers, "
68+
+ "keeping only the first one",
69+
logPrefix,
70+
id);
71+
} else {
72+
seen.add(id);
73+
DefaultNode node = (DefaultNode) oldNodes.get(id);
74+
if (node == null) {
75+
node = new DefaultNode(nodeInfo.getEndPoint(), context);
76+
LOG.debug("[{}] Adding new node {}", logPrefix, node);
77+
added.put(id, node);
78+
}
79+
if (tokenFactory == null && nodeInfo.getPartitioner() != null) {
80+
tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());
81+
}
82+
tokensChanged |= copyInfos(nodeInfo, node, context);
7483
}
75-
tokensChanged |= copyInfos(nodeInfo, node, context);
7684
}
7785

7886
Set<UUID> removed = Sets.difference(oldNodes.keySet(), seen);

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
2424
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
2525
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
26+
import java.util.HashMap;
27+
import java.util.Map;
2628
import java.util.Set;
2729
import java.util.UUID;
2830
import net.jcip.annotations.ThreadSafe;
@@ -58,25 +60,33 @@ public Result compute(
5860
assert oldMetadata == DefaultMetadata.EMPTY;
5961
TokenFactory tokenFactory = null;
6062

61-
ImmutableMap.Builder<UUID, DefaultNode> newNodesBuilder = ImmutableMap.builder();
63+
Map<UUID, DefaultNode> newNodes = new HashMap<>();
6264

6365
for (NodeInfo nodeInfo : nodeInfos) {
64-
EndPoint endPoint = nodeInfo.getEndPoint();
65-
DefaultNode node = findIn(contactPoints, endPoint);
66-
if (node == null) {
67-
node = new DefaultNode(endPoint, context);
68-
LOG.debug("[{}] Adding new node {}", logPrefix, node);
66+
UUID hostId = nodeInfo.getHostId();
67+
if (newNodes.containsKey(hostId)) {
68+
LOG.warn(
69+
"[{}] Found duplicate entries with host_id {} in system.peers, "
70+
+ "keeping only the first one",
71+
logPrefix,
72+
hostId);
6973
} else {
70-
LOG.debug("[{}] Copying contact point {}", logPrefix, node);
74+
EndPoint endPoint = nodeInfo.getEndPoint();
75+
DefaultNode node = findIn(contactPoints, endPoint);
76+
if (node == null) {
77+
node = new DefaultNode(endPoint, context);
78+
LOG.debug("[{}] Adding new node {}", logPrefix, node);
79+
} else {
80+
LOG.debug("[{}] Copying contact point {}", logPrefix, node);
81+
}
82+
if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) {
83+
tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());
84+
}
85+
copyInfos(nodeInfo, node, context);
86+
newNodes.put(hostId, node);
7187
}
72-
if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) {
73-
tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());
74-
}
75-
copyInfos(nodeInfo, node, context);
76-
newNodesBuilder.put(node.getHostId(), node);
7788
}
7889

79-
ImmutableMap<UUID, DefaultNode> newNodes = newNodesBuilder.build();
8090
ImmutableList.Builder<Object> eventsBuilder = ImmutableList.builder();
8191

8292
for (DefaultNode newNode : newNodes.values()) {

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefreshTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,50 @@ public void should_update_existing_nodes() {
129129
assertThat(node2.getSchemaVersion()).isEqualTo(schemaVersion2);
130130
assertThat(result.events).isEmpty();
131131
}
132+
133+
@Test
134+
public void should_ignore_duplicate_host_ids() {
135+
// Given
136+
DefaultMetadata oldMetadata =
137+
new DefaultMetadata(
138+
ImmutableMap.of(node1.getHostId(), node1, node2.getHostId(), node2),
139+
Collections.emptyMap(),
140+
null,
141+
null);
142+
143+
Iterable<NodeInfo> newInfos =
144+
ImmutableList.of(
145+
DefaultNodeInfo.builder()
146+
.withEndPoint(node1.getEndPoint())
147+
.withDatacenter("dc1")
148+
.withRack("rack1")
149+
.withHostId(node1.getHostId())
150+
.build(),
151+
DefaultNodeInfo.builder()
152+
.withEndPoint(node2.getEndPoint())
153+
.withDatacenter("dc1")
154+
.withRack("rack2")
155+
.withHostId(node2.getHostId())
156+
.build(),
157+
// Duplicate host id for node 2, should be ignored:
158+
DefaultNodeInfo.builder()
159+
.withEndPoint(node2.getEndPoint())
160+
.withDatacenter("dc1")
161+
.withRack("rack3")
162+
.withHostId(node2.getHostId())
163+
.build());
164+
FullNodeListRefresh refresh = new FullNodeListRefresh(newInfos);
165+
166+
// When
167+
MetadataRefresh.Result result = refresh.compute(oldMetadata, false, context);
168+
169+
// Then
170+
assertThat(result.newMetadata.getNodes())
171+
.containsOnlyKeys(node1.getHostId(), node2.getHostId());
172+
assertThat(node1.getDatacenter()).isEqualTo("dc1");
173+
assertThat(node1.getRack()).isEqualTo("rack1");
174+
assertThat(node2.getDatacenter()).isEqualTo("dc1");
175+
assertThat(node2.getRack()).isEqualTo("rack2");
176+
assertThat(result.events).isEmpty();
177+
}
132178
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.metadata;
17+
18+
import static com.datastax.oss.driver.Assertions.assertThat;
19+
import static org.mockito.Mockito.when;
20+
21+
import com.datastax.oss.driver.api.core.metadata.EndPoint;
22+
import com.datastax.oss.driver.api.core.metadata.Node;
23+
import com.datastax.oss.driver.internal.core.channel.ChannelFactory;
24+
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
25+
import com.datastax.oss.driver.internal.core.metrics.MetricsFactory;
26+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
27+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
28+
import java.util.Map;
29+
import java.util.UUID;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.junit.runner.RunWith;
33+
import org.mockito.Mock;
34+
import org.mockito.junit.MockitoJUnitRunner;
35+
36+
@RunWith(MockitoJUnitRunner.class)
37+
public class InitialNodeListRefreshTest {
38+
39+
@Mock private InternalDriverContext context;
40+
@Mock protected MetricsFactory metricsFactory;
41+
@Mock private ChannelFactory channelFactory;
42+
43+
private DefaultNode contactPoint1;
44+
private DefaultNode contactPoint2;
45+
private EndPoint endPoint3;
46+
private UUID hostId1;
47+
private UUID hostId2;
48+
private UUID hostId3;
49+
50+
@Before
51+
public void setup() {
52+
when(context.getMetricsFactory()).thenReturn(metricsFactory);
53+
when(context.getChannelFactory()).thenReturn(channelFactory);
54+
55+
contactPoint1 = TestNodeFactory.newContactPoint(1, context);
56+
contactPoint2 = TestNodeFactory.newContactPoint(2, context);
57+
58+
endPoint3 = TestNodeFactory.newEndPoint(3);
59+
hostId1 = UUID.randomUUID();
60+
hostId2 = UUID.randomUUID();
61+
hostId3 = UUID.randomUUID();
62+
}
63+
64+
@Test
65+
public void should_copy_contact_points() {
66+
// Given
67+
Iterable<NodeInfo> newInfos =
68+
ImmutableList.of(
69+
DefaultNodeInfo.builder()
70+
.withEndPoint(contactPoint1.getEndPoint())
71+
// in practice there are more fields, but hostId is enough to validate the logic
72+
.withHostId(hostId1)
73+
.build(),
74+
DefaultNodeInfo.builder()
75+
.withEndPoint(contactPoint2.getEndPoint())
76+
.withHostId(hostId2)
77+
.build());
78+
InitialNodeListRefresh refresh =
79+
new InitialNodeListRefresh(newInfos, ImmutableSet.of(contactPoint1, contactPoint2));
80+
81+
// When
82+
MetadataRefresh.Result result = refresh.compute(DefaultMetadata.EMPTY, false, context);
83+
84+
// Then
85+
// contact points have been copied to the metadata, and completed with missing information
86+
Map<UUID, Node> newNodes = result.newMetadata.getNodes();
87+
assertThat(newNodes).containsOnlyKeys(hostId1, hostId2);
88+
assertThat(newNodes.get(hostId1)).isEqualTo(contactPoint1);
89+
assertThat(contactPoint1.getHostId()).isEqualTo(hostId1);
90+
assertThat(newNodes.get(hostId2)).isEqualTo(contactPoint2);
91+
assertThat(contactPoint2.getHostId()).isEqualTo(hostId2);
92+
}
93+
94+
@Test
95+
public void should_add_other_nodes() {
96+
// Given
97+
Iterable<NodeInfo> newInfos =
98+
ImmutableList.of(
99+
DefaultNodeInfo.builder()
100+
.withEndPoint(contactPoint1.getEndPoint())
101+
// in practice there are more fields, but hostId is enough to validate the logic
102+
.withHostId(hostId1)
103+
.build(),
104+
DefaultNodeInfo.builder()
105+
.withEndPoint(contactPoint2.getEndPoint())
106+
.withHostId(hostId2)
107+
.build(),
108+
DefaultNodeInfo.builder().withEndPoint(endPoint3).withHostId(hostId3).build());
109+
InitialNodeListRefresh refresh =
110+
new InitialNodeListRefresh(newInfos, ImmutableSet.of(contactPoint1, contactPoint2));
111+
112+
// When
113+
MetadataRefresh.Result result = refresh.compute(DefaultMetadata.EMPTY, false, context);
114+
115+
// Then
116+
// new node created in addition to the contact points
117+
Map<UUID, Node> newNodes = result.newMetadata.getNodes();
118+
assertThat(newNodes).containsOnlyKeys(hostId1, hostId2, hostId3);
119+
Node node3 = newNodes.get(hostId3);
120+
assertThat(node3.getEndPoint()).isEqualTo(endPoint3);
121+
assertThat(node3.getHostId()).isEqualTo(hostId3);
122+
}
123+
124+
@Test
125+
public void should_ignore_duplicate_host_ids() {
126+
// Given
127+
Iterable<NodeInfo> newInfos =
128+
ImmutableList.of(
129+
DefaultNodeInfo.builder()
130+
.withEndPoint(contactPoint1.getEndPoint())
131+
// in practice there are more fields, but hostId is enough to validate the logic
132+
.withHostId(hostId1)
133+
.withDatacenter("dc1")
134+
.build(),
135+
DefaultNodeInfo.builder()
136+
.withEndPoint(contactPoint1.getEndPoint())
137+
.withDatacenter("dc2")
138+
.withHostId(hostId1)
139+
.build());
140+
InitialNodeListRefresh refresh =
141+
new InitialNodeListRefresh(newInfos, ImmutableSet.of(contactPoint1));
142+
143+
// When
144+
MetadataRefresh.Result result = refresh.compute(DefaultMetadata.EMPTY, false, context);
145+
146+
// Then
147+
// only the first nodeInfo should have been copied
148+
Map<UUID, Node> newNodes = result.newMetadata.getNodes();
149+
assertThat(newNodes).containsOnlyKeys(hostId1);
150+
assertThat(newNodes.get(hostId1)).isEqualTo(contactPoint1);
151+
assertThat(contactPoint1.getHostId()).isEqualTo(hostId1);
152+
assertThat(contactPoint1.getDatacenter()).isEqualTo("dc1");
153+
}
154+
}

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/TestNodeFactory.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@
2222
public class TestNodeFactory {
2323

2424
public static DefaultNode newNode(int lastIpByte, InternalDriverContext context) {
25-
DefaultEndPoint endPoint = newEndPoint(lastIpByte);
26-
DefaultNode node = new DefaultNode(endPoint, context);
25+
DefaultNode node = newContactPoint(lastIpByte, context);
2726
node.hostId = UUID.randomUUID();
28-
node.broadcastRpcAddress = endPoint.resolve();
27+
node.broadcastRpcAddress = ((InetSocketAddress) node.getEndPoint().resolve());
2928
return node;
3029
}
3130

31+
public static DefaultNode newContactPoint(int lastIpByte, InternalDriverContext context) {
32+
DefaultEndPoint endPoint = newEndPoint(lastIpByte);
33+
return new DefaultNode(endPoint, context);
34+
}
35+
3236
public static DefaultEndPoint newEndPoint(int lastByteOfIp) {
3337
return new DefaultEndPoint(new InetSocketAddress("127.0.0." + lastByteOfIp, 9042));
3438
}

0 commit comments

Comments
 (0)