Skip to content

Commit

Permalink
Changed to use sequence files instead of NLineInputFormat
Browse files Browse the repository at this point in the history
  • Loading branch information
eljefe6a committed Dec 29, 2012
1 parent 9f96b3a commit 5f46403
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 69 deletions.
6 changes: 5 additions & 1 deletion .classpath
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@
<classpathentry kind="lib" path="/usr/lib/hadoop/client-0.20/servlet-api-2.5.jar"/>
<classpathentry kind="lib" path="/usr/lib/hadoop/client-0.20/slf4j-api-1.6.1.jar"/>
<classpathentry kind="lib" path="/usr/lib/hadoop/client-0.20/slf4j-log4j12-1.6.1.jar"/>
<classpathentry kind="lib" path="/usr/lib/hadoop/client-0.20/snappy-java-1.0.4.1.jar"/>
<classpathentry kind="lib" path="/usr/lib/hadoop/client-0.20/snappy-java-1.0.4.1.jar">
<attributes>
<attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="/usr/lib/hadoop/lib/native"/>
</attributes>
</classpathentry>
<classpathentry kind="lib" path="/usr/lib/hadoop/client-0.20/xmlenc-0.52.jar"/>
<classpathentry kind="lib" path="/usr/lib/hadoop/client-0.20/zookeeper-3.4.3-cdh4.1.1.jar"/>
<classpathentry kind="lib" path="/usr/lib/hadoop/hadoop-annotations.jar"/>
Expand Down
49 changes: 34 additions & 15 deletions src/BoggleDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -57,10 +62,6 @@ public int run(String[] args) throws Exception {
Logger.getRootLogger().setLevel(Level.ERROR);

Configuration configuration = getConf();
// To change how the mappers are created to process the roll,
// pass in -D mapreduce.input.lineinputformat.linespermap=0
// or in code uncomment:
configuration.set("mapreduce.input.lineinputformat.linespermap", "10240");

FileSystem fileSystem = FileSystem.get(configuration);

Expand All @@ -79,7 +80,7 @@ public int run(String[] args) throws Exception {
configuration.set(BLOOM_PARAM, bloomPath);
configuration.set(DICTIONARY_PARAM, dictionary);

BoggleRoll roll = BoggleRoll.createRoll(configuration.getInt(ROLL_VERSION, BoggleRoll.NEW_VERSION));
BoggleRoll roll = BoggleRoll.createRoll(configuration.getInt(ROLL_VERSION, BoggleRoll.BIG_BOGGLE_VERSION));
configuration.set(ROLL_PARAM, roll.serialize());

int iteration = traverseGraph(input, configuration, fileSystem, roll);
Expand Down Expand Up @@ -109,7 +110,7 @@ private int traverseGraph(String input, Configuration configuration, FileSystem
throws IOException, InterruptedException, ClassNotFoundException {
int iteration = 0;

writeRollFile(input, fileSystem, roll, iteration);
writeRollFile(input, fileSystem, configuration, roll, iteration);

long previousWordCount = 0;
long bloomSavings = 0;
Expand All @@ -123,8 +124,12 @@ private int traverseGraph(String input, Configuration configuration, FileSystem
FileInputFormat.setInputPaths(job, getPath(input, iteration));
FileOutputFormat.setOutputPath(job, getPath(input, iteration + 1));

// Roll is broken in to x mappers per node
job.setInputFormatClass(NLineInputFormat.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);

job.setNumReduceTasks(0);

Expand Down Expand Up @@ -187,6 +192,8 @@ private boolean findWords(String input, String output, Configuration configurati
FileInputFormat.setInputPaths(job, getPath(input, iteration));
FileOutputFormat.setOutputPath(job, new Path(output));

job.setInputFormatClass(SequenceFileInputFormat.class);

job.setNumReduceTasks(1);

job.setMapperClass(BoggleWordMapper.class);
Expand Down Expand Up @@ -217,8 +224,10 @@ private boolean findWords(String input, String output, Configuration configurati
* The iteration for the input
* @throws IOException
*/
private void writeRollFile(String input, FileSystem fileSystem, BoggleRoll roll, int iteration) throws IOException {
FSDataOutputStream outputStream = fileSystem.create(getPath(input, 0));
private void writeRollFile(String input, FileSystem fileSystem, Configuration configuration, BoggleRoll roll,
int iteration) throws IOException {
Path parent = getPath(input, iteration);
fileSystem.mkdirs(parent);

for (int i = 0; i < roll.rollCharacters.length; i++) {
for (int j = 0; j < roll.rollCharacters[i].length; j++) {
Expand All @@ -227,13 +236,23 @@ private void writeRollFile(String input, FileSystem fileSystem, BoggleRoll roll,

RollGraphWritable graphWritable = new RollGraphWritable(nodes, false);

Text text = new Text(roll.rollCharacters[i][j]);

// Note:
// By creating a file per starting character, that can cause
// one character's file to get very little use if it's a z or x or y.
// You could work around this by rebalancing every so often.

// Mimic the adjacency matrix written by the mapper to start things off
String output = roll.rollCharacters[i][j] + " " + graphWritable.serialize() + "\n";
outputStream.writeBytes(output);
SequenceFile.Writer writer = null;

writer = SequenceFile.createWriter(fileSystem, configuration, new Path(parent, i + "-" + j + ".txt"),
text.getClass(), graphWritable.getClass());
writer.append(text, graphWritable);

IOUtils.closeStream(writer);
}
}

outputStream.close();
}

/**
Expand Down
36 changes: 10 additions & 26 deletions src/BoggleMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.log4j.Logger;

public class BoggleMapper extends Mapper<LongWritable, Text, Text, RollGraphWritable> {
public class BoggleMapper extends Mapper<Text, RollGraphWritable, Text, RollGraphWritable> {
private static final Logger logger = Logger.getLogger("Boggle");

/** The Boggle Roll that is being process */
Expand All @@ -35,29 +34,14 @@ public void setup(Context context) throws IOException {
}

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Expected input:
// aaaaa [[0,0][1,1][2,2]] false
String line = value.toString();

String values[] = line.split("\\s");

if (values.length == 3) {
String charsSoFar = values[0];

RollGraphWritable rollGraph = RollGraphWritable.deserialize(values[1] + " " + values[2]);

if (!rollGraph.isFinal) {
processNonFinalNode(context, charsSoFar, rollGraph);
} else {
context.write(new Text(charsSoFar), rollGraph);

// Use counters to keep track of how many words were found so far
context.getCounter("boggle", "words").increment(1);
}
public void map(Text key, RollGraphWritable value, Context context) throws IOException, InterruptedException {
if (!value.isFinal) {
processNonFinalNode(context, key.toString(), value);
} else {
logger.warn("The input line had more spaces than were expected. Had " + values.length
+ " expected 3. The line was \"" + line + "\"");
context.write(key, value);

// Use counters to keep track of how many words were found so far
context.getCounter("boggle", "words").increment(1);
}
}

Expand Down Expand Up @@ -110,13 +94,13 @@ private void processNonFinalNode(Context context, String charsSoFar, RollGraphWr
RollGraphWritable nextGraphWritable = new RollGraphWritable(nextNodeList, false);

context.write(new Text(newWord), nextGraphWritable);

// Use counters to keep track of how many words were found so far
context.getCounter("boggle", "words").increment(1);
} else {
// Use counters to keep track of how many words were thrown out by the Bloom Filter
context.getCounter("boggle", "bloom").increment(1);

if (logger.isDebugEnabled()) {
logger.debug("Throwing out " + newWord + " because it didn't pass membership test");
}
Expand Down
41 changes: 14 additions & 27 deletions src/BoggleWordMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

public class BoggleWordMapper extends Mapper<LongWritable, Text, Text, RollGraphWritable> {
public class BoggleWordMapper extends Mapper<Text, RollGraphWritable, Text, RollGraphWritable> {
private static final Logger logger = Logger.getLogger("Boggle");

/** All words from the dictionary */
private HashSet<String> words = new HashSet<String>();

/** The minimum size for a word to be output */
private int minimumWordSize = 0;

@Override
public void setup(Context context) throws IOException {
Configuration configuration = context.getConfiguration();
Expand Down Expand Up @@ -49,37 +48,25 @@ public void setup(Context context) throws IOException {
}

dict.close();

// Get the minimum word size from the configuration
minimumWordSize = configuration.getInt(BoggleDriver.MINIMUM_WORD_SIZE_PARAM, BoggleDriver.MINIMUM_WORD_SIZE_DEFAULT);
minimumWordSize = configuration.getInt(BoggleDriver.MINIMUM_WORD_SIZE_PARAM,
BoggleDriver.MINIMUM_WORD_SIZE_DEFAULT);
}

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Expected input:
// aaaaa [[0,0][1,1][2,2]] false
String line = value.toString();
public void map(Text key, RollGraphWritable value, Context context) throws IOException, InterruptedException {
String charsSoFar = key.toString();

String values[] = line.split("\\s");
// See if the word is big enough to emit
if (charsSoFar.length() >= minimumWordSize) {
// See if the word actually appears in the dictionary
if (words.contains(charsSoFar)) {
// Word appears, emit
context.write(new Text(charsSoFar), value);

if (values.length == 3) {
String charsSoFar = values[0];

// See if the word is big enough to emit
if (charsSoFar.length() >= minimumWordSize) {
// See if the word actually appears in the dictionary
if (words.contains(charsSoFar)) {
// Word appears, emit
RollGraphWritable rollGraph = RollGraphWritable.deserialize(values[1] + " " + values[2]);

context.write(new Text(charsSoFar), rollGraph);

context.getCounter("boggle", "finalwords").increment(1);
}
context.getCounter("boggle", "finalwords").increment(1);
}
} else {
logger.warn("The input line had more spaces than were expected. Had " + values.length
+ " expected 3. The line was \"" + line + "\"");
}
}
}

0 comments on commit 5f46403

Please sign in to comment.