Permalink
Browse files

Add word count test of AvroTrevniOutputFormat.

  • Loading branch information...
1 parent e2dfe29 commit 3df6fce5529d07424851d97805181453166e6fa6 @cutting committed Apr 17, 2012
@@ -19,6 +19,7 @@
package org.apache.trevni.avro;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Map;
import org.apache.hadoop.io.NullWritable;
@@ -83,8 +84,12 @@ public static void setMeta(JobConf job, String key, String value) {
new AvroColumnWriter<T>(schema, meta, ReflectData.get());
private void flush() throws IOException {
- if (writer != null)
- writer.writeTo(fs.create(new Path(dir, "part-"+(part++)+EXT)));
+ OutputStream out = fs.create(new Path(dir, "part-"+(part++)+EXT));
+ try {
+ writer.writeTo(out);
+ } finally {
+ out.close();
+ }
writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
}
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+import java.io.File;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.Pair;
+import org.apache.avro.mapred.AvroMapper;
+import org.apache.avro.mapred.AvroReducer;
+import org.apache.avro.mapred.AvroCollector;
+
+import org.apache.avro.Schema;
+
+import org.junit.Test;
+
+public class TestWordCount {
+
+ public static class MapImpl extends AvroMapper<String, Pair<String, Long> > {
+ @Override
+ public void map(String text, AvroCollector<Pair<String,Long>> collector,
+ Reporter reporter) throws IOException {
+ StringTokenizer tokens = new StringTokenizer(text.toString());
+ while (tokens.hasMoreTokens())
+ collector.collect(new Pair<String,Long>(tokens.nextToken(),1L));
+ }
+ }
+
+ public static class ReduceImpl
+ extends AvroReducer<String, Long, Pair<String, Long> > {
+ @Override
+ public void reduce(String word, Iterable<Long> counts,
+ AvroCollector<Pair<String,Long>> collector,
+ Reporter reporter) throws IOException {
+ long sum = 0;
+ for (long count : counts)
+ sum += count;
+ collector.collect(new Pair<String,Long>(word, sum));
+ }
+ }
+
+ @Test public void runTestsInOrder() throws Exception {
+ testJob();
+ }
+
+ private static final Schema STRING = Schema.create(Schema.Type.STRING);
+ static { GenericData.setStringType(STRING, GenericData.StringType.String); }
+ private static final Schema LONG = Schema.create(Schema.Type.LONG);
+
+ @SuppressWarnings("deprecation")
+ public void testJob() throws Exception {
+ JobConf job = new JobConf();
+ File dir = WordCountUtil.DIR;
+
+ WordCountUtil.writeLinesFile();
+
+ AvroJob.setInputSchema(job, STRING);
+ AvroJob.setOutputSchema(job, Pair.getPairSchema(STRING,LONG));
+
+ AvroJob.setMapperClass(job, MapImpl.class);
+ AvroJob.setCombinerClass(job, ReduceImpl.class);
+ AvroJob.setReducerClass(job, ReduceImpl.class);
+
+ FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+ FileOutputFormat.setOutputPath(job, new Path(dir + "/out"));
+ FileOutputFormat.setCompressOutput(job, true);
+
+ job.setOutputFormat(AvroTrevniOutputFormat.class);
+
+ JobClient.runJob(job);
+
+ WordCountUtil.validateCountsFile();
+ }
+
+}
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.File;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.BufferedInputStream;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.mapred.Pair;
+
+public class WordCountUtil {
+
+ static final File DIR = new File("target", "wc");
+ public static final File LINES_FILE
+ = new File(new File(DIR, "in"), "lines.avro");
+ static final File COUNTS_FILE
+ = new File(new File(DIR, "out"), "part-00000/part-0.trv");
+
+ public static final String[] LINES = new String[] {
+ "the quick brown fox jumps over the lazy dog",
+ "the cow jumps over the moon",
+ "the rain in spain falls mainly on the plains"
+ };
+
+ public static final Map<String,Long> COUNTS =
+ new TreeMap<String,Long>();
+ static {
+ for (String line : LINES) {
+ StringTokenizer tokens = new StringTokenizer(line);
+ while (tokens.hasMoreTokens()) {
+ String word = tokens.nextToken();
+ long count = COUNTS.containsKey(word) ? COUNTS.get(word) : 0L;
+ count++;
+ COUNTS.put(word, count);
+ }
+ }
+ }
+
+ public static void writeLinesFile() throws IOException {
+ FileUtil.fullyDelete(DIR);
+ DatumWriter<String> writer = new GenericDatumWriter<String>();
+ DataFileWriter<String> out = new DataFileWriter<String>(writer);
+ LINES_FILE.getParentFile().mkdirs();
+ out.create(Schema.create(Schema.Type.STRING), LINES_FILE);
+ for (String line : LINES)
+ out.append(line);
+ out.close();
+ }
+
+ public static void validateCountsFile() throws Exception {
+ AvroColumnReader<Pair<String,Long>> reader =
+ new AvroColumnReader<Pair<String,Long>>
+ (new AvroColumnReader.Params(COUNTS_FILE).setModel(SpecificData.get()));
+ int numWords = 0;
+ for (Pair<String,Long> wc : reader) {
+ assertEquals(wc.key(), COUNTS.get(wc.key()), wc.value());
+ numWords++;
+ }
+ reader.close();
+ assertEquals(COUNTS.size(), numWords);
+ }
+
+}

0 comments on commit 3df6fce

Please sign in to comment.