/
Cluster.java
3295 lines (2977 loc) · 133 KB
/
Cluster.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.driver.core;
import static com.datastax.driver.core.SchemaElement.KEYSPACE;
import com.datastax.driver.core.exceptions.AuthenticationException;
import com.datastax.driver.core.exceptions.BusyConnectionException;
import com.datastax.driver.core.exceptions.ConnectionException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.SyntaxError;
import com.datastax.driver.core.exceptions.UnsupportedProtocolVersionException;
import com.datastax.driver.core.policies.AddressTranslator;
import com.datastax.driver.core.policies.IdentityTranslator;
import com.datastax.driver.core.policies.LatencyAwarePolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.Policies;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
import com.datastax.driver.core.utils.MoreFutures;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ResourceBundle;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Information and known state of a Cassandra cluster.
*
* <p>This is the main entry point of the driver. A simple example of access to a Cassandra cluster
* would be:
*
* <pre>
* Cluster cluster = Cluster.builder().addContactPoint("192.168.0.1").build();
* Session session = cluster.connect("db1");
*
* for (Row row : session.execute("SELECT * FROM table1"))
* // do something ...
* </pre>
*
* <p>A cluster object maintains a permanent connection to one of the cluster nodes which it uses
* solely to maintain information on the state and current topology of the cluster. Using the
* connection, the driver will discover all the nodes currently in the cluster as well as new nodes
* joining the cluster subsequently.
*/
public class Cluster implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
private static final ResourceBundle driverProperties =
ResourceBundle.getBundle("com.datastax.driver.core.Driver");
static {
logDriverVersion();
// Force initialization to fail fast if there is an issue detecting the version
GuavaCompatibility.init();
}
@VisibleForTesting
static final int NEW_NODE_DELAY_SECONDS =
SystemProperties.getInt("com.datastax.driver.NEW_NODE_DELAY_SECONDS", 1);
// Some per-JVM number that allows to generate unique cluster names when
// multiple Cluster instance are created in the same JVM.
private static final AtomicInteger CLUSTER_ID = new AtomicInteger(0);
private static final int NOTIF_LOCK_TIMEOUT_SECONDS =
SystemProperties.getInt("com.datastax.driver.NOTIF_LOCK_TIMEOUT_SECONDS", 60);
final Manager manager;
/**
* Constructs a new Cluster instance.
*
* <p>This constructor is mainly exposed so Cluster can be sub-classed as a means to make
* testing/mocking easier or to "intercept" its method call. Most users shouldn't extend this
* class however and should prefer either using the {@link #builder} or calling {@link #buildFrom}
* with a custom Initializer.
*
* @param name the name to use for the cluster (this is not the Cassandra cluster name, see {@link
* #getClusterName}).
* @param contactPoints the list of contact points to use for the new cluster.
* @param configuration the configuration for the new cluster.
*/
protected Cluster(String name, List<EndPoint> contactPoints, Configuration configuration) {
this(name, contactPoints, configuration, Collections.<Host.StateListener>emptySet());
}
/**
* Constructs a new Cluster instance.
*
* <p>This constructor is mainly exposed so Cluster can be sub-classed as a means to make
* testing/mocking easier or to "intercept" its method call. Most users shouldn't extend this
* class however and should prefer using the {@link #builder}.
*
* @param initializer the initializer to use.
* @see #buildFrom
*/
protected Cluster(Initializer initializer) {
this(
initializer.getClusterName(),
checkNotEmpty(initializer.getContactPoints()),
initializer.getConfiguration(),
initializer.getInitialListeners());
}
private static List<EndPoint> checkNotEmpty(List<EndPoint> contactPoints) {
if (contactPoints.isEmpty())
throw new IllegalArgumentException("Cannot build a cluster without contact points");
return contactPoints;
}
private Cluster(
String name,
List<EndPoint> contactPoints,
Configuration configuration,
Collection<Host.StateListener> listeners) {
this.manager = new Manager(name, contactPoints, configuration, listeners);
}
/**
* Initialize this Cluster instance.
*
* <p>This method creates an initial connection to one of the contact points used to construct the
* {@code Cluster} instance. That connection is then used to populate the cluster {@link
* Metadata}.
*
* <p>Calling this method is optional in the sense that any call to one of the {@code connect}
* methods of this object will automatically trigger a call to this method beforehand. It is thus
* only useful to call this method if for some reason you want to populate the metadata (or test
* that at least one contact point can be reached) without creating a first {@code Session}.
*
* <p>Please note that this method only creates one control connection for gathering cluster
* metadata. In particular, it doesn't create any connection pools. Those are created when a new
* {@code Session} is created through {@code connect}.
*
* <p>This method has no effect if the cluster is already initialized.
*
* @return this {@code Cluster} object.
* @throws NoHostAvailableException if no host amongst the contact points can be reached.
* @throws AuthenticationException if an authentication error occurs while contacting the initial
* contact points.
* @throws IllegalStateException if the Cluster was closed prior to calling this method. This can
* occur either directly (through {@link #close()} or {@link #closeAsync()}), or as a result
* of an error while initializing the Cluster.
*/
public Cluster init() {
this.manager.init();
return this;
}
/**
* Build a new cluster based on the provided initializer.
*
* <p>Note that for building a cluster pragmatically, Cluster.Builder provides a slightly less
* verbose shortcut with {@link Builder#build}.
*
* <p>Also note that that all the contact points provided by {@code initializer} must share the
* same port.
*
* @param initializer the Cluster.Initializer to use
* @return the newly created Cluster instance
* @throws IllegalArgumentException if the list of contact points provided by {@code initializer}
* is empty or if not all those contact points have the same port.
*/
public static Cluster buildFrom(Initializer initializer) {
return new Cluster(initializer);
}
/**
* Creates a new {@link Cluster.Builder} instance.
*
* <p>This is a convenience method for {@code new Cluster.Builder()}.
*
* @return the new cluster builder.
*/
public static Cluster.Builder builder() {
return new Cluster.Builder();
}
/**
* Returns the current version of the driver.
*
* <p>This is intended for products that wrap or extend the driver, as a way to check
* compatibility if end-users override the driver version in their application.
*
* @return the version.
*/
public static String getDriverVersion() {
return driverProperties.getString("driver.version");
}
/**
* Logs the driver version to the console.
*
* <p>This method logs the version using the logger {@code com.datastax.driver.core} and level
* {@code INFO}.
*/
public static void logDriverVersion() {
Logger core = LoggerFactory.getLogger("com.datastax.driver.core");
core.info("DataStax Java driver {} for Apache Cassandra", getDriverVersion());
}
/**
* Creates a new session on this cluster but does not initialize it.
*
* <p>Because this method does not perform any initialization, it cannot fail. The initialization
* of the session (the connection of the Session to the Cassandra nodes) will occur if either the
* {@link Session#init} method is called explicitly, or whenever the returned session object is
* used.
*
* <p>Once a session returned by this method gets initialized (see above), it will be set to no
* keyspace. If you want to set such session to a keyspace, you will have to explicitly execute a
* 'USE mykeyspace' query.
*
* <p>Note that if you do not particularly need to defer initialization, it is simpler to use one
* of the {@code connect()} method of this class.
*
* @return a new, non-initialized session on this cluster.
*/
public Session newSession() {
checkNotClosed(manager);
return manager.newSession();
}
/**
* Creates a new session on this cluster and initialize it.
*
* <p>Note that this method will initialize the newly created session, trying to connect to the
* Cassandra nodes before returning. If you only want to create a Session object without
* initializing it right away, see {@link #newSession}.
*
* @return a new session on this cluster sets to no keyspace.
* @throws NoHostAvailableException if the Cluster has not been initialized yet ({@link #init} has
* not be called and this is the first connect call) and no host amongst the contact points
* can be reached.
* @throws AuthenticationException if an authentication error occurs while contacting the initial
* contact points.
* @throws IllegalStateException if the Cluster was closed prior to calling this method. This can
* occur either directly (through {@link #close()} or {@link #closeAsync()}), or as a result
* of an error while initializing the Cluster.
*/
public Session connect() {
try {
return Uninterruptibles.getUninterruptibly(connectAsync());
} catch (ExecutionException e) {
throw DriverThrowables.propagateCause(e);
}
}
/**
* Creates a new session on this cluster, initialize it and sets the keyspace to the provided one.
*
* <p>Note that this method will initialize the newly created session, trying to connect to the
* Cassandra nodes before returning. If you only want to create a Session object without
* initializing it right away, see {@link #newSession}.
*
* @param keyspace The name of the keyspace to use for the created {@code Session}.
* @return a new session on this cluster sets to keyspace {@code keyspaceName}.
* @throws NoHostAvailableException if the Cluster has not been initialized yet ({@link #init} has
* not be called and this is the first connect call) and no host amongst the contact points
* can be reached, or if no host can be contacted to set the {@code keyspace}.
* @throws AuthenticationException if an authentication error occurs while contacting the initial
* contact points.
* @throws InvalidQueryException if the keyspace does not exist.
* @throws IllegalStateException if the Cluster was closed prior to calling this method. This can
* occur either directly (through {@link #close()} or {@link #closeAsync()}), or as a result
* of an error while initializing the Cluster.
*/
public Session connect(String keyspace) {
try {
return Uninterruptibles.getUninterruptibly(connectAsync(keyspace));
} catch (ExecutionException e) {
throw DriverThrowables.propagateCause(e);
}
}
/**
* Creates a new session on this cluster and initializes it asynchronously.
*
* <p>This will also initialize the {@code Cluster} if needed; note that cluster initialization
* happens synchronously on the thread that called this method. Therefore it is recommended to
* initialize the cluster at application startup, and not rely on this method to do it.
*
* <p>Note that if a {@linkplain Configuration#getDefaultKeyspace() default keyspace} has been
* configured for use with a DBaaS cluster, this method will attempt to set the session keyspace
* to that keyspace, effectively behaving like {@link #connect(String)}.
*
* @return a future that will complete when the session is fully initialized.
* @throws NoHostAvailableException if the Cluster has not been initialized yet ({@link #init} has
* not been called and this is the first connect call) and no host amongst the contact points
* can be reached.
* @throws IllegalStateException if the Cluster was closed prior to calling this method. This can
* occur either directly (through {@link #close()} or {@link #closeAsync()}), or as a result
* of an error while initializing the Cluster.
* @see #connect()
*/
public ListenableFuture<Session> connectAsync() {
String defaultKeyspace = getConfiguration().getDefaultKeyspace();
return connectAsync(defaultKeyspace);
}
/**
* Creates a new session on this cluster, and initializes it to the given keyspace asynchronously.
*
* <p>This will also initialize the {@code Cluster} if needed; note that cluster initialization
* happens synchronously on the thread that called this method. Therefore it is recommended to
* initialize the cluster at application startup, and not rely on this method to do it.
*
* @param keyspace The name of the keyspace to use for the created {@code Session}.
* @return a future that will complete when the session is fully initialized.
* @throws NoHostAvailableException if the Cluster has not been initialized yet ({@link #init} has
* not been called and this is the first connect call) and no host amongst the contact points
* can be reached.
* @throws IllegalStateException if the Cluster was closed prior to calling this method. This can
* occur either directly (through {@link #close()} or {@link #closeAsync()}), or as a result
* of an error while initializing the Cluster.
*/
public ListenableFuture<Session> connectAsync(final String keyspace) {
checkNotClosed(manager);
init();
final Session session = manager.newSession();
ListenableFuture<Session> sessionInitialized = session.initAsync();
if (keyspace == null) {
return sessionInitialized;
} else {
final String useQuery = "USE " + keyspace;
ListenableFuture<ResultSet> keyspaceSet =
GuavaCompatibility.INSTANCE.transformAsync(
sessionInitialized,
new AsyncFunction<Session, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(Session session) throws Exception {
return session.executeAsync(useQuery);
}
});
ListenableFuture<ResultSet> withErrorHandling =
GuavaCompatibility.INSTANCE.withFallback(
keyspaceSet,
new AsyncFunction<Throwable, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(Throwable t) throws Exception {
session.closeAsync();
if (t instanceof SyntaxError) {
// Give a more explicit message, because it's probably caused by a bad keyspace
// name
SyntaxError e = (SyntaxError) t;
t =
new SyntaxError(
e.getEndPoint(),
String.format(
"Error executing \"%s\" (%s). Check that your keyspace name is valid",
useQuery, e.getMessage()));
}
throw Throwables.propagate(t);
}
});
return GuavaCompatibility.INSTANCE.transform(withErrorHandling, Functions.constant(session));
}
}
/**
* The name of this cluster object.
*
* <p>Note that this is not the Cassandra cluster name, but rather a name assigned to this Cluster
* object. Currently, that name is only used for one purpose: to distinguish exposed JMX metrics
* when multiple Cluster instances live in the same JVM (which should be rare in the first place).
* That name can be set at Cluster building time (through {@link Builder#withClusterName} for
* instance) but will default to a name like {@code cluster1} where each Cluster instance in the
* same JVM will have a different number.
*
* @return the name for this cluster instance.
*/
public String getClusterName() {
return manager.clusterName;
}
/**
* Returns read-only metadata on the connected cluster.
*
* <p>This includes the known nodes with their status as seen by the driver, as well as the schema
* definitions. Since this return metadata on the connected cluster, this method may trigger the
* creation of a connection if none has been established yet (neither {@code init()} nor {@code
* connect()} has been called yet).
*
* @return the cluster metadata.
* @throws NoHostAvailableException if the Cluster has not been initialized yet and no host
* amongst the contact points can be reached.
* @throws AuthenticationException if an authentication error occurs while contacting the initial
* contact points.
* @throws IllegalStateException if the Cluster was closed prior to calling this method. This can
* occur either directly (through {@link #close()} or {@link #closeAsync()}), or as a result
* of an error while initializing the Cluster.
*/
public Metadata getMetadata() {
manager.init();
return manager.metadata;
}
/**
* The cluster configuration.
*
* @return the cluster configuration.
*/
public Configuration getConfiguration() {
return manager.configuration;
}
/**
* The cluster metrics.
*
* @return the cluster metrics, or {@code null} if this cluster has not yet been {@link #init()
* initialized}, or if metrics collection has been disabled (that is if {@link
* Configuration#getMetricsOptions} returns {@code null}).
*/
public Metrics getMetrics() {
checkNotClosed(manager);
return manager.metrics;
}
/**
* Registers the provided listener to be notified on hosts up/down/added/removed events.
*
* <p>Registering the same listener multiple times is a no-op.
*
* <p>This method should be used to register additional listeners on an already-initialized
* cluster. To add listeners to a cluster object prior to its initialization, use {@link
* Builder#withInitialListeners(Collection)}. Calling this method on a non-initialized cluster
* will result in the listener being {@link
* com.datastax.driver.core.Host.StateListener#onRegister(Cluster) notified} twice of cluster
* registration: once inside this method, and once at cluster initialization.
*
* @param listener the new {@link Host.StateListener} to register.
* @return this {@code Cluster} object;
*/
public Cluster register(Host.StateListener listener) {
checkNotClosed(manager);
boolean added = manager.listeners.add(listener);
if (added) listener.onRegister(this);
return this;
}
/**
* Unregisters the provided listener from being notified on hosts events.
*
* <p>This method is a no-op if {@code listener} hasn't previously been registered against this
* Cluster.
*
* @param listener the {@link Host.StateListener} to unregister.
* @return this {@code Cluster} object;
*/
public Cluster unregister(Host.StateListener listener) {
checkNotClosed(manager);
boolean removed = manager.listeners.remove(listener);
if (removed) listener.onUnregister(this);
return this;
}
/**
* Registers the provided tracker to be updated with hosts read latencies.
*
* <p>Registering the same tracker multiple times is a no-op.
*
* <p>Beware that the registered tracker's {@link LatencyTracker#update(Host, Statement,
* Exception, long) update} method will be called very frequently (at the end of every query to a
* Cassandra host) and should thus not be costly.
*
* <p>The main use case for a {@link LatencyTracker} is to allow load balancing policies to
* implement latency awareness. For example, {@link LatencyAwarePolicy} registers it's own
* internal {@code LatencyTracker} (automatically, you don't have to call this method directly).
*
* @param tracker the new {@link LatencyTracker} to register.
* @return this {@code Cluster} object;
*/
public Cluster register(LatencyTracker tracker) {
checkNotClosed(manager);
boolean added = manager.latencyTrackers.add(tracker);
if (added) tracker.onRegister(this);
return this;
}
/**
* Unregisters the provided latency tracking from being updated with host read latencies.
*
* <p>This method is a no-op if {@code tracker} hasn't previously been registered against this
* Cluster.
*
* @param tracker the {@link LatencyTracker} to unregister.
* @return this {@code Cluster} object;
*/
public Cluster unregister(LatencyTracker tracker) {
checkNotClosed(manager);
boolean removed = manager.latencyTrackers.remove(tracker);
if (removed) tracker.onUnregister(this);
return this;
}
/**
* Registers the provided listener to be updated with schema change events.
*
* <p>Registering the same listener multiple times is a no-op.
*
* @param listener the new {@link SchemaChangeListener} to register.
* @return this {@code Cluster} object;
*/
public Cluster register(SchemaChangeListener listener) {
checkNotClosed(manager);
boolean added = manager.schemaChangeListeners.add(listener);
if (added) listener.onRegister(this);
return this;
}
/**
* Unregisters the provided schema change listener from being updated with schema change events.
*
* <p>This method is a no-op if {@code listener} hasn't previously been registered against this
* Cluster.
*
* @param listener the {@link SchemaChangeListener} to unregister.
* @return this {@code Cluster} object;
*/
public Cluster unregister(SchemaChangeListener listener) {
checkNotClosed(manager);
boolean removed = manager.schemaChangeListeners.remove(listener);
if (removed) listener.onUnregister(this);
return this;
}
/**
* Initiates a shutdown of this cluster instance.
*
* <p>This method is asynchronous and return a future on the completion of the shutdown process.
* As soon a the cluster is shutdown, no new request will be accepted, but already submitted
* queries are allowed to complete. This method closes all connections from all sessions and
* reclaims all resources used by this Cluster instance.
*
* <p>If for some reason you wish to expedite this process, the {@link CloseFuture#force} can be
* called on the result future.
*
* <p>This method has no particular effect if the cluster was already closed (in which case the
* returned future will return immediately).
*
* @return a future on the completion of the shutdown process.
*/
public CloseFuture closeAsync() {
return manager.close();
}
/**
* Initiates a shutdown of this cluster instance and blocks until that shutdown completes.
*
* <p>This method is a shortcut for {@code closeAsync().get()}.
*/
@Override
public void close() {
try {
closeAsync().get();
} catch (ExecutionException e) {
throw DriverThrowables.propagateCause(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Whether this Cluster instance has been closed.
*
* <p>Note that this method returns true as soon as one of the close methods ({@link #closeAsync}
* or {@link #close}) has been called, it does not guarantee that the closing is done. If you want
* to guarantee that the closing is done, you can call {@code close()} and wait until it returns
* (or call the get method on {@code closeAsync()} with a very short timeout and check this
* doesn't timeout).
*
* @return {@code true} if this Cluster instance has been closed, {@code false} otherwise.
*/
public boolean isClosed() {
return manager.closeFuture.get() != null;
}
private static void checkNotClosed(Manager manager) {
if (manager.errorDuringInit()) {
throw new IllegalStateException(
"Can't use this cluster instance because it encountered an error in its initialization",
manager.getInitException());
} else if (manager.isClosed()) {
throw new IllegalStateException(
"Can't use this cluster instance because it was previously closed");
}
}
/**
* Initializer for {@link Cluster} instances.
*
* <p>If you want to create a new {@code Cluster} instance programmatically, then it is advised to
* use {@link Cluster.Builder} which can be obtained from the {@link Cluster#builder} method.
*
* <p>But it is also possible to implement a custom {@code Initializer} that retrieves
* initialization from a web-service or from a configuration file.
*/
public interface Initializer {
/**
* An optional name for the created cluster.
*
* <p>Such name is optional (a default name will be created otherwise) and is currently only use
* for JMX reporting of metrics. See {@link Cluster#getClusterName} for more information.
*
* @return the name for the created cluster or {@code null} to use an automatically generated
* name.
*/
public String getClusterName();
/**
* Returns the initial Cassandra hosts to connect to.
*
* @return the initial Cassandra contact points. See {@link Builder#addContactPoint} for more
* details on contact points.
*/
public List<EndPoint> getContactPoints();
/**
* The configuration to use for the new cluster.
*
* <p>Note that some configuration can be modified after the cluster initialization but some
* others cannot. In particular, the ones that cannot be changed afterwards includes:
*
* <ul>
* <li>the port use to connect to Cassandra nodes (see {@link ProtocolOptions}).
* <li>the policies used (see {@link Policies}).
* <li>the authentication info provided (see {@link Configuration}).
* <li>whether metrics are enabled (see {@link Configuration}).
* </ul>
*
* @return the configuration to use for the new cluster.
*/
public Configuration getConfiguration();
/**
* Optional listeners to register against the newly created cluster.
*
* <p>Note that contrary to listeners registered post Cluster creation, the listeners returned
* by this method will see {@link Host.StateListener#onAdd} events for the initial contact
* points.
*
* @return a possibly empty collection of {@code Host.StateListener} to register against the
* newly created cluster.
*/
public Collection<Host.StateListener> getInitialListeners();
}
/** Helper class to build {@link Cluster} instances. */
public static class Builder implements Initializer {
private String clusterName;
private final List<InetSocketAddress> rawHostAndPortContactPoints =
new ArrayList<InetSocketAddress>();
private final List<InetAddress> rawHostContactPoints = new ArrayList<InetAddress>();
private final List<EndPoint> contactPoints = new ArrayList<EndPoint>();
private int port = ProtocolOptions.DEFAULT_PORT;
private int maxSchemaAgreementWaitSeconds =
ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
private ProtocolVersion protocolVersion;
private AuthProvider authProvider = AuthProvider.NONE;
private final Policies.Builder policiesBuilder = Policies.builder();
private final Configuration.Builder configurationBuilder = Configuration.builder();
private ProtocolOptions.Compression compression = ProtocolOptions.Compression.NONE;
private SSLOptions sslOptions = null;
private boolean metricsEnabled = true;
private boolean jmxEnabled = true;
private boolean allowBetaProtocolVersion = false;
private boolean noCompact = false;
private boolean isCloud = false;
private Collection<Host.StateListener> listeners;
@Override
public String getClusterName() {
return clusterName;
}
@Override
public List<EndPoint> getContactPoints() {
// Use a set to remove duplicate endpoints
Set<EndPoint> allContactPoints = new LinkedHashSet<EndPoint>(contactPoints);
// If contact points were provided as InetAddress/InetSocketAddress, assume the default
// endpoint factory is used.
for (InetAddress address : rawHostContactPoints) {
allContactPoints.add(new TranslatedAddressEndPoint(new InetSocketAddress(address, port)));
}
for (InetSocketAddress socketAddress : rawHostAndPortContactPoints) {
allContactPoints.add(new TranslatedAddressEndPoint(socketAddress));
}
return new ArrayList<EndPoint>(allContactPoints);
}
/**
* An optional name for the create cluster.
*
* <p>Note: this is not related to the Cassandra cluster name (though you are free to provide
* the same name). See {@link Cluster#getClusterName} for details.
*
* <p>If you use this method and create more than one Cluster instance in the same JVM (which
* should be avoided unless you need to connect to multiple Cassandra clusters), you should make
* sure each Cluster instance get a unique name or you may have a problem with JMX reporting.
*
* @param name the cluster name to use for the created Cluster instance.
* @return this Builder.
*/
public Builder withClusterName(String name) {
this.clusterName = name;
return this;
}
/**
* The port to use to connect to the Cassandra host.
*
* <p>If not set through this method, the default port (9042) will be used instead.
*
* @param port the port to set.
* @return this Builder.
*/
public Builder withPort(int port) {
this.port = port;
return this;
}
/**
* Create cluster connection using latest development protocol version, which is currently in
* beta. Calling this method will result into setting USE_BETA flag in all outgoing messages,
* which allows server to negotiate the supported protocol version even if it is currently in
* beta.
*
* <p>This feature is only available starting with version {@link ProtocolVersion#V5 V5}.
*
* <p>Use with caution, refer to the server and protocol documentation for the details on latest
* protocol version.
*
* @return this Builder.
*/
public Builder allowBetaProtocolVersion() {
if (protocolVersion != null)
throw new IllegalArgumentException(
"Can't use beta flag with initial protocol version of " + protocolVersion);
this.allowBetaProtocolVersion = true;
this.protocolVersion = ProtocolVersion.NEWEST_BETA;
return this;
}
/**
* Sets the maximum time to wait for schema agreement before returning from a DDL query.
*
* <p>If not set through this method, the default value (10 seconds) will be used.
*
* @param maxSchemaAgreementWaitSeconds the new value to set.
* @return this Builder.
* @throws IllegalStateException if the provided value is zero or less.
*/
public Builder withMaxSchemaAgreementWaitSeconds(int maxSchemaAgreementWaitSeconds) {
if (maxSchemaAgreementWaitSeconds <= 0)
throw new IllegalArgumentException("Max schema agreement wait must be greater than zero");
this.maxSchemaAgreementWaitSeconds = maxSchemaAgreementWaitSeconds;
return this;
}
/**
* The native protocol version to use.
*
* <p>The driver supports versions 1 to 5 of the native protocol. Higher versions of the
* protocol have more features and should be preferred, but this also depends on the Cassandra
* version:
*
* <p>
*
* <table>
* <caption>Native protocol version to Cassandra version correspondence</caption>
* <tr><th>Protocol version</th><th>Minimum Cassandra version</th></tr>
* <tr><td>1</td><td>1.2</td></tr>
* <tr><td>2</td><td>2.0</td></tr>
* <tr><td>3</td><td>2.1</td></tr>
* <tr><td>4</td><td>2.2</td></tr>
* <tr><td>5</td><td>3.10</td></tr>
* </table>
*
* <p>By default, the driver will "auto-detect" which protocol version it can use when
* connecting to the first node. More precisely, it will try first with {@link
* ProtocolVersion#NEWEST_SUPPORTED}, and if not supported fallback to the highest version
* supported by the first node it connects to. Please note that once the version is
* "auto-detected", it won't change: if the first node the driver connects to is a Cassandra 1.2
* node and auto-detection is used (the default), then the native protocol version 1 will be use
* for the lifetime of the Cluster instance.
*
* <p>By using {@link Builder#allowBetaProtocolVersion()}, it is possible to force driver to
* connect to Cassandra node that supports the latest protocol beta version. Leaving this flag
* out will let client to connect with latest released version.
*
* <p>This method allows to force the use of a particular protocol version. Forcing version 1 is
* always fine since all Cassandra version (at least all those supporting the native protocol in
* the first place) so far support it. However, please note that a number of features of the
* driver won't be available if that version of the protocol is in use, including result set
* paging, {@link BatchStatement}, executing a non-prepared query with binary values ({@link
* Session#execute(String, Object...)}), ... (those methods will throw an
* UnsupportedFeatureException). Using the protocol version 1 should thus only be considered
* when using Cassandra 1.2, until nodes have been upgraded to Cassandra 2.0.
*
* <p>If version 2 of the protocol is used, then Cassandra 1.2 nodes will be ignored (the driver
* won't connect to them).
*
* <p>The default behavior (auto-detection) is fine in almost all case, but you may want to
* force a particular version if you have a Cassandra cluster with mixed 1.2/2.0 nodes (i.e.
* during a Cassandra upgrade).
*
* @param version the native protocol version to use. {@code null} is also supported to trigger
* auto-detection (see above) but this is the default (so you don't have to call this method
* for that behavior).
* @return this Builder.
*/
public Builder withProtocolVersion(ProtocolVersion version) {
if (allowBetaProtocolVersion)
throw new IllegalStateException(
"Can not set the version explicitly if `allowBetaProtocolVersion` was used.");
if (version.compareTo(ProtocolVersion.NEWEST_SUPPORTED) > 0)
throw new IllegalArgumentException(
"Can not use "
+ version
+ " protocol version. "
+ "Newest supported protocol version is: "
+ ProtocolVersion.NEWEST_SUPPORTED
+ ". "
+ "For beta versions, use `allowBetaProtocolVersion` instead");
this.protocolVersion = version;
return this;
}
/**
* Adds a contact point - or many if the given address resolves to multiple <code>InetAddress
* </code>s (A records).
*
* <p>Contact points are addresses of Cassandra nodes that the driver uses to discover the
* cluster topology. Only one contact point is required (the driver will retrieve the address of
* the other nodes automatically), but it is usually a good idea to provide more than one
* contact point, because if that single contact point is unavailable, the driver cannot
* initialize itself correctly.
*
* <p>Note that by default (that is, unless you use the {@link #withLoadBalancingPolicy}) method
* of this builder), the first successfully contacted host will be used to define the local
* data-center for the client. If follows that if you are running Cassandra in a multiple
* data-center setting, it is a good idea to only provide contact points that are in the same
* datacenter than the client, or to provide manually the load balancing policy that suits your
* need.
*
* <p>If the host name points to a DNS record with multiple a-records, all InetAddresses
* returned will be used. Make sure that all resulting <code>InetAddress</code>s returned point
* to the same cluster and datacenter.
*
* @param address the address of the node(s) to connect to.
* @return this Builder.
* @throws IllegalArgumentException if the given {@code address} could not be resolved.
* @throws SecurityException if a security manager is present and permission to resolve the host
* name is denied.
*/
public Builder addContactPoint(String address) {
// We explicitly check for nulls because InetAdress.getByName() will happily
// accept it and use localhost (while a null here almost likely mean a user error,
// not "connect to localhost")
failIfCloud();
if (address == null) throw new NullPointerException();
try {
InetAddress[] allByName = InetAddress.getAllByName(address);
Collections.addAll(this.rawHostContactPoints, allByName);
return this;
} catch (UnknownHostException e) {
throw new IllegalArgumentException("Failed to add contact point: " + address, e);
}
}
/**
* Adds a contact point using the given connection information.
*
* <p>You only need this method if you use a custom connection mechanism and have configured a
* custom {@link EndPointFactory}; otherwise, you can safely ignore it and use the higher level,
* host-and-port-based variants such as {@link #addContactPoint(String)}.
*/
public Builder addContactPoint(EndPoint contactPoint) {
failIfCloud();
contactPoints.add(contactPoint);
return this;
}
/**
* Adds contact points.
*
* <p>See {@link Builder#addContactPoint} for more details on contact points.
*
* <p>Note that all contact points must be resolvable; if <em>any</em> of them cannot be
* resolved, this method will fail.
*
* @param addresses addresses of the nodes to add as contact points.
* @return this Builder.
* @throws IllegalArgumentException if any of the given {@code addresses} could not be resolved.
* @throws SecurityException if a security manager is present and permission to resolve the host
* name is denied.
* @see Builder#addContactPoint
*/
public Builder addContactPoints(String... addresses) {
for (String address : addresses) addContactPoint(address);
return this;
}
/**
* Adds contact points.
*
* <p>See {@link Builder#addContactPoint} for more details on contact points.
*
* <p>Note that all contact points must be resolvable; if <em>any</em> of them cannot be
* resolved, this method will fail.
*
* @param addresses addresses of the nodes to add as contact points.
* @return this Builder.
* @throws IllegalArgumentException if any of the given {@code addresses} could not be resolved.
* @throws SecurityException if a security manager is present and permission to resolve the host
* name is denied.
* @see Builder#addContactPoint
*/
public Builder addContactPoints(InetAddress... addresses) {
failIfCloud();
Collections.addAll(this.rawHostContactPoints, addresses);
return this;
}