Skip to content

Commit

Permalink
Merge 31a00eb into 2ae8f03
Browse files Browse the repository at this point in the history
  • Loading branch information
lwj5 committed Sep 12, 2019
2 parents 2ae8f03 + 31a00eb commit 2822a3f
Show file tree
Hide file tree
Showing 20 changed files with 295 additions and 500 deletions.
45 changes: 24 additions & 21 deletions pom.xml
Expand Up @@ -10,7 +10,7 @@

<groupId>ai.preferred</groupId>
<artifactId>venom</artifactId>
<version>4.2.2</version>
<version>4.2.3-SNAPSHOT</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down Expand Up @@ -46,7 +46,7 @@
</developer>
<developer>
<name>LEE Ween Jiann</name>
<email>weenjiannlee -at- smu.edu.sg</email>
<email>wjlee.2019 -at- smu.edu.sg</email>
<organization>Preferred.AI</organization>
<organizationUrl>https://preferred.ai/</organizationUrl>
<timezone>+8</timezone>
Expand Down Expand Up @@ -104,19 +104,7 @@
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.0</version>
<dependencies>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-surefire-provider</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.4.0</version>
</dependency>
</dependencies>
<version>2.22.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -194,7 +182,7 @@
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>
</dependency>
</dependencies>
</plugin>
Expand Down Expand Up @@ -223,6 +211,15 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
<version>2.7</version>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>display-dependency-updates</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

Expand All @@ -249,7 +246,7 @@
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>[1.11,1.12)</version>
<version>[1.12,1.13)</version>
</dependency>
<dependency>
<groupId>net.sourceforge.htmlunit</groupId>
Expand All @@ -264,12 +261,12 @@
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
<version>[63.0,64.0)</version>
<version>[64.0,65.0)</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
<version>28.1-jre</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down Expand Up @@ -304,13 +301,19 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.4.0</version>
<version>5.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-standalone</artifactId>
<version>[2.21,2.22)</version>
<version>[2.24,2.25)</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
58 changes: 40 additions & 18 deletions src/main/java/ai/preferred/venom/Crawler.java
Expand Up @@ -18,8 +18,7 @@

import ai.preferred.venom.fetcher.*;
import ai.preferred.venom.job.Job;
import ai.preferred.venom.job.PriorityQueueScheduler;
import ai.preferred.venom.job.QueueScheduler;
import ai.preferred.venom.job.PriorityJobQueue;
import ai.preferred.venom.job.Scheduler;
import ai.preferred.venom.request.CrawlerRequest;
import ai.preferred.venom.request.Request;
Expand Down Expand Up @@ -89,11 +88,17 @@ public final class Crawler implements Interruptible, AutoCloseable {
@Nullable
private final HandlerRouter router;

/**
* The job queue used.
*/
@NotNull
private final BlockingQueue<Job> jobQueue;

/**
* The scheduler used.
*/
@NotNull
private final QueueScheduler queueScheduler;
private final Scheduler scheduler;

/**
* The maximum number of simultaneous connections.
Expand Down Expand Up @@ -148,7 +153,8 @@ private Crawler(final Builder builder) {
maxTries = builder.maxTries;
propRetainProxy = builder.propRetainProxy;
router = builder.router;
queueScheduler = builder.queueScheduler;
jobQueue = builder.jobQueue;
scheduler = new Scheduler(jobQueue);
connections = new Semaphore(builder.maxConnections);
session = builder.session;
sleepScheduler = builder.sleepScheduler;
Expand Down Expand Up @@ -286,7 +292,7 @@ private void except(final Job job, final Throwable ex) {
jobsPending.decrementAndGet();
if (job.getTryCount() < maxTries) {
job.prepareRetry();
queueScheduler.add(job);
jobQueue.add(job);
LOGGER.debug("Job {} - {} re-queued.", Integer.toHexString(job.hashCode()), job.getRequest().getUrl());
} else {
LOGGER.error("Max retries reached for request: {}", job.getRequest().getUrl());
Expand All @@ -303,15 +309,15 @@ private void run() {
long lastRequestTime = 0;
while (!Thread.currentThread().isInterrupted() && !threadPool.isShutdown() && fatalHandlerExceptions.isEmpty()) {
try {
final Job job = queueScheduler.poll(100, TimeUnit.MILLISECONDS);
final Job job = jobQueue.poll(100, TimeUnit.MILLISECONDS);
if (job == null) {
if (jobsPending.get() > 0) {
continue;
}
// This should only run if pendingJob == 0 && job == null
synchronized (jobsPending) {
LOGGER.debug("({}) Checking for exit conditions.", crawlerThread.getName());
if (queueScheduler.peek() == null && jobsPending.get() <= 0 && exitWhenDone.get()) {
if (jobQueue.peek() == null && jobsPending.get() <= 0 && exitWhenDone.get()) {
break;
}
}
Expand Down Expand Up @@ -388,7 +394,7 @@ public void cancelled(final @NotNull Request request) {
* @return the instance of scheduler used.
*/
public Scheduler getScheduler() {
return queueScheduler.getScheduler();
return scheduler;
}

/**
Expand All @@ -404,7 +410,7 @@ public synchronized Crawler start() {

/**
* Starts the crawler by starting a new thread to poll for jobs and close it
* after the queue has reached 0.
* after the jobQueue has reached 0.
*
* @return the instance of Crawler used.
* @throws Exception if this resource cannot be closed.
Expand Down Expand Up @@ -556,9 +562,9 @@ public static final class Builder {
private HandlerRouter router;

/**
* The scheduler used.
* The job queue used.
*/
private QueueScheduler queueScheduler;
private BlockingQueue<Job> jobQueue;

/**
* The sleep scheduler used.
Expand All @@ -582,7 +588,7 @@ private Builder() {
workerManager = null;
propRetainProxy = 0.05;
router = null;
queueScheduler = new PriorityQueueScheduler();
jobQueue = new PriorityJobQueue();
sleepScheduler = new SleepScheduler(250, 2000);
session = Session.EMPTY_SESSION;
}
Expand Down Expand Up @@ -644,16 +650,32 @@ public Builder setWorkerManager(final @NotNull WorkerManager workerManager) {
}

/**
* Sets the Scheduler to be used, if not set, default will be chosen.
* Sets the JobQueue to be used, if not set, default will be chosen.
* This is deprecated, use setJobQueue instead.
*
* @param jobQueue scheduler to be used.
* @return this
*/
@Deprecated
public Builder setScheduler(final @NotNull BlockingQueue<Job> jobQueue) {
if (jobQueue == null) {
throw new IllegalStateException("Attribute 'jobQueue' cannot be null.");
}
this.jobQueue = jobQueue;
return this;
}

/**
* Sets the JobQueue to be used, if not set, default will be chosen.
*
* @param queueScheduler scheduler to be used.
* @param jobQueue scheduler to be used.
* @return this
*/
public Builder setScheduler(final @NotNull QueueScheduler queueScheduler) {
if (queueScheduler == null) {
throw new IllegalStateException("Attribute 'queueScheduler' cannot be null.");
public Builder setJobQueue(final @NotNull BlockingQueue<Job> jobQueue) {
if (jobQueue == null) {
throw new IllegalStateException("Attribute 'jobQueue' cannot be null.");
}
this.queueScheduler = queueScheduler;
this.jobQueue = jobQueue;
return this;
}

Expand Down
Expand Up @@ -27,31 +27,20 @@
* @author Maksim Tkachenko
*/
@SuppressWarnings("NullableProblems")
public abstract class AbstractQueueScheduler extends AbstractQueue<Job> implements QueueScheduler {
public abstract class AbstractJobQueue extends AbstractQueue<Job> implements BlockingQueue<Job> {

/**
* The queue used for this scheduler.
*/
private final BlockingQueue<Job> queue;

/**
* The adding part of the scheduler.
*/
private final Scheduler scheduler;

/**
* Constructs an instance of AbstractQueueScheduler.
* Constructs an instance of AbstractJobQueue.
*
* @param queue an instance of BlockingQueue
*/
protected AbstractQueueScheduler(final BlockingQueue<Job> queue) {
protected AbstractJobQueue(final BlockingQueue<Job> queue) {
this.queue = queue;
this.scheduler = new JobScheduler(this);
}

@Override
public final Scheduler getScheduler() {
return scheduler;
}

@Nonnull
Expand Down
Expand Up @@ -18,18 +18,19 @@

import javax.annotation.Nonnull;
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author Ween Jiann Lee
*/
public abstract class AbstractPriorityQueueScheduler extends AbstractQueueScheduler {
public abstract class AbstractPriorityJobQueue extends AbstractJobQueue implements BlockingQueue<Job> {

/**
* Constructs an instance of AbstractQueueScheduler.
* Constructs an instance of AbstractJobQueue.
*/
protected AbstractPriorityQueueScheduler() {
protected AbstractPriorityJobQueue() {
super(new PriorityBlockingQueue<>(11,
Comparator.comparing(o -> (o.getJobAttribute(PriorityJobAttribute.class)))));
}
Expand Down
66 changes: 66 additions & 0 deletions src/main/java/ai/preferred/venom/job/FIFOJobQueue.java
@@ -0,0 +1,66 @@
/*
* Copyright 2018 Preferred.AI
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.preferred.venom.job;

import javax.annotation.Nonnull;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* This class provides and implementation of scheduler with a first in
* first out queue.
* <p>
* Jobs in queue will be processed first in order of insertion.
* </p>
*
* @author Ween Jiann Lee
*/
public class FIFOJobQueue extends AbstractJobQueue {

/**
* Constructs an instance of FIFOJobQueue.
*/
public FIFOJobQueue() {
super(new LinkedBlockingQueue<>());
}

@Override
public final void put(final @Nonnull Job job) throws InterruptedException {
getQueue().put(job);
}

@Override
public final boolean offer(final Job job, final long timeout, final @Nonnull TimeUnit unit)
throws InterruptedException {
return getQueue().offer(job, timeout, unit);
}

@Override
public final boolean offer(final @Nonnull Job job) {
return getQueue().offer(job);
}

@Override
public final Job poll(final long timeout, final @Nonnull TimeUnit unit) throws InterruptedException {
return getQueue().poll(timeout, unit);
}

@Override
public final Job poll() {
return getQueue().poll();
}
}

0 comments on commit 2822a3f

Please sign in to comment.