Skip to content
Permalink
Browse files
fix resume test
  • Loading branch information
Ewocker committed Mar 4, 2022
1 parent 2e726a8 commit ff097accccac0c64c48c29f75dec07f62ab68d52
Showing 1 changed file with 6 additions and 4 deletions.
@@ -151,11 +151,11 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
private static final Logger log = LoggerFactory.getLogger(MultithreadedTraverseWithSortStrategy.class);
private final Charset charset = UTF_8;
private final boolean compressionEnabled;
private final NodeStateEntryWriter entryWriter;
/**
* Directory where sorted files will be created.
*/
private final File storeDir;
private final File mergeDir;
/**
* Comparator used for comparing node states for creating sorted files.
*/
@@ -233,11 +233,12 @@ public int getValue() {
BlobStore blobStore, File storeDir, List<File> existingDataDumpDirs,
boolean compressionEnabled, MemoryManager memoryManager, long dumpThreshold) throws IOException {
this.storeDir = storeDir;
this.mergeDir = new File(storeDir, "merge");
FileUtils.forceMkdir(mergeDir);
this.compressionEnabled = compressionEnabled;
this.sortedFiles = new ConcurrentLinkedQueue<>();
this.throwables = new ConcurrentLinkedQueue<>();
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.entryWriter = new NodeStateEntryWriter(blobStore);
taskQueue = new LinkedBlockingQueue<>();
phaser = new Phaser() {
@Override
@@ -269,6 +270,9 @@ void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFa
if (!existingSortWorkDir.isDirectory()) {
log.info("Not a directory {}. Skipping it.", existingSortWorkDir.getAbsolutePath());
continue;
} else if (existingSortWorkDir.getName().equals("merge")) {
log.info("Intermediate Merge Directory. Skipping it.", existingSortWorkDir.getAbsolutePath());
continue;
}
boolean downloadCompleted = DirectoryHelper.hasCompleted(existingSortWorkDir);
if (!downloadCompleted && i == existingDataDumpDirs.size() - 1) {
@@ -405,14 +409,12 @@ private class MergeRunner implements Runnable {
private final ArrayList<File> intermediateMergedFiles = new ArrayList<File>();
private final ArrayList<File> mergedFiles = new ArrayList<File>();
private final File sortedFile;
private final File mergeDir = new File(storeDir, "merge");
private final int failureThreshold = 100;
private final int batchMergeSize = 64;
private int nextMergedLength = 0;


public MergeRunner(File sortedFile) throws IOException {
FileUtils.forceMkdir(mergeDir);
this.sortedFile = sortedFile;
}

0 comments on commit ff097ac

Please sign in to comment.