Skip to content

Commit

Permalink
Merge 9f17723 into 029b155
Browse files Browse the repository at this point in the history
  • Loading branch information
lwj5 committed Jul 29, 2019
2 parents 029b155 + 9f17723 commit 45a07ae
Show file tree
Hide file tree
Showing 33 changed files with 1,259 additions and 524 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -10,7 +10,7 @@

<groupId>ai.preferred</groupId>
<artifactId>venom</artifactId>
<version>4.1.4-SNAPSHOT</version>
<version>4.2.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/ai/preferred/venom/Crawler.java
Expand Up @@ -89,7 +89,7 @@ public final class Crawler implements Interruptible {
* The scheduler used.
*/
@NotNull
private final QueueScheduler<? extends Job> queueScheduler;
private final QueueScheduler queueScheduler;

/**
* The maximum number of simultaneous connections.
Expand Down Expand Up @@ -281,7 +281,9 @@ private void except(final Job job, final Throwable ex) {
synchronized (pendingJobs) { // Synchronisation required to prevent crawler stopping incorrectly.
pendingJobs.remove(job);
if (job.getTryCount() < maxTries) {
job.reQueue();
job.prepareRetry();
queueScheduler.removeAndAdd(job);
LOGGER.debug("Job {} - {} re-queued.", Integer.toHexString(job.hashCode()), job.getRequest().getUrl());
} else {
LOGGER.error("Max retries reached for request: {}", job.getRequest().getUrl());
}
Expand Down Expand Up @@ -558,7 +560,7 @@ public static final class Builder {
/**
* The scheduler used.
*/
private QueueScheduler<? extends Job> queueScheduler;
private QueueScheduler queueScheduler;

/**
* The sleep scheduler used.
Expand Down Expand Up @@ -649,7 +651,7 @@ public Builder setWorkerManager(final @NotNull WorkerManager workerManager) {
* @param queueScheduler scheduler to be used.
* @return this
*/
public Builder setScheduler(final @NotNull QueueScheduler<? extends Job> queueScheduler) {
public Builder setScheduler(final @NotNull QueueScheduler queueScheduler) {
if (queueScheduler == null) {
throw new IllegalStateException("Attribute 'queueScheduler' cannot be null.");
}
Expand Down
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2019 Preferred.AI
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.preferred.venom.job;

import javax.annotation.Nonnull;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author Ween Jiann Lee
*/
public abstract class AbstractPriorityQueueScheduler extends AbstractQueueScheduler {

/**
* Constructs an instance of AbstractQueueScheduler.
*/
protected AbstractPriorityQueueScheduler() {
super(new PriorityBlockingQueue<>(11,
Comparator.comparing(o -> (o.getJobAttribute(PriorityJobAttribute.class)))));
}

/**
* Check the job for {@see PriorityJobAttribute}, if missing,
* adds it to the job.
*
* @param job the job to check.
* @return the input job.
*/
private Job ensurePriorityJobAttribute(final Job job) {
if (job.getJobAttribute(PriorityJobAttribute.class) == null) {
job.addJobAttribute(new PriorityJobAttribute());
}
return job;
}

@Override
public final void put(final @Nonnull Job job) throws InterruptedException {
getQueue().put(ensurePriorityJobAttribute(job));
}

@Override
public final boolean offer(final Job job, final long timeout, final @Nonnull TimeUnit unit)
throws InterruptedException {
return getQueue().offer(ensurePriorityJobAttribute(job), timeout, unit);
}

@Override
public final boolean offer(final @Nonnull Job job) {
return getQueue().offer(ensurePriorityJobAttribute(job));
}


}
72 changes: 3 additions & 69 deletions src/main/java/ai/preferred/venom/job/AbstractQueueScheduler.java
Expand Up @@ -16,11 +16,6 @@

package ai.preferred.venom.job;

import ai.preferred.venom.Handler;
import ai.preferred.venom.request.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.util.AbstractQueue;
import java.util.Collection;
Expand All @@ -31,7 +26,8 @@
* @author Ween Jiann Lee
* @author Maksim Tkachenko
*/
public abstract class AbstractQueueScheduler extends AbstractQueue<Job> implements QueueScheduler<Job> {
@SuppressWarnings("NullableProblems")
public abstract class AbstractQueueScheduler extends AbstractQueue<Job> implements QueueScheduler {

/**
* The queue used for this scheduler.
Expand All @@ -50,7 +46,7 @@ public abstract class AbstractQueueScheduler extends AbstractQueue<Job> implemen
*/
protected AbstractQueueScheduler(final BlockingQueue<Job> queue) {
this.queue = queue;
this.scheduler = new JobScheduler(queue);
this.scheduler = new JobScheduler(this);
}

@Override
Expand Down Expand Up @@ -90,11 +86,6 @@ public final int drainTo(final @Nonnull Collection<? super Job> c, final int max
return queue.drainTo(c, maxElements);
}

@Override
public final boolean offer(final @Nonnull Job job) {
return queue.offer(job);
}

@Override
public final Job peek() {
return queue.peek();
Expand All @@ -109,61 +100,4 @@ protected final BlockingQueue<Job> getQueue() {
return queue;
}

/**
* An implementation of ai.preferred.venom.job.Scheduler using BasicJob.
*/
public static class JobScheduler implements Scheduler {

/**
* Logger.
*/
private static final Logger LOGGER = LoggerFactory.getLogger(JobScheduler.class);

/**
* The queue used for this scheduler.
*/
private final BlockingQueue<Job> queue;

/**
* Constructs an instance of JobScheduler.
*
* @param queue an instance of BlockingQueue
*/
public JobScheduler(final BlockingQueue<Job> queue) {
this.queue = queue;
}

@Override
public final void add(final Request r, final Handler h, final Priority p, final Priority pf) {
final Job job = new BasicJob(r, h, p, pf, queue);
queue.add(job);
LOGGER.debug("Added job {} - {} to queue.", Integer.toHexString(job.hashCode()), r.getUrl());
}

@Override
public final void add(final Request r, final Handler h, final Priority p) {
add(r, h, p, Priority.FLOOR);
}

@Override
public final void add(final Request r, final Handler h) {
add(r, h, Priority.DEFAULT);
}

@Override
public final void add(final Request r, final Priority p, final Priority pf) {
add(r, null, p, pf);
}

@Override
public final void add(final Request r, final Priority p) {
add(r, null, p, Priority.FLOOR);
}

@Override
public final void add(final Request r) {
add(r, null, Priority.DEFAULT, Priority.FLOOR);
}

}
}
120 changes: 0 additions & 120 deletions src/main/java/ai/preferred/venom/job/BasicJob.java

This file was deleted.

5 changes: 5 additions & 0 deletions src/main/java/ai/preferred/venom/job/FIFOQueueScheduler.java
Expand Up @@ -49,6 +49,11 @@ public final boolean offer(final Job job, final long timeout, final @Nonnull Tim
return getQueue().offer(job, timeout, unit);
}

@Override
public final boolean offer(final @Nonnull Job job) {
return getQueue().offer(job);
}

@Override
public final Job poll(final long timeout, final @Nonnull TimeUnit unit) throws InterruptedException {
return getQueue().poll(timeout, unit);
Expand Down

0 comments on commit 45a07ae

Please sign in to comment.