-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
AbstractFSWAL.java
1406 lines (1260 loc) · 54.9 KB
/
AbstractFSWAL.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import com.lmax.disruptor.RingBuffer;
import io.opentelemetry.api.trace.Span;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.management.MemoryType;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
* WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
* This is done internal to the implementation.
* <p>
* As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
* WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
* A bunch of work in the below is done keeping account of these region sequence ids -- what is
* flushed out to hfiles, and what is yet in WAL and in memory only.
* <p>
* It is only practical to delete entire files. Thus, we delete an entire on-disk file
* <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
* (smaller) than the most-recent flush.
* <p>
* To read an WAL, call
* {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
* <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
* is now a lame duck; any more appends or syncs will fail also with the same original exception. If
* we have made successful appends to the WAL and we then are unable to sync them, our current
* semantic is to return error to the client that the appends failed but also to abort the current
* context, usually the hosting server. We need to replay the WALs. <br>
* TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
* that the append failed. <br>
* TODO: replication may pick up these last edits though they have been marked as failed append
* (Need to keep our own file lengths, not rely on HDFS).
*/
@InterfaceAudience.Private
public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec";
private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900;
/** Don't log blocking regions more frequently than this. */
private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
protected static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms";
protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
protected static final String SLOW_SYNC_ROLL_THRESHOLD =
"hbase.regionserver.wal.slowsync.roll.threshold";
protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
"hbase.regionserver.wal.slowsync.roll.interval.ms";
protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
public static final String RING_BUFFER_SLOT_COUNT =
"hbase.regionserver.wal.disruptor.event.count";
/**
* file system instance
*/
protected final FileSystem fs;
/**
* WAL directory, where all WAL files would be placed.
*/
protected final Path walDir;
/**
* dir path where old logs are kept.
*/
protected final Path walArchiveDir;
/**
* Matches just those wal files that belong to this wal instance.
*/
protected final PathFilter ourFiles;
/**
* Prefix of a WAL file, usually the region server name it is hosted on.
*/
protected final String walFilePrefix;
/**
* Suffix included on generated wal file names
*/
protected final String walFileSuffix;
/**
* Prefix used when checking for wal membership.
*/
protected final String prefixPathStr;
protected final WALCoprocessorHost coprocessorHost;
/**
* conf object
*/
protected final Configuration conf;
protected final Abortable abortable;
/** Listeners that are called on WAL events. */
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
/** Tracks the logs in the process of being closed. */
protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
/**
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
* id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
* facility for answering questions such as "Is it safe to GC a WAL?".
*/
protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
/** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */
protected final long slowSyncNs, rollOnSyncNs;
protected final int slowSyncRollThreshold;
protected final int slowSyncCheckInterval;
protected final AtomicInteger slowSyncCount = new AtomicInteger();
private final long walSyncTimeoutNs;
private final long walTooOldNs;
// If > than this size, roll the log.
protected final long logrollsize;
/**
* Block size to use writing files.
*/
protected final long blocksize;
/*
* If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too
* many and we crash, then will take forever replaying. Keep the number of logs tidy.
*/
protected final int maxLogs;
protected final boolean useHsync;
/**
* This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
* is held. We don't just use synchronized because that results in bogus and tedious findbugs
* warning when it thinks synchronized controls writer thread safety. It is held when we are
* actually rolling the log. It is checked when we are looking to see if we should roll the log or
* not.
*/
protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
// The timestamp (in ms) when the log file was created.
protected final AtomicLong filenum = new AtomicLong(-1);
// Number of transactions in the current Wal.
protected final AtomicInteger numEntries = new AtomicInteger(0);
/**
* The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
* WALEdit to background consumer thread, and the transaction id is the sequence number of the
* corresponding entry in queue.
*/
protected volatile long highestUnsyncedTxid = -1;
/**
* Updated to the transaction id of the last successful sync call. This can be less than
* {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
* for it.
*/
protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
/**
* The total size of wal
*/
protected final AtomicLong totalLogSize = new AtomicLong(0);
/**
* Current log file.
*/
volatile W writer;
// Last time to check low replication on hlog's pipeline
private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
// Last time we asked to roll the log due to a slow sync
private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
protected volatile boolean closed = false;
protected final AtomicBoolean shutdown = new AtomicBoolean(false);
private long nextLogTooOldNs = System.nanoTime();
/**
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
* an IllegalArgumentException if used to compare paths from different wals.
*/
final Comparator<Path> LOG_NAME_COMPARATOR =
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
private static final class WalProps {
/**
* Map the encoded region name to the highest sequence id.
* <p/>Contains all the regions it has an entry for.
*/
public final Map<byte[], Long> encodedName2HighestSequenceId;
/**
* The log file size. Notice that the size may not be accurate if we do asynchronous close in
* sub classes.
*/
public final long logSize;
/**
* The nanoTime of the log rolling, used to determine the time interval that has passed since.
*/
public final long rollTimeNs;
public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
this.logSize = logSize;
this.rollTimeNs = System.nanoTime();
}
}
/**
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp
* (contained in the log file name).
*/
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
/**
* A cache of sync futures reused by threads.
*/
protected final SyncFutureCache syncFutureCache;
/**
* The class name of the runtime implementation, used as prefix for logging/tracing.
* <p>
* Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
* refer to HBASE-17676 for more details
* </p>
*/
protected final String implClassName;
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archiver-%d").build());
private final int archiveRetries;
public long getFilenum() {
return this.filenum.get();
}
/**
* A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
* method returns the creation timestamp from a given log file. It extracts the timestamp assuming
* the filename is created with the {@link #computeFilename(long filenum)} method.
* @return timestamp, as in the log file name.
*/
protected long getFileNumFromFileName(Path fileName) {
checkNotNull(fileName, "file name can't be null");
if (!ourFiles.accept(fileName)) {
throw new IllegalArgumentException(
"The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
}
final String fileNameString = fileName.toString();
String chompedPath = fileNameString.substring(prefixPathStr.length(),
(fileNameString.length() - walFileSuffix.length()));
return Long.parseLong(chompedPath);
}
private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
}
// must be power of 2
protected final int getPreallocatedEventCount() {
// Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
// be stuck and make no progress if the buffer is filled with appends only and there is no
// sync. If no sync, then the handlers will be outstanding just waiting on sync completion
// before they return.
int preallocatedEventCount =
this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
int floor = Integer.highestOneBit(preallocatedEventCount);
if (floor == preallocatedEventCount) {
return floor;
}
// max capacity is 1 << 30
if (floor >= 1 << 29) {
return 1 << 30;
}
return floor << 1;
}
protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
final boolean failIfWALExists, final String prefix, final String suffix)
throws FailedLogCloseException, IOException {
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
}
protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
final String logDir, final String archiveDir, final Configuration conf,
final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
final String suffix)
throws FailedLogCloseException, IOException {
this.fs = fs;
this.walDir = new Path(rootDir, logDir);
this.walArchiveDir = new Path(rootDir, archiveDir);
this.conf = conf;
this.abortable = abortable;
if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
throw new IOException("Unable to mkdir " + walDir);
}
if (!fs.exists(this.walArchiveDir)) {
if (!fs.mkdirs(this.walArchiveDir)) {
throw new IOException("Unable to mkdir " + this.walArchiveDir);
}
}
// If prefix is null||empty then just name it wal
this.walFilePrefix =
prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
// we only correctly differentiate suffices when numeric ones start with '.'
if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
"' but instead was '" + suffix + "'");
}
// Now that it exists, set the storage policy for the entire directory of wal files related to
// this FSHLog instance
String storagePolicy =
conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
this.ourFiles = new PathFilter() {
@Override
public boolean accept(final Path fileName) {
// The path should start with dir/<prefix> and end with our suffix
final String fileNameString = fileName.toString();
if (!fileNameString.startsWith(prefixPathStr)) {
return false;
}
if (walFileSuffix.isEmpty()) {
// in the case of the null suffix, we need to ensure the filename ends with a timestamp.
return org.apache.commons.lang3.StringUtils
.isNumeric(fileNameString.substring(prefixPathStr.length()));
} else if (!fileNameString.endsWith(walFileSuffix)) {
return false;
}
return true;
}
};
if (failIfWALExists) {
final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
if (null != walFiles && 0 != walFiles.length) {
throw new IOException("Target WAL already exists within directory " + walDir);
}
}
// Register listeners. TODO: Should this exist anymore? We have CPs?
if (listeners != null) {
for (WALActionsListener i : listeners) {
registerWALActionsListener(i);
}
}
this.coprocessorHost = new WALCoprocessorHost(this, conf);
// Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
// size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
// some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
// the block size but experience from the field has it that this was not enough time for the
// roll to happen before end-of-block. So the new accounting makes WALs of about the same
// size as those made in hbase-1 (to prevent surprise), we now have default block size as
// 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
// make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
this.logrollsize = (long) (this.blocksize * multiplier);
this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir +
", maxLogs=" + this.maxLogs);
this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
DEFAULT_SLOW_SYNC_TIME_MS));
this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS,
DEFAULT_ROLL_ON_SYNC_TIME_MS));
this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD,
DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS,
DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
DEFAULT_WAL_SYNC_TIMEOUT_MS));
this.syncFutureCache = new SyncFutureCache(conf);
this.implClassName = getClass().getSimpleName();
this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
}
/**
* Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
*/
public void init() throws IOException {
rollWriter();
}
@Override
public void registerWALActionsListener(WALActionsListener listener) {
this.listeners.add(listener);
}
@Override
public boolean unregisterWALActionsListener(WALActionsListener listener) {
return this.listeners.remove(listener);
}
@Override
public WALCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
}
@Override
public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
}
@Override
public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
}
@Override
public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
}
@Override
public void abortCacheFlush(byte[] encodedRegionName) {
this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
}
@Override
public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
// Used by tests. Deprecated as too subtle for general usage.
return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
}
@Override
public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
// This method is used by tests and for figuring if we should flush or not because our
// sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
// figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
// from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
// currently flushing sequence ids, and if anything found there, it is returning these. This is
// the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
// we crash during the flush. For figuring what to flush, we might get requeued if our sequence
// id is old even though we are currently flushing. This may mean we do too much flushing.
return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
}
@Override
public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
return rollWriter(false);
}
@Override
public final void sync() throws IOException {
sync(useHsync);
}
@Override
public final void sync(long txid) throws IOException {
sync(txid, useHsync);
}
@Override
public final void sync(boolean forceSync) throws IOException {
TraceUtil.trace(() -> {
doSync(forceSync);
return null;
}, () -> createSpan("WAL.sync"));
}
@Override
public final void sync(long txid, boolean forceSync) throws IOException {
TraceUtil.trace(() -> {
doSync(txid, forceSync);
return null;
}, () -> createSpan("WAL.sync"));
}
protected abstract void doSync(boolean forceSync) throws IOException;
protected abstract void doSync(long txid, boolean forceSync) throws IOException;
/**
* This is a convenience method that computes a new filename with a given file-number.
* @param filenum to use
* @return Path
*/
protected Path computeFilename(final long filenum) {
if (filenum < 0) {
throw new RuntimeException("WAL file number can't be < 0");
}
String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
return new Path(walDir, child);
}
/**
* This is a convenience method that computes a new filename with a given using the current WAL
* file-number
* @return Path
*/
public Path getCurrentFileName() {
return computeFilename(this.filenum.get());
}
/**
* retrieve the next path to use for writing. Increments the internal filenum.
*/
private Path getNewPath() throws IOException {
this.filenum.set(EnvironmentEdgeManager.currentTime());
Path newPath = getCurrentFileName();
while (fs.exists(newPath)) {
this.filenum.incrementAndGet();
newPath = getCurrentFileName();
}
return newPath;
}
Path getOldPath() {
long currentFilenum = this.filenum.get();
Path oldPath = null;
if (currentFilenum > 0) {
// ComputeFilename will take care of meta wal filename
oldPath = computeFilename(currentFilenum);
} // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
return oldPath;
}
/**
* Tell listeners about pre log roll.
*/
private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
throws IOException {
coprocessorHost.preWALRoll(oldPath, newPath);
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.preLogRoll(oldPath, newPath);
}
}
}
/**
* Tell listeners about post log roll.
*/
private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
throws IOException {
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.postLogRoll(oldPath, newPath);
}
}
coprocessorHost.postWALRoll(oldPath, newPath);
}
// public only until class moves to o.a.h.h.wal
/** @return the number of rolled log files */
public int getNumRolledLogFiles() {
return walFile2Props.size();
}
// public only until class moves to o.a.h.h.wal
/** @return the number of log files in use */
public int getNumLogFiles() {
// +1 for current use log
return getNumRolledLogFiles() + 1;
}
/**
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
* check the first (oldest) WAL, and return those regions which should be flushed so that
* it can be let-go/'archived'.
* @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file.
*/
Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
Map<byte[], List<byte[]>> regions = null;
int logCount = getNumRolledLogFiles();
if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
regions =
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
}
if (regions != null) {
List<String> listForPrint = new ArrayList<>();
for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
StringBuilder families = new StringBuilder();
for (int i = 0; i < r.getValue().size(); i++) {
if (i > 0) {
families.append(",");
}
families.append(Bytes.toString(r.getValue().get(i)));
}
listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
}
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
"; forcing (partial) flush of " + regions.size() + " region(s): " +
StringUtils.join(",", listForPrint));
}
return regions;
}
/**
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
*/
private void cleanOldLogs() throws IOException {
List<Pair<Path, Long>> logsToArchive = null;
long now = System.nanoTime();
boolean mayLogTooOld = nextLogTooOldNs <= now;
ArrayList<byte[]> regionsBlockingWal = null;
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
// are older than what is currently in memory, the WAL can be GC'd.
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
Path log = e.getKey();
ArrayList<byte[]> regionsBlockingThisWal = null;
long ageNs = now - e.getValue().rollTimeNs;
if (ageNs > walTooOldNs) {
if (mayLogTooOld && regionsBlockingWal == null) {
regionsBlockingWal = new ArrayList<>();
}
regionsBlockingThisWal = regionsBlockingWal;
}
Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) {
if (logsToArchive == null) {
logsToArchive = new ArrayList<>();
}
logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
if (LOG.isTraceEnabled()) {
LOG.trace("WAL file ready for archiving " + log);
}
} else if (regionsBlockingThisWal != null) {
StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ")
.append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: ");
boolean isFirst = true;
for (byte[] region : regionsBlockingThisWal) {
if (!isFirst) {
sb.append("; ");
}
isFirst = false;
sb.append(Bytes.toString(region));
}
LOG.warn(sb.toString());
nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS;
regionsBlockingThisWal.clear();
}
}
if (logsToArchive != null) {
final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
// make it async
for (Pair<Path, Long> log : localLogsToArchive) {
logArchiveExecutor.execute(() -> {
archive(log);
});
this.walFile2Props.remove(log.getFirst());
}
}
}
protected void archive(final Pair<Path, Long> log) {
int retry = 1;
while (true) {
try {
archiveLogFile(log.getFirst());
totalLogSize.addAndGet(-log.getSecond());
// successful
break;
} catch (Throwable e) {
if (retry > archiveRetries) {
LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
if (this.abortable != null) {
this.abortable.abort("Failed log archiving", e);
break;
}
} else {
LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry,
e);
}
retry++;
}
}
}
/*
* only public so WALSplitter can use.
* @return archived location of a WAL file with the given path p
*/
public static Path getWALArchivePath(Path archiveDir, Path p) {
return new Path(archiveDir, p.getName());
}
protected void archiveLogFile(final Path p) throws IOException {
Path newPath = getWALArchivePath(this.walArchiveDir, p);
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.preLogArchive(p, newPath);
}
}
LOG.info("Archiving " + p + " to " + newPath);
if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
throw new IOException("Unable to rename " + p + " to " + newPath);
}
// Tell our listeners that a log has been archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.postLogArchive(p, newPath);
}
}
}
protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
int oldNumEntries = this.numEntries.getAndSet(0);
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
if (oldPath != null) {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
newPathString);
} else {
LOG.info("New WAL {}", newPathString);
}
}
private Span createSpan(String name) {
return TraceUtil.createSpan(name).setAttribute(TraceUtil.WAL_IMPL, implClassName);
}
/**
* Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
* <p/>
* <ul>
* <li>In the case of creating a new WAL, oldPath will be null.</li>
* <li>In the case of rolling over from one file to the next, none of the parameters will be null.
* </li>
* <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
* null.</li>
* </ul>
* @param oldPath may be null
* @param newPath may be null
* @param nextWriter may be null
* @return the passed in <code>newPath</code>
* @throws IOException if there is a problem flushing or closing the underlying FS
*/
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
return TraceUtil.trace(() -> {
doReplaceWriter(oldPath, newPath, nextWriter);
return newPath;
}, () -> createSpan("WAL.replaceWriter"));
}
protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
// Now we have published the ringbuffer, halt the current thread until we get an answer back.
try {
if (syncFuture != null) {
if (closed) {
throw new IOException("WAL has been closed");
} else {
syncFuture.get(walSyncTimeoutNs);
}
}
} catch (TimeoutIOException tioe) {
throw tioe;
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
throw convertInterruptedExceptionToIOException(ie);
} catch (ExecutionException e) {
throw ensureIOException(e.getCause());
}
}
private static IOException ensureIOException(final Throwable t) {
return (t instanceof IOException) ? (IOException) t : new IOException(t);
}
private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
Thread.currentThread().interrupt();
IOException ioe = new InterruptedIOException();
ioe.initCause(ie);
return ioe;
}
private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
rollWriterLock.lock();
try {
if (this.closed) {
throw new WALClosedException("WAL has been closed");
}
// Return if nothing to flush.
if (!force && this.writer != null && this.numEntries.get() <= 0) {
return null;
}
Map<byte[], List<byte[]>> regionsToFlush = null;
try {
Path oldPath = getOldPath();
Path newPath = getNewPath();
// Any exception from here on is catastrophic, non-recoverable so we currently abort.
W nextWriter = this.createWriterInstance(newPath);
tellListenersAboutPreLogRoll(oldPath, newPath);
// NewPath could be equal to oldPath if replaceWriter fails.
newPath = replaceWriter(oldPath, newPath, nextWriter);
tellListenersAboutPostLogRoll(oldPath, newPath);
if (LOG.isDebugEnabled()) {
LOG.debug("Create new " + implClassName + " writer with pipeline: " +
Arrays.toString(getPipeline()));
}
// We got a new writer, so reset the slow sync count
lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
slowSyncCount.set(0);
// Can we delete any of the old log files?
if (getNumRolledLogFiles() > 0) {
cleanOldLogs();
regionsToFlush = findRegionsToForceFlush();
}
} catch (CommonFSUtils.StreamLacksCapabilityException exception) {
// If the underlying FileSystem can't do what we ask, treat as IO failure so
// we'll abort.
throw new IOException(
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
exception);
}
return regionsToFlush;
} finally {
rollWriterLock.unlock();
}
}
@Override
public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
}
// public only until class moves to o.a.h.h.wal
/** @return the size of log files in use */
public long getLogFileSize() {
return this.totalLogSize.get();
}
// public only until class moves to o.a.h.h.wal
public void requestLogRoll() {
requestLogRoll(ERROR);
}
/**
* Get the backing files associated with this WAL.
* @return may be null if there are no files.
*/
FileStatus[] getFiles() throws IOException {
return CommonFSUtils.listStatus(fs, walDir, ourFiles);
}
@Override
public void shutdown() throws IOException {
if (!shutdown.compareAndSet(false, true)) {
return;
}
closed = true;
// Tell our listeners that the log is closing
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.logCloseRequested();
}
}
rollWriterLock.lock();
try {
doShutdown();
if (syncFutureCache != null) {
syncFutureCache.clear();
}
if (logArchiveExecutor != null) {
logArchiveExecutor.shutdownNow();
}
} finally {
rollWriterLock.unlock();
}
}
@Override
public void close() throws IOException {
shutdown();
final FileStatus[] files = getFiles();
if (null != files && 0 != files.length) {