Skip to content
Browse files

CorpusGenerator can unpack zip files, implements Tools, uses CLI for …

…parsing params
  • Loading branch information...
1 parent ac8d891 commit 3ae6bbd16d22f80a252b5e4348510a1012610d56 @jnioche jnioche committed Mar 21, 2012
View
2 LICENSE.txt
@@ -1,4 +1,4 @@
-Copyright 2010 DigitalPebble Ltd
+Copyright 2012 DigitalPebble Ltd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
View
5 core/pom.xml
@@ -53,6 +53,11 @@
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.3</version>
+</dependency>
</dependencies>
</project>
View
187 core/src/main/java/com/digitalpebble/behemoth/util/CorpusGenerator.java
@@ -17,8 +17,22 @@
package com.digitalpebble.behemoth.util;
-import com.digitalpebble.behemoth.BehemothDocument;
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -28,31 +42,39 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
-import java.io.IOException;
+import com.digitalpebble.behemoth.BehemothConfiguration;
+import com.digitalpebble.behemoth.BehemothDocument;
/**
* Generates a SequenceFile containing BehemothDocuments given a local
* directory. The BehemothDocument gets its byte content and URL. The detection
* of MIME-type and text extraction can be done later using the TikaProcessor.
*/
-public class CorpusGenerator {
+public class CorpusGenerator extends Configured implements Tool {
private transient static Logger log = LoggerFactory
.getLogger(CorpusGenerator.class);
private Path input, output;
+
private Reporter reporter;
+ private static String unpackParamName = "CorpusGenerator-unpack";
+
public enum Counters {
DOC_COUNT
};
+ public CorpusGenerator() {
+ }
+
public CorpusGenerator(Path input, Path output) {
- this.input = input;
- this.output = output;
+ setInput(input);
+ setOutput(output);
}
public CorpusGenerator(Path input, Path output, Reporter reporter) {
@@ -61,6 +83,14 @@ public CorpusGenerator(Path input, Path output, Reporter reporter) {
this.reporter = reporter;
}
+ public void setInput(Path input) {
+ this.input = input;
+ }
+
+ public void setOutput(Path output) {
+ this.output = output;
+ }
+
public long generate(boolean recurse) throws IOException {
long result = 0;
// read from input path
@@ -69,7 +99,7 @@ public long generate(boolean recurse) throws IOException {
BehemothDocument value = new BehemothDocument();
SequenceFile.Writer writer = null;
try {
- Configuration conf = new Configuration();
+ Configuration conf = getConf();
FileSystem fs = output.getFileSystem(conf);
writer = SequenceFile.createWriter(fs, conf, output,
key.getClass(), value.getClass());
@@ -84,29 +114,63 @@ public long generate(boolean recurse) throws IOException {
return result;
}
- public static void main(String argv[]) throws Exception {
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(BehemothConfiguration.create(),
+ new CorpusGenerator(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
- // Populate a SequenceFile with the content of a local directory
+ Options options = new Options();
+ // automatically generate the help statement
+ HelpFormatter formatter = new HelpFormatter();
+ // create the parser
+ CommandLineParser parser = new GnuParser();
- String usage = "Content localdir outputDFSDir [--recurse]";
+ options.addOption("h", "help", false, "print this message");
+ options.addOption("i", "input", true, "input file or directory");
+ options.addOption("o", "output", true, "output Behemoth corpus");
+ options.addOption("r", "recurse", true,
+ "processes directories recursively (default true)");
+ options.addOption("u", "unpack", true,
+ "unpack content of archives (default true)");
- if (argv.length < 2) {
- System.out.println("usage:" + usage);
- return;
+ // parse the command line arguments
+ CommandLine line = null;
+ try {
+ line = parser.parse(options, args);
+ String input = line.getOptionValue("i");
+ if (line.hasOption("help")) {
+ formatter.printHelp("CorpusGenerator", options);
+ return 0;
+ }
+ if (input == null) {
+ formatter.printHelp("CorpusGenerator", options);
+ return -1;
+ }
+ } catch (ParseException e) {
+ formatter.printHelp("CorpusGenerator", options);
}
- Path inputDir = new Path(argv[0]);
+ boolean recurse = true;
+ if ("false".equalsIgnoreCase(line.getOptionValue("r")))
+ recurse = false;
+ boolean unpack = true;
+ if ("false".equalsIgnoreCase(line.getOptionValue("u")))
+ unpack = false;
- Path output = new Path(argv[1]);
+ getConf().setBoolean(unpackParamName, unpack);
- boolean recurse = false;
- if (argv.length > 2 && argv[2].equals("--recurse")) {
- recurse = true;
- }
+ Path inputDir = new Path(line.getOptionValue("i"));
+ Path output = new Path(line.getOptionValue("o"));
- CorpusGenerator generator = new CorpusGenerator(inputDir, output);
- long count = generator.generate(recurse);
+ setInput(inputDir);
+ setOutput(output);
+
+ long count = generate(recurse);
System.out.println(count + " docs converted");
+ return 0;
}
private static long processFiles(Configuration conf, Path input,
@@ -155,31 +219,70 @@ public PerformanceFileFilter(SequenceFile.Writer writer, Text key,
public boolean accept(Path file) {
try {
FileSystem fs = file.getFileSystem(conf);
+ boolean unpack = conf.getBoolean(unpackParamName, true);
+
if (defaultIgnores.accept(file)
&& fs.getFileStatus(file).isDir() == false) {
String URI = file.toUri().toString();
- // Hmm, kind of dangerous to do this
- byte[] fileBArray = new byte[(int) fs.getFileStatus(file)
- .getLen()];
- FSDataInputStream fis = null;
- try {
- fis = fs.open(file);
- fis.readFully(0, fileBArray);
- fis.close();
- key.set(URI);
- // fill the values for the content object
- value.setUrl(URI);
- value.setContent(fileBArray);
-
- writer.append(key, value);
- counter++;
- if (reporter != null) {
- reporter.incrCounter(Counters.DOC_COUNT, 1);
+
+ // detect whether a file is likely to be an archive
+ // TODO extend to other known types
+ if (unpack && URI.toLowerCase().endsWith(".zip")) {
+ FSDataInputStream fis = null;
+ try {
+ fis = fs.open(file);
+ ArchiveInputStream input = new ArchiveStreamFactory()
+ .createArchiveInputStream(new BufferedInputStream(
+ fis));
+ ArchiveEntry entry = null;
+ while ((entry = input.getNextEntry()) != null) {
+ String name = entry.getName();
+ long size = entry.getSize();
+ byte[] content = new byte[(int) size];
+ input.read(content);
+ key.set(name);
+ // fill the values for the content object
+ value.setUrl(name);
+ value.setContent(content);
+
+ writer.append(key, value);
+ counter++;
+ if (reporter != null) {
+ reporter.incrCounter(Counters.DOC_COUNT, 1);
+ }
+ }
+
+ } catch (ArchiveException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ fis.close();
+ }
+
+ } else {
+ // Hmm, kind of dangerous to do this
+ byte[] fileBArray = new byte[(int) fs.getFileStatus(
+ file).getLen()];
+ FSDataInputStream fis = null;
+ try {
+ fis = fs.open(file);
+ fis.readFully(0, fileBArray);
+ fis.close();
+ key.set(URI);
+ // fill the values for the content object
+ value.setUrl(URI);
+ value.setContent(fileBArray);
+
+ writer.append(key, value);
+ counter++;
+ if (reporter != null) {
+ reporter.incrCounter(Counters.DOC_COUNT, 1);
+ }
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- } catch (FileNotFoundException e) {
- throw new RuntimeException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
}
}
// if it is a directory, accept it so we can possibly recurse on
View
2 core/src/main/java/com/digitalpebble/behemoth/util/CorpusReader.java
@@ -57,7 +57,7 @@ public int run(String[] args) throws Exception {
options.addOption("h", "help", false, "print this message");
options.addOption("i", "input", true, "input Behemoth corpus");
- options.addOption("showBinaryContent", false,
+ options.addOption("s", "showBinaryContent", false,
"display binary content in output");
// parse the command line arguments

0 comments on commit 3ae6bbd

Please sign in to comment.
Something went wrong with that request. Please try again.