/
AssignmentManager.java
2143 lines (1924 loc) · 88.6 KB
/
AssignmentManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.regionserver.SequenceId;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
/**
* The AssignmentManager is the coordinator for region assign/unassign operations.
* <ul>
* <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
* <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
* </ul>
* Regions are created by CreateTable, Split, Merge.
* Regions are deleted by DeleteTable, Split, Merge.
* Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash.
* Unassigns are triggered by DisableTable, Split, Merge
*/
@InterfaceAudience.Private
public class AssignmentManager {
private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
// TODO: AMv2
// - handle region migration from hbase1 to hbase2.
// - handle sys table assignment first (e.g. acl, namespace)
// - handle table priorities
// - If ServerBusyException trying to update hbase:meta, we abort the Master
// See updateRegionLocation in RegionStateStore.
//
// See also
// https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
// for other TODOs.
public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
"hbase.assignment.bootstrap.thread.pool.size";
public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
"hbase.assignment.dispatch.wait.msec";
private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
"hbase.assignment.dispatch.wait.queue.max.size";
private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
"hbase.assignment.rit.chore.interval.msec";
private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
"hbase.assignment.dead.region.metric.chore.interval.msec";
private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
public static final String ASSIGN_MAX_ATTEMPTS =
"hbase.assignment.maximum.attempts";
private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
"hbase.assignment.retry.immediately.maximum.attempts";
private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
/** Region in Transition metrics threshold time */
public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
"hbase.metrics.rit.stuck.warning.threshold";
private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
private final MetricsAssignmentManager metrics;
private final RegionInTransitionChore ritChore;
private final DeadServerMetricRegionChore deadMetricChore;
private final MasterServices master;
private final AtomicBoolean running = new AtomicBoolean(false);
private final RegionStates regionStates = new RegionStates();
private final RegionStateStore regionStateStore;
private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
private final boolean shouldAssignRegionsWithFavoredNodes;
private final int assignDispatchWaitQueueMaxSize;
private final int assignDispatchWaitMillis;
private final int assignMaxAttempts;
private final int assignRetryImmediatelyMaxAttempts;
private final Object checkIfShouldMoveSystemRegionLock = new Object();
private Thread assignThread;
public AssignmentManager(final MasterServices master) {
this(master, new RegionStateStore(master));
}
@VisibleForTesting
AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
this.master = master;
this.regionStateStore = stateStore;
this.metrics = new MetricsAssignmentManager();
final Configuration conf = master.getConfiguration();
// Only read favored nodes if using the favored nodes load balancer.
this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
DEFAULT_ASSIGN_MAX_ATTEMPTS));
this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
DEFAULT_RIT_CHORE_INTERVAL_MSEC);
this.ritChore = new RegionInTransitionChore(ritChoreInterval);
int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
if (deadRegionChoreInterval > 0) {
this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
} else {
this.deadMetricChore = null;
}
}
public void start() throws IOException, KeeperException {
if (!running.compareAndSet(false, true)) {
return;
}
LOG.trace("Starting assignment manager");
// Start the Assignment Thread
startAssignmentThread();
// load meta region state
ZKWatcher zkw = master.getZooKeeper();
// it could be null in some tests
if (zkw != null) {
RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
RegionStateNode regionNode =
regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
regionNode.lock();
try {
regionNode.setRegionLocation(regionState.getServerName());
regionNode.setState(regionState.getState());
if (regionNode.getProcedure() != null) {
regionNode.getProcedure().stateLoaded(this, regionNode);
}
setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
} finally {
regionNode.unlock();
}
}
}
/**
* Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
* <p>
* This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
* method below. And it is also very important as now before submitting a TRSP, we need to attach
* it to the RegionStateNode, which acts like a guard, so we need to restore this information at
* the very beginning, before we start processing any procedures.
*/
public void setupRIT(List<TransitRegionStateProcedure> procs) {
procs.forEach(proc -> {
RegionInfo regionInfo = proc.getRegion();
RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
TransitRegionStateProcedure existingProc = regionNode.getProcedure();
if (existingProc != null) {
// This is possible, as we will detach the procedure from the RSN before we
// actually finish the procedure. This is because that, we will detach the TRSP from the RSN
// during execution, at that time, the procedure has not been marked as done in the pv2
// framework yet, so it is possible that we schedule a new TRSP immediately and when
// arriving here, we will find out that there are multiple TRSPs for the region. But we can
// make sure that, only the last one can take the charge, the previous ones should have all
// been finished already. So here we will compare the proc id, the greater one will win.
if (existingProc.getProcId() < proc.getProcId()) {
// the new one wins, unset and set it to the new one below
regionNode.unsetProcedure(existingProc);
} else {
// the old one wins, skip
return;
}
}
LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
regionNode.setProcedure(proc);
});
}
public void stop() {
if (!running.compareAndSet(true, false)) {
return;
}
LOG.info("Stopping assignment manager");
// The AM is started before the procedure executor,
// but the actual work will be loaded/submitted only once we have the executor
final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
// Remove the RIT chore
if (hasProcExecutor) {
master.getMasterProcedureExecutor().removeChore(this.ritChore);
if (this.deadMetricChore != null) {
master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
}
}
// Stop the Assignment Thread
stopAssignmentThread();
// Stop the RegionStateStore
regionStates.clear();
// Update meta events (for testing)
if (hasProcExecutor) {
metaLoadEvent.suspend();
for (RegionInfo hri: getMetaRegionSet()) {
setMetaAssigned(hri, false);
}
}
}
public boolean isRunning() {
return running.get();
}
public Configuration getConfiguration() {
return master.getConfiguration();
}
public MetricsAssignmentManager getAssignmentManagerMetrics() {
return metrics;
}
private LoadBalancer getBalancer() {
return master.getLoadBalancer();
}
private MasterProcedureEnv getProcedureEnvironment() {
return master.getMasterProcedureExecutor().getEnvironment();
}
private MasterProcedureScheduler getProcedureScheduler() {
return getProcedureEnvironment().getProcedureScheduler();
}
int getAssignMaxAttempts() {
return assignMaxAttempts;
}
int getAssignRetryImmediatelyMaxAttempts() {
return assignRetryImmediatelyMaxAttempts;
}
public RegionStates getRegionStates() {
return regionStates;
}
/**
* Returns the regions hosted by the specified server.
* <p/>
* Notice that, for SCP, after we submit the SCP, no one can change the region list for the
* ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
* snapshot of the current region list for this server, which means, right after you get the
* region list, new regions may be moved to this server or some regions may be moved out from this
* server, so you should not use it critically if you need strong consistency.
*/
public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
ServerStateNode serverInfo = regionStates.getServerNode(serverName);
if (serverInfo == null) {
return Collections.emptyList();
}
return serverInfo.getRegionInfoList();
}
public RegionStateStore getRegionStateStore() {
return regionStateStore;
}
public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
return this.shouldAssignRegionsWithFavoredNodes
? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo)
: ServerName.EMPTY_SERVER_LIST;
}
// ============================================================================================
// Table State Manager helpers
// ============================================================================================
TableStateManager getTableStateManager() {
return master.getTableStateManager();
}
public boolean isTableEnabled(final TableName tableName) {
return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
}
public boolean isTableDisabled(final TableName tableName) {
return getTableStateManager().isTableState(tableName,
TableState.State.DISABLED, TableState.State.DISABLING);
}
// ============================================================================================
// META Helpers
// ============================================================================================
private boolean isMetaRegion(final RegionInfo regionInfo) {
return regionInfo.isMetaRegion();
}
public boolean isMetaRegion(final byte[] regionName) {
return getMetaRegionFromName(regionName) != null;
}
public RegionInfo getMetaRegionFromName(final byte[] regionName) {
for (RegionInfo hri: getMetaRegionSet()) {
if (Bytes.equals(hri.getRegionName(), regionName)) {
return hri;
}
}
return null;
}
public boolean isCarryingMeta(final ServerName serverName) {
// TODO: handle multiple meta
return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
}
private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
// TODO: check for state?
final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
return(node != null && serverName.equals(node.getRegionLocation()));
}
private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
//if (regionInfo.isMetaRegion()) return regionInfo;
// TODO: handle multiple meta. if the region provided is not meta lookup
// which meta the region belongs to.
return RegionInfoBuilder.FIRST_META_REGIONINFO;
}
// TODO: handle multiple meta.
private static final Set<RegionInfo> META_REGION_SET =
Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
public Set<RegionInfo> getMetaRegionSet() {
return META_REGION_SET;
}
// ============================================================================================
// META Event(s) helpers
// ============================================================================================
/**
* Notice that, this only means the meta region is available on a RS, but the AM may still be
* loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
* before checking this method, unless you can make sure that your piece of code can only be
* executed after AM builds the region states.
* @see #isMetaLoaded()
*/
public boolean isMetaAssigned() {
return metaAssignEvent.isReady();
}
public boolean isMetaRegionInTransition() {
return !isMetaAssigned();
}
/**
* Notice that this event does not mean the AM has already finished region state rebuilding. See
* the comment of {@link #isMetaAssigned()} for more details.
* @see #isMetaAssigned()
*/
public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
}
private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
if (assigned) {
metaAssignEvent.wake(getProcedureScheduler());
} else {
metaAssignEvent.suspend();
}
}
private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
// TODO: handle multiple meta.
return metaAssignEvent;
}
/**
* Wait until AM finishes the meta loading, i.e, the region states rebuilding.
* @see #isMetaLoaded()
* @see #waitMetaAssigned(Procedure, RegionInfo)
*/
public boolean waitMetaLoaded(Procedure<?> proc) {
return metaLoadEvent.suspendIfNotReady(proc);
}
@VisibleForTesting
void wakeMetaLoadedEvent() {
metaLoadEvent.wake(getProcedureScheduler());
assert isMetaLoaded() : "expected meta to be loaded";
}
/**
* Return whether AM finishes the meta loading, i.e, the region states rebuilding.
* @see #isMetaAssigned()
* @see #waitMetaLoaded(Procedure)
*/
public boolean isMetaLoaded() {
return metaLoadEvent.isReady();
}
/**
* Start a new thread to check if there are region servers whose versions are higher than others.
* If so, move all system table regions to RS with the highest version to keep compatibility.
* The reason is, RS in new version may not be able to access RS in old version when there are
* some incompatible changes.
* <p>This method is called when a new RegionServer is added to cluster only.</p>
*/
public void checkIfShouldMoveSystemRegionAsync() {
// TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
// it should 'move' the system tables from the old server to the new server but
// ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
if (this.master.getServerManager().countOfRegionServers() <= 1) {
return;
}
// This thread used to run whenever there was a change in the cluster. The ZooKeeper
// childrenChanged notification came in before the nodeDeleted message and so this method
// cold run before a ServerCrashProcedure could run. That meant that this thread could see
// a Crashed Server before ServerCrashProcedure and it could find system regions on the
// crashed server and go move them before ServerCrashProcedure had a chance; could be
// dataloss too if WALs were not recovered.
new Thread(() -> {
try {
synchronized (checkIfShouldMoveSystemRegionLock) {
List<RegionPlan> plans = new ArrayList<>();
// TODO: I don't think this code does a good job if all servers in cluster have same
// version. It looks like it will schedule unnecessary moves.
for (ServerName server : getExcludedServersForSystemTable()) {
if (master.getServerManager().isServerDead(server)) {
// TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
// considers only online servers, the server could be queued for dead server
// processing. As region assignments for crashed server is handled by
// ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
// regular flow of LoadBalancer as a favored node and not to have this special
// handling.
continue;
}
List<RegionInfo> regionsShouldMove = getSystemTables(server);
if (!regionsShouldMove.isEmpty()) {
for (RegionInfo regionInfo : regionsShouldMove) {
// null value for dest forces destination server to be selected by balancer
RegionPlan plan = new RegionPlan(regionInfo, server, null);
if (regionInfo.isMetaRegion()) {
// Must move meta region first.
LOG.info("Async MOVE of {} to newer Server={}",
regionInfo.getEncodedName(), server);
moveAsync(plan);
} else {
plans.add(plan);
}
}
}
for (RegionPlan plan : plans) {
LOG.info("Async MOVE of {} to newer Server={}",
plan.getRegionInfo().getEncodedName(), server);
moveAsync(plan);
}
}
}
} catch (Throwable t) {
LOG.error(t.toString(), t);
}
}).start();
}
private List<RegionInfo> getSystemTables(ServerName serverName) {
ServerStateNode serverNode = regionStates.getServerNode(serverName);
if (serverNode == null) {
return Collections.emptyList();
}
return serverNode.getSystemRegionInfoList();
}
private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
throws HBaseIOException {
if (regionNode.getProcedure() != null) {
throw new HBaseIOException(regionNode + " is currently in transition");
}
if (!regionNode.isInState(expectedStates)) {
throw new DoNotRetryRegionException("Unexpected state for " + regionNode);
}
if (getTableStateManager().isTableState(regionNode.getTable(), TableState.State.DISABLING,
TableState.State.DISABLED)) {
throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
}
}
// TODO: Need an async version of this for hbck2.
public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
// TODO: should we use getRegionStateNode?
RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
TransitRegionStateProcedure proc;
regionNode.lock();
try {
preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn);
regionNode.setProcedure(proc);
} finally {
regionNode.unlock();
}
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
return proc.getProcId();
}
public long assign(RegionInfo regionInfo) throws IOException {
return assign(regionInfo, null);
}
public long unassign(RegionInfo regionInfo) throws IOException {
RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
if (regionNode == null) {
throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
}
TransitRegionStateProcedure proc;
regionNode.lock();
try {
preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
regionNode.setProcedure(proc);
} finally {
regionNode.unlock();
}
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
return proc.getProcId();
}
public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
ServerName targetServer) throws HBaseIOException {
RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
if (regionNode == null) {
throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
}
TransitRegionStateProcedure proc;
regionNode.lock();
try {
preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
regionNode.checkOnline();
proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
regionNode.setProcedure(proc);
} finally {
regionNode.unlock();
}
return proc;
}
public void move(RegionInfo regionInfo) throws IOException {
TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
}
public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
TransitRegionStateProcedure proc =
createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
}
// ============================================================================================
// RegionTransition procedures helpers
// ============================================================================================
/**
* Create round-robin assigns. Use on table creation to distribute out regions across cluster.
* @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
* to populate the assigns with targets chosen using round-robin (default balancer
* scheme). If at assign-time, the target chosen is no longer up, thats fine, the
* AssignProcedure will ask the balancer for a new target, and so on.
*/
public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
List<ServerName> serversToExclude) {
if (hris.isEmpty()) {
return new TransitRegionStateProcedure[0];
}
if (serversToExclude != null
&& this.master.getServerManager().getOnlineServersList().size() == 1) {
LOG.debug("Only one region server found and hence going ahead with the assignment");
serversToExclude = null;
}
try {
// Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
// a better job if it has all the assignments in the one lump.
Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
this.master.getServerManager().createDestinationServersList(serversToExclude));
// Return mid-method!
return createAssignProcedures(assignments);
} catch (HBaseIOException hioe) {
LOG.warn("Failed roundRobinAssignment", hioe);
}
// If an error above, fall-through to this simpler assign. Last resort.
return createAssignProcedures(hris);
}
/**
* Create round-robin assigns. Use on table creation to distribute out regions across cluster.
* @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
* to populate the assigns with targets chosen using round-robin (default balancer
* scheme). If at assign-time, the target chosen is no longer up, thats fine, the
* AssignProcedure will ask the balancer for a new target, and so on.
*/
public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
return createRoundRobinAssignProcedures(hris, null);
}
@VisibleForTesting
static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
if (left.getRegion().isMetaRegion()) {
if (right.getRegion().isMetaRegion()) {
return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
}
return -1;
} else if (right.getRegion().isMetaRegion()) {
return +1;
}
if (left.getRegion().getTable().isSystemTable()) {
if (right.getRegion().getTable().isSystemTable()) {
return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
}
return -1;
} else if (right.getRegion().getTable().isSystemTable()) {
return +1;
}
return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
}
private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
ServerName targetServer, boolean override) {
TransitRegionStateProcedure proc;
regionNode.lock();
try {
if(override && regionNode.getProcedure() != null) {
regionNode.unsetProcedure(regionNode.getProcedure());
}
assert regionNode.getProcedure() == null;
proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(),
regionNode.getRegionInfo(), targetServer);
regionNode.setProcedure(proc);
} finally {
regionNode.unlock();
}
return proc;
}
private TransitRegionStateProcedure createUnassignProcedure(RegionStateNode regionNode,
boolean override) {
TransitRegionStateProcedure proc;
regionNode.lock();
try {
if(override && regionNode.getProcedure() != null) {
regionNode.unsetProcedure(regionNode.getProcedure());
}
assert regionNode.getProcedure() == null;
proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
regionNode.getRegionInfo());
regionNode.setProcedure(proc);
} finally {
regionNode.unlock();
}
return proc;
}
/**
* Create one TransitRegionStateProcedure to assign a region w/o specifying a target server.
* This method is specified for HBCK2
*/
public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo hri, boolean override) {
RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
return createAssignProcedure(regionNode, null, override);
}
/**
* Create one TransitRegionStateProcedure to unassign a region.
* This method is specified for HBCK2
*/
public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo hri, boolean override) {
RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
return createUnassignProcedure(regionNode, override);
}
/**
* Create an array of TransitRegionStateProcedure w/o specifying a target server.
* <p/>
* If no target server, at assign time, we will try to use the former location of the region if
* one exists. This is how we 'retain' the old location across a server restart.
* <p/>
* Should only be called when you can make sure that no one can touch these regions other than
* you. For example, when you are creating table.
*/
public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
.map(regionNode -> createAssignProcedure(regionNode, null, false))
.sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
}
/**
* @param assignments Map of assignments from which we produce an array of AssignProcedures.
* @return Assignments made from the passed in <code>assignments</code>
*/
private TransitRegionStateProcedure[] createAssignProcedures(
Map<ServerName, List<RegionInfo>> assignments) {
return assignments.entrySet().stream()
.flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
.map(regionNode -> createAssignProcedure(regionNode, e.getKey(), false)))
.sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
}
/**
* Called by DisableTableProcedure to unassign all the regions for a table.
*/
public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
return regionStates.getTableRegionStateNodes(tableName).stream().map(regionNode -> {
regionNode.lock();
try {
if (!regionStates.include(regionNode, false) ||
regionStates.isRegionOffline(regionNode.getRegionInfo())) {
return null;
}
// As in DisableTableProcedure, we will hold the xlock for table, so we can make sure that
// this procedure has not been executed yet, as TRSP will hold the shared lock for table all
// the time. So here we will unset it and when it is actually executed, it will find that
// the attach procedure is not itself and quit immediately.
if (regionNode.getProcedure() != null) {
regionNode.unsetProcedure(regionNode.getProcedure());
}
TransitRegionStateProcedure proc = TransitRegionStateProcedure
.unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
regionNode.setProcedure(proc);
return proc;
} finally {
regionNode.unlock();
}
}).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
}
public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
final byte[] splitKey) throws IOException {
return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
}
public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException {
return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
}
/**
* Delete the region states. This is called by "DeleteTable"
*/
public void deleteTable(final TableName tableName) throws IOException {
final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
regionStateStore.deleteRegions(regions);
for (int i = 0; i < regions.size(); ++i) {
final RegionInfo regionInfo = regions.get(i);
// we expect the region to be offline
regionStates.removeFromOfflineRegions(regionInfo);
regionStates.deleteRegion(regionInfo);
}
}
// ============================================================================================
// RS Region Transition Report helpers
// ============================================================================================
private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
for (RegionStateTransition transition : transitionList) {
switch (transition.getTransitionCode()) {
case OPENED:
case FAILED_OPEN:
case CLOSED:
assert transition.getRegionInfoCount() == 1 : transition;
final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
long procId =
transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
updateRegionTransition(serverName, transition.getTransitionCode(), hri,
transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
break;
case READY_TO_SPLIT:
case SPLIT:
case SPLIT_REVERTED:
assert transition.getRegionInfoCount() == 3 : transition;
final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
splitB);
break;
case READY_TO_MERGE:
case MERGED:
case MERGE_REVERTED:
assert transition.getRegionInfoCount() == 3 : transition;
final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
mergeB);
break;
}
}
}
public ReportRegionStateTransitionResponse reportRegionStateTransition(
final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
ReportRegionStateTransitionResponse.Builder builder =
ReportRegionStateTransitionResponse.newBuilder();
ServerName serverName = ProtobufUtil.toServerName(req.getServer());
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
// here we have to acquire a read lock instead of a simple exclusive lock. This is because that
// we should not block other reportRegionStateTransition call from the same region server. This
// is not only about performance, but also to prevent dead lock. Think of the meta region is
// also on the same region server and you hold the lock which blocks the
// reportRegionStateTransition for meta, and since meta is not online, you will block inside the
// lock protection to wait for meta online...
serverNode.readLock().lock();
try {
// we only accept reportRegionStateTransition if the region server is online, see the comment
// above in submitServerCrash method and HBASE-21508 for more details.
if (serverNode.isInState(ServerState.ONLINE)) {
try {
reportRegionStateTransition(builder, serverName, req.getTransitionList());
} catch (PleaseHoldException e) {
LOG.trace("Failed transition ", e);
throw e;
} catch (UnsupportedOperationException | IOException e) {
// TODO: at the moment we have a single error message and the RS will abort
// if the master says that one of the region transitions failed.
LOG.warn("Failed transition", e);
builder.setErrorMessage("Failed transition " + e.getMessage());
}
} else {
LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
serverName);
builder.setErrorMessage("You are dead");
}
} finally {
serverNode.readLock().unlock();
}
return builder.build();
}
private void updateRegionTransition(ServerName serverName, TransitionCode state,
RegionInfo regionInfo, long seqId, long procId) throws IOException {
checkMetaLoaded(regionInfo);
RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
if (regionNode == null) {
// the table/region is gone. maybe a delete, split, merge
throw new UnexpectedStateException(String.format(
"Server %s was trying to transition region %s to %s. but the region was removed.",
serverName, regionInfo, state));
}
LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
regionNode, state);
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
regionNode.lock();
try {
if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
// Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
// 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
// rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
// table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
// to CLOSED
// These happen because on cluster shutdown, we currently let the RegionServers close
// regions. This is the only time that region close is not run by the Master (so cluster
// goes down fast). Consider changing it so Master runs all shutdowns.
if (this.master.getServerManager().isClusterShutdown() &&
state.equals(TransitionCode.CLOSED)) {
LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
} else {
LOG.warn("No matching procedure found for {} transition to {}", regionNode, state);
}
}
} finally {
regionNode.unlock();
}
}
private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
TransitionCode state, long seqId, long procId) throws IOException {
ServerName serverName = serverNode.getServerName();
TransitRegionStateProcedure proc = regionNode.getProcedure();
if (proc == null) {
return false;
}
proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
serverName, state, seqId, procId);
return true;
}
private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
throws IOException {
checkMetaLoaded(parent);
if (state != TransitionCode.READY_TO_SPLIT) {
throw new UnexpectedStateException("unsupported split regionState=" + state +
" for parent region " + parent +
" maybe an old RS (< 2.0) had the operation in progress");
}
// sanity check on the request