Skip to content

Commit

Permalink
Merge 3e803c0 into 336c8e2
Browse files Browse the repository at this point in the history
  • Loading branch information
lwj5 committed Mar 6, 2019
2 parents 336c8e2 + 3e803c0 commit 93e04f9
Show file tree
Hide file tree
Showing 19 changed files with 268 additions and 265 deletions.
30 changes: 15 additions & 15 deletions src/main/java/ai/preferred/venom/Crawler.java
Expand Up @@ -17,7 +17,7 @@
package ai.preferred.venom;

import ai.preferred.venom.fetcher.*;
import ai.preferred.venom.job.AbstractQueueScheduler;
import ai.preferred.venom.job.QueueScheduler;
import ai.preferred.venom.job.Job;
import ai.preferred.venom.job.PriorityQueueScheduler;
import ai.preferred.venom.job.Scheduler;
Expand Down Expand Up @@ -89,7 +89,7 @@ public final class Crawler implements Interruptible {
* The scheduler used.
*/
@NotNull
private final AbstractQueueScheduler scheduler;
private final QueueScheduler<? extends Job> queueScheduler;

/**
* The maximum number of simultaneous connections.
Expand Down Expand Up @@ -144,7 +144,7 @@ private Crawler(final Builder builder) {
maxTries = builder.maxTries;
propRetainProxy = builder.propRetainProxy;
router = builder.router;
scheduler = builder.scheduler;
queueScheduler = builder.queueScheduler;
connections = new Semaphore(builder.maxConnections);
session = builder.session;
sleepScheduler = builder.sleepScheduler;
Expand Down Expand Up @@ -243,15 +243,15 @@ private void run() {
long lastRequestTime = 0;
while (!Thread.currentThread().isInterrupted() && !threadPool.isShutdown() && handlerExceptions.isEmpty()) {
try {
final Job job = scheduler.poll(100, TimeUnit.MILLISECONDS);
final Job job = queueScheduler.poll(100, TimeUnit.MILLISECONDS);
if (job == null) {
if (pendingJobs.size() != 0) {
continue;
}
// This should only run if pendingJob == 0 && job == null
synchronized (pendingJobs) {
LOGGER.debug("({}) Checking for exit conditions.", crawlerThread.getName());
if (scheduler.peek() == null && pendingJobs.size() == 0 && exitWhenDone.get()) {
if (queueScheduler.peek() == null && pendingJobs.size() == 0 && exitWhenDone.get()) {
break;
}
}
Expand Down Expand Up @@ -294,7 +294,7 @@ private void run() {
* @return the instance of scheduler used.
*/
public Scheduler getScheduler() {
return scheduler;
return queueScheduler.getScheduler();
}

/**
Expand Down Expand Up @@ -467,7 +467,7 @@ public static final class Builder {
/**
* The scheduler used.
*/
private AbstractQueueScheduler scheduler;
private QueueScheduler<? extends Job> queueScheduler;

/**
* The sleep scheduler used.
Expand All @@ -491,7 +491,7 @@ private Builder() {
workerManager = null;
propRetainProxy = 0.05;
router = null;
scheduler = new PriorityQueueScheduler();
queueScheduler = new PriorityQueueScheduler();
sleepScheduler = new SleepScheduler(250, 2000);
session = Session.EMPTY_SESSION;
}
Expand Down Expand Up @@ -547,11 +547,11 @@ public Builder setWorkerManager(final @NotNull WorkerManager workerManager) {
/**
* Sets the Scheduler to be used, if not set, default will be chosen.
*
* @param scheduler scheduler to be used.
* @param queueScheduler scheduler to be used.
* @return this
*/
public Builder setScheduler(final @NotNull AbstractQueueScheduler scheduler) {
this.scheduler = scheduler;
public Builder setScheduler(final @NotNull QueueScheduler<? extends Job> queueScheduler) {
this.queueScheduler = queueScheduler;
return this;
}

Expand Down Expand Up @@ -674,13 +674,13 @@ public void completed(final Request request, final Response response) {
crawler.threadPool.execute(() -> {
try {
if (job.getHandler() != null) {
job.getHandler().handle(job.getRequest(), new VResponse(response), crawler.scheduler, crawler.session,
crawler.workerManager.getWorker());
job.getHandler().handle(job.getRequest(), new VResponse(response), crawler.queueScheduler.getScheduler(),
crawler.session, crawler.workerManager.getWorker());
} else if (crawler.router != null) {
final Handler routedHandler = crawler.router.getHandler(job.getRequest());
if (routedHandler != null) {
routedHandler.handle(job.getRequest(), new VResponse(response), crawler.scheduler, crawler.session,
crawler.workerManager.getWorker());
routedHandler.handle(job.getRequest(), new VResponse(response), crawler.queueScheduler.getScheduler(),
crawler.session, crawler.workerManager.getWorker());
}
} else {
LOGGER.error("No handler to handle request {}.", job.getRequest().getUrl());
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/ai/preferred/venom/FatalHandlerException.java
@@ -1,3 +1,19 @@
/*
* Copyright 2018 Preferred.AI
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.preferred.venom;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/ai/preferred/venom/HandlerRouter.java
Expand Up @@ -22,7 +22,7 @@
import javax.validation.constraints.NotNull;

/**
* This interface allows the user to map request to handleable.
* This interface allows the user to map request to handler.
*
* @author Maksim Tkachenko
*/
Expand Down
59 changes: 1 addition & 58 deletions src/main/java/ai/preferred/venom/ProxyProvider.java
Expand Up @@ -21,9 +21,6 @@

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* This interface allows the user to define proxies to be used for requests.
Expand All @@ -37,61 +34,7 @@ public interface ProxyProvider {
/**
* An instance of proxy provider without any proxies.
*/
ProxyProvider EMPTY_PROXY_PROVIDER = new ProxyProvider() {
@Override
public List<HttpHost> getProxyList() {
return Collections.emptyList();
}

@Override
public void add(final HttpHost proxy) {
throw new UnsupportedOperationException();
}

@Override
public void addAll(final Collection<HttpHost> proxies) {
throw new UnsupportedOperationException();
}

@Override
public void remove(final HttpHost proxy) {
throw new UnsupportedOperationException();
}

@Override
public HttpHost get(final Request request) {
return null;
}
};

/**
* Returns a list of all proxies.
*
* @return list of proxies
*/
@NotNull
List<HttpHost> getProxyList();

/**
* Add a proxy to the list.
*
* @param proxy the proxy to be added
*/
void add(@NotNull HttpHost proxy);

/**
* Add a list of proxies to the list.
*
* @param proxies the list of proxies to be added
*/
void addAll(@NotNull Collection<HttpHost> proxies);

/**
* Remove a proxy from the list.
*
* @param proxy the proxy to be removed
*/
void remove(@NotNull HttpHost proxy);
ProxyProvider EMPTY_PROXY_PROVIDER = request -> null;

/**
* Returns the get proxy from the list.
Expand Down
47 changes: 39 additions & 8 deletions src/main/java/ai/preferred/venom/UrlRouter.java
Expand Up @@ -21,6 +21,7 @@

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;

/**
Expand All @@ -47,6 +48,16 @@ public class UrlRouter implements HandlerRouter, ValidatorRouter {
*/
private final Map<Pattern, Validator> validatorRules = new LinkedHashMap<>();

/**
* A read write lock for handler rules.
*/
private final ReentrantReadWriteLock handlerRulesLock = new ReentrantReadWriteLock();

/**
* A read write lock for validator rules.
*/
private final ReentrantReadWriteLock validatorRulesLock = new ReentrantReadWriteLock();

/**
* Constructs a url router without default handler.
*/
Expand Down Expand Up @@ -74,7 +85,12 @@ public UrlRouter(final Handler defaultHandler) {
* @return this.
*/
public final UrlRouter register(final Pattern urlPattern, final Handler handler) {
handlerRules.put(urlPattern, handler);
handlerRulesLock.writeLock().lock();
try {
handlerRules.put(urlPattern, handler);
} finally {
handlerRulesLock.writeLock().unlock();
}
return this;
}

Expand All @@ -89,7 +105,12 @@ public final UrlRouter register(final Pattern urlPattern, final Handler handler)
* @return this.
*/
public final UrlRouter register(final Pattern urlPattern, final Validator validator) {
validatorRules.put(urlPattern, validator);
validatorRulesLock.writeLock().lock();
try {
validatorRules.put(urlPattern, validator);
} finally {
validatorRulesLock.writeLock().unlock();
}
return this;
}

Expand All @@ -112,10 +133,15 @@ public final UrlRouter register(final Pattern urlPattern, final Handler handler,

@Override
public final Handler getHandler(final Request request) {
for (final Map.Entry<Pattern, Handler> rule : handlerRules.entrySet()) {
if (rule.getKey().matcher(request.getUrl()).matches()) {
return rule.getValue();
handlerRulesLock.readLock().lock();
try {
for (final Map.Entry<Pattern, Handler> rule : handlerRules.entrySet()) {
if (rule.getKey().matcher(request.getUrl()).matches()) {
return rule.getValue();
}
}
} finally {
handlerRulesLock.readLock().unlock();
}

if (defaultHandler != null) {
Expand All @@ -127,10 +153,15 @@ public final Handler getHandler(final Request request) {

@Override
public final Validator getValidator(final Request request) {
for (final Map.Entry<Pattern, Validator> rule : validatorRules.entrySet()) {
if (rule.getKey().matcher(request.getUrl()).matches()) {
return rule.getValue();
validatorRulesLock.readLock().lock();
try {
for (final Map.Entry<Pattern, Validator> rule : validatorRules.entrySet()) {
if (rule.getKey().matcher(request.getUrl()).matches()) {
return rule.getValue();
}
}
} finally {
validatorRulesLock.readLock().unlock();
}

return Validator.ALWAYS_VALID;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/ai/preferred/venom/ValidatorRouter.java
Expand Up @@ -22,7 +22,7 @@
import javax.validation.constraints.NotNull;

/**
* This interface allows the user to map request to handleable.
* This interface allows the user to map request to handler.
*
* @author Maksim Tkachenko
* @author Ween Jiann Lee
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java
Expand Up @@ -133,7 +133,7 @@ public final class AsyncFetcher implements Fetcher {
private final boolean compressed;

/**
* Constructs an instance of async fetcher.
* Constructs an instance of AsyncFetcher.
*
* @param builder An instance of builder
*/
Expand Down Expand Up @@ -186,7 +186,7 @@ private AsyncFetcher(final Builder builder) {
}

/**
* Create an instance of async fetcher with default options.
* Create an instance of AsyncFetcher with default options.
*
* @return A new instance of async fetcher
*/
Expand All @@ -205,10 +205,10 @@ public static Builder builder() {

/**
* Check if request is an instance of http fetcher request and return it
* if true, otherwise wrap it with http fetcher request and return that.
* if true, otherwise wrap it with HttpFetcherRequest and return that.
*
* @param request An instance of request
* @return An instance of http fetcher request
* @return An instance of HttpFetcherRequest
*/
private HttpFetcherRequest normalizeRequest(final Request request) {
if (request instanceof HttpFetcherRequest) {
Expand All @@ -221,7 +221,7 @@ private HttpFetcherRequest normalizeRequest(final Request request) {
* Prepare fetcher request by prepending headers and set appropriate proxy.
*
* @param request An instance of request
* @return An instance of http fetcher request
* @return An instance of HttpFetcherRequest
*/
private HttpFetcherRequest prepareFetcherRequest(final Request request) {
HttpFetcherRequest httpFetcherRequest = normalizeRequest(request);
Expand Down

0 comments on commit 93e04f9

Please sign in to comment.