Skip to content

Commit

Permalink
Split Generator into two MR jobs, so that MR tasks are idempotent, an…
Browse files Browse the repository at this point in the history
…d task failures does not cause Verify.verify() to fail
  • Loading branch information
enis committed Jun 19, 2012
1 parent 9023922 commit c320c50
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 104 deletions.
213 changes: 129 additions & 84 deletions src/main/java/goraci/Generator.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.util.GoraException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
Expand All @@ -44,106 +44,116 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* A Map only job that generates random linked list and stores them using Gora.
*/
public class Generator extends Configured implements Tool {

private static final Log LOG = LogFactory.getLog(Generator.class);

private static final int WIDTH = 1000000;
private static final int WRAP = WIDTH * 25;

static class GeneratorInputFormat extends InputFormat<LongWritable,NullWritable> {

static class GeneratorInputSplit extends InputSplit implements Writable {

@Override
public long getLength() throws IOException, InterruptedException {
return 1;
}

@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[0];
}

@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub

}

@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub

}
}
}

static class GeneratorRecordReader extends RecordReader<LongWritable,NullWritable> {

private long count;
private long numNodes;
private boolean hasNext = true;
private Random rand;

@Override
public void close() throws IOException {

}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return new LongWritable(numNodes);
return new LongWritable(Math.abs(rand.nextLong()));
}

@Override
public NullWritable getCurrentValue() throws IOException, InterruptedException {
return NullWritable.get();
}

@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
return count / (float)numNodes;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext context) throws IOException, InterruptedException {
numNodes = context.getConfiguration().getLong("goraci.generator.nodes", 1000000);
rand = new Random();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
boolean hasnext = this.hasNext;
this.hasNext = false;
return hasnext;
return count++ < numNodes;
}

}

@Override
public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
GeneratorRecordReader rr = new GeneratorRecordReader();
rr.initialize(split, context);
return rr;
}

@Override
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
int numMappers = job.getConfiguration().getInt("goraci.generator.mappers", 1);

ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);

for (int i = 0; i < numMappers; i++) {
splits.add(new GeneratorInputSplit());
}

return splits;
}


}

/** Ensure output files from prev-job go to map inputs for current job */
static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}

/**
Expand All @@ -170,63 +180,63 @@ public List<InputSplit> getSplits(JobContext job) throws IOException, Interrupte
* | |--------||
* |___________________________|
*/

static class GeneratorMapper extends Mapper<LongWritable,NullWritable,NullWritable,NullWritable> {


Random rand = new Random();

long[] first = null;
long[] prev = null;
long[] current = new long[WIDTH];
DataStore<Long,CINode> store;
Utf8 id;
long count = 0;
int i;

protected void setup(Context context) throws IOException, InterruptedException {
id = new Utf8(UUID.randomUUID().toString());
store = DataStoreFactory.getDataStore(Long.class, CINode.class, new Configuration());
};

protected void cleanup(Context context) throws IOException ,InterruptedException {
store.close();
};

@Override
protected void map(LongWritable key, NullWritable value, Context output) throws IOException {
long num = key.get();
System.out.println("num" + num);

Utf8 id = new Utf8(UUID.randomUUID().toString());

DataStore<Long,CINode> store = DataStoreFactory.getDataStore(Long.class, CINode.class, new Configuration());

Random rand = new Random();

long[] first = null;
long[] prev = null;
long[] current = new long[WIDTH];

long count = 0;
while (count < num) {
for (int i = 0; i < current.length; i++)
current[i] = Math.abs(rand.nextLong());

current[i++] = Math.abs(key.get());

if (i == current.length) {
persist(output, store, count, prev, current, id);

i = 0;

if (first == null)
first = current;
prev = current;
current = new long[WIDTH];

count += current.length;
output.setStatus("Count " + count);

if (count % WRAP == 0) {
// this block of code turns the 1 million linked list of length 25 into one giant circular linked list of 25 million

circularLeftShift(first);

updatePrev(store, first, prev);

first = null;
prev = null;
}

}

store.close();

}

private static void circularLeftShift(long[] first) {
long ez = first[0];
for (int i = 0; i < first.length - 1; i++)
first[i] = first[i + 1];
first[first.length - 1] = ez;
}

private static void persist(Context output, DataStore<Long,CINode> store, long count, long[] prev, long[] current, Utf8 id) throws IOException {
for (int i = 0; i < current.length; i++) {
CINode node = store.newPersistent();
Expand All @@ -236,75 +246,110 @@ private static void persist(Context output, DataStore<Long,CINode> store, long c
else
node.setPrev(-1);
node.setClient(id);

store.put(current[i], node);
if (i % 1000 == 0) {
// Tickle progress every so often else maprunner will think us hung
output.progress();
}
}

store.flush();
}

private static void updatePrev(DataStore<Long,CINode> store, long[] first, long[] current) throws IOException {
for (int i = 0; i < current.length; i++) {
CINode node = store.newPersistent();
node.setPrev(current[i]);
store.put(first[i], node);
}

store.flush();
}
}


@Override
public int run(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("Usage : " + Generator.class.getSimpleName() + " <num mappers> <num nodes per map>");
if (args.length < 3) {
System.out.println("Usage : " + Generator.class.getSimpleName() + " <num mappers> <num nodes per map> <tmp output dir>");
return 0;
}

int numMappers = Integer.parseInt(args[0]);
long numNodes = Long.parseLong(args[1]);
return run(numMappers, numNodes);
Path tmpOutput = new Path(args[2]);
return run(numMappers, numNodes, tmpOutput);
}

protected void createSchema() throws IOException {
DataStore<Long,CINode> store = DataStoreFactory.getDataStore(Long.class, CINode.class, new Configuration());
store.createSchema();
}

public int run(int numMappers, long numNodes) throws Exception {
public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput) throws Exception {
LOG.info("Running RandomInputGenerator with numMappers=" + numMappers +", numNodes=" + numNodes);
Job job = new Job(getConf());

job.setJobName("Random Input Generator");
job.setNumReduceTasks(0);
job.setJarByClass(getClass());

job.setInputFormatClass(GeneratorInputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);

job.getConfiguration().setInt("goraci.generator.mappers", numMappers);
job.getConfiguration().setLong("goraci.generator.nodes", numNodes);

job.setMapperClass(Mapper.class); //identity mapper

FileOutputFormat.setOutputPath(job, tmpOutput);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

boolean success = job.waitForCompletion(true);

return success ? 0 : 1;
}

public int runGenerator(int numMappers, long numNodes, Path tmpOutput) throws Exception {
LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);

createSchema();

Job job = new Job(getConf());

job.setJobName("Link Generator");
job.setNumReduceTasks(0);
job.setJarByClass(getClass());

job.setInputFormatClass(GeneratorInputFormat.class);

FileInputFormat.setInputPaths(job, tmpOutput);
job.setInputFormatClass(OneFilePerMapperSFIF.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);

job.getConfiguration().setInt("goraci.generator.mappers", numMappers);
job.getConfiguration().setLong("goraci.generator.nodes", numNodes);

job.setMapperClass(GeneratorMapper.class);

job.setOutputFormatClass(NullOutputFormat.class);

job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);

boolean success = job.waitForCompletion(true);

return success ? 0 : 1;
}


public int run(int numMappers, long numNodes, Path tmpOutput) throws Exception {
int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput);
if (ret > 0) {
return ret;
}

return runGenerator(numMappers, numNodes, tmpOutput);
}

public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new Generator(), args);
System.exit(ret);
Expand Down

5 comments on commit c320c50

@keith-turner
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just looking at this change and the comment. The original Generator map reduce job should be able to fail at any point without causing undefined nodes because only flushed nodes are referenced. Undefined nodes indicate data loss. Its possible that in some cases this two mapper approach could cover up data loss by rewriting data that was lost.

An alternative change would be to modify the loop verify step so that it does not fail if unreferenced > 0. Unreferenced nodes are ok, these are nodes that nothing point to. Unreferenced nodes do not indicate data loss.

@enis
Copy link
Owner Author

@enis enis commented on c320c50 Jun 21, 2012

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that the repeated execution for the same map input might cause to rewrite data loss, but task failure is supposed to be relatively uncommon, although it happens and causes our nightlies to fail.

The original Generate / Verify is fine, but for Loop, we check in Verify.verify() the total number of referenced and unreferenced nodes. We can relax that, and as you suggested, just check for undefined nodes, but somehow I want to keep those tighter checks. Wdyt?

@keith-turner
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a machine went down and a mapper, tabletserver/regionserver, datanode all died and data loss occurred, wouldn't you like to know about that? Its possible that the event that causes a mapper to die may also cause data loss.

If you could obtain the # of failed map task then you could bound # unreferenced. I think the following should be true.

#unreferenced <= num_failed_map_task * 1000000

I can understand the desire to get nice clean counts. Any of the counts being off indicate problems. For example if the referenced counts were too high, that would indicate a problem. Where the heck did the extra data come from? For me the main focus of this test has always been to detect data loss, so I am very happy when undefined==0. Although the idea of detecting extra unexpected data is interesting.

@enis
Copy link
Owner Author

@enis enis commented on c320c50 Jun 22, 2012

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a machine went down and a mapper, tabletserver/regionserver, datanode all died and data loss occurred, wouldn't you like to know about that? Its possible that the event that causes a mapper to die may also cause data loss.

We should not care if the DN/TT dies. In our nightly test environment running on 5-10 nodes, we see failed tasks for various reasons from time to time. But these should not affect the ingestion test. For RS failures, we should not care as well. They should not cause data loss. In fact, we deliberately kill RSs during the test.

There is another solution, which is to add a column to each row, holding the map task id. And revert to the original implementation. In verify step, we should only count rows from successfully finished map tasks from the previous jobs. I believe at the start of the verify job, we can obtain the failed / successfully finished task id's and either do the filtering, or just delete the rows from failed tasks before verify.

@keith-turner
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would still want to examine the data generated by failed map task for lost data because there should not be any lost data.

Please sign in to comment.