-
Notifications
You must be signed in to change notification settings - Fork 8.7k
/
DirectoryScanner.java
973 lines (881 loc) · 32.9 KB
/
DirectoryScanner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
/**
* Periodically scans the data directories for block and block metadata files.
* Reconciles the differences with block information maintained in the dataset.
*/
@InterfaceAudience.Private
public class DirectoryScanner implements Runnable {
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
private static final int MILLIS_PER_SECOND = 1000;
private static final String START_MESSAGE =
"Periodic Directory Tree Verification scan"
+ " starting at %dms with interval of %dms";
private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
+ " and throttle limit of %dms/s";
private final FsDatasetSpi<?> dataset;
private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs;
private final int throttleLimitMsPerSec;
private volatile boolean shouldRun = false;
private boolean retainDiffs = false;
private final DataNode datanode;
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads executing. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeRunningMs = new AtomicLong(0L);
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads blocked by the throttle. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeWaitingMs = new AtomicLong(0L);
/**
* The complete list of block differences indexed by block pool ID.
*/
@VisibleForTesting
final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
/**
* Statistics about the block differences in each blockpool, indexed by
* block pool ID.
*/
@VisibleForTesting
final Map<String, Stats> stats = new HashMap<String, Stats>();
/**
* Allow retaining diffs for unit test and analysis. Defaults to false (off)
* @param b whether to retain diffs
*/
@VisibleForTesting
void setRetainDiffs(boolean b) {
retainDiffs = b;
}
/**
* Stats tracked for reporting and testing, per blockpool
*/
@VisibleForTesting
static class Stats {
final String bpid;
long totalBlocks = 0;
long missingMetaFile = 0;
long missingBlockFile = 0;
long missingMemoryBlocks = 0;
long mismatchBlocks = 0;
long duplicateBlocks = 0;
/**
* Create a new Stats object for the given blockpool ID.
* @param bpid blockpool ID
*/
public Stats(String bpid) {
this.bpid = bpid;
}
@Override
public String toString() {
return "BlockPool " + bpid
+ " Total blocks: " + totalBlocks + ", missing metadata files:"
+ missingMetaFile + ", missing block files:" + missingBlockFile
+ ", missing blocks in memory:" + missingMemoryBlocks
+ ", mismatched blocks:" + mismatchBlocks;
}
}
/**
* Helper class for compiling block info reports from report compiler threads.
*/
static class ScanInfoPerBlockPool extends
HashMap<String, LinkedList<ScanInfo>> {
private static final long serialVersionUID = 1L;
/**
* Create a new info list.
*/
ScanInfoPerBlockPool() {super();}
/**
* Create a new info list initialized to the given expected size.
* See {@link java.util.HashMap#HashMap(int)}.
*
* @param sz initial expected size
*/
ScanInfoPerBlockPool(int sz) {super(sz);}
/**
* Merges {@code that} ScanInfoPerBlockPool into this one
*
* @param the ScanInfoPerBlockPool to merge
*/
public void addAll(ScanInfoPerBlockPool that) {
if (that == null) return;
for (Entry<String, LinkedList<ScanInfo>> entry : that.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> list = entry.getValue();
if (this.containsKey(bpid)) {
//merge that per-bpid linked list with this one
this.get(bpid).addAll(list);
} else {
//add that new bpid and its linked list to this
this.put(bpid, list);
}
}
}
/**
* Convert all the LinkedList values in this ScanInfoPerBlockPool map
* into sorted arrays, and return a new map of these arrays per blockpool
*
* @return a map of ScanInfo arrays per blockpool
*/
public Map<String, ScanInfo[]> toSortedArrays() {
Map<String, ScanInfo[]> result =
new HashMap<String, ScanInfo[]>(this.size());
for (Entry<String, LinkedList<ScanInfo>> entry : this.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> list = entry.getValue();
// convert list to array
ScanInfo[] record = list.toArray(new ScanInfo[list.size()]);
// Sort array based on blockId
Arrays.sort(record);
result.put(bpid, record);
}
return result;
}
}
/**
* Tracks the files and other information related to a block on the disk
* Missing file is indicated by setting the corresponding member
* to null.
*
* Because millions of these structures may be created, we try to save
* memory here. So instead of storing full paths, we store path suffixes.
* The block file, if it exists, will have a path like this:
* <volume_base_path>/<block_path>
* So we don't need to store the volume path, since we already know what the
* volume is.
*
* The metadata file, if it exists, will have a path like this:
* <volume_base_path>/<block_path>_<genstamp>.meta
* So if we have a block file, there isn't any need to store the block path
* again.
*
* The accessor functions take care of these manipulations.
*/
static class ScanInfo implements Comparable<ScanInfo> {
private final long blockId;
/**
* The block file path, relative to the volume's base directory.
* If there was no block file found, this may be null. If 'vol'
* is null, then this is the full path of the block file.
*/
private final String blockSuffix;
/**
* The suffix of the meta file path relative to the block file.
* If blockSuffix is null, then this will be the entire path relative
* to the volume base directory, or an absolute path if vol is also
* null.
*/
private final String metaSuffix;
private final FsVolumeSpi volume;
/**
* Get the file's length in async block scan
*/
private final long blockFileLength;
private final static Pattern CONDENSED_PATH_REGEX =
Pattern.compile("(?<!^)(\\\\|/){2,}");
private final static String QUOTED_FILE_SEPARATOR =
Matcher.quoteReplacement(File.separator);
/**
* Get the most condensed version of the path.
*
* For example, the condensed version of /foo//bar is /foo/bar
* Unlike {@link File#getCanonicalPath()}, this will never perform I/O
* on the filesystem.
*
* @param path the path to condense
* @return the condensed path
*/
private static String getCondensedPath(String path) {
return CONDENSED_PATH_REGEX.matcher(path).
replaceAll(QUOTED_FILE_SEPARATOR);
}
/**
* Get a path suffix.
*
* @param f The file to get the suffix for.
* @param prefix The prefix we're stripping off.
*
* @return A suffix such that prefix + suffix = path to f
*/
private static String getSuffix(File f, String prefix) {
String fullPath = getCondensedPath(f.getAbsolutePath());
if (fullPath.startsWith(prefix)) {
return fullPath.substring(prefix.length());
}
throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
}
/**
* Create a ScanInfo object for a block. This constructor will examine
* the block data and meta-data files.
*
* @param blockId the block ID
* @param blockFile the path to the block data file
* @param metaFile the path to the block meta-data file
* @param vol the volume that contains the block
*/
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
this.blockId = blockId;
String condensedVolPath = vol == null ? null :
getCondensedPath(vol.getBasePath());
this.blockSuffix = blockFile == null ? null :
getSuffix(blockFile, condensedVolPath);
this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
if (metaFile == null) {
this.metaSuffix = null;
} else if (blockFile == null) {
this.metaSuffix = getSuffix(metaFile, condensedVolPath);
} else {
this.metaSuffix = getSuffix(metaFile,
condensedVolPath + blockSuffix);
}
this.volume = vol;
}
/**
* Returns the block data file.
*
* @return the block data file
*/
File getBlockFile() {
return (blockSuffix == null) ? null :
new File(volume.getBasePath(), blockSuffix);
}
/**
* Return the length of the data block. The length returned is the length
* cached when this object was created.
*
* @return the length of the data block
*/
long getBlockFileLength() {
return blockFileLength;
}
/**
* Returns the block meta data file or null if there isn't one.
*
* @return the block meta data file
*/
File getMetaFile() {
if (metaSuffix == null) {
return null;
} else if (blockSuffix == null) {
return new File(volume.getBasePath(), metaSuffix);
} else {
return new File(volume.getBasePath(), blockSuffix + metaSuffix);
}
}
/**
* Returns the block ID.
*
* @return the block ID
*/
long getBlockId() {
return blockId;
}
/**
* Returns the volume that contains the block that this object describes.
*
* @return the volume
*/
FsVolumeSpi getVolume() {
return volume;
}
@Override // Comparable
public int compareTo(ScanInfo b) {
if (blockId < b.blockId) {
return -1;
} else if (blockId == b.blockId) {
return 0;
} else {
return 1;
}
}
@Override // Object
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ScanInfo)) {
return false;
}
return blockId == ((ScanInfo) o).blockId;
}
@Override // Object
public int hashCode() {
return (int)(blockId^(blockId>>>32));
}
public long getGenStamp() {
return metaSuffix != null ? Block.getGenerationStamp(
getMetaFile().getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
}
}
/**
* Create a new directory scanner, but don't cycle it running yet.
*
* @param datanode the parent datanode
* @param dataset the dataset to scan
* @param conf the Configuration object
*/
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
int throttle =
conf.getInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) {
if (throttle > MILLIS_PER_SECOND) {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value above 1000 ms/sec. Assuming default value of " +
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
} else {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value below 1 ms/sec. Assuming default value of " +
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
}
throttleLimitMsPerSec =
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
} else {
throttleLimitMsPerSec = throttle;
}
int threads =
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
reportCompileThreadPool = Executors.newFixedThreadPool(threads,
new Daemon.DaemonFactory());
masterThread = new ScheduledThreadPoolExecutor(1,
new Daemon.DaemonFactory());
}
/**
* Start the scanner. The scanner will run every
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
void start() {
shouldRun = true;
long offset = ThreadLocalRandom.current().nextInt(
(int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec
long firstScanTime = Time.now() + offset;
String logMsg;
if (throttleLimitMsPerSec < MILLIS_PER_SECOND) {
logMsg = String.format(START_MESSAGE_WITH_THROTTLE, firstScanTime,
scanPeriodMsecs, throttleLimitMsPerSec);
} else {
logMsg = String.format(START_MESSAGE, firstScanTime, scanPeriodMsecs);
}
LOG.info(logMsg);
masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs,
TimeUnit.MILLISECONDS);
}
/**
* Return whether the scanner has been started.
*
* @return whether the scanner has been started
*/
@VisibleForTesting
boolean getRunStatus() {
return shouldRun;
}
/**
* Clear the current cache of diffs and statistics.
*/
private void clear() {
diffs.clear();
stats.clear();
}
/**
* Main program loop for DirectoryScanner. Runs {@link reconcile()}
* and handles any exceptions.
*/
@Override
public void run() {
try {
if (!shouldRun) {
//shutdown has been activated
LOG.warn("this cycle terminating immediately because 'shouldRun' has been deactivated");
return;
}
//We're are okay to run - do it
reconcile();
} catch (Exception e) {
//Log and continue - allows Executor to run again next cycle
LOG.error("Exception during DirectoryScanner execution - will continue next cycle", e);
} catch (Error er) {
//Non-recoverable error - re-throw after logging the problem
LOG.error("System Error during DirectoryScanner execution - permanently terminating periodic scanner", er);
throw er;
}
}
/**
* Stops the directory scanner. This method will wait for 1 minute for the
* main thread to exit and an additional 1 minute for the report compilation
* threads to exit. If a thread does not exit in that time period, it is
* left running, and an error is logged.
*/
void shutdown() {
if (!shouldRun) {
LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
} else {
LOG.warn("DirectoryScanner: shutdown has been called");
}
shouldRun = false;
if (masterThread != null) masterThread.shutdown();
if (reportCompileThreadPool != null) {
reportCompileThreadPool.shutdownNow();
}
if (masterThread != null) {
try {
masterThread.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error("interrupted while waiting for masterThread to " +
"terminate", e);
}
}
if (reportCompileThreadPool != null) {
try {
reportCompileThreadPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error("interrupted while waiting for reportCompileThreadPool to " +
"terminate", e);
}
}
if (!retainDiffs) clear();
}
/**
* Reconcile differences between disk and in-memory blocks
*/
@VisibleForTesting
void reconcile() throws IOException {
scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> diff = entry.getValue();
for (ScanInfo info : diff) {
dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
info.getMetaFile(), info.getVolume());
}
}
if (!retainDiffs) clear();
}
/**
* Scan for the differences between disk and in-memory blocks
* Scan only the "finalized blocks" lists of both disk and memory.
*/
private void scan() {
clear();
Map<String, ScanInfo[]> diskReport = getDiskReport();
// Hold FSDataset lock to prevent further changes to the block map
synchronized(dataset) {
for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
String bpid = entry.getKey();
ScanInfo[] blockpoolReport = entry.getValue();
Stats statsRecord = new Stats(bpid);
stats.put(bpid, statsRecord);
LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
diffs.put(bpid, diffRecord);
statsRecord.totalBlocks = blockpoolReport.length;
List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
FinalizedReplica[] memReport = bl.toArray(new FinalizedReplica[bl.size()]);
Arrays.sort(memReport); // Sort based on blockId
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < memReport.length && d < blockpoolReport.length) {
FinalizedReplica memBlock = memReport[m];
ScanInfo info = blockpoolReport[d];
if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
// Block is missing in memory
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
continue;
}
if (info.getBlockId() > memBlock.getBlockId()) {
// Block is missing on the disk
addDifference(diffRecord, statsRecord,
memBlock.getBlockId(), info.getVolume());
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockFileLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) {
// volumeMap record and on-disk files don't match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
if (d < blockpoolReport.length) {
// There may be multiple on-disk records for the same block, don't increment
// the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)];
if (nextInfo.getBlockId() != info.blockId) {
++m;
}
} else {
++m;
}
}
while (m < memReport.length) {
FinalizedReplica current = memReport[m++];
addDifference(diffRecord, statsRecord,
current.getBlockId(), current.getVolume());
}
while (d < blockpoolReport.length) {
if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) {
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, blockpoolReport[d]);
}
d++;
}
LOG.info(statsRecord.toString());
} //end for
} //end synchronized
}
/**
* Add the ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is found on the disk,
* but the in-memory block is missing or does not match the block on the disk.
*
* @param diffRecord the list to which to add the info
* @param statsRecord the stats to update
* @param info the differing info
*/
private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, ScanInfo info) {
statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0;
statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0;
diffRecord.add(info);
}
/**
* Add a new ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is not found on the disk.
*
* @param diffRecord the list to which to add the info
* @param statsRecord the stats to update
* @param blockId the id of the missing block
* @param vol the volume that contains the missing block
*/
private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, long blockId,
FsVolumeSpi vol) {
statsRecord.missingBlockFile++;
statsRecord.missingMetaFile++;
diffRecord.add(new ScanInfo(blockId, null, null, vol));
}
/**
* Get the lists of blocks on the disks in the dataset, sorted by blockId.
* The returned map contains one entry per blockpool, keyed by the blockpool
* ID.
*
* @return a map of sorted arrays of block information
*/
private Map<String, ScanInfo[]> getDiskReport() {
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
ScanInfoPerBlockPool[] dirReports = null;
// First get list of data directories
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
// Use an array since the threads may return out of order and
// compilersInProgress#keySet may return out of order as well.
dirReports = new ScanInfoPerBlockPool[volumes.size()];
Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
for (int i = 0; i < volumes.size(); i++) {
ReportCompiler reportCompiler =
new ReportCompiler(datanode, volumes.get(i));
Future<ScanInfoPerBlockPool> result =
reportCompileThreadPool.submit(reportCompiler);
compilersInProgress.put(i, result);
}
for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
compilersInProgress.entrySet()) {
Integer index = report.getKey();
try {
dirReports[index] = report.getValue().get();
// If our compiler threads were interrupted, give up on this run
if (dirReports[index] == null) {
dirReports = null;
break;
}
} catch (Exception ex) {
FsVolumeSpi fsVolumeSpi = volumes.get(index);
LOG.error("Error compiling report for the volume, StorageId: "
+ fsVolumeSpi.getStorageID(), ex);
// Continue scanning the other volumes
}
}
} catch (IOException e) {
LOG.error("Unexpected IOException by closing FsVolumeReference", e);
}
if (dirReports != null) {
// Compile consolidated report for all the volumes
for (ScanInfoPerBlockPool report : dirReports) {
if(report != null){
list.addAll(report);
}
}
}
return list.toSortedArrays();
}
/**
* Helper method to determine if a file name is consistent with a block.
* meta-data file
*
* @param blockId the block ID
* @param metaFile the file to check
* @return whether the file name is a block meta-data file name
*/
private static boolean isBlockMetaFile(String blockId, String metaFile) {
return metaFile.startsWith(blockId)
&& metaFile.endsWith(Block.METADATA_EXTENSION);
}
/**
* The ReportCompiler class encapsulates the process of searching a datanode's
* disks for block information. It operates by performing a DFS of the
* volume to discover block information.
*
* When the ReportCompiler discovers block information, it create a new
* ScanInfo object for it and adds that object to its report list. The report
* list is returned by the {@link #call()} method.
*/
private class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
private final FsVolumeSpi volume;
private final DataNode datanode;
// Variable for tracking time spent running for throttling purposes
private final StopWatch throttleTimer = new StopWatch();
// Variable for tracking time spent running and waiting for testing
// purposes
private final StopWatch perfTimer = new StopWatch();
/**
* Create a report compiler for the given volume on the given datanode.
*
* @param datanode the target datanode
* @param volume the target volume
*/
public ReportCompiler(DataNode datanode, FsVolumeSpi volume) {
this.datanode = datanode;
this.volume = volume;
}
/**
* Run this report compiler thread.
*
* @return the block info report list
* @throws IOException if the block pool isn't found
*/
@Override
public ScanInfoPerBlockPool call() throws IOException {
String[] bpList = volume.getBlockPoolList();
ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<>();
File bpFinalizedDir = volume.getFinalizedDir(bpid);
perfTimer.start();
throttleTimer.start();
try {
result.put(bpid,
compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
} catch (InterruptedException ex) {
// Exit quickly and flag the scanner to do the same
result = null;
break;
}
}
return result;
}
/**
* Compile a list of {@link ScanInfo} for the blocks in the directory
* given by {@code dir}.
*
* @param vol the volume that contains the directory to scan
* @param bpFinalizedDir the root directory of the directory to scan
* @param dir the directory to scan
* @param report the list onto which blocks reports are placed
*/
private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol,
File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
throws InterruptedException {
throttle();
List <String> fileNames;
try {
fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe);
// Initiate a check on disk failure.
datanode.checkDiskErrorAsync();
// Ignore this directory and proceed.
return report;
}
Collections.sort(fileNames);
/*
* Assumption: In the sorted list of files block file appears immediately
* before block metadata file. This is true for the current naming
* convention for block file blk_<blockid> and meta file
* blk_<blockid>_<genstamp>.meta
*/
for (int i = 0; i < fileNames.size(); i++) {
// Make sure this thread can make a timely exit. With a low throttle
// rate, completing a run can take a looooong time.
if (Thread.interrupted()) {
throw new InterruptedException();
}
File file = new File(dir, fileNames.get(i));
if (file.isDirectory()) {
compileReport(vol, bpFinalizedDir, file, report);
continue;
}
if (!Block.isBlockFilename(file)) {
if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
long blockId = Block.getBlockId(file.getName());
verifyFileLocation(file.getParentFile(), bpFinalizedDir,
blockId);
report.add(new ScanInfo(blockId, null, file, vol));
}
continue;
}
File blockFile = file;
long blockId = Block.filename2id(file.getName());
File metaFile = null;
// Skip all the files that start with block name until
// getting to the metafile for the block
while (i + 1 < fileNames.size()) {
File blkMetaFile = new File(dir, fileNames.get(i + 1));
if (!(blkMetaFile.isFile()
&& blkMetaFile.getName().startsWith(blockFile.getName()))) {
break;
}
i++;
if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
metaFile = blkMetaFile;
break;
}
}
verifyFileLocation(blockFile, bpFinalizedDir, blockId);
report.add(new ScanInfo(blockId, blockFile, metaFile, vol));
}
return report;
}
/**
* Verify whether the actual directory location of block file has the
* expected directory path computed using its block ID.
*/
private void verifyFileLocation(File actualBlockFile,
File bpFinalizedDir, long blockId) {
File blockDir = DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
if (actualBlockFile.getParentFile().compareTo(blockDir) != 0) {
File expBlockFile = new File(blockDir, actualBlockFile.getName());
LOG.warn("Block: " + blockId
+ " has to be upgraded to block ID-based layout. "
+ "Actual block file path: " + actualBlockFile
+ ", expected block file path: " + expBlockFile);
}
}
/**
* Called by the thread before each potential disk scan so that a pause
* can be optionally inserted to limit the number of scans per second.
* The limit is controlled by
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
*/
private void throttle() throws InterruptedException {
accumulateTimeRunning();
if ((throttleLimitMsPerSec < 1000) &&
(throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) {
Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec);
throttleTimer.reset().start();
}
accumulateTimeWaiting();
}
/**
* Helper method to measure time running.
*/
private void accumulateTimeRunning() {
timeRunningMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
/**
* Helper method to measure time waiting.
*/
private void accumulateTimeWaiting() {
timeWaitingMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
}
private enum BlockDirFilter implements FilenameFilter {
INSTANCE;
@Override
public boolean accept(File dir, String name) {
return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)
|| name.startsWith(DataStorage.STORAGE_DIR_FINALIZED)
|| name.startsWith(Block.BLOCK_FILE_PREFIX);
}
}
}