forked from voldemort/voldemort
/
RebalanceUtils.java
1199 lines (1079 loc) · 53.2 KB
/
RebalanceUtils.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.utils;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.ClientConfig;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.client.rebalance.RebalancePlan;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.Zone;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.routing.StoreRoutingPlan;
import voldemort.server.VoldemortConfig;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreUtils;
import voldemort.store.bdb.BdbStorageConfiguration;
import voldemort.store.metadata.MetadataStore.VoldemortState;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
import voldemort.store.readonly.ReadOnlyStorageFormat;
import voldemort.tools.PartitionBalance;
import voldemort.versioning.Occurred;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
import voldemort.xml.ClusterMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* RebalanceUtils provide basic functionality for rebalancing.
*
*/
public class RebalanceUtils {
private static Logger logger = Logger.getLogger(RebalanceUtils.class);
public final static List<String> canRebalanceList = Arrays.asList(BdbStorageConfiguration.TYPE_NAME,
ReadOnlyStorageConfiguration.TYPE_NAME);
public final static String currentClusterFileName = "current-cluster.xml";
public final static String finalClusterFileName = "final-cluster.xml";
/**
* Given the current replica to partition list, try to check if the donor
* node would already contain that partition and if yes, ignore it
*
* @param stealerNodeId Stealer node id
* @param cluster Cluster metadata
* @param storeDef Store definition
* @param currentReplicaToPartitionList Current replica to partition list
* @return Optimized replica to partition list
*/
public static HashMap<Integer, List<Integer>> getOptimizedReplicaToPartitionList(int stealerNodeId,
Cluster cluster,
StoreDefinition storeDef,
HashMap<Integer, List<Integer>> currentReplicaToPartitionList) {
HashMap<Integer, List<Integer>> optimizedReplicaToPartitionList = Maps.newHashMap();
RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef,
cluster);
for(Entry<Integer, List<Integer>> tuple: currentReplicaToPartitionList.entrySet()) {
List<Integer> partitionList = Lists.newArrayList();
for(int partition: tuple.getValue()) {
List<Integer> preferenceList = strategy.getReplicatingPartitionList(partition);
// If this node was already in the
// preference list before, a copy of the
// data will already exist - Don't copy
// it!
if(!ClusterUtils.containsPreferenceList(cluster, preferenceList, stealerNodeId)) {
partitionList.add(partition);
}
}
if(partitionList.size() > 0) {
optimizedReplicaToPartitionList.put(tuple.getKey(), partitionList);
}
}
return optimizedReplicaToPartitionList;
}
// TODO: (refactor) Either move all methods that take an AdminClient
// somewhere else. Either (i) into a name space of AdminClient or (ii)
// separate utils class. Must wait until after all changes for abortable
// rebalance & atomic update of cluster/stores are merged to do this change.
/**
* Get the latest cluster from all available nodes in the cluster<br>
*
* Throws exception if:<br>
* A) Any node in the required nodes list fails to respond.<br>
* B) Cluster is in inconsistent state with concurrent versions for cluster
* metadata on any two nodes.<br>
*
* @param requiredNodes List of nodes from which we definitely need an
* answer
* @param adminClient Admin client used to query the nodes
* @return Returns the latest cluster metadata
*/
public static Versioned<Cluster> getLatestCluster(List<Integer> requiredNodes,
AdminClient adminClient) {
Versioned<Cluster> latestCluster = new Versioned<Cluster>(adminClient.getAdminClientCluster());
Cluster cluster = latestCluster.getValue();
for(Node node: cluster.getNodes()) {
try {
Cluster nodesCluster = adminClient.metadataMgmtOps.getRemoteCluster(node.getId())
.getValue();
if(!nodesCluster.equals(cluster)) {
throw new VoldemortException("Cluster is in inconsistent state because cluster xml on node "
+ node.getId()
+ " does not match cluster xml of adminClient.");
}
} catch(Exception e) {
if(null != requiredNodes && requiredNodes.contains(node.getId()))
throw new VoldemortException("Failed on node " + node.getId(), e);
else
logger.info("Failed on node " + node.getId(), e);
}
}
return latestCluster;
}
private static void checkNotConcurrent(ArrayList<Versioned<Cluster>> clockList,
VectorClock newClock) {
for(Versioned<Cluster> versionedCluster: clockList) {
VectorClock clock = (VectorClock) versionedCluster.getVersion();
if(Occurred.CONCURRENTLY.equals(clock.compare(newClock)))
throw new VoldemortException("Cluster is in inconsistent state because we got conflicting clocks "
+ clock + " and on current node " + newClock);
}
}
/**
* Given a list of partition informations check all of them belong to the
* same donor node
*
* @param partitionInfos List of partition infos
* @param expectedDonorId Expected donor node id ( If -1, then just checks
* if all are same )
*/
public static void assertSameDonor(List<RebalancePartitionsInfo> partitionInfos,
int expectedDonorId) {
int donorId = (expectedDonorId < 0) ? partitionInfos.get(0).getDonorId() : expectedDonorId;
for(RebalancePartitionsInfo info: partitionInfos) {
if(info.getDonorId() != donorId) {
throw new VoldemortException("Found a stealer information " + info
+ " having a different donor node from others ( "
+ donorId + " )");
}
}
}
/**
* Check the execution state of the server by checking the state of
* {@link VoldemortState} <br>
*
* This function checks if the nodes are all in normal state (
* {@link VoldemortState#NORMAL_SERVER}).
*
* @param cluster Cluster metadata whose nodes we are checking
* @param adminClient Admin client used to query
* @throws VoldemortRebalancingException if any node is not in normal state
*/
public static void checkEachServerInNormalState(final Cluster cluster,
final AdminClient adminClient) {
for(Node node: cluster.getNodes()) {
Versioned<VoldemortState> versioned = adminClient.rebalanceOps.getRemoteServerState(node.getId());
if(!VoldemortState.NORMAL_SERVER.equals(versioned.getValue())) {
throw new VoldemortRebalancingException("Cannot rebalance since node "
+ node.getId() + " (" + node.getHost()
+ ") is not in normal state, but in "
+ versioned.getValue());
} else {
if(logger.isInfoEnabled()) {
logger.info("Node " + node.getId() + " (" + node.getHost()
+ ") is ready for rebalance.");
}
}
}
}
/**
* Verify store definitions are congruent with cluster definition.
*
* @param cluster
* @param stores
*/
public static void validateClusterStores(final Cluster cluster,
final List<StoreDefinition> storeDefs) {
// Constructing a StoreRoutingPlan has the (desirable in this
// case) side-effect of verifying that the store definition is congruent
// with the cluster definition. If there are issues, exceptions are
// thrown.
for(StoreDefinition storeDefinition: storeDefs) {
new StoreRoutingPlan(cluster, storeDefinition);
}
return;
}
// TODO: This method is biased towards the 3 currently supported use cases:
// shuffle, cluster expansion, and zone expansion. There are two other use
// cases we need to consider: cluster contraction (reducing # nodes in a
// zone) and zone contraction (reducing # of zones). We probably want to end
// up pass an enum into this method so we can do proper checks based on use
// case.
/**
* A final cluster ought to be a super set of current cluster. I.e.,
* existing node IDs ought to map to same server, but partition layout can
* have changed and there may exist new nodes.
*
* @param currentCluster
* @param finalCluster
*/
public static void validateCurrentFinalCluster(final Cluster currentCluster,
final Cluster finalCluster) {
validateClusterPartitionCounts(currentCluster, finalCluster);
validateClusterNodeState(currentCluster, finalCluster);
return;
}
/**
* An interim cluster ought to be a super set of current cluster. I.e., it
* ought to either be the same as current cluster (every partition is mapped
* to the same node of current & interim), or it ought to have more nodes
* (possibly in new zones) without partitions.
*
* @param currentCluster
* @param interimCluster
*/
public static void validateCurrentInterimCluster(final Cluster currentCluster,
final Cluster interimCluster) {
validateClusterPartitionCounts(currentCluster, interimCluster);
validateClusterNodeState(currentCluster, interimCluster);
validateClusterPartitionState(currentCluster, interimCluster);
return;
}
/**
* Interim and final clusters ought to have same partition counts, same
* zones, and same node state. Partitions per node may of course differ.
*
* @param interimCluster
* @param finalCluster
*/
public static void validateInterimFinalCluster(final Cluster interimCluster,
final Cluster finalCluster) {
validateClusterPartitionCounts(interimCluster, finalCluster);
validateClusterZonesSame(interimCluster, finalCluster);
validateClusterNodeCounts(interimCluster, finalCluster);
validateClusterNodeState(interimCluster, finalCluster);
return;
}
/**
* Confirms that both clusters have the same number of total partitions.
*
* @param lhs
* @param rhs
*/
public static void validateClusterPartitionCounts(final Cluster lhs, final Cluster rhs) {
if(lhs.getNumberOfPartitions() != rhs.getNumberOfPartitions())
throw new VoldemortException("Total number of partitions should be equal [ lhs cluster ("
+ lhs.getNumberOfPartitions()
+ ") not equal to rhs cluster ("
+ rhs.getNumberOfPartitions() + ") ]");
}
/**
* Confirm that all nodes shared between clusters host exact same partition
* IDs and that nodes only in the super set cluster have no partition IDs.
*
* @param subsetCluster
* @param supersetCluster
*/
public static void validateClusterPartitionState(final Cluster subsetCluster,
final Cluster supersetCluster) {
if(!supersetCluster.getNodeIds().containsAll(subsetCluster.getNodeIds())) {
throw new VoldemortException("Superset cluster does not contain all nodes from subset cluster[ subset cluster node ids ("
+ subsetCluster.getNodeIds()
+ ") are not a subset of superset cluster node ids ("
+ supersetCluster.getNodeIds() + ") ]");
}
for(int nodeId: subsetCluster.getNodeIds()) {
Node supersetNode = supersetCluster.getNodeById(nodeId);
Node subsetNode = subsetCluster.getNodeById(nodeId);
if(!supersetNode.getPartitionIds().equals(subsetNode.getPartitionIds())) {
throw new VoldemortRebalancingException("Partition IDs do not match between clusters for nodes with id "
+ nodeId
+ " : subset cluster has "
+ subsetNode.getPartitionIds()
+ " and superset cluster has "
+ supersetNode.getPartitionIds());
}
}
Set<Integer> nodeIds = supersetCluster.getNodeIds();
nodeIds.removeAll(subsetCluster.getNodeIds());
for(int nodeId: nodeIds) {
Node supersetNode = supersetCluster.getNodeById(nodeId);
if(!supersetNode.getPartitionIds().isEmpty()) {
throw new VoldemortRebalancingException("New node "
+ nodeId
+ " in superset cluster already has partitions: "
+ supersetNode.getPartitionIds());
}
}
}
/**
* Confirms that both clusters have the same set of zones defined.
*
* @param lhs
* @param rhs
*/
public static void validateClusterZonesSame(final Cluster lhs, final Cluster rhs) {
Set<Zone> lhsSet = new HashSet<Zone>(lhs.getZones());
Set<Zone> rhsSet = new HashSet<Zone>(rhs.getZones());
if(!lhsSet.equals(rhsSet))
throw new VoldemortException("Zones are not the same [ lhs cluster zones ("
+ lhs.getZones() + ") not equal to rhs cluster zones ("
+ rhs.getZones() + ") ]");
}
/**
* Confirms that both clusters have the same number of nodes by comparing
* set of node Ids between clusters.
*
* @param lhs
* @param rhs
*/
public static void validateClusterNodeCounts(final Cluster lhs, final Cluster rhs) {
if(!lhs.getNodeIds().equals(rhs.getNodeIds())) {
throw new VoldemortException("Node ids are not the same [ lhs cluster node ids ("
+ lhs.getNodeIds()
+ ") not equal to rhs cluster node ids ("
+ rhs.getNodeIds() + ") ]");
}
}
/**
* Confirms that any nodes from supersetCluster that are in subsetCluster
* have the same state (i.e., node id, host name, and ports). Specific
* partitions hosted are not compared.
*
* @param subsetCluster
* @param supersetCluster
*/
public static void validateClusterNodeState(final Cluster subsetCluster,
final Cluster supersetCluster) {
if(!supersetCluster.getNodeIds().containsAll(subsetCluster.getNodeIds())) {
throw new VoldemortException("Superset cluster does not contain all nodes from subset cluster[ subset cluster node ids ("
+ subsetCluster.getNodeIds()
+ ") are not a subset of superset cluster node ids ("
+ supersetCluster.getNodeIds() + ") ]");
}
for(Node subsetNode: subsetCluster.getNodes()) {
Node supersetNode = supersetCluster.getNodeById(subsetNode.getId());
if(!subsetNode.isEqualState(supersetNode)) {
throw new VoldemortException("Nodes do not have same state[ subset node state ("
+ subsetNode.getStateString()
+ ") not equal to superset node state ("
+ supersetNode.getStateString() + ") ]");
}
}
}
/**
* Given the current cluster and final cluster, generates an interim cluster
* with empty new nodes (and zones).
*
* @param currentCluster Current cluster metadata
* @param finalCluster Final cluster metadata
* @return Returns a new interim cluster which contains nodes and zones of
* final cluster, but with empty partition lists if they were not
* present in current cluster.
*/
public static Cluster getInterimCluster(Cluster currentCluster, Cluster finalCluster) {
List<Node> newNodeList = new ArrayList<Node>(currentCluster.getNodes());
for(Node node: finalCluster.getNodes()) {
if(!ClusterUtils.containsNode(currentCluster, node.getId())) {
newNodeList.add(NodeUtils.updateNode(node, new ArrayList<Integer>()));
}
}
Collections.sort(newNodeList);
return new Cluster(currentCluster.getName(),
newNodeList,
Lists.newArrayList(finalCluster.getZones()));
}
/**
* Given the current cluster and an interim cluster, generates a cluster
* with new nodes (which in turn contain empty partition lists).
*
* @param currentCluster Current cluster metadata
* @param interimCluster Interim cluster metadata
* @return Returns a new cluster which contains nodes of the current cluster
* + new nodes
*/
public static Cluster getClusterWithNewNodes(Cluster currentCluster, Cluster interimCluster) {
return getInterimCluster(currentCluster, interimCluster);
}
/**
* Concatenates the list of current nodes in the given cluster with the new
* nodes provided and returns an updated cluster metadata. <br>
* If the nodes being updated already exist in the current metadata, we take
* the updated ones
*
* @param currentCluster The current cluster metadata
* @param updatedNodeList The list of new nodes to be added
* @return New cluster metadata containing both the sets of nodes
*/
public static Cluster updateCluster(Cluster currentCluster, List<Node> updatedNodeList) {
List<Node> newNodeList = new ArrayList<Node>(updatedNodeList);
for(Node currentNode: currentCluster.getNodes()) {
if(!updatedNodeList.contains(currentNode))
newNodeList.add(currentNode);
}
Collections.sort(newNodeList);
return new Cluster(currentCluster.getName(),
newNodeList,
Lists.newArrayList(currentCluster.getZones()));
}
/**
* Updates the existing cluster such that we remove partitions mentioned
* from the stealer node and add them to the donor node
*
* @param currentCluster Existing cluster metadata. Both stealer and donor
* node should already exist in this metadata
* @param stealerNodeId Id of node for which we are stealing the partitions
* @param donatedPartitions List of partitions we are moving
* @param partitionList List of partitions we are moving
* @return Updated cluster metadata
*/
public static Cluster createUpdatedCluster(Cluster currentCluster,
int stealerNodeId,
List<Integer> donatedPartitions) {
// Clone the cluster
ClusterMapper mapper = new ClusterMapper();
Cluster updatedCluster = mapper.readCluster(new StringReader(mapper.writeCluster(currentCluster)));
// Go over every donated partition one by one
for(int donatedPartition: donatedPartitions) {
// Gets the donor Node that owns this donated partition
Node donorNode = updatedCluster.getNodeForPartitionId(donatedPartition);
Node stealerNode = updatedCluster.getNodeById(stealerNodeId);
if(donorNode == stealerNode) {
// Moving to the same location = No-op
continue;
}
// Update the list of partitions for this node
donorNode = NodeUtils.removePartitionToNode(donorNode, donatedPartition);
stealerNode = NodeUtils.addPartitionToNode(stealerNode, donatedPartition);
// Sort the nodes
updatedCluster = updateCluster(updatedCluster,
Lists.newArrayList(donorNode, stealerNode));
}
return updatedCluster;
}
/**
* For a particular stealer node find all the "primary" <replica, partition>
* tuples it will steal. In other words, expect the "replica" part to be 0
* always.
*
* @param currentCluster The cluster definition of the existing cluster
* @param finalCluster The final cluster definition
* @param stealNodeId Node id of the stealer node
* @return Returns a list of primary partitions which this stealer node will
* get
*/
public static List<Integer> getStolenPrimaryPartitions(final Cluster currentCluster,
final Cluster finalCluster,
final int stealNodeId) {
List<Integer> finalList = new ArrayList<Integer>(finalCluster.getNodeById(stealNodeId)
.getPartitionIds());
List<Integer> currentList = new ArrayList<Integer>();
if(ClusterUtils.containsNode(currentCluster, stealNodeId)) {
currentList = currentCluster.getNodeById(stealNodeId).getPartitionIds();
} else {
if(logger.isDebugEnabled()) {
logger.debug("Current cluster does not contain stealer node (cluster : [[["
+ currentCluster + "]]], node id " + stealNodeId + ")");
}
}
finalList.removeAll(currentList);
return finalList;
}
/**
* Find all [replica_type, partition] tuples to be stolen
*
* @param currentCluster Current cluster metadata
* @param finalCluster Final cluster metadata
* @param storeDef Store Definition
* @return Map of stealer node id to sets of [ replica_type, partition ]
* tuples
*/
public static Map<Integer, Set<Pair<Integer, Integer>>> getStolenPartitionTuples(final Cluster currentCluster,
final Cluster finalCluster,
final StoreDefinition storeDef) {
Map<Integer, Set<Pair<Integer, Integer>>> currentNodeIdToReplicas = getNodeIdToAllPartitions(currentCluster,
storeDef,
true);
Map<Integer, Set<Pair<Integer, Integer>>> finalNodeIdToReplicas = getNodeIdToAllPartitions(finalCluster,
storeDef,
true);
Map<Integer, Set<Pair<Integer, Integer>>> stealerNodeToStolenPartitionTuples = Maps.newHashMap();
for(int stealerId: NodeUtils.getNodeIds(Lists.newArrayList(finalCluster.getNodes()))) {
Set<Pair<Integer, Integer>> clusterStealerReplicas = currentNodeIdToReplicas.get(stealerId);
Set<Pair<Integer, Integer>> finalStealerReplicas = finalNodeIdToReplicas.get(stealerId);
Set<Pair<Integer, Integer>> diff = Utils.getAddedInTarget(clusterStealerReplicas,
finalStealerReplicas);
if(diff != null && diff.size() > 0) {
stealerNodeToStolenPartitionTuples.put(stealerId, diff);
}
}
return stealerNodeToStolenPartitionTuples;
}
/**
* Given a mapping of existing node ids to their partition tuples and
* another new set of node ids to partition tuples, combines them together
* and puts it into the existing partition tuples
*
* @param existingPartitionTuples Existing partition tuples ( Will include
* the new partition tuples at the end of this function )
* @param newPartitionTuples New partition tuples
*/
public static void combinePartitionTuples(Map<Integer, Set<Pair<Integer, Integer>>> existingPartitionTuples,
Map<Integer, Set<Pair<Integer, Integer>>> newPartitionTuples) {
for(int nodeId: newPartitionTuples.keySet()) {
Set<Pair<Integer, Integer>> tuples = null;
if(existingPartitionTuples.containsKey(nodeId)) {
tuples = existingPartitionTuples.get(nodeId);
} else {
tuples = Sets.newHashSet();
existingPartitionTuples.put(nodeId, tuples);
}
tuples.addAll(newPartitionTuples.get(nodeId));
}
}
/**
* For a particular cluster creates a mapping of node id to their
* corresponding list of [ replicaType, partition ] tuple
*
* @param cluster The cluster metadata
* @param storeDef The store definition
* @param includePrimary Include the primary partition?
* @return Map of node id to set of [ replicaType, partition ] tuple
*/
public static Map<Integer, Set<Pair<Integer, Integer>>> getNodeIdToAllPartitions(final Cluster cluster,
final StoreDefinition storeDef,
boolean includePrimary) {
final RoutingStrategy routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef,
cluster);
final Map<Integer, Set<Pair<Integer, Integer>>> nodeIdToReplicas = new HashMap<Integer, Set<Pair<Integer, Integer>>>();
final Map<Integer, Integer> partitionToNodeIdMap = ClusterUtils.getCurrentPartitionMapping(cluster);
// Map initialization.
for(Node node: cluster.getNodes()) {
nodeIdToReplicas.put(node.getId(), new HashSet<Pair<Integer, Integer>>());
}
// Track how many zones actually have partitions (and so replica types)
// in them.
int zonesWithPartitions = 0;
for(Integer zoneId: cluster.getZoneIds()) {
if(cluster.getNumberOfPartitionsInZone(zoneId) > 0) {
zonesWithPartitions++;
}
}
// Loops through all nodes
for(Node node: cluster.getNodes()) {
// Gets the partitions that this node was configured with.
for(Integer primary: node.getPartitionIds()) {
// Gets the list of replicating partitions.
List<Integer> replicaPartitionList = routingStrategy.getReplicatingPartitionList(primary);
if((replicaPartitionList.size() % zonesWithPartitions != 0)
|| ((replicaPartitionList.size() / zonesWithPartitions) != (storeDef.getReplicationFactor() / cluster.getNumberOfZones()))) {
// For zone expansion & shrinking, this warning is expected
// in some cases. For other use cases (shuffling, cluster
// expansion), this warning indicates that something
// is wrong between the clusters and store defs.
logger.warn("Number of replicas returned (" + replicaPartitionList.size()
+ ") does not make sense given the replication factor ("
+ storeDef.getReplicationFactor() + ") and that there are "
+ cluster.getNumberOfZones() + " zones of which "
+ zonesWithPartitions + " have partitions (and of which "
+ (cluster.getNumberOfZones() - zonesWithPartitions)
+ " are empty).");
}
int replicaType = 0;
if(!includePrimary) {
replicaPartitionList.remove(primary);
replicaType = 1;
}
// Get the node that this replicating partition belongs to.
for(Integer replicaPartition: replicaPartitionList) {
Integer replicaNodeId = partitionToNodeIdMap.get(replicaPartition);
// The replicating node will have a copy of primary.
nodeIdToReplicas.get(replicaNodeId).add(Pair.create(replicaType, primary));
replicaType++;
}
}
}
return nodeIdToReplicas;
}
/**
* Print log to the following logger ( Info level )
*
* @param batchId Task id
* @param logger Logger class
* @param message The message to print
*/
public static void printBatchLog(int batchId, Logger logger, String message) {
logger.info("[Rebalance batch id " + batchId + "] " + message);
}
/**
* Print log to the following logger ( Info level )
*
* @param batchId
* @param taskId
* @param logger
* @param message
*/
public static void printBatchTaskLog(int batchId, int taskId, Logger logger, String message) {
logger.info("[Rebalance batch/task id " + batchId + "/" + taskId + "] " + message);
}
/**
* Print log to the following logger ( Error level )
*
* @param taskId Stealer node id
* @param logger Logger class
* @param message The message to print
*/
public static void printErrorLog(int taskId, Logger logger, String message, Exception e) {
if(e == null) {
logger.error("Task id " + Integer.toString(taskId) + "] " + message);
} else {
logger.error("Task id " + Integer.toString(taskId) + "] " + message, e);
}
}
public static AdminClient createTempAdminClient(VoldemortConfig voldemortConfig,
Cluster cluster,
int numConnPerNode) {
AdminClientConfig config = new AdminClientConfig().setMaxConnectionsPerNode(numConnPerNode)
.setAdminConnectionTimeoutSec(voldemortConfig.getAdminConnectionTimeout())
.setAdminSocketTimeoutSec(voldemortConfig.getAdminSocketTimeout())
.setAdminSocketBufferSize(voldemortConfig.getAdminSocketBufferSize());
return new AdminClient(cluster, config, new ClientConfig());
}
/**
* Given the cluster metadata and admin client, retrieves the list of store
* definitions.
*
* <br>
*
* It also checks if the store definitions are consistent across the cluster
*
* @param cluster The cluster metadata
* @param adminClient The admin client to use to retrieve the store
* definitions
* @return List of store definitions
*/
public static List<StoreDefinition> getCurrentStoreDefinitions(Cluster cluster,
AdminClient adminClient) {
List<StoreDefinition> storeDefs = null;
for(Node node: cluster.getNodes()) {
List<StoreDefinition> storeDefList = adminClient.metadataMgmtOps.getRemoteStoreDefList(node.getId())
.getValue();
if(storeDefs == null) {
storeDefs = storeDefList;
} else {
// Compare against the previous store definitions
if(!Utils.compareList(storeDefs, storeDefList)) {
throw new VoldemortException("Store definitions on node " + node.getId()
+ " does not match those on other nodes");
}
}
}
if(storeDefs == null) {
throw new VoldemortException("Could not retrieve list of store definitions correctly");
} else {
return storeDefs;
}
}
/**
* Given a list of store definitions, makes sure that rebalance supports all
* of them. If not it throws an error.
*
* @param storeDefList List of store definitions
* @return Filtered list of store definitions which rebalancing supports
*/
public static List<StoreDefinition> validateRebalanceStore(List<StoreDefinition> storeDefList) {
List<StoreDefinition> returnList = new ArrayList<StoreDefinition>(storeDefList.size());
for(StoreDefinition def: storeDefList) {
if(!def.isView() && !canRebalanceList.contains(def.getType())) {
throw new VoldemortException("Rebalance does not support rebalancing of stores of type "
+ def.getType() + " - " + def.getName());
} else if(!def.isView()) {
returnList.add(def);
} else {
logger.debug("Ignoring view " + def.getName() + " for rebalancing");
}
}
return returnList;
}
/**
* Given a list of store definitions, cluster and admin client returns a
* boolean indicating if all RO stores are in the correct format.
*
* <br>
*
* This function also takes into consideration nodes which are being
* bootstrapped for the first time, in which case we can safely ignore
* checking them ( as they will have default to ro0 )
*
* @param cluster Cluster metadata
* @param storeDefs Complete list of store definitions
* @param adminClient Admin client
*/
public static void validateReadOnlyStores(Cluster cluster,
List<StoreDefinition> storeDefs,
AdminClient adminClient) {
List<StoreDefinition> readOnlyStores = StoreDefinitionUtils.filterStores(storeDefs, true);
if(readOnlyStores.size() == 0) {
// No read-only stores
return;
}
List<String> storeNames = StoreDefinitionUtils.getStoreNames(readOnlyStores);
for(Node node: cluster.getNodes()) {
if(node.getNumberOfPartitions() != 0) {
for(Entry<String, String> storeToStorageFormat: adminClient.readonlyOps.getROStorageFormat(node.getId(),
storeNames)
.entrySet()) {
if(storeToStorageFormat.getValue()
.compareTo(ReadOnlyStorageFormat.READONLY_V2.getCode()) != 0) {
throw new VoldemortRebalancingException("Cannot rebalance since node "
+ node.getId() + " has store "
+ storeToStorageFormat.getKey()
+ " not using format "
+ ReadOnlyStorageFormat.READONLY_V2);
}
}
}
}
}
/**
* Returns a string representation of the cluster
*
* <pre>
* Current Cluster:
* 0 - [0, 1, 2, 3] + [7, 8, 9]
* 1 - [4, 5, 6] + [0, 1, 2, 3]
* 2 - [7, 8, 9] + [4, 5, 6]
* </pre>
*
* @param nodeIdToAllPartitions Mapping of node id to all tuples
* @return Returns a string representation of the cluster
*/
public static String printMap(final Map<Integer, Set<Pair<Integer, Integer>>> nodeIdToAllPartitions) {
StringBuilder sb = new StringBuilder();
for(Map.Entry<Integer, Set<Pair<Integer, Integer>>> entry: nodeIdToAllPartitions.entrySet()) {
final Integer nodeId = entry.getKey();
final Set<Pair<Integer, Integer>> allPartitions = entry.getValue();
final HashMap<Integer, List<Integer>> replicaTypeToPartitions = flattenPartitionTuples(allPartitions);
// Put into sorted key order such that primary replicas occur before
// secondary replicas and so on...
final TreeMap<Integer, List<Integer>> sortedReplicaTypeToPartitions = new TreeMap<Integer, List<Integer>>(replicaTypeToPartitions);
sb.append(nodeId);
if(replicaTypeToPartitions.size() > 0) {
for(Entry<Integer, List<Integer>> partitions: sortedReplicaTypeToPartitions.entrySet()) {
Collections.sort(partitions.getValue());
sb.append(" - " + partitions.getValue());
}
} else {
sb.append(" - empty");
}
sb.append(Utils.NEWLINE);
}
return sb.toString();
}
/**
* Given the initial and final cluster dumps it into the output directory
*
* @param currentCluster Initial cluster metadata
* @param finalCluster Final cluster metadata
* @param outputDir Output directory where to dump this file
* @param filePrefix String to prepend to the initial & final cluster
* metadata files
* @throws IOException
*/
public static void dumpClusters(Cluster currentCluster,
Cluster finalCluster,
String outputDirName,
String filePrefix) {
dumpClusterToFile(outputDirName, filePrefix + currentClusterFileName, currentCluster);
dumpClusterToFile(outputDirName, filePrefix + finalClusterFileName, finalCluster);
}
/**
* Given the current and final cluster dumps it into the output directory
*
* @param currentCluster Initial cluster metadata
* @param finalCluster Final cluster metadata
* @param outputDir Output directory where to dump this file
* @throws IOException
*/
public static void dumpClusters(Cluster currentCluster,
Cluster finalCluster,
String outputDirName) {
dumpClusters(currentCluster, finalCluster, outputDirName, "");
}
/**
* Prints a cluster xml to a file.
*
* @param outputDirName
* @param fileName
* @param cluster
*/
public static void dumpClusterToFile(String outputDirName, String fileName, Cluster cluster) {
if(outputDirName != null) {
File outputDir = new File(outputDirName);
if(!outputDir.exists()) {
Utils.mkdirs(outputDir);
}
try {
FileUtils.writeStringToFile(new File(outputDirName, fileName),
new ClusterMapper().writeCluster(cluster));
} catch(IOException e) {
logger.error("IOException during dumpClusterToFile: " + e);
}
}
}
/**
* Prints a balance analysis to a file.
*
* @param outputDirName
* @param baseFileName suffix '.analysis' is appended to baseFileName.
* @param partitionBalance
*/
public static void dumpAnalysisToFile(String outputDirName,
String baseFileName,
PartitionBalance partitionBalance) {
if(outputDirName != null) {
File outputDir = new File(outputDirName);
if(!outputDir.exists()) {
Utils.mkdirs(outputDir);
}
try {
FileUtils.writeStringToFile(new File(outputDirName, baseFileName + ".analysis"),
partitionBalance.toString());
} catch(IOException e) {
logger.error("IOException during dumpAnalysisToFile: " + e);
}
}
}
/**
* Prints the plan to a file.
*
* @param outputDirName
* @param plan
*/
public static void dumpPlanToFile(String outputDirName, RebalancePlan plan) {
if(outputDirName != null) {
File outputDir = new File(outputDirName);
if(!outputDir.exists()) {
Utils.mkdirs(outputDir);
}
try {
FileUtils.writeStringToFile(new File(outputDirName, "plan.out"), plan.toString());
} catch(IOException e) {
logger.error("IOException during dumpPlanToFile: " + e);
}
}
}
/**
* Given a list of tuples of [replica_type, partition], flattens it and
* generates a map of replica_type to partition mapping
*
* @param partitionTuples Set of <replica_type, partition> tuples
* @return Map of replica_type to set of partitions
*/
public static HashMap<Integer, List<Integer>> flattenPartitionTuples(Set<Pair<Integer, Integer>> partitionTuples) {
HashMap<Integer, List<Integer>> flattenedTuples = Maps.newHashMap();
for(Pair<Integer, Integer> pair: partitionTuples) {
if(flattenedTuples.containsKey(pair.getFirst())) {
flattenedTuples.get(pair.getFirst()).add(pair.getSecond());
} else {
List<Integer> newPartitions = Lists.newArrayList();
newPartitions.add(pair.getSecond());
flattenedTuples.put(pair.getFirst(), newPartitions);
}
}
return flattenedTuples;
}
public static int countPartitionStores(List<RebalancePartitionsInfo> infos) {
int count = 0;
for(RebalancePartitionsInfo info: infos) {