-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
StorageService.java
1126 lines (1018 loc) · 41.9 KB
/
StorageService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
import org.apache.cassandra.net.io.StreamContextManager;
import org.apache.cassandra.tools.MembershipCleanerVerbHandler;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
/*
* This abstraction contains the token/identifier of this node
* on the identifier space. This token gets gossiped around.
* This class will also maintain histograms of the load information
* of other nodes in the cluster.
*/
public final class StorageService implements IEndPointStateChangeSubscriber, StorageServiceMBean
{
private static Logger logger_ = Logger.getLogger(StorageService.class);
private final static String nodeId_ = "NODE-IDENTIFIER";
private final static String BOOTSTRAP_MODE = "BOOTSTRAP-MODE";
/* Gossip load after every 5 mins. */
private static final long threshold_ = 5 * 60 * 1000L;
/* All stage identifiers */
public final static String mutationStage_ = "ROW-MUTATION-STAGE";
public final static String readStage_ = "ROW-READ-STAGE";
/* All verb handler identifiers */
public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
public final static String tokenVerbHandler_ = "TOKEN-VERB-HANDLER";
public final static String loadVerbHandler_ = "LOAD-VERB-HANDLER";
public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
public final static String readRepairVerbHandler_ = "READ-REPAIR-VERB-HANDLER";
public final static String readVerbHandler_ = "ROW-READ-VERB-HANDLER";
public final static String bootStrapInitiateVerbHandler_ = "BOOTSTRAP-INITIATE-VERB-HANDLER";
public final static String bootStrapInitiateDoneVerbHandler_ = "BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
public final static String bootStrapTerminateVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
public static enum ConsistencyLevel
{
WEAK,
STRONG
}
private static StorageService instance_;
/* Used to lock the factory for creation of StorageService instance */
private static Lock createLock_ = new ReentrantLock();
private static EndPoint tcpAddr_;
private static EndPoint udpAddr_;
private static IPartitioner partitioner_;
public static EndPoint getLocalStorageEndPoint()
{
return tcpAddr_;
}
public static EndPoint getLocalControlEndPoint()
{
return udpAddr_;
}
public static IPartitioner getPartitioner() {
return partitioner_;
}
static
{
partitioner_ = DatabaseDescriptor.getPartitioner();
}
public static class BootstrapInitiateDoneVerbHandler implements IVerbHandler
{
private static Logger logger_ = Logger.getLogger( BootstrapInitiateDoneVerbHandler.class );
public void doVerb(Message message)
{
if (logger_.isDebugEnabled())
logger_.debug("Received a bootstrap initiate done message ...");
/* Let the Stream Manager do his thing. */
StreamManager.instance(message.getFrom()).start();
}
}
/*
* Factory method that gets an instance of the StorageService
* class.
*/
public static StorageService instance()
{
String bs = System.getProperty("bootstrap");
boolean bootstrap = bs != null && bs.contains("true");
if ( instance_ == null )
{
StorageService.createLock_.lock();
try
{
if ( instance_ == null )
{
try
{
instance_ = new StorageService(bootstrap);
}
catch ( Throwable th )
{
logger_.error(LogUtil.throwableToString(th));
System.exit(1);
}
}
}
finally
{
createLock_.unlock();
}
}
return instance_;
}
/*
* This is the endpoint snitch which depends on the network architecture. We
* need to keep this information for each endpoint so that we make decisions
* while doing things like replication etc.
*
*/
private IEndPointSnitch endPointSnitch_;
/* This abstraction maintains the token/endpoint metadata information */
private TokenMetadata tokenMetadata_ = new TokenMetadata();
private SystemTable.StorageMetadata storageMetadata_;
/* Timer is used to disseminate load information */
private Timer loadTimer_ = new Timer(false);
/* This thread pool is used to do the bootstrap for a new node */
private ExecutorService bootStrapper_ = new DebuggableThreadPoolExecutor("BOOT-STRAPPER");
/* This thread pool does consistency checks when the client doesn't care about consistency */
private ExecutorService consistencyManager_;
/* This is the entity that tracks load information of all nodes in the cluster */
private StorageLoadBalancer storageLoadBalancer_;
/* We use this interface to determine where replicas need to be placed */
private IReplicaPlacementStrategy nodePicker_;
/* Are we starting this node in bootstrap mode? */
private boolean isBootstrapMode;
private Set<EndPoint> bootstrapSet;
public synchronized void addBootstrapSource(EndPoint s)
{
if (logger_.isDebugEnabled())
logger_.debug("Added " + s.getHost() + " as a bootstrap source");
bootstrapSet.add(s);
}
public synchronized boolean removeBootstrapSource(EndPoint s)
{
bootstrapSet.remove(s);
if (logger_.isDebugEnabled())
logger_.debug("Removed " + s.getHost() + " as a bootstrap source");
if (bootstrapSet.isEmpty())
{
isBootstrapMode = false;
tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, false);
logger_.info("Bootstrap completed! Now serving reads.");
/* Tell others you're not bootstrapping anymore */
Gossiper.instance().deleteApplicationState(BOOTSTRAP_MODE);
}
return isBootstrapMode;
}
/*
* Registers with Management Server
*/
private void init()
{
try
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
mbs.registerMBean(this, new ObjectName(
"org.apache.cassandra.service:type=StorageService"));
}
catch (Exception e)
{
logger_.error(LogUtil.throwableToString(e));
}
}
public StorageService(boolean isBootstrapMode)
{
this.isBootstrapMode = isBootstrapMode;
bootstrapSet = new HashSet<EndPoint>();
init();
storageLoadBalancer_ = new StorageLoadBalancer(this);
endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
/* register the verb handlers */
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.tokenVerbHandler_, new TokenUpdateVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.binaryVerbHandler_, new BinaryVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.loadVerbHandler_, new LoadVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mutationVerbHandler_, new RowMutationVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.readRepairVerbHandler_, new ReadRepairVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.readVerbHandler_, new ReadVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateVerbHandler_, new Table.BootStrapInitiateVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateDoneVerbHandler_, new StorageService.BootstrapInitiateDoneVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapTerminateVerbHandler_, new StreamManager.BootstrapTerminateVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.dataFileVerbHandler_, new DataFileVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.rangeVerbHandler_, new RangeVerbHandler());
/* register the stage for the mutations */
consistencyManager_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
DatabaseDescriptor.getConsistencyThreads(),
Integer.MAX_VALUE, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("CONSISTENCY-MANAGER"));
StageManager.registerStage(StorageService.mutationStage_,
new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
StageManager.registerStage(StorageService.readStage_,
new MultiThreadedStage(StorageService.readStage_, DatabaseDescriptor.getConcurrentReaders()));
Class cls = DatabaseDescriptor.getReplicaPlacementStrategyClass();
Class [] parameterTypes = new Class[] { TokenMetadata.class, IPartitioner.class, int.class, int.class};
try
{
nodePicker_ = (IReplicaPlacementStrategy) cls.getConstructor(parameterTypes).newInstance(tokenMetadata_, partitioner_, DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getStoragePort());
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
public void start() throws IOException
{
storageMetadata_ = SystemTable.initMetadata();
tcpAddr_ = new EndPoint(DatabaseDescriptor.getStoragePort());
udpAddr_ = new EndPoint(DatabaseDescriptor.getControlPort());
/* Listen for application messages */
MessagingService.getMessagingInstance().listen(tcpAddr_);
/* Listen for control messages */
MessagingService.getMessagingInstance().listenUDP(udpAddr_);
SelectorManager.getSelectorManager().start();
SelectorManager.getUdpSelectorManager().start();
/* starts a load timer thread */
loadTimer_.schedule( new LoadDisseminator(), StorageService.threshold_, StorageService.threshold_);
/* Start the storage load balancer */
storageLoadBalancer_.start();
/* Register with the Gossiper for EndPointState notifications */
Gossiper.instance().register(this);
/*
* Start the gossiper with the generation # retrieved from the System
* table
*/
Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
/* Make sure this token gets gossiped around. */
tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, isBootstrapMode);
ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(storageMetadata_.getToken()));
Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
if (isBootstrapMode)
{
logger_.info("Starting in bootstrap mode");
doBootstrap(StorageService.getLocalStorageEndPoint());
Gossiper.instance().addApplicationState(BOOTSTRAP_MODE, new ApplicationState(""));
}
}
public boolean isBootstrapMode()
{
return isBootstrapMode;
}
public TokenMetadata getTokenMetadata()
{
return tokenMetadata_.cloneMe();
}
/* TODO: used for testing */
public void updateTokenMetadata(Token token, EndPoint endpoint)
{
tokenMetadata_.update(token, endpoint);
}
public IEndPointSnitch getEndPointSnitch()
{
return endPointSnitch_;
}
/*
* Given an EndPoint this method will report if the
* endpoint is in the same data center as the local
* storage endpoint.
*/
public boolean isInSameDataCenter(EndPoint endpoint) throws IOException
{
return endPointSnitch_.isInSameDataCenter(StorageService.tcpAddr_, endpoint);
}
/*
* This method performs the requisite operations to make
* sure that the N replicas are in sync. We do this in the
* background when we do not care much about consistency.
*/
public void doConsistencyCheck(Row row, List<EndPoint> endpoints, ReadCommand command)
{
Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, command);
consistencyManager_.submit(consistencySentinel);
}
public Map<Range, List<EndPoint>> getRangeToEndPointMap()
{
/* Get the token to endpoint map. */
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* All the ranges for the tokens */
Range[] ranges = getAllRanges(tokenToEndPointMap.keySet());
return constructRangeToEndPointMap(ranges);
}
/**
* Construct the range to endpoint mapping based on the true view
* of the world.
* @param ranges
* @return mapping of ranges to the replicas responsible for them.
*/
public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges)
{
if (logger_.isDebugEnabled())
logger_.debug("Constructing range to endpoint map ...");
Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
for ( Range range : ranges )
{
EndPoint[] endpoints = getNStorageEndPoint(range.right());
rangeToEndPointMap.put(range, new ArrayList<EndPoint>( Arrays.asList(endpoints) ) );
}
if (logger_.isDebugEnabled())
logger_.debug("Done constructing range to endpoint map ...");
return rangeToEndPointMap;
}
/**
* Construct the range to endpoint mapping based on the view as dictated
* by the mapping of token to endpoints passed in.
* @param ranges
* @param tokenToEndPointMap mapping of token to endpoints.
* @return mapping of ranges to the replicas responsible for them.
*/
public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<Token, EndPoint> tokenToEndPointMap)
{
if (logger_.isDebugEnabled())
logger_.debug("Constructing range to endpoint map ...");
Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
for ( Range range : ranges )
{
EndPoint[] endpoints = getNStorageEndPoint(range.right(), tokenToEndPointMap);
rangeToEndPointMap.put(range, new ArrayList<EndPoint>( Arrays.asList(endpoints) ) );
}
if (logger_.isDebugEnabled())
logger_.debug("Done constructing range to endpoint map ...");
return rangeToEndPointMap;
}
/**
* Construct a mapping from endpoint to ranges that endpoint is
* responsible for.
* @return the mapping from endpoint to the ranges it is responsible
* for.
*/
public Map<EndPoint, List<Range>> constructEndPointToRangesMap()
{
Map<EndPoint, List<Range>> endPointToRangesMap = new HashMap<EndPoint, List<Range>>();
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
Collection<EndPoint> mbrs = tokenToEndPointMap.values();
for ( EndPoint mbr : mbrs )
{
endPointToRangesMap.put(mbr, getRangesForEndPoint(mbr));
}
return endPointToRangesMap;
}
/**
* Called when there is a change in application state. In particular
* we are interested in new tokens as a result of a new node or an
* existing node moving to a new location on the ring.
*/
public void onChange(EndPoint endpoint, EndPointState epState)
{
EndPoint ep = new EndPoint(endpoint.getHost(), DatabaseDescriptor.getStoragePort());
/* node identifier for this endpoint on the identifier space */
ApplicationState nodeIdState = epState.getApplicationState(StorageService.nodeId_);
/* Check if this has a bootstrapping state message */
boolean bootstrapState = epState.getApplicationState(StorageService.BOOTSTRAP_MODE) != null;
if (bootstrapState)
{
if (logger_.isDebugEnabled())
logger_.debug(ep.getHost() + " is in bootstrap state.");
}
if (nodeIdState != null)
{
Token newToken = getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
if (logger_.isDebugEnabled())
logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + nodeIdState.getState());
Token oldToken = tokenMetadata_.getToken(ep);
if ( oldToken != null )
{
/*
* If oldToken equals the newToken then the node had crashed
* and is coming back up again. If oldToken is not equal to
* the newToken this means that the node is being relocated
* to another position in the ring.
*/
if ( !oldToken.equals(newToken) )
{
if (logger_.isDebugEnabled())
logger_.debug("Relocation for endpoint " + ep);
tokenMetadata_.update(newToken, ep, bootstrapState);
}
else
{
/*
* This means the node crashed and is coming back up.
* Deliver the hints that we have for this endpoint.
*/
if (logger_.isDebugEnabled())
logger_.debug("Sending hinted data to " + ep);
deliverHints(endpoint);
}
}
else
{
/*
* This is a new node and we just update the token map.
*/
tokenMetadata_.update(newToken, ep, bootstrapState);
}
}
else
{
/*
* If we are here and if this node is UP and already has an entry
* in the token map. It means that the node was behind a network partition.
*/
if ( epState.isAlive() && tokenMetadata_.isKnownEndPoint(endpoint) )
{
if (logger_.isDebugEnabled())
logger_.debug("EndPoint " + ep + " just recovered from a partition. Sending hinted data.");
deliverHints(ep);
}
}
}
/**
* Get the count of primary keys from the sampler.
*/
public String getLoadInfo()
{
long diskSpace = FileUtils.getUsedDiskSpace();
return FileUtils.stringifyFileSize(diskSpace);
}
/**
* Get the primary count info for this endpoint.
* This is gossiped around and cached in the
* StorageLoadBalancer.
*/
public String getLoadInfo(EndPoint ep)
{
LoadInfo li = storageLoadBalancer_.getLoad(ep);
return ( li == null ) ? "N/A" : li.toString();
}
/*
* This method updates the token on disk and modifies the cached
* StorageMetadata instance. This is only for the local endpoint.
*/
public void updateToken(Token token) throws IOException
{
/* update the token on disk */
SystemTable.updateToken(token);
/* Update the token maps */
/* Get the old token. This needs to be removed. */
tokenMetadata_.update(token, StorageService.tcpAddr_);
/* Gossip this new token for the local storage instance */
ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(token));
Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
}
/*
* This method removes the state associated with this endpoint
* from the TokenMetadata instance.
*
* @param endpoint remove the token state associated with this
* endpoint.
*/
public void removeTokenState(EndPoint endpoint)
{
tokenMetadata_.remove(endpoint);
/* Remove the state from the Gossiper */
Gossiper.instance().removeFromMembership(endpoint);
}
/*
* This method is invoked by the Loader process to force the
* node to move from its current position on the token ring, to
* a position to be determined based on the keys. This will help
* all nodes to start off perfectly load balanced. The array passed
* in is evaluated as follows by the loader process:
* If there are 10 keys in the system and a totality of 5 nodes
* then each node needs to have 2 keys i.e the array is made up
* of every 2nd key in the total list of keys.
*/
public void relocate(String[] keys) throws IOException
{
if ( keys.length > 0 )
{
Token token = tokenMetadata_.getToken(StorageService.tcpAddr_);
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
Token[] tokens = tokenToEndPointMap.keySet().toArray(new Token[tokenToEndPointMap.keySet().size()]);
Arrays.sort(tokens);
int index = Arrays.binarySearch(tokens, token) * (keys.length/tokens.length);
Token newToken = partitioner_.getToken(keys[index]);
/* update the token */
updateToken(newToken);
}
}
/**
* This method takes a colon separated string of nodes that need
* to be bootstrapped. * <i>nodes</i> must be specified as A:B:C.
* @throws UnknownHostException
*
*/
private void doBootstrap(String nodes) throws UnknownHostException
{
String[] allNodes = nodes.split(":");
EndPoint[] endpoints = new EndPoint[allNodes.length];
Token[] tokens = new Token[allNodes.length];
for ( int i = 0; i < allNodes.length; ++i )
{
String host = allNodes[i].trim();
InetAddress ip = InetAddress.getByName(host);
host = ip.getHostAddress();
endpoints[i] = new EndPoint( host, DatabaseDescriptor.getStoragePort() );
tokens[i] = tokenMetadata_.getToken(endpoints[i]);
}
/* Start the bootstrap algorithm */
bootStrapper_.submit( new BootStrapper(endpoints, tokens) );
}
/**
* Starts the bootstrap operations for the specified endpoint.
* @param endpoint
*/
public final void doBootstrap(EndPoint endpoint)
{
Token token = tokenMetadata_.getToken(endpoint);
bootStrapper_.submit(new BootStrapper(new EndPoint[]{endpoint}, token));
}
/**
* Deliver hints to the specified node when it has crashed
* and come back up/ marked as alive after a network partition
*/
public final void deliverHints(EndPoint endpoint)
{
HintedHandOffManager.instance().deliverHints(endpoint);
}
/* This methods belong to the MBean interface */
public String getToken(EndPoint ep)
{
// render a String representation of the Token corresponding to this endpoint
// for a human-facing UI. If there is no such Token then we use "" since
// it is not a valid value either for BigIntegerToken or StringToken.
EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getStoragePort());
Token token = tokenMetadata_.getToken(ep2);
// if there is no token for an endpoint, return an empty string to denote that
return ( token == null ) ? "" : token.toString();
}
public String getToken()
{
return tokenMetadata_.getToken(StorageService.tcpAddr_).toString();
}
public String getLiveNodes()
{
return stringify(Gossiper.instance().getLiveMembers());
}
public String getUnreachableNodes()
{
return stringify(Gossiper.instance().getUnreachableMembers());
}
public int getCurrentGenerationNumber()
{
return Gossiper.instance().getCurrentGenerationNumber(udpAddr_);
}
/* Helper for the MBean interface */
private String stringify(Set<EndPoint> eps)
{
StringBuilder sb = new StringBuilder("");
for (EndPoint ep : eps)
{
sb.append(ep);
sb.append(" ");
}
return sb.toString();
}
public void loadAll(String nodes) throws UnknownHostException
{
doBootstrap(nodes);
}
public void forceTableCleanup() throws IOException
{
List<String> tables = DatabaseDescriptor.getTables();
for ( String tName : tables )
{
Table table = Table.open(tName);
table.forceCleanup();
}
}
/**
* Trigger the immediate compaction of all tables.
*/
public void forceTableCompaction() throws IOException
{
List<String> tables = DatabaseDescriptor.getTables();
for ( String tName : tables )
{
Table table = Table.open(tName);
table.forceCompaction();
}
}
public void forceHandoff(List<String> dataDirectories, String host) throws IOException
{
List<File> filesList = new ArrayList<File>();
List<StreamContextManager.StreamContext> streamContexts = new ArrayList<StreamContextManager.StreamContext>();
for (String dataDir : dataDirectories)
{
File directory = new File(dataDir);
Collections.addAll(filesList, directory.listFiles());
for (File tableDir : directory.listFiles())
{
String tableName = tableDir.getName();
for (File file : tableDir.listFiles())
{
streamContexts.add(new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), tableName));
if (logger_.isDebugEnabled())
logger_.debug("Stream context metadata " + streamContexts);
}
}
}
if ( streamContexts.size() > 0 )
{
EndPoint target = new EndPoint(host, DatabaseDescriptor.getStoragePort());
/* Set up the stream manager with the files that need to streamed */
final StreamContextManager.StreamContext[] contexts = streamContexts.toArray(new StreamContextManager.StreamContext[streamContexts.size()]);
StreamManager.instance(target).addFilesToStream(contexts);
/* Send the bootstrap initiate message */
final StreamContextManager.StreamContext[] bootContexts = streamContexts.toArray(new StreamContextManager.StreamContext[streamContexts.size()]);
BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(bootContexts);
Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
if (logger_.isDebugEnabled())
logger_.debug("Sending a bootstrap initiate message to " + target + " ...");
MessagingService.getMessagingInstance().sendOneWay(message, target);
if (logger_.isDebugEnabled())
logger_.debug("Waiting for transfer to " + target + " to complete");
StreamManager.instance(target).waitForStreamCompletion();
if (logger_.isDebugEnabled())
logger_.debug("Done with transfer to " + target);
}
}
/**
* Takes the snapshot for a given table.
*
* @param tableName the name of the table.
* @param tag the tag given to the snapshot (null is permissible)
*/
public void takeSnapshot(String tableName, String tag) throws IOException
{
if (DatabaseDescriptor.getTable(tableName) == null)
{
throw new IOException("Table " + tableName + "does not exist");
}
Table tableInstance = Table.open(tableName);
tableInstance.snapshot(tag);
}
/**
* Takes a snapshot for every table.
*
* @param tag the tag given to the snapshot (null is permissible)
*/
public void takeAllSnapshot(String tag) throws IOException
{
for (String tableName: DatabaseDescriptor.getTables())
{
Table tableInstance = Table.open(tableName);
tableInstance.snapshot(tag);
}
}
/**
* Remove all the existing snapshots.
*/
public void clearSnapshot() throws IOException
{
for (String tableName: DatabaseDescriptor.getTables())
{
Table tableInstance = Table.open(tableName);
tableInstance.clearSnapshot();
}
if (logger_.isDebugEnabled())
logger_.debug("Cleared out all snapshot directories");
}
public void forceTableFlushBinary(String tableName) throws IOException
{
if (DatabaseDescriptor.getTable(tableName) == null)
{
throw new IOException("Table " + tableName + "does not exist");
}
Table table = Table.open(tableName);
Set<String> columnFamilies = table.getColumnFamilies();
for (String columnFamily : columnFamilies)
{
ColumnFamilyStore cfStore = table.getColumnFamilyStore(columnFamily);
logger_.debug("Forcing flush on keyspace " + tableName + " on CF " + columnFamily);
cfStore.forceFlushBinary();
}
}
/* End of MBean interface methods */
/**
* This method returns the predecessor of the endpoint ep on the identifier
* space.
*/
EndPoint getPredecessor(EndPoint ep)
{
Token token = tokenMetadata_.getToken(ep);
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
return (index == 0) ? tokenToEndPointMap.get(tokens
.get(tokens.size() - 1)) : tokenToEndPointMap.get(tokens
.get(--index));
}
/*
* This method returns the successor of the endpoint ep on the identifier
* space.
*/
public EndPoint getSuccessor(EndPoint ep)
{
Token token = tokenMetadata_.getToken(ep);
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
return (index == (tokens.size() - 1)) ? tokenToEndPointMap
.get(tokens.get(0))
: tokenToEndPointMap.get(tokens.get(++index));
}
/**
* Get the primary range for the specified endpoint.
* @param ep endpoint we are interested in.
* @return range for the specified endpoint.
*/
public Range getPrimaryRangeForEndPoint(EndPoint ep)
{
Token right = tokenMetadata_.getToken(ep);
EndPoint predecessor = getPredecessor(ep);
Token left = tokenMetadata_.getToken(predecessor);
return new Range(left, right);
}
/**
* Get all ranges an endpoint is responsible for.
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
List<Range> getRangesForEndPoint(EndPoint ep)
{
List<Range> ranges = new ArrayList<Range>();
ranges.add( getPrimaryRangeForEndPoint(ep) );
EndPoint predecessor = ep;
int count = DatabaseDescriptor.getReplicationFactor() - 1;
for ( int i = 0; i < count; ++i )
{
predecessor = getPredecessor(predecessor);
ranges.add( getPrimaryRangeForEndPoint(predecessor) );
}
return ranges;
}
/**
* Get all ranges that span the ring as per
* current snapshot of the token distribution.
* @return all ranges in sorted order.
*/
public Range[] getAllRanges()
{
return getAllRanges(tokenMetadata_.cloneTokenEndPointMap().keySet());
}
/**
* Get all ranges that span the ring given a set
* of tokens. All ranges are in sorted order of
* ranges.
* @return ranges in sorted order
*/
public Range[] getAllRanges(Set<Token> tokens)
{
List<Range> ranges = new ArrayList<Range>();
List<Token> allTokens = new ArrayList<Token>(tokens);
Collections.sort(allTokens);
int size = allTokens.size();
for ( int i = 1; i < size; ++i )
{
Range range = new Range( allTokens.get(i - 1), allTokens.get(i) );
ranges.add(range);
}
Range range = new Range( allTokens.get(size - 1), allTokens.get(0) );
ranges.add(range);
return ranges.toArray( new Range[0] );
}
/**
* This method returns the endpoint that is responsible for storing the
* specified key.
*
* @param key - key for which we need to find the endpoint
* @return value - the endpoint responsible for this key
*/
public EndPoint getPrimary(String key)
{
EndPoint endpoint = StorageService.tcpAddr_;
Token token = partitioner_.getToken(key);
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
if (tokens.size() > 0)
{
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
if (index >= 0)
{
/*
* retrieve the endpoint based on the token at this index in the
* tokens list
*/
endpoint = tokenToEndPointMap.get(tokens.get(index));
}
else
{
index = (index + 1) * (-1);
if (index < tokens.size())
endpoint = tokenToEndPointMap.get(tokens.get(index));
else
endpoint = tokenToEndPointMap.get(tokens.get(0));
}
}
return endpoint;
}
/**
* This method determines whether the local endpoint is the
* primary for the given key.
* @param key
* @return true if the local endpoint is the primary replica.
*/
public boolean isPrimary(String key)
{
EndPoint endpoint = getPrimary(key);
return StorageService.tcpAddr_.equals(endpoint);
}
/**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.
*
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
public EndPoint[] getNStorageEndPoint(String key)
{
return nodePicker_.getStorageEndPoints(partitioner_.getToken(key));
}
private Map<String, EndPoint[]> getNStorageEndPoints(String[] keys)
{
return nodePicker_.getStorageEndPoints(keys);
}
/**
* This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication.
*
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
public List<EndPoint> getNLiveStorageEndPoint(String key)
{
List<EndPoint> liveEps = new ArrayList<EndPoint>();
EndPoint[] endpoints = getNStorageEndPoint(key);
for ( EndPoint endpoint : endpoints )
{
if ( FailureDetector.instance().isAlive(endpoint) )
liveEps.add(endpoint);
}
return liveEps;
}
/**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.
*
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
public Map<EndPoint, EndPoint> getNStorageEndPointMap(String key)
{
return nodePicker_.getHintedStorageEndPoints(partitioner_.getToken(key));
}
/**
* This method returns the N endpoints that are responsible for storing the