Skip to content

Commit

Permalink
Improved WARC export of Fetcher:
Browse files Browse the repository at this point in the history
- optionally deduplicate records (duplicates caused by redirects)
- set WARC capture time to fetch start/end time
  • Loading branch information
sebastian-nagel committed Jun 22, 2017
1 parent 3ac2744 commit ccc558a
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 22 deletions.
6 changes: 5 additions & 1 deletion src/java/org/apache/nutch/fetcher/Fetcher.java
Expand Up @@ -678,7 +678,6 @@ public FetcherThread(Configuration conf) {
outlinksIgnoreExternal = conf.getBoolean("fetcher.follow.outlinks.ignore.external", false);
maxOutlinkDepthNumLinks = conf.getInt("fetcher.follow.outlinks.num.links", 4);
outlinksDepthDivisor = conf.getInt("fetcher.follow.outlinks.depth.divisor", 2);
storingWarc = conf.getBoolean("fetcher.store.warc", false);
storing404s = conf.getBoolean("fetcher.store.404s", false);

if (conf.getBoolean("fetcher.store.robotstxt", false)) {
Expand Down Expand Up @@ -1253,6 +1252,7 @@ public void configure(JobConf job) {
this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
this.storingContent = isStoringContent(job);
this.parsing = isParsing(job);
this.storingWarc = isStoringWarc(job);

// if (job.getBoolean("fetcher.verbose", false)) {
// LOG.setLevel(Level.FINE);
Expand All @@ -1269,6 +1269,10 @@ public static boolean isStoringContent(Configuration conf) {
return conf.getBoolean("fetcher.store.content", true);
}

public static boolean isStoringWarc(Configuration conf) {
return conf.getBoolean("fetcher.store.warc", true);
}

public void run(RecordReader<Text, CrawlDatum> input,
OutputCollector<Text, NutchWritable> output,
Reporter reporter) throws IOException {
Expand Down
26 changes: 23 additions & 3 deletions src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
Expand Up @@ -18,6 +18,10 @@
package org.apache.nutch.fetcher;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.TimeZone;

import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.NutchWritable;
Expand All @@ -44,6 +48,7 @@
import org.commoncrawl.warc.WarcCompleteData;
import org.commoncrawl.warc.WarcOutputFormat;


/** Splits FetcherOutput entries into multiple map files. */
public class FetcherOutputFormat implements OutputFormat<Text, NutchWritable> {

Expand Down Expand Up @@ -93,9 +98,24 @@ public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
}

if (true) { // TODO: writeWarc
Path warc = new Path(
new Path(FileOutputFormat.getOutputPath(job), "warc"), name);
if (Fetcher.isStoringWarc(job)) {
Path warc = new Path(
new Path(FileOutputFormat.getOutputPath(job), "warc"), name);
// set start and end time of WARC capture
long timelimit = job.getLong("fetcher.timelimit", -1);
long timelimitMins = job.getLong("fetcher.timelimit.mins", -1);
long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis();
if (timelimitMins > 0) {
startTime = timelimit - (timelimitMins * 60 * 1000);
if (endTime > timelimit) {
endTime = timelimit;
}
}
SimpleDateFormat fileDate = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US);
fileDate.setTimeZone(TimeZone.getTimeZone("GMT"));
job.set("warc.export.date", fileDate.format(new Date(startTime)));
job.set("warc.export.date.end", fileDate.format(new Date(endTime)));
warcOut = new WarcOutputFormat().getRecordWriter(job, warc);
}
}
Expand Down
10 changes: 0 additions & 10 deletions src/java/org/commoncrawl/tools/WarcExport.java
Expand Up @@ -109,23 +109,13 @@ public static class ParseDataCombinedInputFormat extends CombineSequenceFileInpu
public static class CrawlDatumCombinedInputFormat extends CombineSequenceFileInputFormat<Text, CrawlDatum> {
}

public String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.warn("Failed to get hostname: {}", e.getMessage());
}
return "localhost";
}

public void export(Path outputDir, List<Path> segments,
boolean generateCrawlDiagnostics, boolean generateRobotsTxt, Path cdxPath)
throws IOException {
Configuration conf = getConf();

// We compress ourselves, so this isn't necessary
conf.setBoolean(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS, false);
conf.set("warc.export.hostname", getHostname());

conf.setBoolean("warc.export.crawldiagnostics", generateCrawlDiagnostics);
conf.setBoolean("warc.export.robotstxt", generateRobotsTxt);
Expand Down
41 changes: 33 additions & 8 deletions src/java/org/commoncrawl/warc/WarcOutputFormat.java
Expand Up @@ -3,8 +3,10 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -69,20 +71,23 @@ protected static class WarcRecordWriter extends RecordWriter<Text, WarcCompleteD
private boolean generateCrawlDiagnostics;
private boolean generateRobotsTxt;
private boolean generateCdx;
private String lastURL = ""; // for deduplication
private boolean deduplicate;

public WarcRecordWriter(Configuration conf, Path outputPath,
String filename, String hostname,
String publisher, String operator, String software, String isPartOf,
String description,
String filename, String hostname, String publisher, String operator,
String software, String isPartOf, String description,
boolean generateCrawlDiagnostics, boolean generateRobotsTxt,
boolean generateCdx, Path cdxPath, Date captureStartDate)
throws IOException {
boolean generateCdx, Path cdxPath, Date captureStartDate,
boolean deduplicate) throws IOException {

FileSystem fs = outputPath.getFileSystem(conf);

Path warcPath = new Path(new Path(outputPath, "warc"), filename);
warcOut = fs.create(warcPath);

this.deduplicate = deduplicate;

this.generateCdx = generateCdx;
if (generateCdx) {
cdxOut = openCdxOutputStream(new Path(cdxPath, "warc"), filename, conf);
Expand Down Expand Up @@ -169,8 +174,9 @@ protected static DataOutputStream openCdxOutputStream(Path cdxPath,
public synchronized void write(Text key, WarcCompleteData value) throws IOException {
URI targetUri;

String url = value.url.toString();
try {
targetUri = new URI(value.url.toString());
targetUri = new URI(url);
} catch (URISyntaxException e) {
LOG.error("Cannot write WARC record, invalid URI: {}", value.url);
return;
Expand All @@ -181,6 +187,14 @@ public synchronized void write(Text key, WarcCompleteData value) throws IOExcept
return;
}

if (deduplicate) {
if (lastURL.equals(url)) {
LOG.info("Skipping duplicate record: {}", value.url);
return;
}
lastURL = url;
}

String ip = "0.0.0.0";
Date date = null;
boolean notModified = false;
Expand Down Expand Up @@ -456,7 +470,7 @@ public RecordWriter<Text, WarcCompleteData> getRecordWriter(
e.getMessage());
}

String hostname = conf.get("warc.export.hostname", "localhost");
String hostname = conf.get("warc.export.hostname", getHostname());
String publisher = conf.get("warc.export.publisher", null);
String operator = conf.get("warc.export.operator", null);
String software = conf.get("warc.export.software", null);
Expand All @@ -465,6 +479,7 @@ public RecordWriter<Text, WarcCompleteData> getRecordWriter(
boolean generateCrawlDiagnostics = conf.getBoolean("warc.export.crawldiagnostics", false);
boolean generateRobotsTxt = conf.getBoolean("warc.export.robotstxt", false);
boolean generateCdx= conf.getBoolean("warc.export.cdx", false);
boolean deduplicate = conf.getBoolean("warc.deduplicate", false);

// WARC recommends - Prefix-Timestamp-Serial-Crawlhost.warc.gz
// https://github.com/iipc/warc-specifications/blob/gh-pages/specifications/warc-format/warc-1.1/index.md#annex-b-informative-warc-file-size-and-name-recommendations
Expand All @@ -482,7 +497,7 @@ public RecordWriter<Text, WarcCompleteData> getRecordWriter(

return new WarcRecordWriter(conf, outputPath, filename, hostname, publisher,
operator, software, isPartOf, description, generateCrawlDiagnostics,
generateRobotsTxt, generateCdx, cdxPath, captureStartDate);
generateRobotsTxt, generateCdx, cdxPath, captureStartDate, deduplicate);
}

@Override
Expand Down Expand Up @@ -512,4 +527,14 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException,
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, job.getConfiguration());
}

public String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.warn("Failed to get hostname: {}", e.getMessage());
}
return "localhost";
}

}

0 comments on commit ccc558a

Please sign in to comment.