Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SimpleFetcherBolt handles 304 correctly #280

Merged
merged 2 commits into from Apr 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/pom.xml
@@ -1,5 +1,6 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -26,6 +27,7 @@
<httpclient.version>4.4.1</httpclient.version>
<snakeyaml.version>1.16</snakeyaml.version>
<commons.lang.version>2.6</commons.lang.version>
<wiremock.version>1.57</wiremock.version>
</properties>

<build>
Expand Down Expand Up @@ -141,6 +143,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>${wiremock.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.crawler-commons</groupId>
<artifactId>crawler-commons</artifactId>
Expand Down
Expand Up @@ -411,7 +411,7 @@ public void run() {

activeThreads.incrementAndGet(); // count threads

LOG.info(
LOG.debug(
"[Fetcher #{}] {} => activeThreads={}, spinWaiting={}, queueID={}",
taskID, getName(), activeThreads, spinWaiting,
fit.queueID);
Expand Down
Expand Up @@ -346,15 +346,26 @@ input, new Values(urlString, metadata,
response.getMetadata().putAll(metadata);

// determine the status based on the status code
Status status = Status.fromHTTPCode(response.getStatusCode());
final Status status = Status.fromHTTPCode(response.getStatusCode());

// used when sending to status stream
final Values values4status = new Values(urlString,
response.getMetadata(), status);

// if the status is OK emit on default stream
if (status.equals(Status.FETCHED)) {
_collector.emit(
Utils.DEFAULT_STREAM_ID,
input,
new Values(urlString, response.getContent(), response
.getMetadata()));
if (response.getStatusCode() == 304) {
// mark this URL as fetched so that it gets
// rescheduled
// but do not try to parse or index
_collector
.emit(com.digitalpebble.storm.crawler.Constants.StatusStreamName,
input, values4status);
} else {
_collector.emit(Utils.DEFAULT_STREAM_ID, input,
new Values(urlString, response.getContent(),
response.getMetadata()));
}
} else if (status.equals(Status.REDIRECTION)) {

// find the URL it redirects to
Expand All @@ -368,24 +379,19 @@ input, new Values(urlString, metadata,
response.getMetadata().setValue("_redirTo", redirection);
}

// Mark URL as redirected
_collector
.emit(com.digitalpebble.storm.crawler.Constants.StatusStreamName,
input,
new Values(urlString, response.getMetadata(),
status));

if (allowRedirs && StringUtils.isNotBlank(redirection)) {
handleOutlink(input, url, redirection,
response.getMetadata());
}
// Mark URL as redirected
_collector
.emit(com.digitalpebble.storm.crawler.Constants.StatusStreamName,
input, values4status);
} else {
// Error
_collector
.emit(com.digitalpebble.storm.crawler.Constants.StatusStreamName,
input,
new Values(urlString, response.getMetadata(),
status));
input, values4status);
}

} catch (Exception exece) {
Expand Down
Expand Up @@ -173,7 +173,10 @@ public ProtocolResponse handleResponse(HttpResponse response)

private static final byte[] toByteArray(final HttpEntity entity,
int maxContent, MutableBoolean trimmed) throws IOException {
Args.notNull(entity, "Entity");

if (entity == null)
return new byte[] {};

final InputStream instream = entity.getContent();
if (instream == null) {
return null;
Expand Down
Expand Up @@ -28,14 +28,14 @@
import org.junit.Assert;
import org.junit.Test;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import com.digitalpebble.storm.crawler.Constants;
import com.digitalpebble.storm.crawler.TestOutputCollector;
import com.digitalpebble.storm.crawler.TestUtil;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public abstract class AbstractFetcherBoltTest {

BaseRichBolt bolt;
Expand Down
Expand Up @@ -17,13 +17,85 @@

package com.digitalpebble.storm.crawler.bolt;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import com.digitalpebble.storm.crawler.Constants;
import com.digitalpebble.storm.crawler.TestOutputCollector;
import com.digitalpebble.storm.crawler.TestUtil;
import com.github.tomakehurst.wiremock.junit.WireMockRule;

import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;

public class SimpleFetcherBoltTest extends AbstractFetcherBoltTest {

private final static int port = 8089;

@Rule
public WireMockRule wireMockRule = new WireMockRule(port);

@Before
public void setUpContext() throws Exception {
bolt = new SimpleFetcherBolt();
}

@Test
public void test304() {

stubFor(get(urlMatching(".+")).willReturn(aResponse().withStatus(304)));

TestOutputCollector output = new TestOutputCollector();

Map config = new HashMap();
config.put("http.agent.name", "this is only a test");

bolt.prepare(config, TestUtil.getMockedTopologyContext(),
new OutputCollector(output));

Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn("source");
when(tuple.getStringByField("url")).thenReturn(
"http://localhost:" + port + "/");
when(tuple.getValueByField("metadata")).thenReturn(null);
bolt.execute(tuple);

try {
Thread.sleep(1);
} catch (InterruptedException e) {
}

boolean acked = output.getAckedTuples().contains(tuple);
boolean failed = output.getAckedTuples().contains(tuple);

// should be acked or failed
Assert.assertEquals(true, acked || failed);

List<List<Object>> statusTuples = output
.getEmitted(Constants.StatusStreamName);

// we should get one tuple on the status stream
// to notify that the URL has been fetched
Assert.assertEquals(1, statusTuples.size());

// and none on the default stream as there is nothing to parse and/or
// index
Assert.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID)
.size());
}

}