Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public Spider scheduler(Scheduler scheduler) {
*/
public Spider setScheduler(Scheduler updateScheduler) {
checkIfRunning();
SpiderScheduler oldScheduler = this.scheduler;
Scheduler oldScheduler = scheduler.getScheduler();
scheduler.setScheduler(updateScheduler);
if (oldScheduler != null) {
Request request;
Expand Down Expand Up @@ -458,7 +458,6 @@ private void onDownloadSuccess(Request request, Page page) {
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
}
sleep(site.getSleepTime());
return;
}

private void onDownloaderFail(Request request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,13 @@
package us.codecraft.webmagic.scheduler;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;

import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;


/**
Expand All @@ -32,7 +16,7 @@
* @author code4crafter@gmail.com <br>
* @since 0.2.0
*/
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, Closeable {

private String filePath = System.getProperty("java.io.tmpdir");

Expand All @@ -52,8 +36,6 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement

private BlockingQueue<Request> queue;

private Set<String> urls;

private ScheduledExecutorService flushThreadPool;

public FileCacheQueueScheduler(String filePath) {
Expand Down Expand Up @@ -83,36 +65,13 @@ private void init(Task task) {
}

private void initDuplicateRemover() {
setDuplicateRemover(
new DuplicateRemover() {
@Override
public boolean isDuplicate(Request request, Task task) {
if (!inited.get()) {
init(task);
}
return !urls.add(request.getUrl());
}

@Override
public void resetDuplicateCheck(Task task) {
urls.clear();
}

@Override
public int getTotalRequestsCount(Task task) {
return urls.size();
}
});
BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover(this.filePath.hashCode());
setDuplicateRemover(bloomFilterDuplicateRemover);
}

private void initFlushThread() {
flushThreadPool = Executors.newScheduledThreadPool(1);
flushThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
flush();
}
}, 10, 10, TimeUnit.SECONDS);
flushThreadPool = Executors.newScheduledThreadPool(1);
flushThreadPool.scheduleAtFixedRate(this::flush, 10, 10, TimeUnit.SECONDS);
}

private void initWriter() {
Expand All @@ -127,7 +86,6 @@ private void initWriter() {
private void readFile() {
try {
queue = new LinkedBlockingQueue<Request>();
urls = new LinkedHashSet<String>();
readCursorFile();
readUrlFile();
// initDuplicateRemover();
Expand All @@ -140,46 +98,43 @@ private void readFile() {
}

private void readUrlFile() throws IOException {
String line;
BufferedReader fileUrlReader = null;
try {
fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
try (BufferedReader fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)))) {
String line;
int lineReaded = 0;
while ((line = fileUrlReader.readLine()) != null) {
urls.add(line.trim());
Request request = deserializeRequest(line);
this.getDuplicateRemover().isDuplicate(request, null);
lineReaded++;
if (lineReaded > cursor.get()) {
queue.add(deserializeRequest(line));
queue.add(request);
}
}
} finally {
if (fileUrlReader != null) {
IOUtils.closeQuietly(fileUrlReader);
}
}
}

private void readCursorFile() throws IOException {
BufferedReader fileCursorReader = null;
try {
fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
String fileName = getFileName(fileCursor);
try (BufferedReader fileCursorReader = new BufferedReader(new FileReader(fileName))) {
String line;
String lastLine = null;
//read the last number
while ((line = fileCursorReader.readLine()) != null) {
cursor = new AtomicInteger(NumberUtils.toInt(line));
line = line.trim();
if (!line.isEmpty()) {
lastLine = line;
}
}
} finally {
if (fileCursorReader != null) {
IOUtils.closeQuietly(fileCursorReader);
if (lastLine != null) {
cursor.set(NumberUtils.toInt(line));
}
}
}

public void close() throws IOException {
flushThreadPool.shutdown();
fileUrlWriter.close();
fileCursorWriter.close();
}
flushThreadPool.shutdown();
fileUrlWriter.close();
fileCursorWriter.close();
}

private String getFileName(String filename) {
return filePath + task.getUUID() + filename;
Expand Down