Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AdaptiveScheduler to set last-modified time in metadata, fixes #777 #812

Merged
merged 1 commit into from Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,6 +17,7 @@

package com.digitalpebble.stormcrawler.persistence;

import java.time.Duration;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;
Expand Down Expand Up @@ -62,11 +63,15 @@
* <dt>fetchInterval</dt>
* <dd>current fetch interval</dd>
* <dt>signatureChangeDate</dt>
* <dd>date when the signature has changed</dd>
* <dd>date when the signature has changed (ISO-8601 date time format)</dd>
* <dt>last-modified</dt>
* <dd>last-modified time used to send If-Modified-Since HTTP requests, only
* written if <code>scheduler.adaptive.setLastModified</code> is true. Same date
* string as set in &quot;signatureChangeDate&quot;.</dd>
* string as set in &quot;signatureChangeDate&quot;. Note that it is assumed
* that the metadata field `last-modified` is written only by the scheduler, in
* detail, the property `protocol.md.prefix` should not be empty to avoid that
* `last-modified` is filled with an incorrect or ill-formed date from the HTTP
* header.</dd>
* </p>
*
* <h2>Configuration</h2>
Expand Down Expand Up @@ -232,6 +237,15 @@ public Date schedule(Status status, Metadata metadata) {
metadata.remove(SIGNATURE_KEY);
metadata.remove(SIGNATURE_OLD_KEY);

if (status == Status.ERROR) {
/*
* remove last-modified for permanent errors so that no
* if-modified-since request is sent: the content is needed
* again to be parsed and index
*/
metadata.remove(HttpHeaders.LAST_MODIFIED);
}

// fall-back to DefaultScheduler
return super.schedule(status, metadata);
}
Expand All @@ -249,14 +263,25 @@ public Date schedule(Status status, Metadata metadata) {
// HTTP 304 Not Modified
// - no new signature calculated because no content fetched
// - do not compare persisted signatures
// - leave last-modified time unchanged
} else if (signature == null || oldSignature == null) {
// no decision possible by signature comparison if
// - document not parsed (intentionally or not) or
// - signature not generated or
// - old signature not copied
// fall-back to DefaultScheduler
LOG.debug("No signature for FETCHED page: {}", metadata);
return super.schedule(status, metadata);
if (setLastModified && signature != null && oldSignature == null) {
// set last-modified time for first fetch
metadata.setValue(HttpHeaders.LAST_MODIFIED, modifiedTimeString);
}
Date nextFetch = super.schedule(status, metadata);
long fetchIntervalMinutes = Duration
.between(now.toInstant(), nextFetch.toInstant())
.toMinutes();
metadata.setValue(FETCH_INTERVAL_KEY,
Long.toString(fetchIntervalMinutes));
return nextFetch;
} else if (signature.equals(oldSignature)) {
// unchanged
} else {
Expand Down
Expand Up @@ -200,7 +200,7 @@ public ProtocolResponse getProtocolOutput(String url, Metadata md)
request = new HttpHead(url);
}

String lastModified = md.getFirstValue("last-modified");
String lastModified = md.getFirstValue(HttpHeaders.LAST_MODIFIED);
if (StringUtils.isNotBlank(lastModified)) {
request.addHeader("If-Modified-Since",
HttpHeaders.formatHttpDate(lastModified));
Expand Down
Expand Up @@ -231,7 +231,8 @@ public ProtocolResponse getProtocolOutput(String url, final Metadata metadata) t
});

if (metadata != null) {
String lastModified = metadata.getFirstValue("last-modified");
String lastModified = metadata
.getFirstValue(HttpHeaders.LAST_MODIFIED);
if (StringUtils.isNotBlank(lastModified)) {
rb.header("If-Modified-Since",
HttpHeaders.formatHttpDate(lastModified));
Expand Down
@@ -0,0 +1,150 @@
/**
* Licensed to DigitalPebble Ltd under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.digitalpebble.stormcrawler.persistence;

import java.net.MalformedURLException;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.time.DateUtils;
import org.junit.Assert;
import org.junit.Test;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.protocol.HttpHeaders;

public class AdaptiveSchedulerTest {

private static String md5sumEmptyContent = "d41d8cd98f00b204e9800998ecf8427e";
private static String md5sumSpaceContent = "7215ee9c7d9dc229d2921a40e899ec5f";

private static Map<String, Object> getConf() {
Map<String, Object> stormConf = new HashMap<>();
stormConf.put("fetchInterval.FETCHED.testKey=someValue", 6);
stormConf.put("fetchInterval.testKey=someValue", 8);
stormConf.put("scheduler.adaptive.setLastModified", true);
stormConf.put("scheduler.adaptive.fetchInterval.min", 2);
stormConf.put("fetchInterval.default", 5);
stormConf.put("scheduler.adaptive.fetchInterval.max", 10);
stormConf.put("protocol.md.prefix", "protocol.");
return stormConf;
}

/**
* Verify setting the initial fetch interval by metadata and fetch status
* implemented in DefaultScheduler
*/
@Test
public void testSchedulerInitialInterval() throws MalformedURLException {
Scheduler scheduler = new AdaptiveScheduler();
scheduler.init(getConf());

Metadata metadata = new Metadata();
metadata.addValue("testKey", "someValue");
metadata.addValue("fetch.statusCode", "200");
Date nextFetch = scheduler.schedule(Status.FETCHED, metadata);

Calendar cal = Calendar.getInstance();
cal.add(Calendar.MINUTE, 6);
Assert.assertEquals(DateUtils.round(cal.getTime(), Calendar.SECOND),
DateUtils.round(nextFetch, Calendar.SECOND));

nextFetch = scheduler.schedule(Status.ERROR, metadata);

cal = Calendar.getInstance();
cal.add(Calendar.MINUTE, 8);
Assert.assertEquals(DateUtils.round(cal.getTime(), Calendar.SECOND),
DateUtils.round(nextFetch, Calendar.SECOND));
}

@Test
public void testSchedule() throws MalformedURLException {
Scheduler scheduler = new AdaptiveScheduler();
scheduler.init(getConf());

Metadata metadata = new Metadata();
metadata.addValue("fetch.statusCode", "200");
metadata.addValue(AdaptiveScheduler.SIGNATURE_KEY, md5sumEmptyContent);
Date nextFetch = scheduler.schedule(Status.FETCHED, metadata);
Instant firstFetch = DateUtils
.round(Calendar.getInstance().getTime(), Calendar.SECOND)
.toInstant();

/* verify initial fetch interval and last-modified time */
String lastModified = metadata.getFirstValue(HttpHeaders.LAST_MODIFIED);
Assert.assertNotNull(lastModified);
Instant lastModifiedTime = DateUtils.round(
GregorianCalendar.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME
.parse(lastModified, ZonedDateTime::from)),
Calendar.SECOND).toInstant();
Assert.assertEquals(firstFetch, lastModifiedTime);
String fetchInterval = metadata.getFirstValue(AdaptiveScheduler.FETCH_INTERVAL_KEY);
Assert.assertNotNull(fetchInterval);
/* initial interval is the default interval */
Assert.assertEquals(5, Integer.parseInt(fetchInterval));

/* test with signature not modified */
metadata.addValue(AdaptiveScheduler.SIGNATURE_OLD_KEY, md5sumEmptyContent);
nextFetch = scheduler.schedule(Status.FETCHED, metadata);
fetchInterval = metadata.getFirstValue(AdaptiveScheduler.FETCH_INTERVAL_KEY);
Assert.assertNotNull(fetchInterval);
/* interval should be bigger than initial interval */
int fi1 = Integer.parseInt(fetchInterval);
Assert.assertTrue(5 < fi1);
/* last-modified time should be unchanged */
Assert.assertEquals(lastModified, metadata.getFirstValue(HttpHeaders.LAST_MODIFIED));

/* test with HTTP 304 "not modified" */
metadata.setValue("fetch.statusCode", "304");
nextFetch = scheduler.schedule(Status.FETCHED, metadata);
fetchInterval = metadata.getFirstValue(AdaptiveScheduler.FETCH_INTERVAL_KEY);
Assert.assertNotNull(fetchInterval);
/* interval should be bigger than initial interval and interval from last step */
int fi2 = Integer.parseInt(fetchInterval);
Assert.assertTrue(5 < fi2);
Assert.assertTrue(fi1 < fi2);
/* last-modified time should be unchanged */
Assert.assertEquals(lastModified, metadata.getFirstValue(HttpHeaders.LAST_MODIFIED));

/* test with a changed signature */
metadata.setValue("fetch.statusCode", "200");
metadata.addValue(AdaptiveScheduler.SIGNATURE_KEY, md5sumSpaceContent);
nextFetch = scheduler.schedule(Status.FETCHED, metadata);
Instant lastFetch = DateUtils
.round(Calendar.getInstance().getTime(), Calendar.SECOND)
.toInstant();
fetchInterval = metadata.getFirstValue(AdaptiveScheduler.FETCH_INTERVAL_KEY);
Assert.assertNotNull(fetchInterval);
/* interval should now shrink */
int fi3 = Integer.parseInt(fetchInterval);
Assert.assertTrue(fi2 > fi3);
/* last-modified time should fetch time of last fetch */
lastModified = metadata.getFirstValue(HttpHeaders.LAST_MODIFIED);
Assert.assertNotNull(lastModified);
lastModifiedTime = DateUtils.round(
GregorianCalendar.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME
.parse(lastModified, ZonedDateTime::from)),
Calendar.SECOND).toInstant();
Assert.assertEquals(lastFetch, lastModifiedTime);
}
}