diff --git a/src/main/java/com/digitalpebble/storm/crawler/bolt/ParserBolt.java b/src/main/java/com/digitalpebble/storm/crawler/bolt/ParserBolt.java
index 5be36ba99..35348a3d6 100644
--- a/src/main/java/com/digitalpebble/storm/crawler/bolt/ParserBolt.java
+++ b/src/main/java/com/digitalpebble/storm/crawler/bolt/ParserBolt.java
@@ -27,6 +27,9 @@
import java.util.Map;
import java.util.Set;
+import backtype.storm.Config;
+import com.digitalpebble.storm.crawler.protocol.Protocol;
+import com.digitalpebble.storm.crawler.protocol.ProtocolFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.html.dom.HTMLDocumentImpl;
import org.apache.tika.Tika;
@@ -89,9 +92,14 @@ public class ParserBolt extends BaseRichBolt {
private boolean upperCaseElementNames = true;
private Class HTMLMapperClass = IdentityHtmlMapper.class;
+ private ProtocolFactory protocolFactory;
+
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
+ Config config = new Config();
+ config.putAll(conf);
+
String urlconfigfile = ConfUtils.getString(conf,
"urlfilters.config.file", "urlfilters.json");
@@ -150,6 +158,8 @@ public void prepare(Map conf, TopologyContext context,
LOG.debug("Tika loaded in " + (end - start) + " msec");
+ this.protocolFactory = new ProtocolFactory(config);
+
this.collector = collector;
this.eventMeters = context.registerMetric("parser-meter",
@@ -264,6 +274,15 @@ public void execute(Tuple tuple) {
List links = linkHandler.getLinks();
Set slinks = new HashSet(links.size());
+
+ Protocol protocol = protocolFactory.getProtocol(url_);
+
+ // TODO This is a method call with non-explicit side effects...yuck
+ // Calling getRobotsRules will seed the cache with the rules for this
+ // URL, if not already present, ensuring they'll be available downstream
+ // to the robots url filter. There's got to be a better way to do this.
+ protocol.getRobotRules(url);
+
for (Link l : links) {
if (StringUtils.isBlank(l.getUri()))
continue;
diff --git a/src/main/java/com/digitalpebble/storm/crawler/filtering/RobotsURLFilter.java b/src/main/java/com/digitalpebble/storm/crawler/filtering/RobotsURLFilter.java
new file mode 100644
index 000000000..d79b782f6
--- /dev/null
+++ b/src/main/java/com/digitalpebble/storm/crawler/filtering/RobotsURLFilter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to DigitalPebble Ltd under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * DigitalPebble licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.digitalpebble.storm.crawler.filtering;
+
+import com.digitalpebble.storm.crawler.protocol.MemoryRobotsCache;
+import com.digitalpebble.storm.crawler.protocol.RobotsCache;
+import com.fasterxml.jackson.databind.JsonNode;
+import crawlercommons.robots.BaseRobotRules;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * This {@link com.digitalpebble.storm.crawler.filtering.URLFilter} filters outlinks
+ * using the robots rules for the domain. If the rules for the domain aren't found
+ * in the cache, the outlink will pass the filter.
+ */
+public class RobotsURLFilter implements URLFilter {
+
+ private RobotsCache cache;
+
+ public String filter(String URL) {
+ try {
+ URL url = new URL(URL);
+ String key = cache.getCacheKey(url);
+ BaseRobotRules rules = cache.get(key);
+ // If we have a cache miss, return the URL
+ if (rules == null)
+ return URL;
+ if (rules.isAllowed(URL))
+ return URL;
+ else
+ return null;
+ } catch (MalformedURLException e) {
+ return null;
+ }
+ }
+
+ public void configure(JsonNode paramNode) {
+ //TODO Specify the cache in the config
+ this.cache = MemoryRobotsCache.getInstance();
+ }
+
+}
diff --git a/src/main/java/com/digitalpebble/storm/crawler/protocol/MemoryRobotsCache.java b/src/main/java/com/digitalpebble/storm/crawler/protocol/MemoryRobotsCache.java
new file mode 100644
index 000000000..32bd5de26
--- /dev/null
+++ b/src/main/java/com/digitalpebble/storm/crawler/protocol/MemoryRobotsCache.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to DigitalPebble Ltd under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * DigitalPebble licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.digitalpebble.storm.crawler.protocol;
+
+import org.apache.storm.guava.cache.Cache;
+import org.apache.storm.guava.cache.CacheBuilder;
+import org.apache.storm.guava.cache.CacheLoader;
+import org.apache.storm.guava.cache.LoadingCache;
+import crawlercommons.robots.BaseRobotRules;
+
+import java.net.URL;
+
+/**
+ * Provides an in-memory, singleton, thread-safe cache for robots rules.
+ */
+public class MemoryRobotsCache implements RobotsCache {
+
+ private static final long MAX_SIZE = 1000;
+
+ private static final MemoryRobotsCache INSTANCE = new MemoryRobotsCache();
+
+ private static Cache CACHE;
+
+ public static MemoryRobotsCache getInstance() {
+ return INSTANCE;
+ }
+
+ private MemoryRobotsCache() {
+ CACHE = CacheBuilder.newBuilder().maximumSize(MAX_SIZE).build();
+ }
+
+ public BaseRobotRules get(String key) {
+ return CACHE.getIfPresent(key);
+ }
+
+ public void put(String key, BaseRobotRules rules) {
+ CACHE.put(key, rules);
+ }
+
+ /**
+ * Compose unique key to store and access robot rules in cache for given URL
+ */
+ public String getCacheKey(URL url) {
+ String protocol = url.getProtocol().toLowerCase(); // normalize to lower
+ // case
+ String host = url.getHost().toLowerCase(); // normalize to lower case
+ int port = url.getPort();
+ if (port == -1) {
+ port = url.getDefaultPort();
+ }
+ /*
+ * Robot rules apply only to host, protocol, and port where robots.txt
+ * is hosted (cf. NUTCH-1752). Consequently
+ */
+ String cacheKey = protocol + ":" + host + ":" + port;
+ return cacheKey;
+ }
+
+}
diff --git a/src/main/java/com/digitalpebble/storm/crawler/protocol/RobotRulesParser.java b/src/main/java/com/digitalpebble/storm/crawler/protocol/RobotRulesParser.java
index 2b35113e1..076835e45 100644
--- a/src/main/java/com/digitalpebble/storm/crawler/protocol/RobotRulesParser.java
+++ b/src/main/java/com/digitalpebble/storm/crawler/protocol/RobotRulesParser.java
@@ -46,7 +46,7 @@ public abstract class RobotRulesParser {
public static final Logger LOG = LoggerFactory
.getLogger(RobotRulesParser.class);
- protected static final Hashtable CACHE = new Hashtable();
+ protected RobotsCache cache;
/**
* A {@link BaseRobotRules} object appropriate for use when the
diff --git a/src/main/java/com/digitalpebble/storm/crawler/protocol/RobotsCache.java b/src/main/java/com/digitalpebble/storm/crawler/protocol/RobotsCache.java
new file mode 100644
index 000000000..39a70c186
--- /dev/null
+++ b/src/main/java/com/digitalpebble/storm/crawler/protocol/RobotsCache.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to DigitalPebble Ltd under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * DigitalPebble licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.digitalpebble.storm.crawler.protocol;
+
+import crawlercommons.robots.BaseRobotRules;
+
+import java.net.URL;
+
+/**
+ * This interface defines the methods that must be implemented by a cache for Robots rules.
+ */
+public interface RobotsCache {
+
+ /**
+ *
+ * @param key Cache key
+ * @return Returns the robots rules for the key, or null if there's a cache miss.
+ */
+ public BaseRobotRules get(String key);
+
+ /**
+ *
+ * @param key Cache key
+ * @param rules Robots rules to associate with the key
+ */
+ public void put(String key, BaseRobotRules rules);
+
+ public String getCacheKey(URL url);
+}
diff --git a/src/main/java/com/digitalpebble/storm/crawler/protocol/http/HttpRobotRulesParser.java b/src/main/java/com/digitalpebble/storm/crawler/protocol/http/HttpRobotRulesParser.java
index c8c587fde..3452ec51f 100644
--- a/src/main/java/com/digitalpebble/storm/crawler/protocol/http/HttpRobotRulesParser.java
+++ b/src/main/java/com/digitalpebble/storm/crawler/protocol/http/HttpRobotRulesParser.java
@@ -17,21 +17,16 @@
package com.digitalpebble.storm.crawler.protocol.http;
-import java.net.URL;
-import java.util.Collections;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
-
-import com.digitalpebble.storm.crawler.protocol.Protocol;
-import com.digitalpebble.storm.crawler.protocol.ProtocolResponse;
-import com.digitalpebble.storm.crawler.protocol.RobotRulesParser;
+import com.digitalpebble.storm.crawler.protocol.*;
import com.digitalpebble.storm.crawler.util.ConfUtils;
import com.digitalpebble.storm.crawler.util.KeyValues;
-
import crawlercommons.robots.BaseRobotRules;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
/**
* This class is used for parsing robots for urls belonging to HTTP protocol. It
@@ -44,10 +39,27 @@ public class HttpRobotRulesParser extends RobotRulesParser {
.getLogger(HttpRobotRulesParser.class);
protected boolean allowForbidden = false;
- HttpRobotRulesParser() {
- }
+ private RobotsCache cache;
+ /**
+ * @param conf The topology {@link backtype.storm.Config}.
+ * Default constructor uses an in-memory robots rules cache,
+ * {@link com.digitalpebble.storm.crawler.protocol.MemoryRobotsCache}
+ */
public HttpRobotRulesParser(Config conf) {
+ this.cache = MemoryRobotsCache.getInstance();
+ setConf(conf);
+ }
+
+ /**
+ *
+ * @param conf The topology {@link backtype.storm.Config}.
+ * @param cache The {@link com.digitalpebble.storm.crawler.protocol.RobotsCache}
+ * to use for the parser.
+ */
+
+ public HttpRobotRulesParser(Config conf, RobotsCache cache) {
+ this.cache = cache;
setConf(conf);
}
@@ -57,25 +69,6 @@ public void setConf(Config conf) {
true);
}
- /**
- * Compose unique key to store and access robot rules in cache for given URL
- */
- protected static String getCacheKey(URL url) {
- String protocol = url.getProtocol().toLowerCase(); // normalize to lower
- // case
- String host = url.getHost().toLowerCase(); // normalize to lower case
- int port = url.getPort();
- if (port == -1) {
- port = url.getDefaultPort();
- }
- /*
- * Robot rules apply only to host, protocol, and port where robots.txt
- * is hosted (cf. NUTCH-1752). Consequently
- */
- String cacheKey = protocol + ":" + host + ":" + port;
- return cacheKey;
- }
-
/**
* Get the rules from robots.txt which applies for the given {@code url}.
* Robot rules are cached for a unique combination of host, protocol, and
@@ -83,17 +76,14 @@ protected static String getCacheKey(URL url) {
* {{protocol://host:port/robots.txt}}. The robots.txt is then parsed and
* the rules are cached to avoid re-fetching and re-parsing it again.
*
- * @param http
- * The {@link Protocol} object
- * @param url
- * URL robots.txt applies to
- *
+ * @param http The {@link Protocol} object
+ * @param url URL robots.txt applies to
* @return {@link BaseRobotRules} holding the rules from robots.txt
*/
public BaseRobotRules getRobotRulesSet(Protocol http, URL url) {
- String cacheKey = getCacheKey(url);
- BaseRobotRules robotRules = CACHE.get(cacheKey);
+ String cacheKey = cache.getCacheKey(url);
+ BaseRobotRules robotRules = cache.get(cacheKey);
boolean cacheRule = true;
@@ -105,7 +95,7 @@ public BaseRobotRules getRobotRulesSet(Protocol http, URL url) {
try {
ProtocolResponse response = http.getProtocolOutput(new URL(url,
"/robots.txt").toString(), Collections
- . emptyMap());
+ .emptyMap());
// try one level of redirection ?
if (response.getStatusCode() == 301
@@ -127,7 +117,7 @@ public BaseRobotRules getRobotRulesSet(Protocol http, URL url) {
redir = new URL(redirection);
}
response = http.getProtocolOutput(redir.toString(),
- Collections. emptyMap());
+ Collections.emptyMap());
}
}
@@ -155,11 +145,12 @@ else if (response.getStatusCode() >= 500) {
}
if (cacheRule) {
- CACHE.put(cacheKey, robotRules); // cache rules for host
+ cache.put(cacheKey, robotRules); // cache rules for host
if (redir != null
&& !redir.getHost().equalsIgnoreCase(url.getHost())) {
// cache also for the redirected host
- CACHE.put(getCacheKey(redir), robotRules);
+ String redirKey = cache.getCacheKey(redir);
+ cache.put(redirKey, robotRules);
}
}
}
diff --git a/src/main/resources/urlfilters.json b/src/main/resources/urlfilters.json
index 670dae37d..7a5e1fc7a 100644
--- a/src/main/resources/urlfilters.json
+++ b/src/main/resources/urlfilters.json
@@ -20,6 +20,11 @@
"params": {
"regexFilterFile": "default-regex-filters.txt"
}
+ },
+ {
+ "class": "com.digitalpebble.storm.crawler.filtering.RobotsURLFilter",
+ "name": "RobotsURLFilter",
+ "params": {}
}
]