Skip to content
Permalink
Browse files
Merge pull request #1993 from treff7es/eventual_consistent_fs_support
  • Loading branch information
abti committed Sep 8, 2017
2 parents 6b616d4 + 0e87095 commit 344d6d3c68b23ba451cabbf3bacb389e38e27f11
Show file tree
Hide file tree
Showing 18 changed files with 231 additions and 70 deletions.
@@ -37,7 +37,7 @@ public abstract class RecordCountProvider {
* Convert a {@link Path} from another {@link RecordCountProvider} so that it can be used
* in {@link #getRecordCount(Path)} of this {@link RecordCountProvider}.
*/
public Path convertPath(Path path, RecordCountProvider src) {
public Path convertPath(Path path, String extension, RecordCountProvider src) {
if (this.getClass().equals(src.getClass())) {
return path;
}
@@ -25,6 +25,7 @@
import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.COMMITTED;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,7 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -255,6 +257,8 @@ public class MRCompactor implements Compactor {
public static final String COMPACTION_TRACKING_EVENTS_NAMESPACE = COMPACTION_PREFIX + "tracking.events";

public static final String COMPACTION_INPUT_PATH_TIME = COMPACTION_PREFIX + "input.path.time";
public static final String COMPACTION_FILE_EXTENSION =
COMPACTION_PREFIX + "extension";

private static final long COMPACTION_JOB_WAIT_INTERVAL_SECONDS = 10;
private static final Map<Dataset, Job> RUNNING_MR_JOBS = Maps.newConcurrentMap();
@@ -18,13 +18,16 @@
package org.apache.gobblin.compaction.mapreduce;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.math3.primes.Primes;
@@ -41,23 +44,28 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.rholder.retry.Retryer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;

import org.apache.gobblin.compaction.dataset.Dataset;
import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -68,8 +76,10 @@
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
import org.apache.gobblin.util.recordcount.LateFileRecordCountProvider;
import org.apache.gobblin.util.retry.RetryerFactory;


import static org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_PREFIX;
import static org.apache.gobblin.util.retry.RetryerFactory.*;

/**
* This class is responsible for configuring and running a single MR job.
@@ -116,6 +126,8 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom

public static final String HADOOP_JOB_NAME = "Gobblin MR Compaction";
private static final long MR_JOB_CHECK_COMPLETE_INTERVAL_MS = 5000;
private final boolean isRetryEnabled;
private final String tmpFsUri;

public enum Policy {

@@ -137,6 +149,7 @@ public enum Status {

protected final Dataset dataset;
protected final FileSystem fs;
protected final FileSystem tmpFs;
protected final FsPermission perm;
protected final boolean shouldDeduplicate;
protected final boolean outputDeduplicated;
@@ -151,11 +164,31 @@ public enum Status {
private final LateFileRecordCountProvider lateOutputRecordCountProvider;
private final DatasetHelper datasetHelper;
private final int copyLateDataThreadPoolSize;
private final String outputExtension;

private volatile Policy policy = Policy.DO_NOT_PUBLISH_DATA;
private volatile Status status = Status.RUNNING;
private final Cache<Path, List<Path>> applicablePathCache;

static final String COMPACTION_RETRY_PREFIX = COMPACTION_JOB_PREFIX + "retry.";
static final String COMPACTION_RETRY_ENABLED = COMPACTION_RETRY_PREFIX + "enabled";
static final String COMPACTION_TMP_FS = COMPACTION_PREFIX + "tmp.fs";

static final Config COMPACTION_RETRY_DEFAULTS;

static {
Map<String, Object> configMap =
ImmutableMap.<String, Object>builder()
.put(RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(2L)) //Overall retry for 2 minutes
.put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(5L)) //Try to retry 5 seconds
.put(RETRY_MULTIPLIER, 2L) // Muliply by 2 every attempt
.put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
.build();
COMPACTION_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
};

protected final Config retrierConfig;

protected MRCompactorJobRunner(Dataset dataset, FileSystem fs) {
this.dataset = dataset;
this.fs = fs;
@@ -185,6 +218,20 @@ protected MRCompactorJobRunner(Dataset dataset, FileSystem fs) {
this.copyLateDataThreadPoolSize = this.dataset.jobProps().getPropAsInt(COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE,
DEFAULT_COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE);

this.tmpFsUri = this.dataset.jobProps().getProp(COMPACTION_TMP_FS,
null);

try {
Log.info("Tmp fs uri:"+this.tmpFsUri);
if (this.tmpFsUri != null) {
this.tmpFs = FileSystem.get(new URI(this.tmpFsUri), new Configuration());
} else {
this.tmpFs = MRCompactorJobRunner.this.fs;
}
} catch (Exception e) {
throw new RuntimeException("Failed get Filesystem from tmp fs uri", e);
}

try {
this.inputRecordCountProvider = (RecordCountProvider) Class
.forName(this.dataset.jobProps().getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER,
@@ -196,13 +243,29 @@ protected MRCompactorJobRunner(Dataset dataset, FileSystem fs) {
.newInstance();
this.lateInputRecordCountProvider = new LateFileRecordCountProvider(this.inputRecordCountProvider);
this.lateOutputRecordCountProvider = new LateFileRecordCountProvider(this.outputRecordCountProvider);
this.isRetryEnabled= this.dataset.jobProps().getPropAsBoolean(COMPACTION_RETRY_ENABLED,
false);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate RecordCountProvider", e);
}

this.applicablePathCache = CacheBuilder.newBuilder().maximumSize(2000).build();
this.datasetHelper = new DatasetHelper(this.dataset, this.fs, this.getApplicableFileExtensions());

this.outputExtension = this.dataset.jobProps().getProp(MRCompactor.COMPACTION_FILE_EXTENSION, ".avro");

if (this.isRetryEnabled) {
this.retrierConfig = ConfigBuilder.create()
.loadProps(this.dataset.jobProps().getProperties(), COMPACTION_RETRY_PREFIX)
.build()
.withFallback(COMPACTION_RETRY_DEFAULTS);

LOG.info("Retry enabled for compaction publish :"+ retrierConfig.root().render(ConfigRenderOptions.concise()));
} else {
this.retrierConfig = WriterUtils.NO_RETRY_CONFIG;
LOG.info("Retry disabled for compaction");
}

}

@Override
@@ -325,6 +388,7 @@ private void copyDataFiles(final Path outputDirectory, List<Path> inputFilePaths
public Void call() throws Exception {
Path convertedFilePath = MRCompactorJobRunner.this.outputRecordCountProvider.convertPath(
LateFileRecordCountProvider.restoreFilePath(filePath),
MRCompactorJobRunner.this.outputExtension,
MRCompactorJobRunner.this.inputRecordCountProvider);
String targetFileName = convertedFilePath.getName();
Path outPath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(targetFileName,
@@ -364,7 +428,7 @@ private void addJars(Configuration conf) throws IOException {
}

protected void configureJob(Job job) throws IOException {
job.setJobName(HADOOP_JOB_NAME);
job.setJobName(HADOOP_JOB_NAME + " (" + this.dataset.getDatasetName() + ")");
configureInputAndOutputPaths(job);
configureMapper(job);
configureReducer(job);
@@ -514,29 +578,41 @@ private void markOutputDirAsCompleted(DateTime jobStartTime) throws IOException
}

private void moveTmpPathToOutputPath() throws IOException {
Retryer<Void> retryer = RetryerFactory.newInstance(this.retrierConfig);

LOG.info(String.format("Moving %s to %s", this.dataset.outputTmpPath(), this.dataset.outputPath()));

this.fs.delete(this.dataset.outputPath(), true);

WriterUtils.mkdirsWithRecursivePermission(this.fs, this.dataset.outputPath().getParent(), this.perm);
if (!this.fs.rename(this.dataset.outputTmpPath(), this.dataset.outputPath())) {
throw new IOException(
String.format("Unable to move %s to %s", this.dataset.outputTmpPath(), this.dataset.outputPath()));
if (this.isRetryEnabled) {
try {
retryer.call(() -> {
if (fs.exists(this.dataset.outputPath())) {
throw new IOException("Path " + this.dataset.outputPath() + " exists however it should not. Will wait more.");
}
return null;
});
} catch (Exception e) {
throw new IOException(e);
}
}

WriterUtils.mkdirsWithRecursivePermissionWithRetry(MRCompactorJobRunner.this.fs, this.dataset.outputPath().getParent(), this.perm, this.retrierConfig);

Log.info("Moving from fs: ("+MRCompactorJobRunner.this.tmpFs.getUri()+") path: "+ this.dataset.outputTmpPath() + " to "+ "fs: ("+ FileSystem.get(this.dataset.outputPath().getParent().toUri(), this.fs.getConf()).getUri()+") output path: " + this.dataset.outputPath());
HadoopUtils.movePath (MRCompactorJobRunner.this.tmpFs, this.dataset.outputTmpPath(), FileSystem.get(this.dataset.outputPath().getParent().toUri(), this.fs.getConf()), this.dataset.outputPath(), false, this.fs.getConf()) ;
}

private void addFilesInTmpPathToOutputPath () throws IOException {
List<Path> paths = this.getApplicableFilePaths(this.dataset.outputTmpPath());
List<Path> paths = this.getApplicableFilePaths(this.dataset.outputTmpPath(), this.tmpFs);
for (Path path: paths) {
String fileName = path.getName();
LOG.info(String.format("Adding %s to %s", path.toString(), this.dataset.outputPath()));
Path outPath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(fileName,
MRCompactorJobRunner.this.fs, this.dataset.outputPath());

if (!this.fs.rename(path, outPath)) {
throw new IOException(
String.format("Unable to move %s to %s", path.toString(), outPath.toString()));
}
HadoopUtils.movePath(MRCompactorJobRunner.this.tmpFs, path,
FileSystem.get(this.dataset.outputPath().getParent().toUri(), this.fs.getConf()), outPath, false, this.fs.getConf());
}
}

@@ -575,7 +651,7 @@ public int compareTo(MRCompactorJobRunner o) {
* Get the list of file {@link Path}s in the given dataDir, which satisfy the extension requirements
* of {@link #getApplicableFileExtensions()}.
*/
private List<Path> getApplicableFilePaths(final Path dataDir) throws IOException {
private List<Path> getApplicableFilePaths(final Path dataDir, final FileSystem fs) throws IOException {
try {
return applicablePathCache.get(dataDir, new Callable<List<Path>>() {

@@ -585,7 +661,7 @@ public List<Path> call() throws Exception {
return Lists.newArrayList();
}
List<Path> paths = Lists.newArrayList();
for (FileStatus fileStatus : FileListUtils.listFilesRecursively(MRCompactorJobRunner.this.fs, dataDir,
for (FileStatus fileStatus : FileListUtils.listFilesRecursively(fs, dataDir,
new PathFilter() {
@Override
public boolean accept(Path path) {
@@ -619,15 +695,15 @@ private void submitSlaEvent(Job job) {
.additionalMetadata(
CompactionSlaEventHelper.LATE_RECORD_COUNT,
Long.toString(this.lateOutputRecordCountProvider.getRecordCount(this.getApplicableFilePaths(this.dataset
.outputLatePath()))))
.outputLatePath(), this.fs))))
.additionalMetadata(
CompactionSlaEventHelper.REGULAR_RECORD_COUNT,
Long.toString(this.outputRecordCountProvider.getRecordCount(this.getApplicableFilePaths(this.dataset
.outputPath()))))
.outputPath(), this.fs))))
.additionalMetadata(CompactionSlaEventHelper.RECOMPATED_METADATA_NAME,
Boolean.toString(this.dataset.needToRecompact())).build().submit();
} catch (Throwable e) {
LOG.warn("Failed to submit compcation completed event:" + e, e);
LOG.warn("Failed to submit compaction completed event:" + e, e);
}
}

@@ -40,12 +40,12 @@
import com.typesafe.config.ConfigFactory;

import javax.annotation.Nonnull;

import lombok.Getter;
import lombok.Setter;

import org.apache.gobblin.ack.Ackable;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
@@ -57,8 +57,6 @@
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.writer.exception.NonTransientException;


/**
* A Data Writer to use as a base for writing async writers.
@@ -53,7 +53,6 @@ dependencies {
compile externalDependency.scala
compile externalDependency.lombok
compile externalDependency.typesafeConfig
compile externalDependency.guavaretrying
compile externalDependency.findBugsAnnotations
compile externalDependency.oltu
compile externalDependency.opencsv

0 comments on commit 344d6d3

Please sign in to comment.