diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java
index a35af70af..a71166421 100644
--- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java
+++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java
@@ -187,7 +187,7 @@ public Spider scheduler(Scheduler scheduler) {
*/
public Spider setScheduler(Scheduler updateScheduler) {
checkIfRunning();
- SpiderScheduler oldScheduler = this.scheduler;
+ Scheduler oldScheduler = scheduler.getScheduler();
scheduler.setScheduler(updateScheduler);
if (oldScheduler != null) {
Request request;
@@ -458,7 +458,6 @@ private void onDownloadSuccess(Request request, Page page) {
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
}
sleep(site.getSleepTime());
- return;
}
private void onDownloaderFail(Request request) {
diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java
index fec3c1db9..0dabdd954 100644
--- a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java
+++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java
@@ -1,29 +1,13 @@
package us.codecraft.webmagic.scheduler;
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
-
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
-import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
+
+import java.io.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -32,7 +16,7 @@
* @author code4crafter@gmail.com
* @since 0.2.0
*/
-public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {
+public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, Closeable {
private String filePath = System.getProperty("java.io.tmpdir");
@@ -52,8 +36,6 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement
private BlockingQueue queue;
- private Set urls;
-
private ScheduledExecutorService flushThreadPool;
public FileCacheQueueScheduler(String filePath) {
@@ -83,36 +65,13 @@ private void init(Task task) {
}
private void initDuplicateRemover() {
- setDuplicateRemover(
- new DuplicateRemover() {
- @Override
- public boolean isDuplicate(Request request, Task task) {
- if (!inited.get()) {
- init(task);
- }
- return !urls.add(request.getUrl());
- }
-
- @Override
- public void resetDuplicateCheck(Task task) {
- urls.clear();
- }
-
- @Override
- public int getTotalRequestsCount(Task task) {
- return urls.size();
- }
- });
+ BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover(this.filePath.hashCode());
+ setDuplicateRemover(bloomFilterDuplicateRemover);
}
private void initFlushThread() {
- flushThreadPool = Executors.newScheduledThreadPool(1);
- flushThreadPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- flush();
- }
- }, 10, 10, TimeUnit.SECONDS);
+ flushThreadPool = Executors.newScheduledThreadPool(1);
+ flushThreadPool.scheduleAtFixedRate(this::flush, 10, 10, TimeUnit.SECONDS);
}
private void initWriter() {
@@ -127,7 +86,6 @@ private void initWriter() {
private void readFile() {
try {
queue = new LinkedBlockingQueue();
- urls = new LinkedHashSet();
readCursorFile();
readUrlFile();
// initDuplicateRemover();
@@ -140,46 +98,43 @@ private void readFile() {
}
private void readUrlFile() throws IOException {
- String line;
- BufferedReader fileUrlReader = null;
- try {
- fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
+ try (BufferedReader fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)))) {
+ String line;
int lineReaded = 0;
while ((line = fileUrlReader.readLine()) != null) {
- urls.add(line.trim());
+ Request request = deserializeRequest(line);
+ this.getDuplicateRemover().isDuplicate(request, null);
lineReaded++;
if (lineReaded > cursor.get()) {
- queue.add(deserializeRequest(line));
+ queue.add(request);
}
}
- } finally {
- if (fileUrlReader != null) {
- IOUtils.closeQuietly(fileUrlReader);
- }
}
}
private void readCursorFile() throws IOException {
- BufferedReader fileCursorReader = null;
- try {
- fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
+ String fileName = getFileName(fileCursor);
+ try (BufferedReader fileCursorReader = new BufferedReader(new FileReader(fileName))) {
String line;
+ String lastLine = null;
//read the last number
while ((line = fileCursorReader.readLine()) != null) {
- cursor = new AtomicInteger(NumberUtils.toInt(line));
+ line = line.trim();
+ if (!line.isEmpty()) {
+ lastLine = line;
+ }
}
- } finally {
- if (fileCursorReader != null) {
- IOUtils.closeQuietly(fileCursorReader);
+ if (lastLine != null) {
+ cursor.set(NumberUtils.toInt(line));
}
}
}
-
+
public void close() throws IOException {
- flushThreadPool.shutdown();
- fileUrlWriter.close();
- fileCursorWriter.close();
- }
+ flushThreadPool.shutdown();
+ fileUrlWriter.close();
+ fileCursorWriter.close();
+ }
private String getFileName(String filename) {
return filePath + task.getUUID() + filename;