This repository has been archived by the owner on May 27, 2022. It is now read-only.
forked from hector-client/hector
-
Notifications
You must be signed in to change notification settings - Fork 1
/
NodeDiscovery.java
133 lines (111 loc) · 4.42 KB
/
NodeDiscovery.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package me.prettyprint.cassandra.connection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCluster;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;
import org.apache.cassandra.thrift.EndpointDetails;
import org.apache.cassandra.thrift.TokenRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
public class NodeDiscovery {
private static final Logger log = LoggerFactory.getLogger(NodeDiscovery.class);
private CassandraHostConfigurator cassandraHostConfigurator;
private HConnectionManager connectionManager;
private DataCenterValidator dataCenterValidator;
public NodeDiscovery(CassandraHostConfigurator cassandraHostConfigurator, HConnectionManager connectionManager) {
this.cassandraHostConfigurator = cassandraHostConfigurator;
this.connectionManager = connectionManager;
this.dataCenterValidator = new DataCenterValidator(cassandraHostConfigurator.getAutoDiscoveryDataCenters());
}
/**
* Find any nodes that are not already in the connection manager but are in
* the cassandra ring and add them
*/
public void doAddNodes() {
if (log.isDebugEnabled()) {
log.debug("Node discovery running...");
}
Set<CassandraHost> foundHosts = discoverNodes();
if (foundHosts != null && foundHosts.size() > 0) {
log.info("Found {} new host(s) in Ring", foundHosts.size());
for (CassandraHost cassandraHost : foundHosts) {
log.info("Addding found host {} to pool", cassandraHost);
cassandraHostConfigurator.applyConfig(cassandraHost);
connectionManager.addCassandraHost(cassandraHost);
}
}
if (log.isDebugEnabled()) {
log.debug("Node discovery run complete.");
}
}
/**
* Find any unknown nodes in the cassandra ring
*/
public Set<CassandraHost> discoverNodes() {
Cluster cluster = HFactory.getCluster(connectionManager.getClusterName());
if (cluster == null) {
return null;
}
Set<CassandraHost> existingHosts = connectionManager.getHosts();
Set<CassandraHost> foundHosts = new HashSet<CassandraHost>();
if (log.isDebugEnabled()) {
log.debug("using existing hosts {}", existingHosts);
}
try {
for (KeyspaceDefinition keyspaceDefinition : cluster.describeKeyspaces()) {
if (!keyspaceDefinition.getName().equals(Keyspace.KEYSPACE_SYSTEM)) {
List<TokenRange> tokenRanges = cluster.describeRing(keyspaceDefinition.getName());
for (TokenRange tokenRange : tokenRanges) {
for (EndpointDetails endPointDetail : tokenRange.getEndpoint_details()) {
// Check if we are allowed to include this Data
// Center.
if (dataCenterValidator == null
|| dataCenterValidator.validate(endPointDetail.getDatacenter())) {
// Maybe add this host if it's a new host.
CassandraHost foundHost = new CassandraHost(endPointDetail.getHost(),
cassandraHostConfigurator.getPort());
if (!existingHosts.contains(foundHost)) {
log.info("Found a node we don't know about {} for TokenRange {}", foundHost,
tokenRange);
foundHosts.add(foundHost);
}
}
}
}
break;
}
}
} catch (Exception e) {
log.error("Discovery Service failed attempt to connect CassandraHost", e);
}
return foundHosts;
}
/**
* Abstraction to validate that the discovered nodes belong to a specific
* datacenters.
*
* @author patricioe (Patricio Echague - patricio@datastax.com)
*
*/
class DataCenterValidator {
Set<String> dataCenters;
public DataCenterValidator(List<String> dataCenters) {
if (dataCenters != null)
this.dataCenters = new HashSet<String>(dataCenters);
}
public boolean validate(String dcName) {
// If the DC is not defined (i.e: single cluster) always returns
// TRUE.
if (dataCenters == null || dcName == null)
return true;
return dataCenters.contains(dcName);
}
}
}