Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.digitalpebble.storm.crawler.bolt.IndexerBolt;
import com.digitalpebble.storm.crawler.bolt.ParserBolt;
import com.digitalpebble.storm.crawler.bolt.PrinterBolt;
import com.digitalpebble.storm.crawler.bolt.SiteMapParserBolt;
import com.digitalpebble.storm.crawler.bolt.URLPartitionerBolt;
import com.digitalpebble.storm.crawler.spout.RandomURLSpout;
import com.digitalpebble.storm.metrics.DebugMetricsConsumer;
Expand All @@ -49,14 +50,18 @@ protected int run(String[] args) {
builder.setBolt("fetch", new FetcherBolt()).fieldsGrouping(
"partitioner", new Fields("key"));

builder.setBolt("sitemap", new SiteMapParserBolt())
.localOrShuffleGrouping("fetch");

builder.setBolt("parse", new ParserBolt()).localOrShuffleGrouping(
"fetch");
"sitemap");

builder.setBolt("index", new IndexerBolt()).localOrShuffleGrouping(
"parse");

builder.setBolt("status", new PrinterBolt())
.localOrShuffleGrouping("fetch", Constants.StatusStreamName)
.localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
.localOrShuffleGrouping("parse", Constants.StatusStreamName);

conf.registerMetricsConsumer(DebugMetricsConsumer.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/**
* Licensed to DigitalPebble Ltd under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.digitalpebble.storm.crawler.bolt;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.slf4j.LoggerFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.digitalpebble.storm.crawler.Constants;
import com.digitalpebble.storm.crawler.persistence.Status;
import com.digitalpebble.storm.crawler.protocol.HttpHeaders;
import com.digitalpebble.storm.crawler.util.KeyValues;

import crawlercommons.sitemaps.AbstractSiteMap;
import crawlercommons.sitemaps.SiteMap;
import crawlercommons.sitemaps.SiteMapIndex;
import crawlercommons.sitemaps.SiteMapURL;
import crawlercommons.sitemaps.SiteMapURL.ChangeFrequency;

/**
* Extracts URLs from sitemap files. The parsing is triggered by the presence of
* 'isSitemap=true' in the metadata. Any tuple which does not have this
* key/value in the metadata is simply passed on to the default stream, whereas
* any URLs extracted from the sitemaps is sent to the 'status' field.
**/
public class SiteMapParserBolt extends BaseRichBolt {

private OutputCollector collector;
public static final String isSitemapKey = "isSitemap";

private boolean strictMode = false;

private static final org.slf4j.Logger LOG = LoggerFactory
.getLogger(SiteMapParserBolt.class);

@Override
public void execute(Tuple tuple) {
HashMap<String, String[]> metadata = (HashMap<String, String[]>) tuple
.getValueByField("metadata");

// TODO check that we have the right number of fields ?
String isSitemap = KeyValues.getValue(isSitemapKey, metadata);
if (!Boolean.valueOf(isSitemap)) {
// just pass it on
this.collector.emit(tuple.getValues());
this.collector.ack(tuple);
return;
}

// it does have the right key/value
byte[] content = tuple.getBinaryByField("content");
String url = tuple.getStringByField("url");
String ct = KeyValues.getValue(HttpHeaders.CONTENT_TYPE, metadata);
List<Values> outlinks = parseSiteMap(url, content, ct);

// send to status stream
for (Values ol : outlinks) {
collector.emit(Constants.StatusStreamName, ol);
}

// marking the main URL as successfully fetched
// regardless of whether we got a parse exception or not
collector.emit(Constants.StatusStreamName, new Values(url, metadata,
Status.FETCHED));

this.collector.ack(tuple);
}

private List<Values> parseSiteMap(String url, byte[] content,
String contentType) {

crawlercommons.sitemaps.SiteMapParser parser = new crawlercommons.sitemaps.SiteMapParser(
strictMode);

AbstractSiteMap siteMap = null;
try {
siteMap = parser.parseSiteMap(contentType, content, new URL(url));
} catch (Exception e) {
LOG.error("Exception while parsing sitemap", e);
return Collections.emptyList();
}

List<Values> links = new ArrayList<Values>();

if (siteMap.isIndex()) {
SiteMapIndex smi = ((SiteMapIndex) siteMap);
Collection<AbstractSiteMap> subsitemaps = smi.getSitemaps();
// keep the subsitemaps as outlinks
// they will be fetched and parsed in the following steps
Iterator<AbstractSiteMap> iter = subsitemaps.iterator();
while (iter.hasNext()) {
String s = iter.next().getUrl().toExternalForm();
// TODO apply filtering to outlinks
// TODO configure which metadata gets inherited from parent
HashMap<String, String[]> metadata = KeyValues.newInstance();
KeyValues.setValue(isSitemapKey, metadata, "true");
Values ol = new Values(s, metadata, Status.DISCOVERED);
links.add(ol);
LOG.debug("{} : [sitemap] {}", url, s);
}
}
// sitemap files
else {
SiteMap sm = ((SiteMap) siteMap);
// TODO see what we can do with the LastModified info
Collection<SiteMapURL> sitemapURLs = sm.getSiteMapUrls();
Iterator<SiteMapURL> iter = sitemapURLs.iterator();
while (iter.hasNext()) {
SiteMapURL smurl = iter.next();
double priority = smurl.getPriority();
// TODO handle priority in metadata
ChangeFrequency freq = smurl.getChangeFrequency();
// TODO convert the frequency into a numerical value and handle
// it in metadata
// TODO configure which metadata gets inherited from parent
String s = smurl.getUrl().toExternalForm();
// TODO apply filtering to outlinks
HashMap<String, String[]> metadata = KeyValues.newInstance();
KeyValues.setValue(isSitemapKey, metadata, "false");
Values ol = new Values(s, metadata, Status.DISCOVERED);
links.add(ol);
LOG.debug("{} : [sitemap] {}", url, s);
}
}

return links;
}

@Override
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url", "content", "metadata"));
declarer.declareStream(Constants.StatusStreamName, new Fields("url",
"metadata", "status"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package com.digitalpebble.storm.crawler.parse.filter;
package com.digitalpebble.storm.crawler;

import java.util.ArrayList;
import java.util.Collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package com.digitalpebble.storm.crawler.parse.filter;
package com.digitalpebble.storm.crawler;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to DigitalPebble Ltd under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.digitalpebble.storm.crawler.bolt;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.digitalpebble.storm.crawler.parse.filter.ParsingTester;
import com.digitalpebble.storm.crawler.protocol.HttpHeaders;
import com.digitalpebble.storm.crawler.util.KeyValues;

public class SiteMapParserBoltTest extends ParsingTester {

@Before
public void setupParserBolt() {
bolt = new SiteMapParserBolt();
setupParserBolt(bolt);
}

// TODO add a test for a sitemap containing links
// to other sitemap files

@Test
public void testSitemapParsing() throws IOException {

prepareParserBolt("test.parsefilters.json");

Map<String, String[]> metadata = KeyValues.newInstance();
// specify that it is a sitemap file
KeyValues.setValue(SiteMapParserBolt.isSitemapKey, metadata, "true");
// and its mime-type
KeyValues.setValue(HttpHeaders.CONTENT_TYPE, metadata,
"application/xml");

parse("http://www.digitalpebble.com/sitemap.xml",
"digitalpebble.sitemap.xml", metadata);

Assert.assertEquals(6, output.getEmitted().size());
// TODO test that the new links have the right metadata
List<Object> fields = output.getEmitted().get(0);
Assert.assertEquals(3, fields.size());
}

@Test
public void testNonSitemapParsing() throws IOException {

prepareParserBolt("test.parsefilters.json");

Map<String, String[]> metadata = KeyValues.newInstance();
// do not specify that it is a sitemap file
parse("http://www.digitalpebble.com", "digitalpebble.com.html",
metadata);

Assert.assertEquals(1, output.getEmitted().size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@

import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import com.digitalpebble.storm.crawler.bolt.ParserBolt;
import com.digitalpebble.storm.crawler.TestOutputCollector;
import com.digitalpebble.storm.crawler.TestUtil;

public class ParsingTester {
protected ParserBolt bolt;
protected BaseRichBolt bolt;
protected TestOutputCollector output;

@Before
public void setupParserBolt() {
bolt = new ParserBolt();
protected void setupParserBolt(BaseRichBolt bolt) {
this.bolt = bolt;
output = new TestOutputCollector();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@
import java.util.Map;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.digitalpebble.storm.crawler.bolt.ParserBolt;
import com.digitalpebble.storm.crawler.util.KeyValues;

/** **/

public class XPathFilterTest extends ParsingTester {

@Before
public void setupParserBolt() {
bolt = new ParserBolt();
setupParserBolt(bolt);
}

@Test
public void testBasicExtraction() throws IOException {

Expand Down
39 changes: 39 additions & 0 deletions src/test/resources/digitalpebble.sitemap.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<urlset
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.sitemaps.org/schemas/sitemap/0.9
http://www.sitemaps.org/schemas/sitemap/0.9/sitemap.xsd">
<!-- created with Free Online Sitemap Generator www.xml-sitemaps.com -->

<url>
<loc>http://digitalpebble.com/</loc>
<lastmod>2012-12-05T10:59:04+00:00</lastmod>
<changefreq>monthly</changefreq>
<priority>1.00</priority>
</url>
<url>
<loc>http://digitalpebble.com/index.html</loc>
<lastmod>2012-12-05T10:59:04+00:00</lastmod>
<changefreq>monthly</changefreq>
<priority>0.80</priority>
</url>
<url>
<loc>http://digitalpebble.com/solutions.html</loc>
<lastmod>2012-09-06T16:53:04+00:00</lastmod>
<changefreq>monthly</changefreq>
<priority>0.80</priority>
</url>
<url>
<loc>http://digitalpebble.com/references.html</loc>
<lastmod>2014-04-16T14:40:10+00:00</lastmod>
<changefreq>monthly</changefreq>
<priority>0.80</priority>
</url>
<url>
<loc>http://digitalpebble.com/contact.html</loc>
<lastmod>2012-12-05T10:59:00+00:00</lastmod>
<changefreq>monthly</changefreq>
<priority>0.80</priority>
</url>
</urlset>