Permalink
Browse files

added inmapper-combining option unit tests, updated driver file, adde…

…d sample weather dataset
  • Loading branch information...
1 parent 69bf86c commit c7457a71ad892599b465e3307fba50b2548f7986 @bbejeck committed Oct 11, 2012
View
Binary file not shown.
View
@@ -0,0 +1,58 @@
+package bbejeck.mapred.aggregation;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * User: Bill Bejeck
+ * Date: 10/8/12
+ * Time: 9:55 PM
+ */
+public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {
+
+ private static final int MISSING = 9999;
+ private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>();
+
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String line = value.toString();
+ String yearMonth = line.substring(15, 21);
+
+ int tempStartPosition = 87;
+
+ if (line.charAt(tempStartPosition) == '+') {
+ tempStartPosition += 1;
+ }
+
+ int temp = Integer.parseInt(line.substring(tempStartPosition, 92));
+
+ if (temp != MISSING) {
+ TemperatureAveragingPair pair = pairMap.get(yearMonth);
+ if(pair == null){
+ pair = new TemperatureAveragingPair();
+ pairMap.put(yearMonth,pair);
+ }
+ int temps = pair.getTemp().get() + temp;
+ int count = pair.getCount().get() + 1;
+ pair.set(temps,count);
+ }
+ }
+
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ Set<String> keys = pairMap.keySet();
+ Text keyText = new Text();
+ for (String key : keys) {
+ keyText.set(key);
+ context.write(keyText,pairMap.get(key));
+ }
+ }
+}
@@ -18,28 +18,35 @@ public static void main(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(BasicMapRedDriver.class);
if (args[0].equalsIgnoreCase("per")) {
- doMapReduce(job, args[1], PerTokenMapper.class, "per-token", "PerTokenCount");
+ doMapReduce(job, args[1], PerTokenMapper.class, "per-token", "PerTokenCount", TokenCountReducer.class, IntWritable.class);
}else if(args[0].equalsIgnoreCase("doc")){
- doMapReduce(job,args[1],PerDocumentMapper.class,"per-document","PerDocCount");
+ doMapReduce(job,args[1],PerDocumentMapper.class,"per-document","PerDocCount", TokenCountReducer.class, IntWritable.class);
}else if(args[0].equalsIgnoreCase("all")){
- doMapReduce(job,args[1],AllDocumentMapper.class,"all-document","AllDocCount");
+ doMapReduce(job,args[1],AllDocumentMapper.class,"all-document","AllDocCount", TokenCountReducer.class, IntWritable.class);
}else if(args[0].equalsIgnoreCase("comb")){
job.setCombinerClass(TokenCountReducer.class);
- doMapReduce(job,args[1],PerTokenMapper.class,"combiner","CombinerCount");
+ doMapReduce(job,args[1],PerTokenMapper.class,"combiner","CombinerCount", TokenCountReducer.class, IntWritable.class);
+ }else if(args[0].equalsIgnoreCase("ave")){
+ doMapReduce(job,args[1],AverageTemperatureMapper.class,"simple-average","SimpleAverage", AverageTemperatureReducer.class, TemperatureAveragingPair.class);
+ }else if(args[0].equalsIgnoreCase("avcomb")){
+ job.setCombinerClass(AverageTemperatureCombiner.class);
+ doMapReduce(job, args[1], AverageTemperatureMapper.class, "combiner-average", "CombinerAverage", AverageTemperatureReducer.class, TemperatureAveragingPair.class);
+ }else if(args[0].equalsIgnoreCase("avinmap")){
+ doMapReduce(job,args[1],AverageTemperatureCombiningMapper.class,"inmapper-average","InMapperAverage", AverageTemperatureReducer.class, TemperatureAveragingPair.class);
}
}
- private static void doMapReduce(Job job, String path, Class mapperClass, String outPath, String jobName) throws Exception {
+ private static void doMapReduce(Job job, String path, Class mapperClass, String outPath, String jobName, Class reducerClass, Class outputClass) throws Exception {
job.setJobName(jobName);
FileInputFormat.addInputPath(job, new Path(path));
FileOutputFormat.setOutputPath(job, new Path(outPath));
job.setMapperClass(mapperClass);
- job.setReducerClass(TokenCountReducer.class);
+ job.setReducerClass(reducerClass);
job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
+ job.setOutputValueClass(outputClass);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
@@ -0,0 +1,51 @@
+package bbejeck.mapred.aggregation;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.junit.Test;
+
+/**
+ * User: Bill Bejeck
+ * Date: 10/10/12
+ * Time: 9:52 PM
+ */
+public class AverageTemperatureCombiningMapperTest {
+
+ String[] temps = new String[]{"0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999",
+ "0029029070999991901010113004+64333+023450FM-12+000599999V0202901N008219999999N0000001N9+00721+99999102001ADDGF104991999999999999999999",
+ "0029029070999991901010120004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9+99991+99999102001ADDGF108991999999999999999999",
+ "0029029070999991901010206004+64333+023450FM-12+000599999V0201801N008219999999N0000001N9-00611+99999101831ADDGF108991999999999999999999",
+ "0029029070999991901010213004+64333+023450FM-12+000599999V0201801N009819999999N0000001N9-00561+99999101761ADDGF108991999999999999999999",
+ "0029029070999991901010220004+64333+023450FM-12+000599999V0201801N009819999999N0000001N9-00281+99999101751ADDGF108991999999999999999999",
+ "0029029070999991901010306004+64333+023450FM-12+000599999V0202001N009819999999N0000001N9-00671+99999101701ADDGF106991999999999999999999",
+ "0029029070999991901010313004+64333+023450FM-12+000599999V0202301N011819999999N0000001N9-00331+99999101741ADDGF108991999999999999999999",
+ "0029029070999991901010320004+64333+023450FM-12+000599999V0202301N011819999999N0000001N9-00281+99999101741ADDGF108991999999999999999999",
+ "0029029070999991901010406004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00331+99999102311ADDGF108991999999999999999999"};
+
+
+ @Test
+ public void testCombiningMapper() throws Exception {
+ new MapDriver<LongWritable,Text,Text,TemperatureAveragingPair>()
+ .withMapper(new AverageTemperatureCombiningMapper())
+ .withInput(new LongWritable(4),new Text(temps[3]))
+ .withOutput(new Text("190101"),new TemperatureAveragingPair(-61,1)).runTest();
+ }
+
+ @Test
+ public void testMapLeadingPlus() {
+ new MapDriver<LongWritable, Text, Text, TemperatureAveragingPair>()
+ .withMapper(new AverageTemperatureCombiningMapper())
+ .withInput(new LongWritable(1), new Text(temps[1]))
+ .withOutput(new Text("190101"), new TemperatureAveragingPair(72, 1))
+ .runTest();
+ }
+
+ @Test
+ public void testMapNoValue() {
+ new MapDriver<LongWritable, Text, Text, TemperatureAveragingPair>()
+ .withMapper(new AverageTemperatureCombiningMapper())
+ .withInput(new LongWritable(1), new Text(temps[2]))
+ .runTest();
+ }
+}

0 comments on commit c7457a7

Please sign in to comment.