Permalink
Browse files

Transferred content

  • Loading branch information...
1 parent 029a241 commit 3c0589c730b92b8b4ebe5e21cf520f45aa27cc45 @laserson laserson committed Jan 2, 2013
View
@@ -0,0 +1,62 @@
+*.py[co]
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+
+# Installer logs
+pip-log.txt
+
+# Unit test / coverage reports
+.coverage
+.tox
+
+#Translations
+*.mo
+
+#Mr Developer
+.mr.developer.cfg
+
+#Java
+*.class
+*.jar
+*.war
+*.ear
+
+*.pydevproject
+.project
+.metadata
+bin/**
+tmp/**
+tmp/**/*
+*.tmp
+*.bak
+*.swp
+*~.nib
+local.properties
+.classpath
+.settings/
+.loadpath
+
+# External tool builders
+.externalToolBuilders/
+
+# Locally stored "Eclipse launch configurations"
+*.launch
+
+# CDT-specific
+.cproject
+
+# PDT-specific
+.buildpath
+
+target/
View
@@ -0,0 +1,41 @@
+import os
+import re
+
+class NgramMapper(object):
+
+ def __init__(self):
+ # determine value of n in the current block of ngrams
+ input_file = os.environ['map_input_file']
+ self.expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0])
+
+ def __call__(self, key, value):
+ data = value.split('\t')
+
+ if len(data) < 3:
+ return
+
+ ngram = data[0].split()
+ year = data[1]
+ count = int(data[2])
+
+ if len(ngram) != self.expected_tokens:
+ return
+
+ pair = sorted([ngram[0], ngram[self.expected_tokens - 1]])
+ k = pair + [year]
+
+ yield (k, count)
+
+def combiner(key, values):
+ yield (key, sum(values))
+
+def reducer(key, values):
+ yield "%s\t%s\t%s" % tuple(key), str(sum(values))
+
+
+if __name__ == '__main__':
+ import dumbo
+ # import pdb
+ # pdb.set_trace()
+ # dumbo.run(NgramMapper, reducer, combiner=combiner)
+ dumbo.run(NgramMapper, reducer)
@@ -0,0 +1,11 @@
+from hadoopy import launch_frozen
+
+input_path = 'hdfs://laserson-1.ent.cloudera.com/ngrams'
+output_path = 'hdfs://laserson-1.ent.cloudera.com/output-hadoopy-frozen'
+
+launch_frozen(input_path,
+ output_path,
+ 'ngrams.py',
+ use_seqoutput=False,
+ num_reducers=10,
+ hstreaming='/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.2.jar')
View
@@ -0,0 +1,11 @@
+from hadoopy import launch
+
+input_path = 'hdfs://laserson-1.ent.cloudera.com/ngrams'
+output_path = 'hdfs://laserson-1.ent.cloudera.com/output-hadoopy'
+
+launch(input_path,
+ output_path,
+ 'ngrams.py',
+ use_seqoutput=False,
+ num_reducers=10,
+ hstreaming='/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.2.jar')
View
@@ -0,0 +1,38 @@
+import os
+import re
+
+import hadoopy
+
+class Mapper(object):
+
+ def __init__(self):
+ # determine value of n in the current block of ngrams
+ input_file = os.environ['map_input_file']
+ self.expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0])
+
+ def map(self, key, value):
+ data = value.split('\t')
+
+ if len(data) < 3:
+ return
+
+ ngram = data[0].split()
+ year = data[1]
+ count = int(data[2])
+
+ if len(ngram) != self.expected_tokens:
+ return
+
+ pair = sorted([ngram[0], ngram[self.expected_tokens - 1]])
+ k = pair + [year]
+
+ yield (k, count)
+
+def combiner(key, values):
+ yield (key, sum(values))
+
+def reducer(key, values):
+ yield "%s\t%s\t%s" % tuple(key), str(sum(values))
+
+if __name__ == '__main__':
+ hadoopy.run(Mapper, reducer, combiner)
View
@@ -0,0 +1,55 @@
+#! /usr/bin/env python
+
+import os
+import re
+
+from mrjob.job import MRJob
+from mrjob.protocol import RawProtocol, ReprProtocol
+
+class NgramNeighbors(MRJob):
+
+ # mrjob allows you to specify input/intermediate/output serialization
+ # default output protocol is JSON; here we set it to text
+ OUTPUT_PROTOCOL = RawProtocol
+
+ def mapper_init(self):
+ # determine value of n in the current block of ngrams by parsing filename
+ input_file = os.environ['map_input_file']
+ self.expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0])
+
+ def mapper(self, key, line):
+ data = line.split('\t')
+
+ # error checking
+ if len(data) < 3:
+ return
+
+ # unpack data
+ ngram = data[0].split()
+ year = data[1]
+ count = int(data[2])
+
+ # more error checking
+ if len(ngram) != self.expected_tokens:
+ return
+
+ # generate key
+ pair = sorted([ngram[0], ngram[self.expected_tokens - 1]])
+ k = pair + [year]
+
+ # note that the key is an object (a list in this case)
+ # that mrjob will serialize as JSON text
+ yield (k, count)
+
+ def combiner(self, key, counts):
+ # the combiner must be separate from the reducer because the input
+ # and output must both be JSON
+ yield (key, sum(counts))
+
+ def reducer(self, key, counts):
+ # the final output is encoded as text
+ yield "%s\t%s\t%s" % tuple(key), str(sum(counts))
+
+if __name__ == '__main__':
+ # sets up a runner, based on command line options
+ NgramNeighbors.run()
View
@@ -0,0 +1,39 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.cloudera</groupId>
+ <artifactId>NgramsComparison</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>Ngrams Comparison</name>
+
+ <properties>
+ <hadoop.version>2.0.0-mr1-cdh4.0.1</hadoop.version>
+ <!-- <hadoop.version>1.0.4</hadoop.version> -->
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>cloudera-releases</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ </dependencies>
+
+</project>
@@ -0,0 +1,36 @@
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+
+public class NgramsDriver extends Configured implements Tool {
+
+ public int run(String[] args) throws Exception {
+ Job job = new Job(getConf());
+ job.setJarByClass(getClass());
+
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.setMapperClass(NgramsMapper.class);
+ job.setCombinerClass(NgramsReducer.class);
+ job.setReducerClass(NgramsReducer.class);
+
+ job.setOutputKeyClass(TextTriple.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setNumReduceTasks(10);
+
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new NgramsDriver(), args);
+ System.exit(exitCode);
+ }
+}
@@ -0,0 +1,60 @@
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+
+
+public class NgramsMapper extends Mapper<LongWritable, Text, TextTriple, IntWritable> {
+
+ private Logger LOG = Logger.getLogger(getClass());
+
+ private int expectedTokens;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ String inputFile = ((FileSplit) context.getInputSplit()).getPath().getName();
+ LOG.info("inputFile: " + inputFile);
+ Pattern c = Pattern.compile("([\\d]+)gram");
+ Matcher m = c.matcher(inputFile);
+ m.find();
+ expectedTokens = Integer.parseInt(m.group(1));
+ return;
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String[] data = value.toString().split("\\t");
+
+ if (data.length < 3) {
+ return;
+ }
+
+ String[] ngram = data[0].split("\\s+");
+ String year = data[1];
+ IntWritable count = new IntWritable(Integer.parseInt(data[2]));
+
+ if (ngram.length != this.expectedTokens) {
+ return;
+ }
+
+ // build keyOut
+ List<String> triple = new ArrayList<String>(3);
+ triple.add(ngram[0]);
+ triple.add(ngram[expectedTokens - 1]);
+ Collections.sort(triple);
+ triple.add(year);
+ TextTriple keyOut = new TextTriple(triple);
+
+ context.write(keyOut, count);
+ }
+}
@@ -0,0 +1,18 @@
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+
+public class NgramsReducer extends Reducer<TextTriple, IntWritable, TextTriple, IntWritable> {
+
+ @Override
+ protected void reduce(TextTriple key, Iterable<IntWritable> values, Context context)
+ throws IOException, InterruptedException {
+ int sum = 0;
+ for (IntWritable value : values) {
+ sum += value.get();
+ }
+ context.write(key, new IntWritable(sum));
+ }
+}
Oops, something went wrong.

0 comments on commit 3c0589c

Please sign in to comment.