Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove empty avro files during compaction #2158

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -83,30 +86,36 @@ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOExcepti
boolean appendDeltaOutput = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);

// Obtain record count from input file names
// We are not getting record count from map-reduce counter because in next run, the threshold (delta record)
// calculation is based on the input file names.
Job job = this.configurator.getConfiguredJob();

long newTotalRecords = 0;
long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir()));
long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir()));

List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, tmpPath, this.fs);

if (appendDeltaOutput) {
FsPermission permission = HadoopUtils.deserializeFsPermission(this.state,
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath, permission);
// append files under mr output to destination
List<Path> paths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
for (Path path: paths) {
String fileName = path.getName();
log.info(String.format("Adding %s to %s", path.toString(), dstPath));
for (Path filePath: goodPaths) {
String fileName = filePath.getName();
log.info(String.format("Adding %s to %s", filePath.toString(), dstPath));
Path outPath = new Path (dstPath, fileName);

if (!this.fs.rename(path, outPath)) {
if (!this.fs.rename(filePath, outPath)) {
throw new IOException(
String.format("Unable to move %s to %s", path.toString(), outPath.toString()));
String.format("Unable to move %s to %s", filePath.toString(), outPath.toString()));
}
}

// Obtain record count from input file names.
// We don't get record count from map-reduce counter because in the next run, the threshold (delta record)
// calculation is based on the input file names. By pre-defining which input folders are involved in the
// MR execution, it is easy to track how many files are involved in MR so far, thus calculating the number of total records
// (all previous run + current run) is possible.
newTotalRecords = this.configurator.getFileNameRecordCount();
} else {
this.fs.delete(dstPath, true);
Expand All @@ -120,15 +129,18 @@ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOExcepti
String.format("Unable to move %s to %s", tmpPath, dstPath));
}

// get record count from map reduce job counter
Job job = this.configurator.getConfiguredJob();
// Obtain record count from map reduce job counter
// We don't get record count from file name because tracking which files are actually involved in the MR execution can
// be hard. This is due to new minutely data is rolled up to hourly folder but from daily compaction perspective we are not
// able to tell which file are newly added (because we simply pass all hourly folders to MR job instead of individual files).
Counter counter = job.getCounters().findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
newTotalRecords = counter.getValue();
}

State compactState = helper.loadState(new Path (result.getDstAbsoluteDir()));
compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
helper.saveState(new Path (result.getDstAbsoluteDir()), compactState);

log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath, executeCount + 1);
Expand All @@ -138,12 +150,15 @@ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOExcepti
Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1),
CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);
}
}
}



public void addEventSubmitter(EventSubmitter eventSubmitter) {
this.eventSubmitter = eventSubmitter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class CompactionSlaEventHelper {
public static final String NEED_RECOMPACT = "needRecompact";
public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal";
public static final String EXEC_COUNT_TOTAL = "executionCountTotal";
public static final String MR_JOB_ID = "mrJobId";
public static final String RECORD_COUNT_TOTAL = "recordCountTotal";
public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths";
public static final String RENAME_DIR_PATHS = "renameDirPaths";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import com.google.common.base.Enums;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;

import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.mapreduce.avro.*;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
Expand All @@ -38,20 +41,27 @@
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.commons.math3.primes.Primes;
import org.apache.gobblin.writer.WriterOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A configurator that focused on creating avro compaction map-reduce job
Expand Down Expand Up @@ -331,5 +341,61 @@ protected Collection<Path> getGranularInputPaths (Path path) throws IOException

return uncompacted;
}

private static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job completedJob) {
List<TaskCompletionEvent> completionEvents = new LinkedList<>();

while (true) {
try {
TaskCompletionEvent[] bunchOfEvents;
bunchOfEvents = completedJob.getTaskCompletionEvents(completionEvents.size());
if (bunchOfEvents == null || bunchOfEvents.length == 0) {
break;
}
completionEvents.addAll(Arrays.asList(bunchOfEvents));
} catch (IOException e) {
break;
}
}

return completionEvents;
}

private static List<TaskCompletionEvent> getUnsuccessfulTaskCompletionEvent(Job completedJob) {
return getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED).collect(
Collectors.toList());
}

private static boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) {
return failedEvents.stream()
.anyMatch(event -> path.toString().contains(Path.SEPARATOR + event.getTaskAttemptId().toString() + Path.SEPARATOR));
}

/**
* Remove all bad paths caused by speculative execution
* The problem happens when speculative task attempt initialized but then killed in the middle of processing.
* Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
* without being committed to its final destination at {tmp_output}/part-m-xxxx.avro.
*
* @param job Completed MR job
* @param fs File system that can handle file system
* @return all successful paths
*/
public static List<Path> removeFailedPaths(Job job, Path tmpPath, FileSystem fs) throws IOException {
List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);

List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
List<Path> goodPaths = new ArrayList<>();
for (Path filePath: allFilePaths) {
if (CompactionAvroJobConfigurator.isFailedPath(filePath, failedEvents)) {
fs.delete(filePath, false);
log.error("{} is a bad path so it was deleted", filePath);
} else {
goodPaths.add(filePath);
}
}

return goodPaths;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.math3.primes.Primes;
Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -321,9 +324,12 @@ public void run() {
this.configureJob(job);
this.submitAndWait(job);
if (shouldPublishData(compactionTimestamp)) {
// remove all invalid empty files due to speculative task execution
List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, this.dataset.outputTmpPath(), this.tmpFs);

if (!this.recompactAllData && this.recompactFromDestPaths) {
// append new files without deleting output directory
addFilesInTmpPathToOutputPath();
addGoodFilesToOutputPath(goodPaths);
// clean up late data from outputLateDirectory, which has been set to inputPath
deleteFilesByPaths(this.dataset.inputPaths());
} else {
Expand Down Expand Up @@ -352,7 +358,6 @@ public void run() {
}
}


/**
* For regular compactions, compaction timestamp is the time the compaction job starts.
*
Expand Down Expand Up @@ -603,9 +608,8 @@ private void moveTmpPathToOutputPath() throws IOException {
HadoopUtils.movePath (MRCompactorJobRunner.this.tmpFs, this.dataset.outputTmpPath(), FileSystem.get(this.dataset.outputPath().getParent().toUri(), this.fs.getConf()), this.dataset.outputPath(), false, this.fs.getConf()) ;
}

private void addFilesInTmpPathToOutputPath () throws IOException {
List<Path> paths = this.getApplicableFilePaths(this.dataset.outputTmpPath(), this.tmpFs);
for (Path path: paths) {
private void addGoodFilesToOutputPath (List<Path> goodPaths) throws IOException {
for (Path path: goodPaths) {
String fileName = path.getName();
LOG.info(String.format("Adding %s to %s", path.toString(), this.dataset.outputPath()));
Path outPath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(fileName,
Expand Down