Skip to content

Commit

Permalink
Refactor crawler and add special logic for some platforms
Browse files Browse the repository at this point in the history
* Break apart CrawlerRetreiver
* Break apart HttpFetcher into an interface and impl for testing sanity
* Add special logic for Lemmy, Mediawiki and Discourse to not waste requests on paths that aren't interesting.
  • Loading branch information
vlofgren committed Jun 27, 2023
1 parent 5abaf13 commit ed373ee
Show file tree
Hide file tree
Showing 22 changed files with 5,094 additions and 270 deletions.
Expand Up @@ -2,13 +2,14 @@

import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.process.log.WorkLog;
import plan.CrawlPlanLoader;
import plan.CrawlPlan;
import nu.marginalia.crawling.io.CrawledDomainWriter;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.internal.Util;
Expand Down Expand Up @@ -102,8 +103,8 @@ private void fetchDomain(CrawlingSpecification specification) {
if (workLog.isJobFinished(specification.id))
return;

HttpFetcher fetcher = new HttpFetcherImpl(userAgent.uaString(), dispatcher, connectionPool);

HttpFetcher fetcher = new HttpFetcher(userAgent.uaString(), dispatcher, connectionPool);
try (CrawledDomainWriter writer = new CrawledDomainWriter(crawlDataDir, specification.domain, specification.id)) {
var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept);

Expand Down
Expand Up @@ -3,11 +3,12 @@
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import lombok.SneakyThrows;
import nu.marginalia.crawl.retreival.fetcher.FetchResult;
import nu.marginalia.crawl.retreival.fetcher.FetchResultState;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.crawling.model.*;
import nu.marginalia.ip_blocklist.GeoIpBlocklist;
import nu.marginalia.ip_blocklist.IpBlockList;
import nu.marginalia.ip_blocklist.UrlBlocklist;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
Expand All @@ -20,10 +21,9 @@
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static java.lang.Math.max;
import static java.lang.Math.min;
Expand All @@ -32,16 +32,18 @@ public class CrawlerRetreiver {
private static final long DEFAULT_CRAWL_DELAY_MIN_MS = Long.getLong("defaultCrawlDelay", 1000);
private static final long DEFAULT_CRAWL_DELAY_MAX_MS = Long.getLong("defaultCrawlDelaySlow", 2500);

private static final int MAX_ERRORS = 10;
private static final int MAX_ERRORS = 20;

private final LinkedList<EdgeUrl> queue = new LinkedList<>();
private final HttpFetcher fetcher;

private final HashSet<String> visited;
private final HashSet<String> known;

/** Flag to indicate that the crawler should slow down, e.g. from 429s */
private boolean slowDown = false;

private final int depth;

/** Testing flag to disable crawl delay (otherwise crawler tests take several minutes) */
private boolean testFlagIgnoreDelay = false;

private final String id;
private final String domain;
private final Consumer<SerializableCrawlData> crawledDomainWriter;
Expand All @@ -50,118 +52,120 @@ public class CrawlerRetreiver {
private static final Logger logger = LoggerFactory.getLogger(CrawlerRetreiver.class);

private static final HashFunction hashMethod = Hashing.murmur3_128(0);
private static final IpBlockList ipBlocklist;
private static final UrlBlocklist urlBlocklist = new UrlBlocklist();
private static final LinkFilterSelector linkFilterSelector = new LinkFilterSelector();

int errorCount = 0;
private static final DomainProber domainProber = new DomainProber();
private final DomainCrawlFrontier crawlFrontier;

static {
try {
ipBlocklist = new IpBlockList(new GeoIpBlocklist());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

int errorCount = 0;

public CrawlerRetreiver(HttpFetcher fetcher, CrawlingSpecification specs, Consumer<SerializableCrawlData> writer) {
this.fetcher = fetcher;
visited = new HashSet<>((int)(specs.urls.size() * 1.5));
known = new HashSet<>(specs.urls.size() * 10);

depth = specs.crawlDepth;
id = specs.id;
domain = specs.domain;

crawledDomainWriter = writer;

for (String urlStr : specs.urls) {
EdgeUrl.parse(urlStr).ifPresent(this::addToQueue);
}
this.crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), specs.urls, specs.crawlDepth);

if (queue.peek() != null) {
var fst = queue.peek();
var fst = crawlFrontier.peek();
if (fst != null) {

// Ensure the index page is always crawled
var root = fst.withPathAndParam("/", null);
if (known.add(root.toString()))
queue.addFirst(root);
if (crawlFrontier.addKnown(root))
crawlFrontier.addFirst(root);
}
else {
addToQueue(new EdgeUrl("http", new EdgeDomain(domain), null, "/", null));
addToQueue(new EdgeUrl("https", new EdgeDomain(domain), null, "/", null));
// We know nothing about this domain, so we'll start with the index, trying both HTTP and HTTPS
crawlFrontier.addToQueue(new EdgeUrl("http", new EdgeDomain(domain), null, "/", null));
crawlFrontier.addToQueue(new EdgeUrl("https", new EdgeDomain(domain), null, "/", null));
}
}

public CrawlerRetreiver withNoDelay() {
testFlagIgnoreDelay = true;
return this;
}

public int fetch() {
Optional<CrawledDomain> probeResult = probeDomainForProblems(domain);
final DomainProber.ProbeResult probeResult = domainProber.probeDomain(fetcher, domain, crawlFrontier.peek());

if (probeResult.isPresent()) {
crawledDomainWriter.accept(probeResult.get());
return 1;
}
else {
if (probeResult instanceof DomainProber.ProbeResultOk) {
return crawlDomain();
}
}

private Optional<CrawledDomain> probeDomainForProblems(String domain) {
EdgeUrl fst = queue.peek();

// handle error cases for probe

if (fst == null) {
logger.warn("No URLs for domain {}", domain);
var ip = findIp(domain);

return Optional.of(CrawledDomain.builder()
.crawlerStatus(CrawlerDomainStatus.ERROR.name())
.crawlerStatusDesc("No known URLs")
.id(id)
.domain(domain)
.build());
if (probeResult instanceof DomainProber.ProbeResultError err) {
crawledDomainWriter.accept(
CrawledDomain.builder()
.crawlerStatus(err.status().name())
.crawlerStatusDesc(err.desc())
.id(id)
.domain(domain)
.ip(ip)
.build()
);
return 1;
}

if (!ipBlocklist.isAllowed(fst.domain)) {
return Optional.of(CrawledDomain.builder()
.crawlerStatus(CrawlerDomainStatus.BLOCKED.name())
.id(id)
.domain(domain)
.ip(findIp(domain))
.build());
if (probeResult instanceof DomainProber.ProbeResultRedirect redirect) {
crawledDomainWriter.accept(
CrawledDomain.builder()
.crawlerStatus(CrawlerDomainStatus.REDIRECT.name())
.crawlerStatusDesc("Redirected to different domain")
.redirectDomain(redirect.domain().toString())
.id(id)
.domain(domain)
.ip(ip)
.build()
);
return 1;
}

var fetchResult = fetcher.probeDomain(fst.withPathAndParam("/", null));
if (!fetchResult.ok()) {
logger.debug("Bad status on {}", domain);
return Optional.of(createErrorPostFromStatus(fetchResult));
}
return Optional.empty();
}
throw new IllegalStateException("Unknown probe result: " + probeResult);
};

private int crawlDomain() {
String ip = findIp(domain);

assert !queue.isEmpty();
assert !crawlFrontier.isEmpty();

var robotsRules = fetcher.fetchRobotRules(queue.peek().domain);
var robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain);
long crawlDelay = robotsRules.getCrawlDelay();

CrawledDomain ret = new CrawledDomain(id, domain, null, CrawlerDomainStatus.OK.name(), null, ip, new ArrayList<>(), null);

int fetchedCount = 0;

while (!queue.isEmpty() && visited.size() < depth && errorCount < MAX_ERRORS ) {
var top = queue.removeFirst();
configureLinkFilter();

while (!crawlFrontier.isEmpty()
&& !crawlFrontier.isCrawlDepthReached()
&& errorCount < MAX_ERRORS)
{
var top = crawlFrontier.takeNextUrl();

if (!robotsRules.isAllowed(top.toString())) {
crawledDomainWriter.accept(createRobotsError(top));
continue;
}

if (!crawlFrontier.filterLink(top))
continue;
if (urlBlocklist.isUrlBlocked(top))
continue;
if (!isAllowedProtocol(top.proto))
continue;
if (top.toString().length() > 255)
continue;
if (!visited.add(top.toString()))
if (!crawlFrontier.addVisited(top))
continue;

if (fetchDocument(top, crawlDelay)) {
Expand All @@ -176,8 +180,22 @@ private int crawlDomain() {
return fetchedCount;
}

private void configureLinkFilter() {
try {
logger.info("Configuring link filter");

fetchUrl(crawlFrontier.peek())
.map(linkFilterSelector::selectFilter)
.ifPresent(crawlFrontier::setLinkFilter);
}
catch (Exception ex) {
logger.error("Error configuring link filter", ex);
}
}

private boolean fetchDocument(EdgeUrl top, long crawlDelay) {
logger.debug("Fetching {}", top);

long startTime = System.currentTimeMillis();

var doc = fetchUrl(top);
Expand All @@ -186,10 +204,10 @@ private boolean fetchDocument(EdgeUrl top, long crawlDelay) {
crawledDomainWriter.accept(d);

if (d.url != null) {
EdgeUrl.parse(d.url).map(EdgeUrl::toString).ifPresent(visited::add);
EdgeUrl.parse(d.url).ifPresent(crawlFrontier::addVisited);
}

if ("ERROR".equals(d.crawlerStatus)) {
if ("ERROR".equals(d.crawlerStatus) && d.httpStatus != 404) {
errorCount++;
}

Expand All @@ -211,7 +229,6 @@ private Optional<CrawledDocument> fetchUrl(EdgeUrl top) {
var doc = fetchContent(top);

if (doc.documentBody != null) {

doc.documentBodyHash = createHash(doc.documentBody.decode());

Optional<Document> parsedDoc = parseDoc(doc);
Expand Down Expand Up @@ -260,37 +277,23 @@ private Optional<Document> parseDoc(CrawledDocument doc) {
return Optional.of(Jsoup.parse(doc.documentBody.decode()));
}

public boolean isSameDomain(EdgeUrl url) {
return domain.equalsIgnoreCase(url.domain.toString());
}

private void findLinks(EdgeUrl baseUrl, Document parsed) {
baseUrl = linkParser.getBaseLink(parsed, baseUrl);

for (var link : parsed.getElementsByTag("a")) {
linkParser.parseLink(baseUrl, link).ifPresent(this::addToQueue);
linkParser.parseLink(baseUrl, link).ifPresent(crawlFrontier::addToQueue);
}
for (var link : parsed.getElementsByTag("frame")) {
linkParser.parseFrame(baseUrl, link).ifPresent(this::addToQueue);
linkParser.parseFrame(baseUrl, link).ifPresent(crawlFrontier::addToQueue);
}
for (var link : parsed.getElementsByTag("iframe")) {
linkParser.parseFrame(baseUrl, link).ifPresent(this::addToQueue);
linkParser.parseFrame(baseUrl, link).ifPresent(crawlFrontier::addToQueue);
}
}

private void addToQueue(EdgeUrl url) {
if (!isSameDomain(url))
return;
if (urlBlocklist.isUrlBlocked(url))
return;
if (urlBlocklist.isMailingListLink(url))
return;
// reduce memory usage by not growing queue huge when crawling large sites
if (queue.size() + visited.size() >= depth + 100)
return;

if (known.add(url.toString())) {
queue.addLast(url);
for (var link : parsed.getElementsByTag("link")) {
String rel = link.attr("rel");
if (rel.equalsIgnoreCase("next") || rel.equalsIgnoreCase("prev")) {
linkParser.parseLink(baseUrl, link).ifPresent(crawlFrontier::addToQueue);
}
}
}

Expand All @@ -314,6 +317,9 @@ private String findIp(String domain) {

@SneakyThrows
private void delay(long sleepTime, long spentTime) {
if (testFlagIgnoreDelay)
return;

if (sleepTime >= 1) {
if (spentTime > sleepTime)
return;
Expand Down Expand Up @@ -355,17 +361,17 @@ private CrawledDocument createRetryError(EdgeUrl url) {
.crawlerStatus(CrawlerDocumentStatus.ERROR.name())
.build();
}
private CrawledDomain createErrorPostFromStatus(HttpFetcher.FetchResult ret) {
private CrawledDomain createErrorPostFromStatus(FetchResult ret) {
String ip = findIp(domain);

if (ret.state == HttpFetcher.FetchResultState.ERROR) {
if (ret.state == FetchResultState.ERROR) {
return CrawledDomain.builder()
.crawlerStatus(CrawlerDomainStatus.ERROR.name())
.id(id).domain(domain)
.ip(ip)
.build();
}
if (ret.state == HttpFetcher.FetchResultState.REDIRECT) {
if (ret.state == FetchResultState.REDIRECT) {
return CrawledDomain.builder()
.crawlerStatus(CrawlerDomainStatus.REDIRECT.name())
.id(id)
Expand All @@ -377,4 +383,5 @@ private CrawledDomain createErrorPostFromStatus(HttpFetcher.FetchResult ret) {
throw new AssertionError("Unexpected case");
}


}

0 comments on commit ed373ee

Please sign in to comment.