Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
AVRO-1120. Let AvroMultipleOutput jobs use multiple schemas with map-…
…only jobs. Contributed by Ashish Nagavaram.

git-svn-id: https://svn.apache.org/repos/asf/avro/trunk@1356503 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
cutting committed Jul 2, 2012
1 parent f60833f commit 061fc31
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 11 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -21,6 +21,9 @@ Avro 1.7.1 (unreleased)

IMPROVEMENTS

AVRO-1120. Let AvroMultipleOutput jobs use multiple schemas with
map-only jobs. (Ashish Nagavaram via cutting)

BUG FIXES

AVRO-1114. Java: Update license headers for new mapreduce code. (cutting)
Expand Down
Expand Up @@ -70,7 +70,7 @@
* Usage pattern for job submission:
* <pre>
*
* Job job = new Job();
* JobConf job = new JobConf();
*
* FileInputFormat.setInputPath(job, inDir);
* FileOutputFormat.setOutputPath(job, outDir);
Expand Down Expand Up @@ -526,15 +526,19 @@ private static class InternalFileOutputFormat extends FileOutputFormat<Object, O
public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";

@SuppressWarnings({"unchecked"})
public RecordWriter<Object, Object> getRecordWriter(FileSystem fs,JobConf job, String baseFileName, Progressable arg3) throws IOException
{
public RecordWriter<Object, Object> getRecordWriter(FileSystem fs,JobConf job, String baseFileName, Progressable arg3) throws IOException {
String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
String fileName = getUniqueName(job, baseFileName);
Schema schema = schemaList.get(nameOutput+"_SCHEMA");
JobConf outputConf = new JobConf(job);
outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
if(schema!=null)
AvroJob.setOutputSchema(outputConf,schema);
boolean isMapOnly = job.getNumReduceTasks() == 0;
if (schema != null) {
if (isMapOnly)
AvroJob.setMapOutputSchema(outputConf, schema);
else
AvroJob.setOutputSchema(outputConf, schema);
}
OutputFormat outputFormat = outputConf.getOutputFormat();
return outputFormat.getRecordWriter(fs, outputConf, fileName, arg3);
}
Expand Down
Expand Up @@ -45,19 +45,31 @@

public class TestAvroMultipleOutputs {

private static final String UTF8 = "UTF-8";
private static final String UTF8 = "UTF-8";

public static class MapImpl extends AvroMapper<Utf8, Pair<Utf8, Long>> {


private AvroMultipleOutputs amos;

public void configure(JobConf Job) {
this.amos = new AvroMultipleOutputs(Job);
}

@Override
public void map(Utf8 text, AvroCollector<Pair<Utf8,Long>> collector,
Reporter reporter) throws IOException {
StringTokenizer tokens = new StringTokenizer(text.toString());
while (tokens.hasMoreTokens())
collector.collect(new Pair<Utf8,Long>(new Utf8(tokens.nextToken()),1L));
while (tokens.hasMoreTokens()) {
String tok = tokens.nextToken();
collector.collect(new Pair<Utf8,Long>(new Utf8(tok),1L));
amos.getCollector("myavro2",reporter)
.collect(new Pair<Utf8,Long>(new Utf8(tok),1L).toString());
}

}
public void close() throws IOException {
amos.close();
}

}

public static class ReduceImpl
Expand Down Expand Up @@ -91,6 +103,8 @@ public void close() throws IOException
testJob();
testProjection();
testProjection1();
testJob_noreducer();
testProjection_noreducer();
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -118,7 +132,7 @@ public void testJob() throws Exception {
FileOutputFormat.setCompressOutput(job, false);
AvroMultipleOutputs.addNamedOutput(job,"myavro",AvroOutputFormat.class, new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());
AvroMultipleOutputs.addNamedOutput(job,"myavro1",AvroOutputFormat.class, Schema.create(Schema.Type.STRING));

AvroMultipleOutputs.addNamedOutput(job,"myavro2",AvroOutputFormat.class, Schema.create(Schema.Type.STRING));
WordCountUtil.setMeta(job);


Expand Down Expand Up @@ -201,4 +215,49 @@ public void testProjection1() throws Exception {
}
Assert.assertEquals(sumOfCounts, actualSumOfCounts);
}

@SuppressWarnings("deprecation")
public void testJob_noreducer() throws Exception {
JobConf job = new JobConf();
job.setNumReduceTasks(0);
// private static final String UTF8 = "UTF-8";
String dir = System.getProperty("test.dir", ".") + "/mapred";
Path outputPath = new Path(dir + "/out");

outputPath.getFileSystem(job).delete(outputPath);
WordCountUtil.writeLinesFile();

job.setJobName("AvroMultipleOutputs_noreducer");

AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setOutputSchema(job,
new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());

AvroJob.setMapperClass(job, MapImpl.class);

FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
FileOutputFormat.setOutputPath(job, outputPath);
FileOutputFormat.setCompressOutput(job, false);
AvroMultipleOutputs.addNamedOutput(job,"myavro2",AvroOutputFormat.class, Schema.create(Schema.Type.STRING));
JobClient.runJob(job);
}

public void testProjection_noreducer() throws Exception {
JobConf job = new JobConf();
long onel = 1;
Schema readerSchema = Schema.create(Schema.Type.STRING);
AvroJob.setInputSchema(job, readerSchema);
String dir= System.getProperty("test.dir", ".") + "/mapred";
Path inputPath = new Path(dir + "/out" + "/myavro2-m-00000.avro");
FileStatus fileStatus = FileSystem.get(job).getFileStatus(inputPath);
FileSplit fileSplit = new FileSplit(inputPath, 0, fileStatus.getLen(), job);
AvroRecordReader<Utf8> recordReader_new = new AvroRecordReader<Utf8>(job, fileSplit);
AvroWrapper<Utf8> inputPair_new = new AvroWrapper<Utf8>(null);
NullWritable ignore = NullWritable.get();
long testl=0;
while(recordReader_new.next(inputPair_new, ignore)) {
testl=Long.parseLong(inputPair_new.datum().toString().split(":")[2].replace("}","").trim());
Assert.assertEquals(onel,testl);
}
}
}

0 comments on commit 061fc31

Please sign in to comment.