Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ ext.externalDependency = [
"curatorTest": "org.apache.curator:curator-test:2.8.0",
"hamcrest": "org.hamcrest:hamcrest-all:1.3",
"joptSimple": "net.sf.jopt-simple:jopt-simple:4.9",
"protobuf": "com.google.protobuf:protobuf-java:2.6.1",
"protobuf": "com.google.protobuf:protobuf-java:2.5.0",
"pegasus" : [
"data" : "com.linkedin.pegasus:data:" + pegasusVersion,
"generator" : "com.linkedin.pegasus:generator:" + pegasusVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class ConfigurationKeys {
public static final String JOB_TRACKING_URL_KEY = "job.tracking.url";
public static final String FORK_STATE_KEY = "fork.state";
public static final String JOB_STATE_FILE_PATH_KEY = "job.state.file.path";
public static final String JOB_STATE_DISTRIBUTED_CACHE_NAME = "job.state.distributed.cache.name";

/**
* Dataset-related configuration properties;
Expand Down Expand Up @@ -486,7 +487,7 @@ public class ConfigurationKeys {
* Kafka job configurations.
*/
public static final String KAFKA_BROKERS = "kafka.brokers";
public static final String KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS = "kafka.source.work.units.creation.threads";
public static final String KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS = "kafka.source.work.units.creation.threads";
public static final int KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT = 30;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package gobblin.runtime.mapreduce;

import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -335,6 +336,9 @@ static void serializeJobState(FileSystem fs, Path mrJobDir, Configuration conf,
short jobStateRF = (short)conf.getInt("dfs.replication.max", 20);
SerializationUtils.serializeState(fs, jobStateFilePath, jobState, jobStateRF);
job.getConfiguration().set(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY, jobStateFilePath.toString());

DistributedCache.addCacheFile(jobStateFilePath.toUri(), job.getConfiguration());
job.getConfiguration().set(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME, jobStateFilePath.getName());
}

/**
Expand Down Expand Up @@ -509,14 +513,25 @@ public static class TaskRunner extends Mapper<LongWritable, Text, NullWritable,

@Override
protected void setup(Context context) {
try {
try(Closer closer = Closer.create()) {
this.fs = FileSystem.get(context.getConfiguration());
this.taskStateStore =
new FsStateStore<>(this.fs, FileOutputFormat.getOutputPath(context).toUri().getPath(),
TaskState.class);

Path jobStateFilePath = new Path(context.getConfiguration().get(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY));
SerializationUtils.deserializeState(this.fs, jobStateFilePath, this.jobState);
String jobStateFileName = context.getConfiguration().get(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME);
boolean foundStateFile = false;
for (Path dcPath : DistributedCache.getLocalCacheFiles(context.getConfiguration())) {
if (dcPath.getName().equals(jobStateFileName)) {
SerializationUtils.deserializeStateFromInputStream(closer.register(new FileInputStream(dcPath.toUri().getPath())),
this.jobState);
foundStateFile = true;
break;
}
}
if (!foundStateFile) {
throw new IOException("Job state file not found.");
}
} catch (IOException ioe) {
throw new RuntimeException("Failed to setup the mapper task", ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,22 @@ public static <T extends State> void serializeState(FileSystem fs, Path jobState
*/
public static <T extends State> void deserializeState(FileSystem fs, Path jobStateFilePath, T state)
throws IOException {
try (InputStream is = fs.open(jobStateFilePath)) {
deserializeStateFromInputStream(is, state);
}
}

try (InputStream is = fs.open(jobStateFilePath); DataInputStream dis = (new DataInputStream(is))) {
/**
* Deserialize/read a {@link State} instance from a file.
*
* @param is {@link InputStream} containing the state.
* @param state an empty {@link State} instance to deserialize into
* @param <T> the {@link State} object type
* @throws IOException if it fails to deserialize the {@link State} instance
*/
public static <T extends State> void deserializeStateFromInputStream(InputStream is, T state)
throws IOException {
try (DataInputStream dis = (new DataInputStream(is))) {
state.readFields(dis);
}
}
Expand Down