forked from apache/hbase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
HRegionServer.java
3636 lines (3302 loc) · 146 KB
/
HRegionServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.management.MemoryUsage;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.management.MalformedObjectNameException;
import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ExecutorStatusChore;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HBaseServerBase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HealthCheckChore;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionSize;
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
* are many HRegionServers in a single HBase deployment.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@SuppressWarnings({ "deprecation" })
public class HRegionServer extends HBaseServerBase<RSRpcServices>
implements RegionServerServices, LastSequenceId {
private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
/**
* For testing only! Set to true to skip notifying region assignment to master .
*/
@InterfaceAudience.Private
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
/**
* A map from RegionName to current action in progress. Boolean value indicates: true - if open
* region action in progress false - if close region action in progress
*/
private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
/**
* Used to cache the open/close region procedures which already submitted. See
* {@link #submitRegionProcedure(long)}.
*/
private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
/**
* Used to cache the open/close region procedures which already executed. See
* {@link #submitRegionProcedure(long)}.
*/
private final Cache<Long, Long> executedRegionProcedures =
CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
/**
* Used to cache the moved-out regions
*/
private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
.expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
private MemStoreFlusher cacheFlusher;
private HeapMemoryManager hMemManager;
// Replication services. If no replication, this handler will be null.
private ReplicationSourceService replicationSourceHandler;
private ReplicationSinkService replicationSinkHandler;
private boolean sameReplicationSourceAndSink;
// Compactions
private CompactSplit compactSplitThread;
/**
* Map of regions currently being served by this region server. Key is the encoded region name.
* All access should be synchronized.
*/
private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
/**
* Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
* need to be a ConcurrentHashMap?
*/
private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
/**
* Map of encoded region names to the DataNode locations they should be hosted on We store the
* value as Address since InetSocketAddress is required by the HDFS API (create() that takes
* favored nodes as hints for placing file blocks). We could have used ServerName here as the
* value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
* call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
* here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
* conversion on demand from Address to InetSocketAddress will guarantee the resolution results
* will be fresh when we need it.
*/
private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
private LeaseManager leaseManager;
private volatile boolean dataFsOk;
static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
// Default abort timeout is 1200 seconds for safe
private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
// Will run this task when abort timeout
static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
// A state before we go into stopped state. At this stage we're closing user
// space regions.
private boolean stopping = false;
private volatile boolean killed = false;
private final int threadWakeFrequency;
private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
private final int compactionCheckFrequency;
private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
private final int flushCheckFrequency;
// Stub to do region server status calls against the master.
private volatile RegionServerStatusService.BlockingInterface rssStub;
private volatile LockService.BlockingInterface lockStub;
// RPC client. Used to make the stub above that does region server status checking.
private RpcClient rpcClient;
private UncaughtExceptionHandler uncaughtExceptionHandler;
private JvmPauseMonitor pauseMonitor;
private RSSnapshotVerifier rsSnapshotVerifier;
/** region server process name */
public static final String REGIONSERVER = "regionserver";
private MetricsRegionServer metricsRegionServer;
MetricsRegionServerWrapperImpl metricsRegionServerImpl;
/**
* Check for compactions requests.
*/
private ScheduledChore compactionChecker;
/**
* Check for flushes
*/
private ScheduledChore periodicFlusher;
private volatile WALFactory walFactory;
private LogRoller walRoller;
// A thread which calls reportProcedureDone
private RemoteProcedureResultReporter procedureResultReporter;
// flag set after we're done setting up server threads
final AtomicBoolean online = new AtomicBoolean(false);
// master address tracker
private final MasterAddressTracker masterAddressTracker;
// Log Splitting Worker
private SplitLogWorker splitLogWorker;
private final int shortOperationTimeout;
// Time to pause if master says 'please hold'
private final long retryPauseTime;
private final RegionServerAccounting regionServerAccounting;
private NamedQueueServiceChore namedQueueServiceChore = null;
// Block cache
private BlockCache blockCache;
// The cache for mob files
private MobFileCache mobFileCache;
/** The health check chore. */
private HealthCheckChore healthCheckChore;
/** The Executor status collect chore. */
private ExecutorStatusChore executorStatusChore;
/** The nonce manager chore. */
private ScheduledChore nonceManagerChore;
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
/**
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
* {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
*/
@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
"hbase.regionserver.hostname.disable.master.reversedns";
/**
* HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
* Exception will be thrown if both are used.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
"hbase.unsafe.regionserver.hostname.disable.master.reversedns";
/**
* Unique identifier for the cluster we are a part of.
*/
private String clusterId;
// chore for refreshing store files for secondary regions
private StorefileRefresherChore storefileRefresher;
private volatile RegionServerCoprocessorHost rsHost;
private RegionServerProcedureManagerHost rspmHost;
private RegionServerRpcQuotaManager rsQuotaManager;
private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
/**
* Nonce manager. Nonces are used to make operations like increment and append idempotent in the
* case where client doesn't receive the response from a successful operation and retries. We
* track the successful ops for some time via a nonce sent by client and handle duplicate
* operations (currently, by failing them; in future we might use MVCC to return result). Nonces
* are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
* recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
* records. If we don't read the records, we don't read and recover the nonces. Some WALs within
* nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
* recovery during normal region move, so nonces will not be transfered. We can have separate
* additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
* - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
* given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
* of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
* matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
* recovered during move.
*/
final ServerNonceManager nonceManager;
private BrokenStoreFileCleaner brokenStoreFileCleaner;
private RSMobFileCleanerChore rsMobFileCleanerChore;
@InterfaceAudience.Private
CompactedHFilesDischarger compactedFileDischarger;
private volatile ThroughputController flushThroughputController;
private SecureBulkLoadManager secureBulkLoadManager;
private FileSystemUtilizationChore fsUtilizationChore;
private BootstrapNodeManager bootstrapNodeManager;
/**
* True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
* just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
* other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
* Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
* {@link #TEST_SKIP_REPORTING_TRANSITION} ?
*/
private final boolean masterless;
private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
/** regionserver codec list **/
private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
// A timer to shutdown the process if abort takes too long
private Timer abortMonitor;
private RegionReplicationBufferManager regionReplicationBufferManager;
/*
* Chore that creates replication marker rows.
*/
private ReplicationMarkerChore replicationMarkerChore;
/**
* Starts a HRegionServer at the default location.
* <p/>
* Don't start any services or managers in here in the Constructor. Defer till after we register
* with the Master as much as possible. See {@link #startServices}.
*/
public HRegionServer(final Configuration conf) throws IOException {
super(conf, "RegionServer"); // thread name
final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
try (Scope ignored = span.makeCurrent()) {
this.dataFsOk = true;
this.masterless = !clusterMode();
MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
HFile.checkHFileVersion(this.conf);
checkCodecs(this.conf);
FSUtils.setupShortCircuitRead(this.conf);
// Disable usage of meta replicas in the regionserver
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
// Config'ed params
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
regionServerAccounting = new RegionServerAccounting(conf);
blockCache = BlockCacheFactory.createBlockCache(conf);
mobFileCache = new MobFileCache(conf);
rsSnapshotVerifier = new RSSnapshotVerifier(conf);
uncaughtExceptionHandler =
(t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
// If no master in cluster, skip trying to track one or look for a cluster status.
if (!this.masterless) {
masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
masterAddressTracker.start();
} else {
masterAddressTracker = null;
}
this.rpcServices.start(zooKeeper);
span.setStatus(StatusCode.OK);
} catch (Throwable t) {
// Make sure we log the exception. HRegionServer is often started via reflection and the
// cause of failed startup is lost.
TraceUtil.setError(span, t);
LOG.error("Failed construction RegionServer", t);
throw t;
} finally {
span.end();
}
}
// HMaster should override this method to load the specific config for master
@Override
protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
if (!StringUtils.isBlank(hostname)) {
String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
+ UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
+ UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
+ UNSAFE_RS_HOSTNAME_KEY + " is used";
throw new IOException(msg);
} else {
return rpcServices.getSocketAddress().getHostName();
}
} else {
return hostname;
}
}
@Override
protected void login(UserProvider user, String host) throws IOException {
user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
}
@Override
protected String getProcessName() {
return REGIONSERVER;
}
@Override
protected boolean canCreateBaseZNode() {
return !clusterMode();
}
@Override
protected boolean canUpdateTableDescriptor() {
return false;
}
@Override
protected boolean cacheTableDescriptor() {
return false;
}
protected RSRpcServices createRpcServices() throws IOException {
return new RSRpcServices(this);
}
@Override
protected void configureInfoServer(InfoServer infoServer) {
infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
infoServer.setAttribute(REGIONSERVER, this);
}
@Override
protected Class<? extends HttpServlet> getDumpServlet() {
return RSDumpServlet.class;
}
/**
* Used by {@link RSDumpServlet} to generate debugging information.
*/
public void dumpRowLocks(final PrintWriter out) {
StringBuilder sb = new StringBuilder();
for (HRegion region : getRegions()) {
if (region.getLockedRows().size() > 0) {
for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
sb.setLength(0);
sb.append(region.getTableDescriptor().getTableName()).append(",")
.append(region.getRegionInfo().getEncodedName()).append(",");
sb.append(rowLockContext.toString());
out.println(sb);
}
}
}
}
@Override
public boolean registerService(Service instance) {
// No stacking of instances is allowed for a single executorService name
ServiceDescriptor serviceDesc = instance.getDescriptorForType();
String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
if (coprocessorServiceHandlers.containsKey(serviceName)) {
LOG.error("Coprocessor executorService " + serviceName
+ " already registered, rejecting request from " + instance);
return false;
}
coprocessorServiceHandlers.put(serviceName, instance);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Registered regionserver coprocessor executorService: executorService=" + serviceName);
}
return true;
}
/**
* Run test on configured codecs to make sure supporting libs are in place.
*/
private static void checkCodecs(final Configuration c) throws IOException {
// check to see if the codec list is available:
String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
if (codecs == null) {
return;
}
for (String codec : codecs) {
if (!CompressionTest.testCompression(codec)) {
throw new IOException(
"Compression codec " + codec + " not supported, aborting RS construction");
}
}
}
public String getClusterId() {
return this.clusterId;
}
/**
* All initialization needed before we go register with Master.<br>
* Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
* In here we just put up the RpcServer, setup Connection, and ZooKeeper.
*/
private void preRegistrationInitialization() {
final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
try (Scope ignored = span.makeCurrent()) {
initializeZooKeeper();
setupClusterConnection();
bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
regionReplicationBufferManager = new RegionReplicationBufferManager(this);
// Setup RPC client for master communication
this.rpcClient = asyncClusterConnection.getRpcClient();
span.setStatus(StatusCode.OK);
} catch (Throwable t) {
// Call stop if error or process will stick around for ever since server
// puts up non-daemon threads.
TraceUtil.setError(span, t);
this.rpcServices.stop();
abort("Initialization of RS failed. Hence aborting RS.", t);
} finally {
span.end();
}
}
/**
* Bring up connection to zk ensemble and then wait until a master for this cluster and then after
* that, wait until cluster 'up' flag has been set. This is the order in which master does things.
* <p>
* Finally open long-living server short-circuit connection.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
justification = "cluster Id znode read would give us correct response")
private void initializeZooKeeper() throws IOException, InterruptedException {
// Nothing to do in here if no Master in the mix.
if (this.masterless) {
return;
}
// Create the master address tracker, register with zk, and start it. Then
// block until a master is available. No point in starting up if no master
// running.
blockAndCheckIfStopped(this.masterAddressTracker);
// Wait on cluster being up. Master will set this flag up in zookeeper
// when ready.
blockAndCheckIfStopped(this.clusterStatusTracker);
// If we are HMaster then the cluster id should have already been set.
if (clusterId == null) {
// Retrieve clusterId
// Since cluster status is now up
// ID should have already been set by HMaster
try {
clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
if (clusterId == null) {
this.abort("Cluster ID has not been set");
}
LOG.info("ClusterId : " + clusterId);
} catch (KeeperException e) {
this.abort("Failed to retrieve Cluster ID", e);
}
}
if (isStopped() || isAborted()) {
return; // No need for further initialization
}
// watch for snapshots and other procedures
try {
rspmHost = new RegionServerProcedureManagerHost();
rspmHost.loadProcedures(conf);
rspmHost.initialize(this);
} catch (KeeperException e) {
this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
}
}
/**
* Utilty method to wait indefinitely on a znode availability while checking if the region server
* is shut down
* @param tracker znode tracker to use
* @throws IOException any IO exception, plus if the RS is stopped
* @throws InterruptedException if the waiting thread is interrupted
*/
private void blockAndCheckIfStopped(ZKNodeTracker tracker)
throws IOException, InterruptedException {
while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
if (this.stopped) {
throw new IOException("Received the shutdown message while waiting.");
}
}
}
/** Returns True if the cluster is up. */
@Override
public boolean isClusterUp() {
return this.masterless
|| (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
}
private void initializeReplicationMarkerChore() {
boolean replicationMarkerEnabled =
conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
// If replication or replication marker is not enabled then return immediately.
if (replicationMarkerEnabled) {
int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
}
}
/**
* The HRegionServer sticks in this loop until closed.
*/
@Override
public void run() {
if (isStopped()) {
LOG.info("Skipping run; stopped");
return;
}
try {
// Do pre-registration initializations; zookeeper, lease threads, etc.
preRegistrationInitialization();
} catch (Throwable e) {
abort("Fatal exception during initialization", e);
}
try {
if (!isStopped() && !isAborted()) {
installShutdownHook();
// Initialize the RegionServerCoprocessorHost now that our ephemeral
// node was created, in case any coprocessors want to use ZooKeeper
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
// Try and register with the Master; tell it we are here. Break if server is stopped or
// the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
// start up all Services. Use RetryCounter to get backoff in case Master is struggling to
// come up.
LOG.debug("About to register with Master.");
TraceUtil.trace(() -> {
RetryCounterFactory rcf =
new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
RetryCounter rc = rcf.create();
while (keepLooping()) {
RegionServerStartupResponse w = reportForDuty();
if (w == null) {
long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
this.sleeper.sleep(sleepTime);
} else {
handleReportForDutyResponse(w);
break;
}
}
}, "HRegionServer.registerWithMaster");
}
if (!isStopped() && isHealthy()) {
TraceUtil.trace(() -> {
// start the snapshot handler and other procedure handlers,
// since the server is ready to run
if (this.rspmHost != null) {
this.rspmHost.start();
}
// Start the Quota Manager
if (this.rsQuotaManager != null) {
rsQuotaManager.start(getRpcServer().getScheduler());
}
if (this.rsSpaceQuotaManager != null) {
this.rsSpaceQuotaManager.start();
}
}, "HRegionServer.startup");
}
// We registered with the Master. Go into run mode.
long lastMsg = EnvironmentEdgeManager.currentTime();
long oldRequestCount = -1;
// The main run loop.
while (!isStopped() && isHealthy()) {
if (!isClusterUp()) {
if (onlineRegions.isEmpty()) {
stop("Exiting; cluster shutdown set and not carrying any regions");
} else if (!this.stopping) {
this.stopping = true;
LOG.info("Closing user regions");
closeUserRegions(isAborted());
} else {
boolean allUserRegionsOffline = areAllUserRegionsOffline();
if (allUserRegionsOffline) {
// Set stopped if no more write requests tp meta tables
// since last time we went around the loop. Any open
// meta regions will be closed on our way out.
if (oldRequestCount == getWriteRequestCount()) {
stop("Stopped; only catalog regions remaining online");
break;
}
oldRequestCount = getWriteRequestCount();
} else {
// Make sure all regions have been closed -- some regions may
// have not got it because we were splitting at the time of
// the call to closeUserRegions.
closeUserRegions(this.abortRequested.get());
}
LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
}
}
long now = EnvironmentEdgeManager.currentTime();
if ((now - lastMsg) >= msgInterval) {
tryRegionServerReport(lastMsg, now);
lastMsg = EnvironmentEdgeManager.currentTime();
}
if (!isStopped() && !isAborted()) {
this.sleeper.sleep();
}
} // for
} catch (Throwable t) {
if (!rpcServices.checkOOME(t)) {
String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
abort(prefix + t.getMessage(), t);
}
}
final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
try (Scope ignored = span.makeCurrent()) {
if (this.leaseManager != null) {
this.leaseManager.closeAfterLeasesExpire();
}
if (this.splitLogWorker != null) {
splitLogWorker.stop();
}
stopInfoServer();
// Send cache a shutdown.
if (blockCache != null) {
blockCache.shutdown();
}
if (mobFileCache != null) {
mobFileCache.shutdown();
}
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
if (this.hMemManager != null) {
this.hMemManager.stop();
}
if (this.cacheFlusher != null) {
this.cacheFlusher.interruptIfNecessary();
}
if (this.compactSplitThread != null) {
this.compactSplitThread.interruptIfNecessary();
}
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
if (rspmHost != null) {
rspmHost.stop(this.abortRequested.get() || this.killed);
}
if (this.killed) {
// Just skip out w/o closing regions. Used when testing.
} else if (abortRequested.get()) {
if (this.dataFsOk) {
closeUserRegions(abortRequested.get()); // Don't leave any open file handles
}
LOG.info("aborting server " + this.serverName);
} else {
closeUserRegions(abortRequested.get());
LOG.info("stopping server " + this.serverName);
}
regionReplicationBufferManager.stop();
closeClusterConnection();
// Closing the compactSplit thread before closing meta regions
if (!this.killed && containsMetaTableRegions()) {
if (!abortRequested.get() || this.dataFsOk) {
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
this.compactSplitThread = null;
}
closeMetaTableRegions(abortRequested.get());
}
}
if (!this.killed && this.dataFsOk) {
waitOnAllRegionsToClose(abortRequested.get());
LOG.info("stopping server " + this.serverName + "; all regions closed.");
}
// Stop the quota manager
if (rsQuotaManager != null) {
rsQuotaManager.stop();
}
if (rsSpaceQuotaManager != null) {
rsSpaceQuotaManager.stop();
rsSpaceQuotaManager = null;
}
// flag may be changed when closing regions throws exception.
if (this.dataFsOk) {
shutdownWAL(!abortRequested.get());
}
// Make sure the proxy is down.
if (this.rssStub != null) {
this.rssStub = null;
}
if (this.lockStub != null) {
this.lockStub = null;
}
if (this.rpcClient != null) {
this.rpcClient.close();
}
if (this.leaseManager != null) {
this.leaseManager.close();
}
if (this.pauseMonitor != null) {
this.pauseMonitor.stop();
}
if (!killed) {
stopServiceThreads();
}
if (this.rpcServices != null) {
this.rpcServices.stop();
}
try {
deleteMyEphemeralNode();
} catch (KeeperException.NoNodeException nn) {