diff --git a/build.gradle b/build.gradle index 467758c50c1..439e1b726f3 100644 --- a/build.gradle +++ b/build.gradle @@ -208,7 +208,7 @@ ext.externalDependency = [ "curatorTest": "org.apache.curator:curator-test:2.8.0", "hamcrest": "org.hamcrest:hamcrest-all:1.3", "joptSimple": "net.sf.jopt-simple:jopt-simple:4.9", - "protobuf": "com.google.protobuf:protobuf-java:2.6.1", + "protobuf": "com.google.protobuf:protobuf-java:2.5.0", "pegasus" : [ "data" : "com.linkedin.pegasus:data:" + pegasusVersion, "generator" : "com.linkedin.pegasus:generator:" + pegasusVersion, diff --git a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java index 489bb2ffcf6..dfa59b7f7c5 100644 --- a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java @@ -126,6 +126,7 @@ public class ConfigurationKeys { public static final String JOB_TRACKING_URL_KEY = "job.tracking.url"; public static final String FORK_STATE_KEY = "fork.state"; public static final String JOB_STATE_FILE_PATH_KEY = "job.state.file.path"; + public static final String JOB_STATE_DISTRIBUTED_CACHE_NAME = "job.state.distributed.cache.name"; /** * Dataset-related configuration properties; @@ -486,7 +487,7 @@ public class ConfigurationKeys { * Kafka job configurations. */ public static final String KAFKA_BROKERS = "kafka.brokers"; - public static final String KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS = "kafka.source.work.units.creation.threads"; + public static final String KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS = "kafka.source.work.units.creation.threads"; public static final int KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT = 30; /** diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java index a648c97f244..1045167a8c3 100644 --- a/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -13,6 +13,7 @@ package gobblin.runtime.mapreduce; import java.io.BufferedWriter; +import java.io.FileInputStream; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; @@ -335,6 +336,9 @@ static void serializeJobState(FileSystem fs, Path mrJobDir, Configuration conf, short jobStateRF = (short)conf.getInt("dfs.replication.max", 20); SerializationUtils.serializeState(fs, jobStateFilePath, jobState, jobStateRF); job.getConfiguration().set(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY, jobStateFilePath.toString()); + + DistributedCache.addCacheFile(jobStateFilePath.toUri(), job.getConfiguration()); + job.getConfiguration().set(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME, jobStateFilePath.getName()); } /** @@ -509,14 +513,25 @@ public static class TaskRunner extends Mapper(this.fs, FileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class); - Path jobStateFilePath = new Path(context.getConfiguration().get(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY)); - SerializationUtils.deserializeState(this.fs, jobStateFilePath, this.jobState); + String jobStateFileName = context.getConfiguration().get(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME); + boolean foundStateFile = false; + for (Path dcPath : DistributedCache.getLocalCacheFiles(context.getConfiguration())) { + if (dcPath.getName().equals(jobStateFileName)) { + SerializationUtils.deserializeStateFromInputStream(closer.register(new FileInputStream(dcPath.toUri().getPath())), + this.jobState); + foundStateFile = true; + break; + } + } + if (!foundStateFile) { + throw new IOException("Job state file not found."); + } } catch (IOException ioe) { throw new RuntimeException("Failed to setup the mapper task", ioe); } diff --git a/gobblin-utility/src/main/java/gobblin/util/SerializationUtils.java b/gobblin-utility/src/main/java/gobblin/util/SerializationUtils.java index bf2c6a269f1..5bf3a6a046c 100644 --- a/gobblin-utility/src/main/java/gobblin/util/SerializationUtils.java +++ b/gobblin-utility/src/main/java/gobblin/util/SerializationUtils.java @@ -146,8 +146,22 @@ public static void serializeState(FileSystem fs, Path jobState */ public static void deserializeState(FileSystem fs, Path jobStateFilePath, T state) throws IOException { + try (InputStream is = fs.open(jobStateFilePath)) { + deserializeStateFromInputStream(is, state); + } + } - try (InputStream is = fs.open(jobStateFilePath); DataInputStream dis = (new DataInputStream(is))) { + /** + * Deserialize/read a {@link State} instance from a file. + * + * @param is {@link InputStream} containing the state. + * @param state an empty {@link State} instance to deserialize into + * @param the {@link State} object type + * @throws IOException if it fails to deserialize the {@link State} instance + */ + public static void deserializeStateFromInputStream(InputStream is, T state) + throws IOException { + try (DataInputStream dis = (new DataInputStream(is))) { state.readFields(dis); } }