/
HRegion.java
8871 lines (8073 loc) · 363 KB
/
HRegion.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ROW_LOCK_READ_LOCK_KEY;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import com.google.errorprone.annotations.RestrictedApi;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.RandomAccess;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MetaCellComparator;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
import org.apache.hadoop.hbase.regionserver.metrics.MetricsTableRequests;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.TableDescriptorChecker;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/**
* Regions store data for a certain region of a table. It stores all columns for each row. A given
* table consists of one or more Regions.
* <p>
* An Region is defined by its table and its key extent.
* <p>
* Locking at the Region level serves only one purpose: preventing the region from being closed (and
* consequently split) while other operations are ongoing. Each row level operation obtains both a
* row lock and a region read lock for the duration of the operation. While a scanner is being
* constructed, getScanner holds a read lock. If the scanner is successfully constructed, it holds a
* read lock until it is closed. A close takes out a write lock and consequently will block for
* ongoing operations and will block new operations from starting while the close is in progress.
*/
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
private static final Logger LOG = LoggerFactory.getLogger(HRegion.class);
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
"hbase.hregion.scan.loadColumnFamiliesOnDemand";
public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize";
public static final int DEFAULT_MAX_CELL_SIZE = 10485760;
public static final String HBASE_REGIONSERVER_MINIBATCH_SIZE =
"hbase.regionserver.minibatch.size";
public static final int DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE = 20000;
public static final String WAL_HSYNC_CONF_KEY = "hbase.wal.hsync";
public static final boolean DEFAULT_WAL_HSYNC = false;
/** Parameter name for compaction after bulkload */
public static final String COMPACTION_AFTER_BULKLOAD_ENABLE =
"hbase.compaction.after.bulkload.enable";
/** Config for allow split when file count greater than the configured blocking file count */
public static final String SPLIT_IGNORE_BLOCKING_ENABLED_KEY =
"hbase.hregion.split.ignore.blocking.enabled";
public static final String REGION_STORAGE_POLICY_KEY = "hbase.hregion.block.storage.policy";
public static final String DEFAULT_REGION_STORAGE_POLICY = "NONE";
/**
* This is for for using HRegion as a local storage, where we may put the recovered edits in a
* special place. Once this is set, we will only replay the recovered edits under this directory
* and ignore the original replay directory configs.
*/
public static final String SPECIAL_RECOVERED_EDITS_DIR =
"hbase.hregion.special.recovered.edits.dir";
/**
* Mainly used for master local region, where we will replay the WAL file directly without
* splitting, so it is possible to have WAL files which are not closed cleanly, in this way,
* hitting EOF is expected so should not consider it as a critical problem.
*/
public static final String RECOVERED_EDITS_IGNORE_EOF =
"hbase.hregion.recovered.edits.ignore.eof";
/**
* Whether to use {@link MetaCellComparator} even if we are not meta region. Used when creating
* master local region.
*/
public static final String USE_META_CELL_COMPARATOR = "hbase.region.use.meta.cell.comparator";
public static final boolean DEFAULT_USE_META_CELL_COMPARATOR = false;
final AtomicBoolean closed = new AtomicBoolean(false);
/*
* Closing can take some time; use the closing flag if there is stuff we don't want to do while in
* closing state; e.g. like offer this region up to the master as a region to close if the
* carrying regionserver is overloaded. Once set, it is never cleared.
*/
final AtomicBoolean closing = new AtomicBoolean(false);
/**
* The max sequence id of flushed data on this region. There is no edit in memory that is less
* that this sequence id.
*/
private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
/**
* Record the sequence id of last flush operation. Can be in advance of {@link #maxFlushedSeqId}
* when flushing a single column family. In this case, {@link #maxFlushedSeqId} will be older than
* the oldest edit in memory.
*/
private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
/**
* The sequence id of the last replayed open region event from the primary region. This is used to
* skip entries before this due to the possibility of replay edits coming out of order from
* replication.
*/
protected volatile long lastReplayedOpenRegionSeqId = -1L;
protected volatile long lastReplayedCompactionSeqId = -1L;
//////////////////////////////////////////////////////////////////////////////
// Members
//////////////////////////////////////////////////////////////////////////////
// map from a locked row to the context for that lock including:
// - CountDownLatch for threads waiting on that row
// - the thread that owns the lock (allow reentrancy)
// - reference count of (reentrant) locks held by the thread
// - the row itself
private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
new ConcurrentHashMap<>();
protected final Map<byte[], HStore> stores =
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
// TODO: account for each registered handler in HeapSize computation
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
// Track data size in all memstores
private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
RegionServicesForStores regionServicesForStores;
// Debug possible data loss due to WAL off
final LongAdder numMutationsWithoutWAL = new LongAdder();
final LongAdder dataInMemoryWithoutWAL = new LongAdder();
// Debug why CAS operations are taking a while.
final LongAdder checkAndMutateChecksPassed = new LongAdder();
final LongAdder checkAndMutateChecksFailed = new LongAdder();
// Number of requests
// Count rows for scan
final LongAdder readRequestsCount = new LongAdder();
final LongAdder cpRequestsCount = new LongAdder();
final LongAdder filteredReadRequestsCount = new LongAdder();
// Count rows for multi row mutations
final LongAdder writeRequestsCount = new LongAdder();
// Number of requests blocked by memstore size.
private final LongAdder blockedRequestsCount = new LongAdder();
// Compaction LongAdders
final LongAdder compactionsFinished = new LongAdder();
final LongAdder compactionsFailed = new LongAdder();
final LongAdder compactionNumFilesCompacted = new LongAdder();
final LongAdder compactionNumBytesCompacted = new LongAdder();
final LongAdder compactionsQueued = new LongAdder();
final LongAdder flushesQueued = new LongAdder();
private BlockCache blockCache;
private MobFileCache mobFileCache;
private final WAL wal;
private final HRegionFileSystem fs;
protected final Configuration conf;
private final Configuration baseConf;
private final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
private Path regionWalDir;
private FileSystem walFS;
// set to true if the region is restored from snapshot for reading by ClientSideRegionScanner
private boolean isRestoredRegion = false;
public void setRestoredRegion(boolean restoredRegion) {
isRestoredRegion = restoredRegion;
}
public MetricsTableRequests getMetricsTableRequests() {
return metricsTableRequests;
}
// Handle table latency metrics
private MetricsTableRequests metricsTableRequests;
// The internal wait duration to acquire a lock before read/update
// from the region. It is not per row. The purpose of this wait time
// is to avoid waiting a long time while the region is busy, so that
// we can release the IPC handler soon enough to improve the
// availability of the region server. It can be adjusted by
// tuning configuration "hbase.busy.wait.duration".
final long busyWaitDuration;
static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
// If updating multiple rows in one call, wait longer,
// i.e. waiting for busyWaitDuration * # of rows. However,
// we can limit the max multiplier.
final int maxBusyWaitMultiplier;
// Max busy wait duration. There is no point to wait longer than the RPC
// purge timeout, when a RPC call will be terminated by the RPC engine.
final long maxBusyWaitDuration;
// Max cell size. If nonzero, the maximum allowed size for any given cell
// in bytes
final long maxCellSize;
// Number of mutations for minibatch processing.
private final int miniBatchSize;
final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
final ReadPointCalculationLock smallestReadPointCalcLock;
/**
* The sequence ID that was enLongAddered when this region was opened.
*/
private long openSeqNum = HConstants.NO_SEQNUM;
/**
* The default setting for whether to enable on-demand CF loading for scan requests to this
* region. Requests can override it.
*/
private boolean isLoadingCfsOnDemandDefault = false;
private final AtomicInteger majorInProgress = new AtomicInteger(0);
private final AtomicInteger minorInProgress = new AtomicInteger(0);
//
// Context: During replay we want to ensure that we do not lose any data. So, we
// have to be conservative in how we replay wals. For each store, we calculate
// the maxSeqId up to which the store was flushed. And, skip the edits which
// are equal to or lower than maxSeqId for each store.
// The following map is populated when opening the region
Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
// lock used to protect the replay operation for secondary replicas, so the below two fields does
// not need to be volatile.
private Lock replayLock;
/** Saved state from replaying prepare flush cache */
private PrepareFlushResult prepareFlushResult = null;
private long lastReplayedSequenceId = HConstants.NO_SEQNUM;
private volatile ConfigurationManager configurationManager;
// Used for testing.
private volatile Long timeoutForWriteLock = null;
private final CellComparator cellComparator;
private final int minBlockSizeBytes;
/**
* @return The smallest mvcc readPoint across all the scanners in this region. Writes older than
* this readPoint, are included in every read operation.
*/
public long getSmallestReadPoint() {
// We need to ensure that while we are calculating the smallestReadPoint
// no new RegionScanners can grab a readPoint that we are unaware of.
smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.CALCULATION_LOCK);
try {
long minimumReadPoint = mvcc.getReadPoint();
for (Long readPoint : this.scannerReadPoints.values()) {
minimumReadPoint = Math.min(minimumReadPoint, readPoint);
}
return minimumReadPoint;
} finally {
smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.CALCULATION_LOCK);
}
}
/*
* Data structure of write state flags used coordinating flushes, compactions and closes.
*/
static class WriteState {
// Set while a memstore flush is happening.
volatile boolean flushing = false;
// Set when a flush has been requested.
volatile boolean flushRequested = false;
// Number of compactions running.
AtomicInteger compacting = new AtomicInteger(0);
// Gets set in close. If set, cannot compact or flush again.
volatile boolean writesEnabled = true;
// Set if region is read-only
volatile boolean readOnly = false;
// whether the reads are enabled. This is different than readOnly, because readOnly is
// static in the lifetime of the region, while readsEnabled is dynamic
volatile boolean readsEnabled = true;
/**
* Set flags that make this region read-only.
* @param onOff flip value for region r/o setting
*/
synchronized void setReadOnly(final boolean onOff) {
this.writesEnabled = !onOff;
this.readOnly = onOff;
}
boolean isReadOnly() {
return this.readOnly;
}
boolean isFlushRequested() {
return this.flushRequested;
}
void setReadsEnabled(boolean readsEnabled) {
this.readsEnabled = readsEnabled;
}
static final long HEAP_SIZE = ClassSize.align(ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
}
/**
* Objects from this class are created when flushing to describe all the different states that
* that method ends up in. The Result enum describes those states. The sequence id should only be
* specified if the flush was successful, and the failure message should only be specified if it
* didn't flush.
*/
public static class FlushResultImpl implements FlushResult {
final Result result;
final String failureReason;
final long flushSequenceId;
final boolean wroteFlushWalMarker;
/**
* Convenience constructor to use when the flush is successful, the failure message is set to
* null.
* @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
* @param flushSequenceId Generated sequence id that comes right after the edits in the
* memstores.
*/
FlushResultImpl(Result result, long flushSequenceId) {
this(result, flushSequenceId, null, false);
assert result == Result.FLUSHED_NO_COMPACTION_NEEDED
|| result == Result.FLUSHED_COMPACTION_NEEDED;
}
/**
* Convenience constructor to use when we cannot flush.
* @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
* @param failureReason Reason why we couldn't flush.
*/
FlushResultImpl(Result result, String failureReason, boolean wroteFlushMarker) {
this(result, -1, failureReason, wroteFlushMarker);
assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
}
/**
* Constructor with all the parameters.
* @param result Any of the Result.
* @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
* @param failureReason Reason why we couldn't flush, or null.
*/
FlushResultImpl(Result result, long flushSequenceId, String failureReason,
boolean wroteFlushMarker) {
this.result = result;
this.flushSequenceId = flushSequenceId;
this.failureReason = failureReason;
this.wroteFlushWalMarker = wroteFlushMarker;
}
/**
* Convenience method, the equivalent of checking if result is FLUSHED_NO_COMPACTION_NEEDED or
* FLUSHED_NO_COMPACTION_NEEDED.
* @return true if the memstores were flushed, else false.
*/
@Override
public boolean isFlushSucceeded() {
return result == Result.FLUSHED_NO_COMPACTION_NEEDED
|| result == Result.FLUSHED_COMPACTION_NEEDED;
}
/**
* Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
* @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
*/
@Override
public boolean isCompactionNeeded() {
return result == Result.FLUSHED_COMPACTION_NEEDED;
}
@Override
public String toString() {
return new StringBuilder().append("flush result:").append(result).append(", ")
.append("failureReason:").append(failureReason).append(",").append("flush seq id")
.append(flushSequenceId).toString();
}
@Override
public Result getResult() {
return result;
}
}
/** A result object from prepare flush cache stage */
protected static class PrepareFlushResult {
final FlushResultImpl result; // indicating a failure result from prepare
final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
final TreeMap<byte[], List<Path>> committedFiles;
final TreeMap<byte[], MemStoreSize> storeFlushableSize;
final long startTime;
final long flushOpSeqId;
final long flushedSeqId;
final MemStoreSizing totalFlushableSize;
/** Constructs an early exit case */
PrepareFlushResult(FlushResultImpl result, long flushSeqId) {
this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, MemStoreSizing.DUD);
}
/** Constructs a successful prepare flush result */
PrepareFlushResult(TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
TreeMap<byte[], List<Path>> committedFiles, TreeMap<byte[], MemStoreSize> storeFlushableSize,
long startTime, long flushSeqId, long flushedSeqId, MemStoreSizing totalFlushableSize) {
this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushSeqId,
flushedSeqId, totalFlushableSize);
}
private PrepareFlushResult(FlushResultImpl result,
TreeMap<byte[], StoreFlushContext> storeFlushCtxs, TreeMap<byte[], List<Path>> committedFiles,
TreeMap<byte[], MemStoreSize> storeFlushableSize, long startTime, long flushSeqId,
long flushedSeqId, MemStoreSizing totalFlushableSize) {
this.result = result;
this.storeFlushCtxs = storeFlushCtxs;
this.committedFiles = committedFiles;
this.storeFlushableSize = storeFlushableSize;
this.startTime = startTime;
this.flushOpSeqId = flushSeqId;
this.flushedSeqId = flushedSeqId;
this.totalFlushableSize = totalFlushableSize;
}
public FlushResult getResult() {
return this.result;
}
}
/**
* A class that tracks exceptions that have been observed in one batch. Not thread safe.
*/
static class ObservedExceptionsInBatch {
private boolean wrongRegion = false;
private boolean failedSanityCheck = false;
private boolean wrongFamily = false;
/** Returns If a {@link WrongRegionException} has been observed. */
boolean hasSeenWrongRegion() {
return wrongRegion;
}
/**
* Records that a {@link WrongRegionException} has been observed.
*/
void sawWrongRegion() {
wrongRegion = true;
}
/** Returns If a {@link FailedSanityCheckException} has been observed. */
boolean hasSeenFailedSanityCheck() {
return failedSanityCheck;
}
/**
* Records that a {@link FailedSanityCheckException} has been observed.
*/
void sawFailedSanityCheck() {
failedSanityCheck = true;
}
/** Returns If a {@link NoSuchColumnFamilyException} has been observed. */
boolean hasSeenNoSuchFamily() {
return wrongFamily;
}
/**
* Records that a {@link NoSuchColumnFamilyException} has been observed.
*/
void sawNoSuchFamily() {
wrongFamily = true;
}
}
final WriteState writestate = new WriteState();
long memstoreFlushSize;
final long timestampSlop;
// Last flush time for each Store. Useful when we are flushing for each column
private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
protected RegionServerServices rsServices;
private RegionServerAccounting rsAccounting;
private long flushCheckInterval;
// flushPerChanges is to prevent too many changes in memstore
private long flushPerChanges;
private long blockingMemStoreSize;
// Used to guard closes
final ReentrantReadWriteLock lock;
// Used to track interruptible holders of the region lock. Currently that is only RPC handler
// threads. Boolean value in map determines if lock holder can be interrupted, normally true,
// but may be false when thread is transiting a critical section.
final ConcurrentHashMap<Thread, Boolean> regionLockHolders;
// Stop updates lock
private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
private final MultiVersionConcurrencyControl mvcc;
// Coprocessor host
private volatile RegionCoprocessorHost coprocessorHost;
private TableDescriptor htableDescriptor = null;
private RegionSplitPolicy splitPolicy;
private RegionSplitRestriction splitRestriction;
private FlushPolicy flushPolicy;
private final MetricsRegion metricsRegion;
private final MetricsRegionWrapperImpl metricsRegionWrapper;
private final Durability regionDurability;
private final boolean regionStatsEnabled;
// Stores the replication scope of the various column families of the table
// that has non-default scope
private final NavigableMap<byte[], Integer> replicationScope =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final StoreHotnessProtector storeHotnessProtector;
protected Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();
/**
* HRegion constructor. This constructor should only be used for testing and extensions. Instances
* of HRegion should be instantiated with the {@link HRegion#createHRegion} or
* {@link HRegion#openHRegion} method.
* @param tableDir qualified path of directory where region should be located, usually the table
* directory.
* @param wal The WAL is the outbound log for any updates to the HRegion The wal file is a
* logfile from the previous execution that's custom-computed for this HRegion.
* The HRegionServer computes and sorts the appropriate wal info for this
* HRegion. If there is a previous wal file (implying that the HRegion has been
* written-to before), then read it from the supplied path.
* @param fs is the filesystem.
* @param confParam is global configuration settings.
* @param regionInfo - RegionInfo that describes the region is new), then read them from the
* supplied path.
* @param htd the table descriptor
* @param rsServices reference to {@link RegionServerServices} or null
* @deprecated Use other constructors.
*/
@Deprecated
public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
final Configuration confParam, final RegionInfo regionInfo, final TableDescriptor htd,
final RegionServerServices rsServices) {
this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo), wal, confParam, htd,
rsServices);
}
/**
* HRegion constructor. This constructor should only be used for testing and extensions. Instances
* of HRegion should be instantiated with the {@link HRegion#createHRegion} or
* {@link HRegion#openHRegion} method.
* @param fs is the filesystem.
* @param wal The WAL is the outbound log for any updates to the HRegion The wal file is a
* logfile from the previous execution that's custom-computed for this HRegion.
* The HRegionServer computes and sorts the appropriate wal info for this
* HRegion. If there is a previous wal file (implying that the HRegion has been
* written-to before), then read it from the supplied path.
* @param confParam is global configuration settings.
* @param htd the table descriptor
* @param rsServices reference to {@link RegionServerServices} or null
*/
public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
final TableDescriptor htd, final RegionServerServices rsServices) {
if (htd == null) {
throw new IllegalArgumentException("Need table descriptor");
}
if (confParam instanceof CompoundConfiguration) {
throw new IllegalArgumentException("Need original base configuration");
}
this.wal = wal;
this.fs = fs;
this.mvcc = new MultiVersionConcurrencyControl(getRegionInfo().getShortNameToLog());
// 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
this.baseConf = confParam;
this.conf = new CompoundConfiguration().add(confParam).addBytesMap(htd.getValues());
this.cellComparator = htd.isMetaTable()
|| conf.getBoolean(USE_META_CELL_COMPARATOR, DEFAULT_USE_META_CELL_COMPARATOR)
? MetaCellComparator.META_COMPARATOR
: CellComparatorImpl.COMPARATOR;
this.lock = new ReentrantReadWriteLock(
conf.getBoolean(FAIR_REENTRANT_CLOSE_LOCK, DEFAULT_FAIR_REENTRANT_CLOSE_LOCK));
this.regionLockHolders = new ConcurrentHashMap<>();
this.flushCheckInterval =
conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL, DEFAULT_CACHE_FLUSH_INTERVAL);
this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
throw new IllegalArgumentException(
MEMSTORE_FLUSH_PER_CHANGES + " can not exceed " + MAX_FLUSH_PER_CHANGES);
}
int tmpRowLockDuration =
conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION);
if (tmpRowLockDuration <= 0) {
LOG.info("Found hbase.rowlock.wait.duration set to {}. values <= 0 will cause all row "
+ "locking to fail. Treating it as 1ms to avoid region failure.", tmpRowLockDuration);
tmpRowLockDuration = 1;
}
this.rowLockWaitDuration = tmpRowLockDuration;
this.smallestReadPointCalcLock = new ReadPointCalculationLock(conf);
this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
this.htableDescriptor = htd;
Set<byte[]> families = this.htableDescriptor.getColumnFamilyNames();
for (byte[] family : families) {
if (!replicationScope.containsKey(family)) {
int scope = htd.getColumnFamily(family).getScope();
// Only store those families that has NON-DEFAULT scope
if (scope != REPLICATION_SCOPE_LOCAL) {
// Do a copy before storing it here.
replicationScope.put(Bytes.copy(family), scope);
}
}
}
this.rsServices = rsServices;
if (this.rsServices != null) {
this.blockCache = rsServices.getBlockCache().orElse(null);
this.mobFileCache = rsServices.getMobFileCache().orElse(null);
}
this.regionServicesForStores = new RegionServicesForStores(this, rsServices);
setHTableSpecificConf();
this.scannerReadPoints = new ConcurrentHashMap<>();
this.busyWaitDuration = conf.getLong("hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
throw new IllegalArgumentException("Invalid hbase.busy.wait.duration (" + busyWaitDuration
+ ") or hbase.busy.wait.multiplier.max (" + maxBusyWaitMultiplier
+ "). Their product should be positive");
}
this.maxBusyWaitDuration =
conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
/*
* timestamp.slop provides a server-side constraint on the timestamp. This assumes that you base
* your TS around EnvironmentEdgeManager.currentTime(). In this case, throw an error to the user
* if the user-specified TS is newer than now + slop. LATEST_TIMESTAMP == don't use this
* functionality
*/
this.timestampSlop =
conf.getLong("hbase.hregion.keyvalue.timestamp.slop.millisecs", HConstants.LATEST_TIMESTAMP);
this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
boolean forceSync = conf.getBoolean(WAL_HSYNC_CONF_KEY, DEFAULT_WAL_HSYNC);
/**
* This is the global default value for durability. All tables/mutations not defining a
* durability or using USE_DEFAULT will default to this value.
*/
Durability defaultDurability = forceSync ? Durability.FSYNC_WAL : Durability.SYNC_WAL;
this.regionDurability = this.htableDescriptor.getDurability() == Durability.USE_DEFAULT
? defaultDurability
: this.htableDescriptor.getDurability();
decorateRegionConfiguration(conf);
if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver
// TODO: revisit if coprocessors should load in other cases
this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper, conf);
} else {
this.metricsRegionWrapper = null;
this.metricsRegion = null;
}
if (LOG.isDebugEnabled()) {
// Write out region name, its encoded name and storeHotnessProtector as string.
LOG.debug("Instantiated " + this + "; " + storeHotnessProtector.toString());
}
configurationManager = null;
// disable stats tracking system tables, but check the config for everything else
this.regionStatsEnabled = htd.getTableName().getNamespaceAsString()
.equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)
? false
: conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE);
this.miniBatchSize =
conf.getInt(HBASE_REGIONSERVER_MINIBATCH_SIZE, DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE);
// recover the metrics of read and write requests count if they were retained
if (rsServices != null && rsServices.getRegionServerAccounting() != null) {
Pair<Long, Long> retainedRWRequestsCnt = rsServices.getRegionServerAccounting()
.getRetainedRegionRWRequestsCnt().get(getRegionInfo().getEncodedName());
if (retainedRWRequestsCnt != null) {
this.addReadRequestsCount(retainedRWRequestsCnt.getFirst());
this.addWriteRequestsCount(retainedRWRequestsCnt.getSecond());
// remove them since won't use again
rsServices.getRegionServerAccounting().getRetainedRegionRWRequestsCnt()
.remove(getRegionInfo().getEncodedName());
}
}
minBlockSizeBytes = Arrays.stream(this.htableDescriptor.getColumnFamilies())
.mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE);
}
private void setHTableSpecificConf() {
if (this.htableDescriptor == null) {
return;
}
long flushSize = this.htableDescriptor.getMemStoreFlushSize();
if (flushSize <= 0) {
flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
}
this.memstoreFlushSize = flushSize;
long mult = conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
this.blockingMemStoreSize = this.memstoreFlushSize * mult;
}
/**
* Initialize this region. Used only by tests and SplitTransaction to reopen the region. You
* should use createHRegion() or openHRegion()
* @return What the next sequence (edit) id should be.
* @throws IOException e
* @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
*/
@Deprecated
public long initialize() throws IOException {
return initialize(null);
}
/**
* Initialize this region.
* @param reporter Tickle every so often if initialize is taking a while.
* @return What the next sequence (edit) id should be.
*/
long initialize(final CancelableProgressable reporter) throws IOException {
// Refuse to open the region if there is no column family in the table
if (htableDescriptor.getColumnFamilyCount() == 0) {
throw new DoNotRetryIOException("Table " + htableDescriptor.getTableName().getNameAsString()
+ " should have at least one column family.");
}
MonitoredTask status =
TaskMonitor.get().createStatus("Initializing region " + this, false, true);
long nextSeqId = -1;
try {
nextSeqId = initializeRegionInternals(reporter, status);
return nextSeqId;
} catch (IOException e) {
LOG.warn("Failed initialize of region= {}, starting to roll back memstore",
getRegionInfo().getRegionNameAsString(), e);
// global memstore size will be decreased when dropping memstore
try {
// drop the memory used by memstore if open region fails
dropMemStoreContents();
} catch (IOException ioE) {
if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
LOG.warn(
"Failed drop memstore of region= {}, "
+ "some chunks may not released forever since MSLAB is enabled",
getRegionInfo().getRegionNameAsString());
}
}
if (metricsTableRequests != null) {
metricsTableRequests.removeRegistry();
}
throw e;
} finally {
// nextSeqid will be -1 if the initialization fails.
// At least it will be 0 otherwise.
if (nextSeqId == -1) {
status.abort("Exception during region " + getRegionInfo().getRegionNameAsString()
+ " initialization.");
}
if (LOG.isDebugEnabled()) {