Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove empty avro files during compaction #2158

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -83,30 +86,51 @@ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOExcepti
boolean appendDeltaOutput = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);

// Obtain record count from input file names
// We are not getting record count from map-reduce counter because in next run, the threshold (delta record)
// calculation is based on the input file names.
Job job = this.configurator.getConfiguredJob();
List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);

long newTotalRecords = 0;
long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir()));
long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir()));

// Filter out all bad paths caused by speculative execution
// The problem happens when speculative task attempt initialized but then killed in the middle of processing.
// Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
// without being committed to its final destination at {tmp_output}/part-m-xxxx.avro.
// We do a scanning here to remove all these bad files before these files being moved to the compaction output folder.
List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
List<Path> goodPaths = new ArrayList<>();
for (Path filePath: allFilePaths) {
if (isFailedPath(filePath, failedEvents)) {
this.fs.delete(filePath, false);
log.error("{} is a bad path so it was deleted", filePath);
} else {
goodPaths.add(filePath);
}
}

if (appendDeltaOutput) {
FsPermission permission = HadoopUtils.deserializeFsPermission(this.state,
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath, permission);
// append files under mr output to destination
List<Path> paths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
for (Path path: paths) {
String fileName = path.getName();
log.info(String.format("Adding %s to %s", path.toString(), dstPath));
for (Path filePath: goodPaths) {
String fileName = filePath.getName();
log.info(String.format("Adding %s to %s", filePath.toString(), dstPath));
Path outPath = new Path (dstPath, fileName);

if (!this.fs.rename(path, outPath)) {
if (!this.fs.rename(filePath, outPath)) {
throw new IOException(
String.format("Unable to move %s to %s", path.toString(), outPath.toString()));
String.format("Unable to move %s to %s", filePath.toString(), outPath.toString()));
}
}

// Obtain record count from input file names.
// We don't get record count from map-reduce counter because in the next run, the threshold (delta record)
// calculation is based on the input file names. By pre-defining which input folders are involved in the
// MR execution, it is easy to track how many files are involved in MR so far, thus calculating the number of total records
// (all previous run + current run) is possible.
newTotalRecords = this.configurator.getFileNameRecordCount();
} else {
this.fs.delete(dstPath, true);
Expand All @@ -120,15 +144,18 @@ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOExcepti
String.format("Unable to move %s to %s", tmpPath, dstPath));
}

// get record count from map reduce job counter
Job job = this.configurator.getConfiguredJob();
// Obtain record count from map reduce job counter
// We don't get record count from file name because tracking which files are actually involved in the MR execution can
// be hard. This is due to new minutely data is rolled up to hourly folder but from daily compaction perspective we are not
// able to tell which file are newly added (because we simply pass all hourly folders to MR job instead of individual files).
Counter counter = job.getCounters().findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
newTotalRecords = counter.getValue();
}

State compactState = helper.loadState(new Path (result.getDstAbsoluteDir()));
compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
helper.saveState(new Path (result.getDstAbsoluteDir()), compactState);

log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath, executeCount + 1);
Expand All @@ -138,12 +165,19 @@ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOExcepti
Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1),
CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);
}
}
}

private boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) {
return failedEvents.stream()
.filter(event -> path.toString().contains(event.getTaskAttemptId().toString()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only care about one match, so limit(1) can be added. Also, should put the path separator before and after the attempt id to avoid an incorrect match if the attempt id is a prefix of another attempt id. Like if attempt abc_10 passes, we don't want failed attempt abc_1 to match it.

.collect(Collectors.toList()).size() > 0;
}

public void addEventSubmitter(EventSubmitter eventSubmitter) {
this.eventSubmitter = eventSubmitter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class CompactionSlaEventHelper {
public static final String NEED_RECOMPACT = "needRecompact";
public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal";
public static final String EXEC_COUNT_TOTAL = "executionCountTotal";
public static final String MR_JOB_ID = "mrJobId";
public static final String RECORD_COUNT_TOTAL = "recordCountTotal";
public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths";
public static final String RENAME_DIR_PATHS = "renameDirPaths";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A configurator that focused on creating avro compaction map-reduce job
Expand Down Expand Up @@ -331,5 +336,29 @@ protected Collection<Path> getGranularInputPaths (Path path) throws IOException

return uncompacted;
}

public static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job completedJob) {
List<TaskCompletionEvent> completionEvents = new LinkedList<>();

while (true) {
try {
TaskCompletionEvent[] bunchOfEvents;
bunchOfEvents = completedJob.getTaskCompletionEvents(completionEvents.size());
if (bunchOfEvents == null || bunchOfEvents.length == 0) {
break;
}
completionEvents.addAll(Arrays.asList(bunchOfEvents));
} catch (IOException e) {
break;
}
}

return completionEvents;
}

public static List<TaskCompletionEvent> getUnsuccessfulTaskCompletionEvent(Job completedJob) {
return getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED).collect(
Collectors.toList());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.math3.primes.Primes;
Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -321,9 +324,24 @@ public void run() {
this.configureJob(job);
this.submitAndWait(job);
if (shouldPublishData(compactionTimestamp)) {
// remove all invalid empty files due to speculative task execution
List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(this.tmpFs, this.dataset.outputTmpPath(), Lists.newArrayList("avro"));
List<Path> goodPaths = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code shows up in CompactionCompleteFileOperationAction.java too. Can you make a common method for this?

for (Path filePath: allFilePaths) {
if (isFailedPath(filePath, failedEvents)) {
this.tmpFs.delete(filePath, false);
LOG.error("{} is a bad path so it was deleted", filePath);
} else {
LOG.info("{} is a good path so it was kept", filePath);
goodPaths.add(filePath);
}
}


if (!this.recompactAllData && this.recompactFromDestPaths) {
// append new files without deleting output directory
addFilesInTmpPathToOutputPath();
addGoodFilesToOutputPath(goodPaths);
// clean up late data from outputLateDirectory, which has been set to inputPath
deleteFilesByPaths(this.dataset.inputPaths());
} else {
Expand Down Expand Up @@ -352,6 +370,11 @@ public void run() {
}
}

private boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is also in CompactionCompleteFileOperationAction.java.

return failedEvents.stream()
.filter(event -> path.toString().contains(event.getTaskAttemptId().toString()))
.collect(Collectors.toList()).size() > 0;
}

/**
* For regular compactions, compaction timestamp is the time the compaction job starts.
Expand Down Expand Up @@ -603,9 +626,8 @@ private void moveTmpPathToOutputPath() throws IOException {
HadoopUtils.movePath (MRCompactorJobRunner.this.tmpFs, this.dataset.outputTmpPath(), FileSystem.get(this.dataset.outputPath().getParent().toUri(), this.fs.getConf()), this.dataset.outputPath(), false, this.fs.getConf()) ;
}

private void addFilesInTmpPathToOutputPath () throws IOException {
List<Path> paths = this.getApplicableFilePaths(this.dataset.outputTmpPath(), this.tmpFs);
for (Path path: paths) {
private void addGoodFilesToOutputPath (List<Path> goodPaths) throws IOException {
for (Path path: goodPaths) {
String fileName = path.getName();
LOG.info(String.format("Adding %s to %s", path.toString(), this.dataset.outputPath()));
Path outPath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(fileName,
Expand Down