Skip to content

Commit

Permalink
NUTCH-2683 DeduplicationJob: add option to prefer https:// over http://
Browse files Browse the repository at this point in the history
- add optional value "httpsOverHttp" to -compareOrder argument
  to prefer https:// over http:// if it comes before the "urlLength"
  and neither "score" nor "fetchTime" take precedence
- code improvements: remove nested loop, sort imports, add @OverRide
  statements where applicable
  • Loading branch information
sebastian-nagel committed Jan 7, 2019
1 parent d0a4abf commit 3958d0c
Showing 1 changed file with 85 additions and 77 deletions.
162 changes: 85 additions & 77 deletions src/java/org/apache/nutch/crawl/DeduplicationJob.java
Expand Up @@ -24,26 +24,24 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.CrawlDb;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
Expand Down Expand Up @@ -76,14 +74,13 @@ public static class DBFilter extends

private String groupMode;

@Override
public void setup(Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum>.Context context) {
Configuration arg0 = context.getConfiguration();
groupMode = arg0.get(DEDUPLICATION_GROUP_MODE);
}

public void close() throws IOException {
}

@Override
public void map(Text key, CrawlDatum value,
Context context)
throws IOException, InterruptedException {
Expand Down Expand Up @@ -118,7 +115,7 @@ public void map(Text key, CrawlDatum value,
}
// add the URL as a temporary MD
value.getMetaData().put(urlKey, key);
// reduce on the signature optionall grouped on host or domain or not at all
// reduce on the signature optionally grouped on host or domain or not at all
context.write(sig, value);
}
}
Expand All @@ -129,9 +126,10 @@ public static class DedupReducer extends

private String[] compareOrder;

@Override
public void setup(Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum>.Context context) {
Configuration arg0 = context.getConfiguration();
compareOrder = arg0.get(DEDUPLICATION_COMPARE_ORDER).split(",");
Configuration conf = context.getConfiguration();
compareOrder = conf.get(DEDUPLICATION_COMPARE_ORDER).split(",");
}

private void writeOutAsDuplicate(CrawlDatum datum,
Expand All @@ -144,95 +142,105 @@ private void writeOutAsDuplicate(CrawlDatum datum,
context.write(key, datum);
}

@Override
public void reduce(BytesWritable key, Iterable<CrawlDatum> values,
Context context)
throws IOException, InterruptedException {
Context context) throws IOException, InterruptedException {
CrawlDatum existingDoc = null;

outerloop:
for (CrawlDatum newDoc : values) {
if (existingDoc == null) {
existingDoc = new CrawlDatum();
existingDoc.set(newDoc);
continue;
}

for (int i = 0; i < compareOrder.length; i++) {
switch (compareOrder[i]) {
case "score":
// compare based on score
if (existingDoc.getScore() < newDoc.getScore()) {
writeOutAsDuplicate(existingDoc, context);
existingDoc = new CrawlDatum();
existingDoc.set(newDoc);
continue outerloop;
} else if (existingDoc.getScore() > newDoc.getScore()) {
// mark new one as duplicate
writeOutAsDuplicate(newDoc, context);
continue outerloop;
}
break;
case "fetchTime":
// same score? delete the one which is oldest
if (existingDoc.getFetchTime() > newDoc.getFetchTime()) {
// mark new one as duplicate
writeOutAsDuplicate(newDoc, context);
continue outerloop;
} else if (existingDoc.getFetchTime() < newDoc.getFetchTime()) {
// mark existing one as duplicate
writeOutAsDuplicate(existingDoc, context);
existingDoc = new CrawlDatum();
existingDoc.set(newDoc);
continue outerloop;
}
break;
case "urlLength":
// same time? keep the one which has the shortest URL
String urlExisting;
String urlnewDoc;
try {
urlExisting = URLDecoder.decode(existingDoc.getMetaData().get(urlKey).toString(), "UTF8");
urlnewDoc = URLDecoder.decode(newDoc.getMetaData().get(urlKey).toString(), "UTF8");
} catch (UnsupportedEncodingException e) {
LOG.error("Error decoding: " + urlKey);
throw new IOException("UnsupportedEncodingException for " + urlKey);
}
if (urlExisting.length() < urlnewDoc.length()) {
// mark new one as duplicate
writeOutAsDuplicate(newDoc, context);
continue outerloop;
} else if (urlExisting.length() > urlnewDoc.length()) {
// mark existing one as duplicate
writeOutAsDuplicate(existingDoc, context);
existingDoc = new CrawlDatum();
existingDoc.set(newDoc);
continue outerloop;
}
break;
CrawlDatum duplicate = getDuplicate(existingDoc, newDoc);
if (duplicate != null) {
writeOutAsDuplicate(duplicate, context);
if (duplicate == existingDoc) {
// keep new
existingDoc.set(newDoc);
}
}

}
}

public void close() throws IOException {

private CrawlDatum getDuplicate(CrawlDatum existingDoc, CrawlDatum newDoc)
throws IOException {
for (int i = 0; i < compareOrder.length; i++) {
switch (compareOrder[i]) {
case "score":
// compare based on score
if (existingDoc.getScore() < newDoc.getScore()) {
return existingDoc;
} else if (existingDoc.getScore() > newDoc.getScore()) {
// mark new one as duplicate
return newDoc;
}
break;
case "fetchTime":
// same score? delete the one which is oldest
if (existingDoc.getFetchTime() > newDoc.getFetchTime()) {
// mark new one as duplicate
return newDoc;
} else if (existingDoc.getFetchTime() < newDoc.getFetchTime()) {
// mark existing one as duplicate
return existingDoc;
}
break;
case "httpsOverHttp":
// prefer https:// over http:// if URLs are identical except for the
// protocol
String url1 = existingDoc.getMetaData().get(urlKey).toString();
String url2 = newDoc.getMetaData().get(urlKey).toString();
if (url1.startsWith("https://") && url2.startsWith("http://")
&& url1.substring(8).equals(url2.substring(7))) {
// existingDoc with https://, mark newDoc as duplicate
return newDoc;
} else if (url2.startsWith("https://") && url1.startsWith("http://")
&& url2.substring(8).equals(url1.substring(7))) {
// newDoc with https://, mark existingDoc as duplicate
return existingDoc;
}
break;
case "urlLength":
// same time? keep the one which has the shortest URL
String urlExisting;
String urlnewDoc;
try {
urlExisting = URLDecoder.decode(
existingDoc.getMetaData().get(urlKey).toString(), "UTF8");
urlnewDoc = URLDecoder
.decode(newDoc.getMetaData().get(urlKey).toString(), "UTF8");
} catch (UnsupportedEncodingException e) {
LOG.error("Error decoding: " + urlKey);
throw new IOException("UnsupportedEncodingException for " + urlKey);
}
if (urlExisting.length() < urlnewDoc.length()) {
// mark new one as duplicate
return newDoc;
} else if (urlExisting.length() > urlnewDoc.length()) {
// mark existing one as duplicate
return existingDoc;
}
break;
}
}
return null; // no decision possible
}
}

/** Combine multiple new entries for a url. */
public static class StatusUpdateReducer extends
Reducer<Text, CrawlDatum, Text, CrawlDatum> {

@Override
public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
}

public void close() {
}

private CrawlDatum old = new CrawlDatum();
private CrawlDatum duplicate = new CrawlDatum();

@Override
public void reduce(Text key, Iterable<CrawlDatum> values,
Context context)
throws IOException, InterruptedException {
Expand Down Expand Up @@ -260,7 +268,7 @@ public void reduce(Text key, Iterable<CrawlDatum> values,

public int run(String[] args) throws IOException {
if (args.length < 1) {
System.err.println("Usage: DeduplicationJob <crawldb> [-group <none|host|domain>] [-compareOrder <score>,<fetchTime>,<urlLength>]");
System.err.println("Usage: DeduplicationJob <crawldb> [-group <none|host|domain>] [-compareOrder <score>,<fetchTime>,<httpsOverHttp>,<urlLength>]");
return 1;
}

Expand Down

0 comments on commit 3958d0c

Please sign in to comment.