Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Smerity committed Mar 19, 2014
0 parents commit 421b395
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
@@ -0,0 +1,4 @@
*.class

data/
bin/
Binary file added lib/webarchive-commons-jar-with-dependencies.jar
Binary file not shown.
53 changes: 53 additions & 0 deletions src/org/commoncrawl/examples/S3ReaderTest.java
@@ -0,0 +1,53 @@
package org.commoncrawl.examples;
import java.io.IOException;

import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;
import org.archive.io.warc.WARCReaderFactory;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;

public class S3ReaderTest {
/**
* @param args
* @throws IOException
* @throws S3ServiceException
*/
public static void main(String[] args) throws IOException, S3ServiceException {
// We're accessing a publicly available bucket so don't need to fill in our credentials
S3Service s3s = new RestS3Service(null);

// Let's grab a file out of the CommonCrawl S3 bucket
String fn = "common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/warc/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz";
S3Object f = s3s.getObject("aws-publicdatasets", fn, null, null, null, null, null, null);

// The file name identifies the ArchiveReader and indicates if it should be decompressed
ArchiveReader ar = WARCReaderFactory.get(fn, f.getDataInputStream(), true);

// Once we have an ArchiveReader, we can work through each of the records it contains
int i = 0;
for(ArchiveRecord r : ar) {
// The header file contains information such as the type of record, size, creation time, and URL
System.out.println("Header: " + r.getHeader());
System.out.println("URL: " + r.getHeader().getUrl());
System.out.println();

// If we want to read the contents of the record, we can use the ArchiveRecord as an InputStream
// Create a byte array that is as long as all the record's stated length
byte[] rawData = new byte[r.available()];
r.read(rawData);
// Note: potential optimization would be to have a large buffer only allocated once

// Why don't we convert it to a string and print the start of it? Let's hope it's text!
String content = new String(rawData);
System.out.println(content.substring(0, Math.min(500, content.length())));
System.out.println((content.length() > 500 ? "..." : ""));

// Pretty printing to make the output more readable
System.out.println("=-=-=-=-=-=-=-=-=");
if (i++ > 4) break;
}
}
}
69 changes: 69 additions & 0 deletions src/org/commoncrawl/examples/WARCMapReduceTest.java
@@ -0,0 +1,69 @@
package org.commoncrawl.examples;

import org.apache.commons.httpclient.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.commoncrawl.warc.WARCFileInputFormat;

public class WARCMapReduceTest extends Configured implements Tool {
private static final Logger LOG = Logger.getLogger(WARCMapReduceTest.class);

/**
* Main entry point that uses the {@link ToolRunner} class to run the Hadoop job.
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WARCMapReduceTest(), args);
System.exit(res);
}

/**
* Builds and runs the Hadoop job.
* @return 0 if the Hadoop job completes successfully and 1 otherwise.
*/
@Override
public int run(String[] arg0) throws Exception {
//
Configuration conf = getConf();
Job job = new Job(conf);
job.setJarByClass(WARCMapReduceTest.class);
job.setNumReduceTasks(1);

String inputPath = "data/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz";
LOG.info("Setting input path to '"+ inputPath + "'");
FileInputFormat.addInputPath(job, new Path(inputPath));

String outputPath = "/tmp/cc/";
FileSystem fs = FileSystem.newInstance(conf);
if (fs.exists(new Path(outputPath))) {
fs.delete(new Path(outputPath));
}
FileOutputFormat.setOutputPath(job, new Path(outputPath));

job.setInputFormatClass(WARCFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

job.setMapperClass(WordCounter.WordCountMapper.class);
job.setReducerClass(LongSumReducer.class);

if (job.waitForCompletion(true)) {
return 0;
} else {
return 1;
}
}
}
46 changes: 46 additions & 0 deletions src/org/commoncrawl/examples/WARCReaderTest.java
@@ -0,0 +1,46 @@
package org.commoncrawl.examples;
import java.io.FileInputStream;
import java.io.IOException;

import org.apache.commons.lang.StringEscapeUtils;
import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;
import org.archive.io.warc.WARCReaderFactory;

public class WARCReaderTest {
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// Set up the local compressed WARC file for reading
String fn = "data/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz";
FileInputStream is = new FileInputStream(fn);
// The file name identifies the ArchiveReader and indicates if it should be decompressed
ArchiveReader ar = WARCReaderFactory.get(fn, is, true);

// Once we have an ArchiveReader, we can work through each of the records it contains
int i = 0;
for(ArchiveRecord r : ar) {
// The header file contains information such as the type of record, size, creation time, and URL
System.out.println(r.getHeader());
System.out.println(r.getHeader().getUrl());
System.out.println();

// If we want to read the contents of the record, we can use the ArchiveRecord as an InputStream
// Create a byte array that is as long as all the record's stated length
byte[] rawData = new byte[r.available()];
r.read(rawData);
// Note: potential optimization would be to have a large buffer only allocated once

// Why don't we convert it to a string and print the start of it? Let's hope it's text!
String content = new String(rawData);
System.out.println(content.substring(0, Math.min(500, content.length())));
System.out.println((content.length() > 500 ? "..." : ""));

// Pretty printing to make the output more readable
System.out.println("=-=-=-=-=-=-=-=-=");
if (i++ > 4) break;
}
}
}
23 changes: 23 additions & 0 deletions src/org/commoncrawl/examples/WARCResponseTest.java
@@ -0,0 +1,23 @@
package org.commoncrawl.examples;

public class WARCResponseTest {

/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
String txt = new String(rawData);
// This gives us the HTTP headers and body
// If you want to parse the headers, look towards Apache HttpComponents
// http://hc.apache.org/index.html
// We're just interested in the body, so skip to the end of the headers (<CR><LF>)
System.out.println(rawData.length);
System.out.println(txt.indexOf("\r\n\r\n"));
// TODO: Ensure we can't walk off the end
String body = txt.substring(txt.indexOf("\r\n\r\n") + 4);
System.out.println("________________");
System.out.println(body.substring(0, Math.min(body.length(), 500)));
}

}
54 changes: 54 additions & 0 deletions src/org/commoncrawl/examples/WordCounter.java
@@ -0,0 +1,54 @@
package org.commoncrawl.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.archive.io.ArchiveRecord;

public class WordCounter {
private static final Logger LOG = Logger.getLogger(WordCounter.class);
protected static enum MAPPERCOUNTER {
RECORDS_IN,
EMPTY_PAGE_TEXT,
EXCEPTIONS
}

protected static class WordCountMapper extends Mapper<Text, ArchiveRecord, Text, LongWritable> {
private StringTokenizer tokenizer;
private Text outKey = new Text();
private LongWritable outVal = new LongWritable(1);

@Override
public void map(Text key, ArchiveRecord value, Context context) throws IOException {
context.getCounter(MAPPERCOUNTER.RECORDS_IN).increment(1);

try {
System.out.println(key + " -- " + value.available());
byte[] rawData = new byte[value.available()];
value.read(rawData);
String content = new String(rawData);
System.out.println(content.substring(0, Math.min(50, content.length())));
System.out.println(content.length());
System.out.println();

tokenizer = new StringTokenizer(content);
if (!tokenizer.hasMoreTokens()) {
context.getCounter(MAPPERCOUNTER.EMPTY_PAGE_TEXT).increment(1);
} else {
while (tokenizer.hasMoreTokens()) {
outKey.set(tokenizer.nextToken());
context.write(outKey, outVal);
}
}
}
catch (Exception ex) {
LOG.error("Caught Exception", ex);
context.getCounter(MAPPERCOUNTER.EXCEPTIONS).increment(1);
}
}
}
}
26 changes: 26 additions & 0 deletions src/org/commoncrawl/warc/WARCFileInputFormat.java
@@ -0,0 +1,26 @@
package org.commoncrawl.warc;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.archive.io.ArchiveRecord;


public class WARCFileInputFormat extends FileInputFormat<Text, ArchiveRecord> {

@Override
public RecordReader<Text, ArchiveRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new WARCFileRecordReader();
}

@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
71 changes: 71 additions & 0 deletions src/org/commoncrawl/warc/WARCFileRecordReader.java
@@ -0,0 +1,71 @@
package org.commoncrawl.warc;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;
import org.archive.io.warc.WARCReaderFactory;


public class WARCFileRecordReader extends RecordReader<Text, ArchiveRecord> {
private ArchiveReader ar;
private FSDataInputStream fsin;
private Iterator<ArchiveRecord> iter;
private ArchiveRecord currentRecord;

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) inputSplit;
Configuration conf = context.getConfiguration();
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
fsin = fs.open(path);
ar = WARCReaderFactory.get(path.getName(), fsin, true);
iter = ar.iterator();
nextKeyValue();
}

@Override
public void close() throws IOException {
currentRecord.close();
fsin.close();
ar.close();
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
if (currentRecord != null) {
return new Text(currentRecord.getHeader().getUrl());
}
return null;
}

@Override
public ArchiveRecord getCurrentValue() throws IOException, InterruptedException {
return currentRecord;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return iter.hasNext() ? 0 : 1;
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!iter.hasNext()) {
return false;
}
return true;
}
}

0 comments on commit 421b395

Please sign in to comment.