Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/com/digitalpebble/storm/crawler/bolt/ParserBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.Map;
import java.util.Set;

import backtype.storm.Config;
import com.digitalpebble.storm.crawler.protocol.Protocol;
import com.digitalpebble.storm.crawler.protocol.ProtocolFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.html.dom.HTMLDocumentImpl;
import org.apache.tika.Tika;
Expand Down Expand Up @@ -89,9 +92,14 @@ public class ParserBolt extends BaseRichBolt {
private boolean upperCaseElementNames = true;
private Class HTMLMapperClass = IdentityHtmlMapper.class;

private ProtocolFactory protocolFactory;

public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {

Config config = new Config();
config.putAll(conf);

String urlconfigfile = ConfUtils.getString(conf,
"urlfilters.config.file", "urlfilters.json");

Expand Down Expand Up @@ -150,6 +158,8 @@ public void prepare(Map conf, TopologyContext context,

LOG.debug("Tika loaded in " + (end - start) + " msec");

this.protocolFactory = new ProtocolFactory(config);

this.collector = collector;

this.eventMeters = context.registerMetric("parser-meter",
Expand Down Expand Up @@ -264,6 +274,15 @@ public void execute(Tuple tuple) {

List<Link> links = linkHandler.getLinks();
Set<String> slinks = new HashSet<String>(links.size());

Protocol protocol = protocolFactory.getProtocol(url_);

// TODO This is a method call with non-explicit side effects...yuck
// Calling getRobotsRules will seed the cache with the rules for this
// URL, if not already present, ensuring they'll be available downstream
// to the robots url filter. There's got to be a better way to do this.
protocol.getRobotRules(url);

for (Link l : links) {
if (StringUtils.isBlank(l.getUri()))
continue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to DigitalPebble Ltd under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.digitalpebble.storm.crawler.filtering;

import com.digitalpebble.storm.crawler.protocol.MemoryRobotsCache;
import com.digitalpebble.storm.crawler.protocol.RobotsCache;
import com.fasterxml.jackson.databind.JsonNode;
import crawlercommons.robots.BaseRobotRules;

import java.net.MalformedURLException;
import java.net.URL;

/**
* This {@link com.digitalpebble.storm.crawler.filtering.URLFilter} filters outlinks
* using the robots rules for the domain. If the rules for the domain aren't found
* in the cache, the outlink will pass the filter.
*/
public class RobotsURLFilter implements URLFilter {

private RobotsCache cache;

public String filter(String URL) {
try {
URL url = new URL(URL);
String key = cache.getCacheKey(url);
BaseRobotRules rules = cache.get(key);
// If we have a cache miss, return the URL
if (rules == null)
return URL;
if (rules.isAllowed(URL))
return URL;
else
return null;
} catch (MalformedURLException e) {
return null;
}
}

public void configure(JsonNode paramNode) {
//TODO Specify the cache in the config
this.cache = MemoryRobotsCache.getInstance();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to DigitalPebble Ltd under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.digitalpebble.storm.crawler.protocol;

import org.apache.storm.guava.cache.Cache;
import org.apache.storm.guava.cache.CacheBuilder;
import org.apache.storm.guava.cache.CacheLoader;
import org.apache.storm.guava.cache.LoadingCache;
import crawlercommons.robots.BaseRobotRules;

import java.net.URL;

/**
* Provides an in-memory, singleton, thread-safe cache for robots rules.
*/
public class MemoryRobotsCache implements RobotsCache {

private static final long MAX_SIZE = 1000;

private static final MemoryRobotsCache INSTANCE = new MemoryRobotsCache();

private static Cache<String, BaseRobotRules> CACHE;

public static MemoryRobotsCache getInstance() {
return INSTANCE;
}

private MemoryRobotsCache() {
CACHE = CacheBuilder.newBuilder().maximumSize(MAX_SIZE).build();
}

public BaseRobotRules get(String key) {
return CACHE.getIfPresent(key);
}

public void put(String key, BaseRobotRules rules) {
CACHE.put(key, rules);
}

/**
* Compose unique key to store and access robot rules in cache for given URL
*/
public String getCacheKey(URL url) {
String protocol = url.getProtocol().toLowerCase(); // normalize to lower
// case
String host = url.getHost().toLowerCase(); // normalize to lower case
int port = url.getPort();
if (port == -1) {
port = url.getDefaultPort();
}
/*
* Robot rules apply only to host, protocol, and port where robots.txt
* is hosted (cf. NUTCH-1752). Consequently
*/
String cacheKey = protocol + ":" + host + ":" + port;
return cacheKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class RobotRulesParser {
public static final Logger LOG = LoggerFactory
.getLogger(RobotRulesParser.class);

protected static final Hashtable<String, BaseRobotRules> CACHE = new Hashtable<String, BaseRobotRules>();
protected RobotsCache cache;

/**
* A {@link BaseRobotRules} object appropriate for use when the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to DigitalPebble Ltd under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.digitalpebble.storm.crawler.protocol;

import crawlercommons.robots.BaseRobotRules;

import java.net.URL;

/**
* This interface defines the methods that must be implemented by a cache for Robots rules.
*/
public interface RobotsCache {

/**
*
* @param key Cache key
* @return Returns the robots rules for the key, or null if there's a cache miss.
*/
public BaseRobotRules get(String key);

/**
*
* @param key Cache key
* @param rules Robots rules to associate with the key
*/
public void put(String key, BaseRobotRules rules);

public String getCacheKey(URL url);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@

package com.digitalpebble.storm.crawler.protocol.http;

import java.net.URL;
import java.util.Collections;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;

import com.digitalpebble.storm.crawler.protocol.Protocol;
import com.digitalpebble.storm.crawler.protocol.ProtocolResponse;
import com.digitalpebble.storm.crawler.protocol.RobotRulesParser;
import com.digitalpebble.storm.crawler.protocol.*;
import com.digitalpebble.storm.crawler.util.ConfUtils;
import com.digitalpebble.storm.crawler.util.KeyValues;

import crawlercommons.robots.BaseRobotRules;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.util.Collections;

/**
* This class is used for parsing robots for urls belonging to HTTP protocol. It
Expand All @@ -44,10 +39,27 @@ public class HttpRobotRulesParser extends RobotRulesParser {
.getLogger(HttpRobotRulesParser.class);
protected boolean allowForbidden = false;

HttpRobotRulesParser() {
}
private RobotsCache cache;

/**
* @param conf The topology {@link backtype.storm.Config}.
* Default constructor uses an in-memory robots rules cache,
* {@link com.digitalpebble.storm.crawler.protocol.MemoryRobotsCache}
*/
public HttpRobotRulesParser(Config conf) {
this.cache = MemoryRobotsCache.getInstance();
setConf(conf);
}

/**
*
* @param conf The topology {@link backtype.storm.Config}.
* @param cache The {@link com.digitalpebble.storm.crawler.protocol.RobotsCache}
* to use for the parser.
*/

public HttpRobotRulesParser(Config conf, RobotsCache cache) {
this.cache = cache;
setConf(conf);
}

Expand All @@ -57,43 +69,21 @@ public void setConf(Config conf) {
true);
}

/**
* Compose unique key to store and access robot rules in cache for given URL
*/
protected static String getCacheKey(URL url) {
String protocol = url.getProtocol().toLowerCase(); // normalize to lower
// case
String host = url.getHost().toLowerCase(); // normalize to lower case
int port = url.getPort();
if (port == -1) {
port = url.getDefaultPort();
}
/*
* Robot rules apply only to host, protocol, and port where robots.txt
* is hosted (cf. NUTCH-1752). Consequently
*/
String cacheKey = protocol + ":" + host + ":" + port;
return cacheKey;
}

/**
* Get the rules from robots.txt which applies for the given {@code url}.
* Robot rules are cached for a unique combination of host, protocol, and
* port. If no rules are found in the cache, a HTTP request is send to fetch
* {{protocol://host:port/robots.txt}}. The robots.txt is then parsed and
* the rules are cached to avoid re-fetching and re-parsing it again.
*
* @param http
* The {@link Protocol} object
* @param url
* URL robots.txt applies to
*
* @param http The {@link Protocol} object
* @param url URL robots.txt applies to
* @return {@link BaseRobotRules} holding the rules from robots.txt
*/
public BaseRobotRules getRobotRulesSet(Protocol http, URL url) {

String cacheKey = getCacheKey(url);
BaseRobotRules robotRules = CACHE.get(cacheKey);
String cacheKey = cache.getCacheKey(url);
BaseRobotRules robotRules = cache.get(cacheKey);

boolean cacheRule = true;

Expand All @@ -105,7 +95,7 @@ public BaseRobotRules getRobotRulesSet(Protocol http, URL url) {
try {
ProtocolResponse response = http.getProtocolOutput(new URL(url,
"/robots.txt").toString(), Collections
.<String, String[]> emptyMap());
.<String, String[]>emptyMap());

// try one level of redirection ?
if (response.getStatusCode() == 301
Expand All @@ -127,7 +117,7 @@ public BaseRobotRules getRobotRulesSet(Protocol http, URL url) {
redir = new URL(redirection);
}
response = http.getProtocolOutput(redir.toString(),
Collections.<String, String[]> emptyMap());
Collections.<String, String[]>emptyMap());
}
}

Expand Down Expand Up @@ -155,11 +145,12 @@ else if (response.getStatusCode() >= 500) {
}

if (cacheRule) {
CACHE.put(cacheKey, robotRules); // cache rules for host
cache.put(cacheKey, robotRules); // cache rules for host
if (redir != null
&& !redir.getHost().equalsIgnoreCase(url.getHost())) {
// cache also for the redirected host
CACHE.put(getCacheKey(redir), robotRules);
String redirKey = cache.getCacheKey(redir);
cache.put(redirKey, robotRules);
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/urlfilters.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
"params": {
"regexFilterFile": "default-regex-filters.txt"
}
},
{
"class": "com.digitalpebble.storm.crawler.filtering.RobotsURLFilter",
"name": "RobotsURLFilter",
"params": {}
}

]
Expand Down