diff --git a/build.gradle b/build.gradle index 7a833b3b3..725eab5d1 100644 --- a/build.gradle +++ b/build.gradle @@ -39,6 +39,7 @@ dependencies { compile 'org.apache.commons:commons-compress:1.9' compile 'org.apache.httpcomponents:httpclient:4.4.1' compile 'org.apache.tika:tika-parsers:1.9' + compile 'com.github.crawler-commons:crawler-commons:0.6' testCompile 'junit:junit:4.12' testCompile 'org.hamcrest:hamcrest-all:1.3' diff --git a/config/sample_config/ache.yml b/config/sample_config/ache.yml index de0a05de4..2fc94a833 100644 --- a/config/sample_config/ache.yml +++ b/config/sample_config/ache.yml @@ -59,7 +59,7 @@ target_storage.english_language_detection_enabled: true # Configurations for target storage's server target_storage.server.host: localhost -target_storage.server.port : 1987 +target_storage.server.port: 1987 # # Configurations for Link Storage @@ -80,7 +80,7 @@ link_storage.link_strategy.backlinks: false # - LinkClassifierAuthority: link strategy for the bipartite crawling link_storage.link_classifier.type: LinkClassifierBaseline #link_storage.link_classifier.type: LinkClassifierImpl -#link_storage.link_classifier.parameters.class_values: ["0", "1", "2"] #CLASS_VALUES 0 1 2 +#link_storage.link_classifier.parameters.class_values: ["0", "1", "2"] # Retrain link classifiers on-the-fly link_storage.online_learning.enabled: false @@ -124,15 +124,12 @@ link_storage.backsurfer.pattern_end_title: "\",\"uu\":" # # Configurations for Crawler Manager # -crawler_manager.robot_mananger.thread_group: crawler_group -crawler_manager.robot_mananger.resting_time: 10 -crawler_manager.robot_mananger.check_time: 10000 -crawler_manager.robot_mananger.max_time: 10000 -crawler_manager.robot_mananger.robot_error_sleep_time: 5000 -crawler_manager.robot_mananger.thread_factor: 10 -crawler_manager.robot_mananger.quantity: 5 - -crawler_manager.downloader.max_blocked_threads: 200000 - -#crawler_manager.downloader.user_agent: "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT 4.0)" -#crawler_manager.downloader.request_accept: "Accept | */*" +crawler_manager.scheduler.host_min_access_interval: 5000 +crawler_manager.scheduler.max_links: 10000 +crawler_manager.downloader.download_thread_pool_size: 100 +crawler_manager.downloader.max_retry_count: 2 +crawler_manager.downloader.user_agent.name: ACHE +crawler_manager.downloader.user_agent.url: https://github.com/ViDA-NYU/ache +crawler_manager.downloader.valid_mime_types: + - text/html + - text/plain \ No newline at end of file diff --git a/src/main/java/focusedCrawler/Main.java b/src/main/java/focusedCrawler/Main.java index 16a7a468c..911a1bb26 100644 --- a/src/main/java/focusedCrawler/Main.java +++ b/src/main/java/focusedCrawler/Main.java @@ -17,8 +17,7 @@ import org.slf4j.LoggerFactory; import focusedCrawler.config.ConfigService; -import focusedCrawler.crawler.CrawlerManager; -import focusedCrawler.crawler.CrawlerManagerException; +import focusedCrawler.crawler.async.AsyncCrawler; import focusedCrawler.link.LinkStorage; import focusedCrawler.link.classifier.LinkClassifierFactoryException; import focusedCrawler.link.frontier.AddSeeds; @@ -265,7 +264,8 @@ private static void startTargetStorage(CommandLine cmd) throws MissingArgumentEx private static void startCrawlManager(final String configPath) { try { ConfigService config = new ConfigService(Paths.get(configPath, "ache.yml").toString()); - CrawlerManager.run(config); + AsyncCrawler.run(config); + } catch (Throwable t) { logger.error("Something bad happened to CrawlManager :(", t); } @@ -288,19 +288,17 @@ private static void startCrawl(CommandLine cmd) throws MissingArgumentException dataOutputPath, modelPath, config.getLinkStorageConfig()); // start target storage - Storage targetStorage = TargetStorage.createTargetStorage(configPath, - modelPath, dataOutputPath, elasticIndexName, + Storage targetStorage = TargetStorage.createTargetStorage( + configPath, modelPath, dataOutputPath, elasticIndexName, config.getTargetStorageConfig(), linkStorage); - + + AsyncCrawler.Config crawlerConfig = config.getCrawlerConfig(); + // start crawl manager - CrawlerManager manager = CrawlerManager.createCrawlerManager( - config.getCrawlerManagerConfig(), linkStorage, targetStorage); - manager.start(); + AsyncCrawler crawler = new AsyncCrawler(targetStorage, (LinkStorage) linkStorage, crawlerConfig); + crawler.run(); } - catch (CrawlerManagerException e) { - logger.error("Problem while creating CrawlerManager", e); - } catch (LinkClassifierFactoryException | FrontierPersistentException e) { logger.error("Problem while creating LinkStorage", e); } diff --git a/src/main/java/focusedCrawler/config/ConfigService.java b/src/main/java/focusedCrawler/config/ConfigService.java index c9c2ec058..f667831ed 100644 --- a/src/main/java/focusedCrawler/config/ConfigService.java +++ b/src/main/java/focusedCrawler/config/ConfigService.java @@ -9,7 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import focusedCrawler.crawler.CrawlerManagerConfig; +import focusedCrawler.crawler.async.AsyncCrawler; import focusedCrawler.link.LinkStorageConfig; public class ConfigService { @@ -21,8 +21,7 @@ public class ConfigService { private TargetStorageConfig targetStorageConfig; private LinkStorageConfig linkStorageConfig; - private CrawlerManagerConfig crawlerManagerConfig; - + private AsyncCrawler.Config crawlerConfig; public ConfigService(String configFilePath) { this(Paths.get(configFilePath)); @@ -33,7 +32,7 @@ public ConfigService(Path configFilePath) { JsonNode config = yamlMapper.readTree(configFilePath.toFile()); this.targetStorageConfig = new TargetStorageConfig(config, yamlMapper); this.linkStorageConfig = new LinkStorageConfig(config, yamlMapper); - this.crawlerManagerConfig = new CrawlerManagerConfig(config, yamlMapper); + this.crawlerConfig = new AsyncCrawler.Config(config, yamlMapper); } catch (IOException e) { throw new IllegalArgumentException("Could not read settings from file: "+configFilePath, e); } @@ -47,8 +46,8 @@ public LinkStorageConfig getLinkStorageConfig() { return linkStorageConfig; } - public CrawlerManagerConfig getCrawlerManagerConfig() { - return crawlerManagerConfig; + public AsyncCrawler.Config getCrawlerConfig() { + return crawlerConfig; } } diff --git a/src/main/java/focusedCrawler/config/TargetStorageConfig.java b/src/main/java/focusedCrawler/config/TargetStorageConfig.java index 861d71ef2..2ec6a3bb4 100644 --- a/src/main/java/focusedCrawler/config/TargetStorageConfig.java +++ b/src/main/java/focusedCrawler/config/TargetStorageConfig.java @@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import focusedCrawler.target.elasticsearch.ElasticSearchConfig; -import focusedCrawler.util.ParameterFile; import focusedCrawler.util.storage.StorageConfig; public class TargetStorageConfig { @@ -63,41 +62,6 @@ public static class MonitorConfig { private final StorageConfig serverConfig; - @Deprecated - public TargetStorageConfig(String filename) { - this(new ParameterFile(filename)); - } - - @Deprecated - public TargetStorageConfig(ParameterFile params) { - this.useClassifier = params.getParamBoolean("USE_CLASSIFIER"); - this.targetStorageDirectory = params.getParam("TARGET_STORAGE_DIRECTORY"); - this.negativeStorageDirectory = params.getParam("NEGATIVE_STORAGE_DIRECTORY"); - this.dataFormat = params.getParamOrDefault("DATA_FORMAT", "FILE"); - - this.monitor = new MonitorConfig(); - this.monitor.sync = params.getParamBoolean("REFRESH_SYNC"); - this.monitor.frequency = params.getParamInt("SYNC_REFRESH_FREQUENCY"); - this.monitor.frequencyCrawled = params.getParamInt("CRAWLED_REFRESH_FREQUENCY"); - this.monitor.frequencyRelevant = params.getParamInt("RELEVANT_REFRESH_FREQUENCY"); - this.monitor.frequencyHarvestInfo = params.getParamInt("HARVESTINFO_REFRESH_FREQUENCY"); - - this.hashFileName = params.getParamBooleanOrDefault("HASH_FILE_NAME", false); - this.compressData = params.getParamBooleanOrDefault("COMPRESS_DATA", false); - this.relevanceThreshold = params.getParamFloat("RELEVANCE_THRESHOLD"); - this.visitedPageLimit = params.getParamInt("VISITED_PAGE_LIMIT"); - this.hardFocus = params.getParamBoolean("HARD_FOCUS"); - this.bipartite = params.getParamBoolean("BIPARTITE"); - this.saveNegativePages = params.getParamBoolean("SAVE_NEGATIVE_PAGES"); - this.englishLanguageDetectionEnabled = params.getParamBooleanOrDefault("ENGLISH_LANGUAGE_DETECTION_ENABLED", true); - - String elasticSearchHost = params.getParamOrDefault("ELASTICSEARCH_HOST", "localhost"); - int elasticSearchPort = params.getParamIntOrDefault("ELASTICSEARCH_PORT", 9300); - String clusterName = params.getParamOrDefault("ELASTICSEARCH_CLUSTERNAME", "elasticsearch"); - this.elasticSearchConfig = new ElasticSearchConfig(elasticSearchHost, elasticSearchPort, clusterName); - this.serverConfig = new StorageConfig(params); - } - public TargetStorageConfig(JsonNode config, ObjectMapper objectMapper) throws IOException { objectMapper.readerForUpdating(this).readValue(config); this.serverConfig = StorageConfig.create(config, "target_storage.server."); diff --git a/src/main/java/focusedCrawler/crawler/Crawler.java b/src/main/java/focusedCrawler/crawler/Crawler.java deleted file mode 100644 index 0ee8828a9..000000000 --- a/src/main/java/focusedCrawler/crawler/Crawler.java +++ /dev/null @@ -1,397 +0,0 @@ -/* -############################################################################ -## -## Copyright (C) 2006-2009 University of Utah. All rights reserved. -## -## This file is part of DeepPeep. -## -## This file may be used under the terms of the GNU General Public -## License version 2.0 as published by the Free Software Foundation -## and appearing in the file LICENSE.GPL included in the packaging of -## this file. Please review the following to ensure GNU General Public -## Licensing requirements will be met: -## http://www.opensource.org/licenses/gpl-license.php -## -## If you are unsure which license is appropriate for your use (for -## instance, you are interested in developing a commercial derivative -## of DeepPeep), please contact us at deeppeep@sci.utah.edu. -## -## This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE -## WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. -## -############################################################################ -*/ -package focusedCrawler.crawler; - -//crawler -import java.net.URL; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - *

Description: This abstract class implements the partial behavior - * of a webcrawler

- * - *

Copyright: Copyright (c) 2004

- * - * @author Luciano Barbosa - * @version 1.0 - */ - - -public abstract class Crawler extends Thread { - - //possible states - public static final int INIT = 0; - public static final int SELECT_URL = INIT + 1; - public static final int CHECK_URL = SELECT_URL + 1; - public static final int DOWNLOAD_URL = CHECK_URL + 1; - public static final int PROCESS_DATA = DOWNLOAD_URL + 1; - public static final int CHECK_DATA = PROCESS_DATA + 1; - public static final int SEND_DATA = CHECK_DATA + 1; - public static final int END = SEND_DATA + 1; - public static final int SLEEPING = END + 1; - public static final int[] STATES = new int[]{INIT, - SELECT_URL,CHECK_URL,DOWNLOAD_URL, - PROCESS_DATA,CHECK_DATA,SEND_DATA, - END,SLEEPING}; - public static final int DEAD = SLEEPING + 1; - - - private static final Logger logger = LoggerFactory.getLogger(Crawler.class); - - private int status; - - private boolean stop; - - private long restingTime; - - private long sleepTime; - - private boolean jump; - - private long startCicleTime; - - private URL url; - - private long totalCicleTime; - - private long[] partitionTime = new long[STATES.length]; - - private String message; - - private int selectedLinks = 0; - - private boolean shutdown; - - private CrawlerException lastException; - - public Crawler() { - } - - - public Crawler(String name) { - super(name); - defaults(); - } - public Crawler(ThreadGroup g, String name) { - super(g,name); - defaults(); - } - protected void defaults() { - setShutdown(false); - setStop(false); - setRestingTime(0); - setSleepTime(0); - setUrl(null); - setJump(false); - setStartCicleTime(System.currentTimeMillis()); - } - - public void setStatus(int newStatus) { - status = newStatus; - } - public int getStatus() { - return status; - } - public void setStop(boolean newStop) { - stop = newStop; - } - public boolean isStop() { - return stop; - } - public void setRestingTime(long newRestingTime) { - restingTime = newRestingTime; - } - public long getRestingTime() { - return restingTime; - } - public void setSleepTime(long newSleepTime) { - sleepTime = newSleepTime; - } - public long getSleepTime() { - return sleepTime; - } - public void setJump(boolean newJump) { - jump = newJump; - } - public boolean isJump() { - return jump; - } - public void setJump(boolean newJump,String message) { - logger.info(message); - setJump(newJump); - } - - public void setStartCicleTime(long newStartCicleTime) { - startCicleTime = newStartCicleTime; - } - public long getStartCicleTime() { - return startCicleTime; - } - public void setUrl(URL newUrl) { - url = newUrl; - } - public URL getUrl() { - return url; - } - public long getCicleTime() { - return System.currentTimeMillis()-getStartCicleTime(); - } - public void setTotalCicleTime(long newTotalCicleTime) { - totalCicleTime = newTotalCicleTime; - } - public long getTotalCicleTime() { - return totalCicleTime; - } - public void setPartitionTime(int index, long time) { - partitionTime[index] = time; - } - public long getPartitionTime(int index) { - return partitionTime[index]; - } - public void setMessage(String newMessage) { - message = newMessage; - } - public String getMessage() { - return message; - } - - /** Getter for property selectedLinks. - * @return Value of property selectedLinks. - */ - public int getSelectedLinks() { - return selectedLinks; - } - - /** Setter for property selectedLinks. - * @param selectedLinks New value of property selectedLinks. - */ - public void setSelectedLinks(int selectedLinks) { - this.selectedLinks = selectedLinks; - } - - public void setShutdown(boolean newShutdown) { - shutdown = newShutdown; - } - public boolean isShutdown() { - return shutdown; - } - - public void setLastException(CrawlerException newLastException) { - lastException = newLastException; - } - - public CrawlerException getLastException() { - return lastException; - } - /** - * This method implements the main loop of the crawler, where the crawler - * accomplishes all the steps needed to retrieve Web pages. - */ - - public void run() { - long time = System.currentTimeMillis(); - while(!stop) { - setStartCicleTime(System.currentTimeMillis()); - try { - setStatus(INIT); - setPartitionTime(INIT,System.currentTimeMillis()-time); - if( jump ) { - setJump(false); - cleanup(); - continue; - } - - setStatus(SELECT_URL); - time = System.currentTimeMillis(); - selectUrl(); - setPartitionTime(SELECT_URL,System.currentTimeMillis()-time); - if( jump ) { - setJump(false); - cleanup(); - continue; - } - logger.trace(getName()+">after request url"); - - setStatus(CHECK_URL); - time = System.currentTimeMillis(); - checkUrl(); - setPartitionTime(CHECK_URL,System.currentTimeMillis()-time); - if( jump ) { - setJump(false); - cleanup(); - continue; - } - logger.trace(getName()+">after check url"); - - setStatus(DOWNLOAD_URL); - time = System.currentTimeMillis(); - downloadUrl(); - setPartitionTime(DOWNLOAD_URL,System.currentTimeMillis()-time); - if( jump ) { - setJump(false); - cleanup(); - continue; - } - logger.trace(getName()+">after download data"); - - setStatus(PROCESS_DATA); - time = System.currentTimeMillis(); - processData(); - setPartitionTime(PROCESS_DATA,System.currentTimeMillis()-time); - if( jump ) { - setJump(false); - cleanup(); - continue; - } - logger.trace(getName()+">after process data"); - - setStatus(CHECK_DATA); - time = System.currentTimeMillis(); - checkData(); - setPartitionTime(CHECK_DATA,System.currentTimeMillis()-time); - if( jump ) { - setJump(false); - cleanup(); - continue; - } - logger.trace(getName()+">after check data"); - - setStatus(SEND_DATA); - time = System.currentTimeMillis(); - sendData(); - setPartitionTime(SEND_DATA,System.currentTimeMillis()-time); - logger.trace(getName()+">after send data"); - - setLastException(null); - setStatus(END); - time = System.currentTimeMillis(); - restingSleep(); - } - catch(CrawlerException re) { - logger.error(re.getMessage(), re); - if( re.detail != null ) { - re.detail.printStackTrace(); - } - setLastException(re); - try { - setStatus(SLEEPING); - time = System.currentTimeMillis(); - if( !stop ) { - logger.info("Sleeping "+sleepTime+" mls due to last error."); - sleep(sleepTime); - } - setPartitionTime(SLEEPING,System.currentTimeMillis()-time); - } - catch( InterruptedException ie ) { - logger.error("Sleeping interrupted.", ie); - } - } - finally { - try { - cleanup(); - } - catch(Exception exc) { - logger.info("Problem while executing cleanup.", exc); - } - setPartitionTime(END,System.currentTimeMillis()-time); - setTotalCicleTime(System.currentTimeMillis() - getStartCicleTime()); - } - String parts = ""; - for(int i = 0; i < STATES.length; i++) { - parts += (i==0?""+getPartitionTime(i):","+getPartitionTime(i)); - } - logger.info("Total time is "+getTotalCicleTime()+" mls ["+parts+"]"); - } - try { - logger.info("Thread dead, calling cleanup()."); - setStatus(DEAD); - cleanup(); - logger.info("Thread dead cleanup() done."); - } - catch(Exception exc) { - logger.info("Problem while finishing crawler thread.", exc); - } - } - - public void restingSleep() { - try { - sleep(restingTime); - } - catch(InterruptedException exc) { - logger.info("Sleeping interrupted.", exc); - } - } - - /** - * This method gets the next URL to be processed. - * @throws CrawlerException - */ - abstract protected void selectUrl() throws CrawlerException; - - /** - * It checks if there is any constraint about the given URL - * - * @throws CrawlerException - */ - abstract protected void checkUrl() throws CrawlerException; - - /** - * This method dowloads the given URL. - * - * @throws CrawlerException - */ - abstract protected void downloadUrl() throws CrawlerException; - - /** - * This method processes the URL content - * - * @throws CrawlerException - */ - abstract protected void processData() throws CrawlerException; - - /** - * It checks if there is any constraint about the processed data. - * - * @throws CrawlerException - */ - abstract protected void checkData() throws CrawlerException; - - /** - * This method sends data already processed. - * - * @throws CrawlerException - */ - abstract protected void sendData() throws CrawlerException; - - /** - * This method cleans up any temporary attribute/variable - * - * @throws CrawlerException - */ - abstract protected void cleanup() throws CrawlerException; - - -} diff --git a/src/main/java/focusedCrawler/crawler/CrawlerException.java b/src/main/java/focusedCrawler/crawler/CrawlerException.java deleted file mode 100644 index e1692a7e5..000000000 --- a/src/main/java/focusedCrawler/crawler/CrawlerException.java +++ /dev/null @@ -1,54 +0,0 @@ -/* -############################################################################ -## -## Copyright (C) 2006-2009 University of Utah. All rights reserved. -## -## This file is part of DeepPeep. -## -## This file may be used under the terms of the GNU General Public -## License version 2.0 as published by the Free Software Foundation -## and appearing in the file LICENSE.GPL included in the packaging of -## this file. Please review the following to ensure GNU General Public -## Licensing requirements will be met: -## http://www.opensource.org/licenses/gpl-license.php -## -## If you are unsure which license is appropriate for your use (for -## instance, you are interested in developing a commercial derivative -## of DeepPeep), please contact us at deeppeep@sci.utah.edu. -## -## This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE -## WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. -## -############################################################################ -*/ -package focusedCrawler.crawler; - -/** - *

Description: Exception thrown by the crawler

- * - *

Copyright: Copyright (c) 2004

- * - *

Company:

- * - * @author not attributable - * @version 1.0 - */ -@SuppressWarnings("serial") -public class CrawlerException extends Exception { - - public Throwable detail; - - public CrawlerException() { - super(); - } - - public CrawlerException(String message) { - super(message); - } - - public CrawlerException(String message, Throwable detail) { - super(message); - this.detail = detail; - } - -} diff --git a/src/main/java/focusedCrawler/crawler/CrawlerImpl.java b/src/main/java/focusedCrawler/crawler/CrawlerImpl.java deleted file mode 100644 index 3db525863..000000000 --- a/src/main/java/focusedCrawler/crawler/CrawlerImpl.java +++ /dev/null @@ -1,229 +0,0 @@ -/* -############################################################################ -## -## Copyright (C) 2006-2009 University of Utah. All rights reserved. -## -## This file is part of DeepPeep. -## -## This file may be used under the terms of the GNU General Public -## License version 2.0 as published by the Free Software Foundation -## and appearing in the file LICENSE.GPL included in the packaging of -## this file. Please review the following to ensure GNU General Public -## Licensing requirements will be met: -## http://www.opensource.org/licenses/gpl-license.php -## -## If you are unsure which license is appropriate for your use (for -## instance, you are interested in developing a commercial derivative -## of DeepPeep), please contact us at deeppeep@sci.utah.edu. -## -## This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE -## WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. -## -############################################################################ -*/ -package focusedCrawler.crawler; - -import java.net.URL; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import focusedCrawler.util.DataNotFoundException; -import focusedCrawler.util.LinkRelevance; -import focusedCrawler.util.Page; -import focusedCrawler.util.distribution.CommunicationException; -import focusedCrawler.util.parser.PaginaURL; -import focusedCrawler.util.storage.Storage; -import focusedCrawler.util.storage.StorageException; - -/** - * This class implements a crawler - * @author lbarbosa - * - */ - -public class CrawlerImpl extends Crawler { - - private static final Logger logger = LoggerFactory.getLogger(CrawlerImpl.class); - - private Storage linkStorage; - - private Storage targetStorage; - - protected URL initialUrl; - - protected URL currentUrl; - - protected URL urlFinal; - - protected static long DATE_UNKNOWN = -1; - - protected int length; - - protected double relevance; - - protected Page page; - - protected int bufferSize; - - protected String source; - - protected Downloader urlDownloader; - - public CrawlerImpl(ThreadGroup tg, String name, Storage linkStorage, Storage targetStorage) { - super(tg,name); - this.linkStorage = linkStorage; - this.targetStorage = targetStorage; - - } - - /** - * This method selects the next URL to be downloaded by the crawler - */ - @Override - protected void selectUrl() throws CrawlerException { - - try { - long t1 = System.currentTimeMillis(); - setMessage("selectUrl() linkStorage."); - - LinkRelevance lr = ((LinkRelevance)linkStorage.select(null)); - - initialUrl = lr.getURL(); - String host = lr.getURL().getHost(); - - host = host.substring(0,host.indexOf(".")); - boolean number = false; - - try{ - Integer.parseInt(host); - number = true; - } catch(Exception ex) { } - - relevance = lr.getRelevance(); - - t1 = System.currentTimeMillis()-t1; - - logger.info("Selected next URL to download (time: "+t1+"): "+initialUrl); - - if( initialUrl == null || number || lr.getURL().getHost().contains("fc2.com")) { - throw new CrawlerException(getName()+": LinkStorage sent null!"); - } - - setSelectedLinks(getSelectedLinks() +1); - currentUrl = initialUrl; - setUrl(currentUrl); - - setMessage(""); - } - catch(DataNotFoundException dnfe) { - throw new CrawlerException(getName()+":"+dnfe.getMessage(),dnfe.detail); - } - catch(StorageException se) { - throw new CrawlerException(getName()+":"+se.getMessage(),se.detail); - } - catch(CommunicationException ce) { - throw new CrawlerException(getName()+":"+ce.getMessage(),ce.detail); - } - } - - @Override - protected void checkUrl() throws CrawlerException { - - } - - /** - * This method downloads the URL selected in the selectURL method. - */ - @Override - protected void downloadUrl() throws CrawlerException { - urlFinal = getUrl(); - urlDownloader = new Downloader(urlFinal); - source = urlDownloader.getContent(); - } - - protected void handleNotFound() throws Exception { - setJump(true, "Url(insert) '" + getUrl() + "' not found."); - } - - protected void handleRedirect() throws Exception { - logger.info(getUrl() + " redirected to " + urlFinal + "."); - } - - @Override - protected void processData() throws CrawlerException { - setMessage("URL "+getUrl()); - try { - - if(urlDownloader.isRedirection()) { - page = new Page(getUrl(), source, - urlDownloader.getResponseHeaders(), - urlDownloader.getRedirectionUrl()); - } else { - page = new Page(getUrl(), source, urlDownloader.getResponseHeaders()); - } - page.setFetchTime(System.currentTimeMillis()); - - PaginaURL pageParser = new PaginaURL(page.getURL(),page.getContent()); - page.setPageURL(pageParser); - if(relevance > LinkRelevance.DEFAULT_HUB_RELEVANCE && relevance < LinkRelevance.DEFAULT_AUTH_RELEVANCE){ - page.setHub(true); - } - page.setRelevance(relevance); - } catch (Exception e) { - logger.error("Problem while processing data.", e); - } - - setMessage(null); - } - - @Override - protected void checkData() throws CrawlerException { - - } - - /** - * In this method, the crawler sends a downloaded page to the Form Storage. - */ - @Override - protected void sendData() throws CrawlerException { - - try { - - logger.info("Sending page [ "+page.getURL()+" ] to TargetStorage."); - - targetStorage.insert(page); - -// linkStorage.insert(page); - - } - catch( StorageException se ) { - logger.error("Problem while sending page to storage.", se); - throw new CrawlerException(getName()+":"+se.getMessage(),se); - } - catch(CommunicationException ce) { - logger.error("Communication problem while sending page to storage.", ce); - throw new CrawlerException(getName()+":"+ce.getMessage(),ce); - } - - } - - /** - * This cleans all the temporary variables. - */ - @Override - protected synchronized void cleanup() throws CrawlerException { - setUrl(null); - initialUrl = null; - currentUrl = null; - urlFinal = null; - length = 0; - //file.setLength(0); - page = null; - for(int i = getStatus(); i < STATES.length; i++) { - setPartitionTime(i,0); - } - } - -} - diff --git a/src/main/java/focusedCrawler/crawler/CrawlerManager.java b/src/main/java/focusedCrawler/crawler/CrawlerManager.java deleted file mode 100644 index 922ebb960..000000000 --- a/src/main/java/focusedCrawler/crawler/CrawlerManager.java +++ /dev/null @@ -1,362 +0,0 @@ -/* -############################################################################ -## -## Copyright (C) 2006-2009 University of Utah. All rights reserved. -## -## This file is part of DeepPeep. -## -## This file may be used under the terms of the GNU General Public -## License version 2.0 as published by the Free Software Foundation -## and appearing in the file LICENSE.GPL included in the packaging of -## this file. Please review the following to ensure GNU General Public -## Licensing requirements will be met: -## http://www.opensource.org/licenses/gpl-license.php -## -## If you are unsure which license is appropriate for your use (for -## instance, you are interested in developing a commercial derivative -## of DeepPeep), please contact us at deeppeep@sci.utah.edu. -## -## This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE -## WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. -## -############################################################################ -*/ -package focusedCrawler.crawler; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import focusedCrawler.config.ConfigService; -import focusedCrawler.util.storage.Storage; -import focusedCrawler.util.storage.StorageConfig; -import focusedCrawler.util.storage.StorageFactoryException; -import focusedCrawler.util.storage.distribution.StorageCreator; - - -/** - *

Description: This class manages the crawlers

- * - *

Copyright: Copyright (c) 2004

- * - * @author Luciano Barbosa - * @version 1.0 - */ - -public class CrawlerManager extends Thread { - - public static final Logger logger = LoggerFactory.getLogger(CrawlerManager.class); - - private CrawlerManagerConfig config; - - private Storage linkStorage; - - private Storage formStorage; - - private boolean stop; - - private ThreadGroup crawlerThreadGroup; - - private Crawler[] crawlers; - - private int[] life; - - - public CrawlerManager(CrawlerManagerConfig config, - Storage linkStorage, - Storage formStorage) throws CrawlerManagerException { - - this.config = config; - this.linkStorage = linkStorage; - this.formStorage = formStorage; - - this.crawlerThreadGroup = new ThreadGroup(config.getRobotThreadGroup()); - this.crawlers = new Crawler[config.getRobotQuantity()]; - this.life = new int[crawlers.length]; - - for (int i = 0; i < this.crawlers.length; i++) { - this.crawlers[i] = createCrawler(this.crawlerThreadGroup, i, this.life[0]); - } - - } - - /** - * This method monitors the crawlers' behavior - */ - public void run() { - - setPriority( getPriority() + 1 ); - - //Start the Robots - - for( int i = 0; i < crawlers.length; i++ ) { - - crawlers[i].start(); - - } - - - stop = false; - - while(!stop) { - - boolean isShutdown = false; - - for ( int i = 0 ; i < crawlers.length ; i++ ) { - - try { - - Crawler crawler = crawlers[i]; - - logger.info("RM>"+crawlerThreadGroup.getName()+">"+crawler.getName()+">Time("+crawler.getCicleTime()+"):"+statusCrawlerString(crawler)+(crawler.getMessage()==null?"":":"+crawler.getMessage())); - - if(crawler.getCicleTime() > config.getRobotManagerMaxTime() ) { - - stopCrawler(i); - - life[i]++; - - crawlers[i] = createCrawler(crawlerThreadGroup, i, life[i]); - crawlers[i].start(); - } - } - - catch(CrawlerManagerException rme) { - logger.error("Problem:"+rme.getMessage(), rme); - } - catch(Exception e) { - logger.error("Problem:"+e.getMessage(), e); - } - - int numThreadsEst = crawlerThreadGroup.activeCount(); - - Thread[] threads = new Thread[numThreadsEst * 2]; - - int numThreads = crawlerThreadGroup.enumerate(threads, false); - - int maxThreads = 2 * config.getRobotManagerRobotThreadFactor() * crawlers.length; - - if (numThreads > maxThreads) { - - logger.warn("Threads limit " + maxThreads + " exceeded '" + numThreads + "'"); - -// stopManager(); - - sleepExit(1 * 60 * 1000); - - } else if ( isShutdown ) { - - logger.info("Shutdown fired."); - - stopManager(); - - sleepExit(1 * 60 * 1000); - - } - } - - try { - sleep(config.getRobotManagerSleepCheckTime()); - } - catch(InterruptedException ie) { - logger.error("Sleeping for "+config.getRobotManagerSleepCheckTime()+" interrupted.", ie); - } - - } - - logger.info("RM>"+getName()+">Stop requested. Giving a max of 60s to stop all robots before halt."); - - stopManagerExit(1 * 60 * 1000); - - } - - private void sleepExit(long time) { - try { - logger.info("Waiting " + time + " mls to die."); - sleep(time); - logger.info("System.exit()"); -// System.exit(1); - } catch (InterruptedException exc) { - logger.error("Interruped while waiting mls to exit.", exc); - } - } - - public Crawler createCrawler(ThreadGroup tg, int index, int life) throws CrawlerManagerException { - try { - String name = tg.getName() + "_" + index + "_" + life; - - CrawlerImpl crawler = new CrawlerImpl(tg, name, linkStorage, formStorage); - crawler.setRestingTime(config.getRobotManagerRestingTime()); - crawler.setSleepTime(config.getRobotManagerRobotErrorTime()); - crawler.setPriority(Thread.NORM_PRIORITY); - - return crawler; - } catch (Exception exc) { - throw new CrawlerManagerException(exc.getMessage(), exc); - } - } - - /** - * This method stops the crawler n - */ - public void stopCrawler(int n) { - Crawler r = crawlers[n]; - logger.info("Killing crawler" + r + " : " + statusCrawlerString(r)); - r.setStop(true); - } - - /** - * Stop all crawlers - */ - public void stopAllCrawlers() { - for(int i = 0; i < crawlers.length; i++) { - crawlers[i].setStop(true); - } - } - - /** - * This method stops all the robots and the manager as well. - */ - public void stopManager() { - stopAllCrawlers(); - stop = true; - } - - public void stopManagerExit(long time) { - - stopManager(); - - try { - - long MAX_JOIN = 5 * 60 * 1000; //5minutos cada robot. - - for (int i = 0; i < crawlers.length; i++) { - - Crawler crawler = crawlers[i]; - - logger.info("RM> Waiting "+crawler.getName()+". Max join time is "+MAX_JOIN); - - crawler.join(MAX_JOIN); - - logger.info("RM>"+crawler.getName()+" joined."); - - } - - for (int i = 0; i < crawlers.length; i++) { - - Crawler crawler = crawlers[i]; - - logger.info("RM>"+crawler.getName()+">Time("+crawler.getCicleTime()+"):"+statusCrawlerString(crawler)); - - } - - } - catch (InterruptedException ex) { - logger.error("Error while stoping manager. ", ex); - } - - System.exit(0); - - } - - public String statusCrawlerString(Crawler crawler) { - - String result = null; - - switch( crawler.getStatus() ) { - - case Crawler.INIT : - - result = "At the begin of the main loop."; - - break; - - case Crawler.SELECT_URL : - - result = "Asking for a new url to work on."; - - break; - - case Crawler.DOWNLOAD_URL : - - result = "Downloading "+crawler.getUrl()+"."; - - break; - - case Crawler.PROCESS_DATA : - - result = "Processing data retrieved. URL:"+crawler.getUrl(); - - break; - - case Crawler.CHECK_DATA : - - result = "Checking the processed data. URL:"+crawler.getUrl(); - - break; - - case Crawler.SEND_DATA : - - result = "Sending data of '"+crawler.getUrl()+"' to the page storage."; - - break; - - case Crawler.END : - - result = "At the end of the main loop"; - - break; - - case Crawler.SLEEPING : - - result = "Sleeping as a consequence of this problem: '"+crawler.getLastException().getMessage()+"'"; - - break; - - case Crawler.DEAD : - - result = "Dead."; - - break; - - default : - - result = "Unknown."; - - } - - return result; - - } - - public static CrawlerManager createCrawlerManager(CrawlerManagerConfig crawlerManagerConfig, - Storage linkStorage, - Storage formStorage) - throws CrawlerManagerException { - - return new CrawlerManager(crawlerManagerConfig, linkStorage, formStorage); - } - - public static void run(ConfigService config) throws IOException, NumberFormatException { - logger.info("Starting CrawlerManager..."); - try { - StorageConfig linkStorageServerConfig = config.getLinkStorageConfig().getStorageServerConfig(); - Storage linkStorage = new StorageCreator(linkStorageServerConfig).produce(); - - StorageConfig targetServerConfig = config.getTargetStorageConfig().getStorageServerConfig(); - Storage formStorage = new StorageCreator(targetServerConfig).produce(); - - CrawlerManager manager = createCrawlerManager(config.getCrawlerManagerConfig(), - linkStorage, formStorage); - manager.start(); - - } catch (CrawlerManagerException ex) { - logger.error("An error occurred while starting CrawlerManager. ", ex); - } catch (StorageFactoryException ex) { - logger.error("An error occurred while starting CrawlerManager. ", ex); - } - } - -} - diff --git a/src/main/java/focusedCrawler/crawler/CrawlerManagerConfig.java b/src/main/java/focusedCrawler/crawler/CrawlerManagerConfig.java deleted file mode 100644 index b8e73eda8..000000000 --- a/src/main/java/focusedCrawler/crawler/CrawlerManagerConfig.java +++ /dev/null @@ -1,78 +0,0 @@ -package focusedCrawler.crawler; - -import java.io.IOException; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - -import focusedCrawler.util.ParameterFile; - -public class CrawlerManagerConfig { - - @JsonProperty("crawler_manager.robot_mananger.thread_group") - private String robotThreadGroup = "crawler_group"; - @JsonProperty("crawler_manager.robot_mananger.resting_time") - private long robotManagerRestingTime = 10; - @JsonProperty("crawler_manager.robot_mananger.quantity") - private int robotQuantity = 5; - @JsonProperty("crawler_manager.robot_mananger.check_time") - private long robotManagerCheckTime = 30000; - @JsonProperty("crawler_manager.robot_mananger.max_time") - private long robotManagerMaxTime = 30000; - @JsonProperty("crawler_manager.robot_mananger.robot_error_sleep_time") - private long robotManagerRobotErrorSleepTime = 5000; - @JsonProperty("crawler_manager.robot_mananger.thread_factor") - private int robotManagerRobotThreadFactor = 10; - @JsonProperty("crawler_manager.downloader.max_blocked_threads") - private long downloaderMaxBlockedThreads = 20000000; - - public CrawlerManagerConfig(String filename) { - ParameterFile params = new ParameterFile(filename); - this.robotThreadGroup = params.getParam("ROBOT_THREAD_GROUP"); - this.robotQuantity = params.getParamIntOrDefault("ROBOT_QUANTITY", 5); - this.robotManagerRestingTime = params.getParamLongOrDefault("ROBOT_MANAGER_RESTINGTIME", 10); - this.robotManagerCheckTime = params.getParamLongOrDefault("ROBOT_MANAGER_CHECKTIME", 30000); - this.robotManagerMaxTime = params.getParamLongOrDefault("ROBOT_MANAGER_MAXTIME", 30000); - this.robotManagerRobotErrorSleepTime = params.getParamLongOrDefault("ROBOT_MANAGER_ROBOT_ERROR_SLEEP_TIME", 5000); - this.robotManagerRobotThreadFactor = params.getParamIntOrDefault("ROBOT_MANAGER_ROBOT_THREAD_FACTOR", 10); - this.downloaderMaxBlockedThreads = params.getParamLongOrDefault("DOWNLOADER_MAX_BLOCKED_THREADS", 20000000); - } - - public CrawlerManagerConfig(JsonNode config, ObjectMapper objectMapper) throws IOException { - objectMapper.readerForUpdating(this).readValue(config); - } - - public String getRobotThreadGroup() { - return robotThreadGroup; - } - - public int getRobotQuantity() { - return robotQuantity; - } - - public long getRobotManagerRestingTime() { - return robotManagerRestingTime; - } - - public long getRobotManagerSleepCheckTime() { - return robotManagerCheckTime; - } - - public long getRobotManagerMaxTime() { - return robotManagerMaxTime; - } - - public long getRobotManagerRobotErrorTime() { - return robotManagerRobotErrorSleepTime; - } - - public int getRobotManagerRobotThreadFactor() { - return robotManagerRobotThreadFactor; - } - - public long getDownloaderMaxBlockedThreads() { - return downloaderMaxBlockedThreads; - } - -} \ No newline at end of file diff --git a/src/main/java/focusedCrawler/crawler/CrawlerManagerException.java b/src/main/java/focusedCrawler/crawler/CrawlerManagerException.java deleted file mode 100644 index b570d2e7c..000000000 --- a/src/main/java/focusedCrawler/crawler/CrawlerManagerException.java +++ /dev/null @@ -1,53 +0,0 @@ -/* -############################################################################ -## -## Copyright (C) 2006-2009 University of Utah. All rights reserved. -## -## This file is part of DeepPeep. -## -## This file may be used under the terms of the GNU General Public -## License version 2.0 as published by the Free Software Foundation -## and appearing in the file LICENSE.GPL included in the packaging of -## this file. Please review the following to ensure GNU General Public -## Licensing requirements will be met: -## http://www.opensource.org/licenses/gpl-license.php -## -## If you are unsure which license is appropriate for your use (for -## instance, you are interested in developing a commercial derivative -## of DeepPeep), please contact us at deeppeep@sci.utah.edu. -## -## This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE -## WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. -## -############################################################################ -*/ -package focusedCrawler.crawler; -/** - *

Description: Exception thrown by the crawler manager

- * - *

Copyright: Copyright (c) 2004

- * - *

Company:

- * - * @author not attributable - * @version 1.0 - */ -@SuppressWarnings("serial") -public class CrawlerManagerException extends Exception { - - public Throwable detail; - - public CrawlerManagerException() { - super(); - } - - public CrawlerManagerException(String message) { - super(message); - } - - public CrawlerManagerException(String message, Throwable detail) { - super(message); - this.detail = detail; - } - -} diff --git a/src/main/java/focusedCrawler/crawler/Downloader.java b/src/main/java/focusedCrawler/crawler/Downloader.java deleted file mode 100644 index 1c600dae5..000000000 --- a/src/main/java/focusedCrawler/crawler/Downloader.java +++ /dev/null @@ -1,127 +0,0 @@ -package focusedCrawler.crawler; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLConnection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class Downloader { - - public static final Logger logger = LoggerFactory.getLogger(Downloader.class); - - private URL originalURL; - private URL redirectionURL; - private boolean isURLRedirecting = false; - private Map> responseHeaders; - private String mimeType; - private String content; - - public Downloader(String url) throws MalformedURLException, CrawlerException { - this(new URL(url)); - } - - public Downloader(URL url) throws CrawlerException { - try { - URLConnection connection = url.openConnection(); - connection.connect(); - - this.originalURL = url; - this.mimeType = connection.getContentType(); - this.responseHeaders = connection.getHeaderFields(); - if(this.responseHeaders != null) { - // URLConnection.getHeaderFields() doesn't parse headers correctly. - // HTTP protocol line is parsed as field and stored in the map with - // a null key, e.g: null=[HTTP/1.1 301 Moved Permanently] - // So, here we remove null keys from the map. - if(this.responseHeaders.containsKey(null)) { - this.responseHeaders = new HashMap<>(this.responseHeaders); - this.responseHeaders.remove(null); - } - } - - if(connection instanceof HttpURLConnection){ - processRedirection((HttpURLConnection) connection, responseHeaders); - } - - this.content = readContent(connection); - - } catch (IOException e) { - throw new CrawlerException("Failed to donwload URL: "+url, e); - } - - } - - private String readContent(URLConnection connection) throws IOException { - InputStream in = connection.getInputStream(); - BufferedReader bin = new BufferedReader(new InputStreamReader(in)); - StringBuffer buffer = new StringBuffer(); - try { - String inputLine; - while ((inputLine = bin.readLine()) != null) { - buffer.append(inputLine).append("\n"); - } - return buffer.toString(); - } finally { - bin.close(); - } - } - - private void processRedirection(HttpURLConnection httpConnection, - Map> responseHeaders) - throws IOException { - - int responseCode = httpConnection.getResponseCode(); - boolean isRedirectionCode = responseCode >= 301 && responseCode <= 307; - - if (isRedirectionCode && responseHeaders.keySet() != null) { - for (String headerKey : responseHeaders.keySet()) { - - if(headerKey == null) { - continue; - } - - if (headerKey.toLowerCase().equals("location")) { - // we have a redirecting URL - String location = httpConnection.getHeaderField(headerKey); - this.redirectionURL = new URL(originalURL, location); - this.isURLRedirecting = true; - } - } - } - } - - protected Map> getResponseHeaders() { - return responseHeaders; - } - - public boolean isRedirection() { - return isURLRedirecting; - } - - public URL getOriginalUrl() { - return originalURL; - } - - public URL getRedirectionUrl() { - return redirectionURL; - } - - public String getContent() { - return content; - } - - public String getMimeType() { - return mimeType; - } - -} \ No newline at end of file diff --git a/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java b/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java new file mode 100644 index 000000000..1b3840845 --- /dev/null +++ b/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java @@ -0,0 +1,160 @@ +package focusedCrawler.crawler.async; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonUnwrapped; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import focusedCrawler.config.ConfigService; +import focusedCrawler.link.DownloadScheduler; +import focusedCrawler.link.LinkStorage; +import focusedCrawler.util.DataNotFoundException; +import focusedCrawler.util.LinkRelevance; +import focusedCrawler.util.storage.Storage; +import focusedCrawler.util.storage.StorageConfig; +import focusedCrawler.util.storage.StorageException; +import focusedCrawler.util.storage.StorageFactoryException; +import focusedCrawler.util.storage.distribution.StorageCreator; + +public class AsyncCrawler { + + private static final Logger logger = LoggerFactory.getLogger(AsyncCrawler.class); + + public static class Config { + + @JsonProperty("crawler_manager.scheduler.host_min_access_interval") + private int hostMinAccessInterval = 5000; + + @JsonProperty("crawler_manager.scheduler.max_links") + private int maxLinksInScheduler = 10000; + + @JsonUnwrapped + private HttpDownloader.Config downloaderConfig = new HttpDownloader.Config(); + + public Config(JsonNode config, ObjectMapper objectMapper) throws JsonProcessingException, IOException { + objectMapper.readerForUpdating(this).readValue(config); + } + + public int getHostMinAccessInterval() { + return hostMinAccessInterval; + } + + public int getMaxLinksInScheduler() { + return maxLinksInScheduler; + } + + public HttpDownloader.Config getDownloaderConfig() { + return downloaderConfig; + } + + } + + private final LinkStorage linkStorage; + private final HttpDownloader downloader; + private final FetchedResultHandler resultHandler; + private final DownloadScheduler downloadScheduler; + + private boolean shouldStop = false; + + public AsyncCrawler(Storage targetStorage, LinkStorage linkStorage, Config crawlerConfig) { + this.linkStorage = linkStorage; + this.downloader = new HttpDownloader(crawlerConfig.getDownloaderConfig()); + this.resultHandler = new FetchedResultHandler(targetStorage); + this.downloadScheduler = new DownloadScheduler( + crawlerConfig.getHostMinAccessInterval(), + crawlerConfig.getMaxLinksInScheduler()); + } + + private class DownloadDispatcher extends Thread { + public DownloadDispatcher() { + setName("download-dispatcher"); + } + @Override + public void run() { + while(!shouldStop) { + LinkRelevance linkRelevance = downloadScheduler.nextLink(); + if(linkRelevance != null) { + downloader.dipatchDownload(linkRelevance, resultHandler); + } else { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + logger.error("LinkDispatcher was interrupted.", e); + } + } + } + } + } + + public void run() { + try { + DownloadDispatcher linkDispatcher = new DownloadDispatcher(); + linkDispatcher.start(); + while(!this.shouldStop) { + try { + LinkRelevance link = (LinkRelevance) linkStorage.select(null); + if(link != null) { + downloadScheduler.addLink(link); + } + } + catch (DataNotFoundException e) { + // There are no more links available in the frontier right now + if(downloader.hasPendingDownloads() || downloadScheduler.hasPendingLinks()) { + // If there are still pending downloads, new links + // may be found in these pages, so we should wait some + // time until more links are available and try again + try { + logger.info("Waiting for links from pages being downloaded..."); + Thread.sleep(1000); + } catch (InterruptedException ie) { } + continue; + } + // There are no more pending downloads and there are no + // more links available in the frontier, so stop crawler + logger.info("LinkStorage ran out of links, stopping crawler."); + this.shouldStop = true; + break; + } catch (StorageException e) { + logger.error("Problem when selecting link from LinkStorage.", e); + } catch (Exception e) { + logger.error("An unexpected error happened.", e); + } + } + downloader.await(); + } finally { + logger.info("Shutting down crawler..."); + downloader.close(); + logger.info("Done."); + } + } + + public void stop() { + this.shouldStop = true; + } + + public static void run(ConfigService config) throws IOException, NumberFormatException { + logger.info("Starting CrawlerManager..."); + try { + StorageConfig linkStorageServerConfig = config.getLinkStorageConfig().getStorageServerConfig(); + Storage linkStorage = new StorageCreator(linkStorageServerConfig).produce(); + + StorageConfig targetServerConfig = config.getTargetStorageConfig().getStorageServerConfig(); + Storage targetStorage = new StorageCreator(targetServerConfig).produce(); + + AsyncCrawler.Config crawlerConfig = config.getCrawlerConfig(); + + AsyncCrawler crawler = new AsyncCrawler(targetStorage, (LinkStorage) linkStorage, crawlerConfig); + crawler.run(); + + } catch (StorageFactoryException ex) { + logger.error("An error occurred while starting CrawlerManager. ", ex); + } + } + +} diff --git a/src/main/java/focusedCrawler/crawler/async/FetchedResultHandler.java b/src/main/java/focusedCrawler/crawler/async/FetchedResultHandler.java new file mode 100644 index 000000000..210830f7c --- /dev/null +++ b/src/main/java/focusedCrawler/crawler/async/FetchedResultHandler.java @@ -0,0 +1,104 @@ +package focusedCrawler.crawler.async; + +import java.net.URL; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.tika.metadata.Metadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import crawlercommons.fetcher.AbortedFetchException; +import crawlercommons.fetcher.FetchedResult; +import focusedCrawler.util.LinkRelevance; +import focusedCrawler.util.Page; +import focusedCrawler.util.parser.PaginaURL; +import focusedCrawler.util.storage.Storage; + +public class FetchedResultHandler implements HttpDownloader.Callback { + + private static final Logger logger = LoggerFactory.getLogger(FetchedResultHandler.class); + + private Storage targetStorage; + + public FetchedResultHandler(Storage targetStorage) { + this.targetStorage = targetStorage; + } + + @Override + public void completed(final FetchedResult response) { + + int statusCode = response.getStatusCode(); + if(statusCode >= 200 && statusCode < 300) { + logger.info("Successfully downloaded URL=["+response.getBaseUrl()+"] HTTP-Response-Code="+statusCode); + processData(response); + } else { + // TODO: Update metadata about page visits in link storage + logger.info("Server returned bad code for URL=["+response.getBaseUrl()+"] HTTP-Response-Code="+statusCode); + } + } + + @Override + public void failed(String url, final Exception e) { + if(e instanceof AbortedFetchException) { + AbortedFetchException afe = (AbortedFetchException) e; + logger.info("Download aborted: \n>URL: {}\n>Reason: {}", url, afe.getAbortReason()); + } else { + logger.info("Failed to download URL: "+url, e.getMessage()); + } + } + + private void processData(FetchedResult response) { + try { + Page page; + if(response.getNumRedirects() == 0) { + page = new Page( + new URL(response.getBaseUrl()), + new String(response.getContent()), + parseResponseHeaders(response.getHeaders()) + ); + } else { + page = new Page( + new URL(response.getBaseUrl()), + new String(response.getContent()), + parseResponseHeaders(response.getHeaders()), + new URL(response.getFetchedUrl()) + ); + } + page.setFetchTime(response.getFetchTime()); + + PaginaURL pageParser = new PaginaURL(page.getURL(), page.getContent()); + page.setPageURL(pageParser); + + LinkRelevance link = (LinkRelevance) response.getPayload().get(HttpDownloader.PAYLOAD_KEY); + + final double relevance = link.getRelevance(); + if(relevance > LinkRelevance.DEFAULT_HUB_RELEVANCE && + relevance < LinkRelevance.DEFAULT_AUTH_RELEVANCE){ + page.setHub(true); + } + + page.setRelevance(relevance); + + logger.info(relevance + " Sending page to TargetStorage: "+ response.getFetchedUrl()); + targetStorage.insert(page); + + } catch (Exception e) { + logger.error("Problem while processing data.", e); + } + } + + private Map> parseResponseHeaders(Metadata headerAsMetadata) { + Map> responseHeaders = new HashMap<>(); + String[] names = headerAsMetadata.names(); + if(names != null && names.length > 0) { + for(String name : names) { + responseHeaders.put(name, Arrays.asList(headerAsMetadata.getValues(name))); + } + } + return responseHeaders; + } + +} diff --git a/src/main/java/focusedCrawler/crawler/async/HttpDownloader.java b/src/main/java/focusedCrawler/crawler/async/HttpDownloader.java new file mode 100644 index 000000000..7a84d8abc --- /dev/null +++ b/src/main/java/focusedCrawler/crawler/async/HttpDownloader.java @@ -0,0 +1,262 @@ +package focusedCrawler.crawler.async; + +import java.io.Closeable; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.http.client.methods.HttpGet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import crawlercommons.fetcher.BaseFetchException; +import crawlercommons.fetcher.FetchedResult; +import crawlercommons.fetcher.Payload; +import crawlercommons.fetcher.http.SimpleHttpFetcher; +import crawlercommons.fetcher.http.UserAgent; +import focusedCrawler.util.LinkRelevance; + +public class HttpDownloader implements Closeable { + + private static Logger logger = LoggerFactory.getLogger(HttpDownloader.class); + + public static final String PAYLOAD_KEY = "link-relevance"; + private static final int CPU_CORES = Runtime.getRuntime().availableProcessors(); + + static public class Config { + + @JsonProperty("crawler_manager.downloader.download_thread_pool_size") + private int downloadThreadPoolSize = 100; + + @JsonProperty("crawler_manager.downloader.max_retry_count") + private int maxRetryCount = 2; + + @JsonProperty("crawler_manager.downloader.valid_mime_types") + private String[] validMimeTypes = { + "text/html", + "application/x-asp", + "application/xhtml+xml", + "application/vnd.wap.xhtml+xml" + }; + + @JsonProperty("crawler_manager.downloader.user_agent.name") + private String userAgentName = "ACHE"; + + @JsonProperty("crawler_manager.downloader.user_agent.url") + private String userAgentUrl = "https://github.com/ViDA-NYU/ache"; + + public Config() { + } + + public Config(JsonNode config, ObjectMapper objectMapper) throws IOException { + objectMapper.readerForUpdating(this).readValue(config); + } + + public int getDownloadThreadPoolSize() { + return this.downloadThreadPoolSize; + } + + public int getMaxRetryCount() { + return this.maxRetryCount; + } + + public String getUserAgentName() { + return this.userAgentName; + } + + public String getUserAgentUrl() { + return this.userAgentUrl; + } + + public String[] getValidMimeTypes() { + return this.validMimeTypes; + } + } + + private final SimpleHttpFetcher fetcher; + private final ExecutorService downloadThreadPool; + private final ExecutorService distpatchThreadPool; + private final LinkedBlockingQueue downloadQueue; + private final LinkedBlockingQueue dispatchQueue; + private final AtomicInteger numberOfDownloads = new AtomicInteger(0); + private final int downloadQueueMaxSize; + + public HttpDownloader() { + this(new Config()); + } + + public HttpDownloader(Config config) { + + ThreadFactory downloadThreadFactory = new ThreadFactoryBuilder().setNameFormat("downloader-%d").build(); + ThreadFactory dispatcherThreadFactory = new ThreadFactoryBuilder().setNameFormat("dispatcher-%d").build(); + + this.downloadQueue = new LinkedBlockingQueue(); + this.dispatchQueue = new LinkedBlockingQueue(); + + int threadPoolSize = config.getDownloadThreadPoolSize(); + this.downloadThreadPool = new ThreadPoolExecutor(threadPoolSize , threadPoolSize, + 0L, TimeUnit.MILLISECONDS, this.downloadQueue, downloadThreadFactory); + + this.distpatchThreadPool = new ThreadPoolExecutor(CPU_CORES, CPU_CORES, + 0L, TimeUnit.MILLISECONDS, this.dispatchQueue, dispatcherThreadFactory); + + this.downloadQueueMaxSize = threadPoolSize * 2; + + // Adding some extra connections for URLs that have redirects + // and thus creates more connections + int connectionPoolSize = (int) (threadPoolSize * 2); + UserAgent userAgent = new UserAgent(config.getUserAgentName(), "", config.getUserAgentUrl()); + + this.fetcher = new SimpleHttpFetcher(connectionPoolSize, userAgent); + this.fetcher.setSocketTimeout(30*1000); + this.fetcher.setMaxConnectionsPerHost(1); + this.fetcher.setConnectionTimeout(5*60*1000); + this.fetcher.setMaxRetryCount(config.getMaxRetryCount()); + this.fetcher.setDefaultMaxContentSize(10*1024*1024); + if(config.getValidMimeTypes() != null) { + for (String mimeTypes : config.getValidMimeTypes()) { + this.fetcher.addValidMimeType(mimeTypes); + } + } + } + + public Future dipatchDownload(String url) { + try { + URL urlObj = new URL(url); + Future future = dipatchDownload(urlObj, null); + return future; + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid URL provided: "+url, e); + } + } + + public Future dipatchDownload(URL url, Callback callback) { + LinkRelevance link = new LinkRelevance(url, 0d); + return dipatchDownload(link, callback); + } + + public Future dipatchDownload(LinkRelevance link, Callback callback) { + try { + while(downloadQueue.size() > downloadQueueMaxSize) { + Thread.sleep(100); + } + } catch (InterruptedException e) { + // ok, just finish execution + } + Future future = downloadThreadPool.submit(new RequestTask(link, callback)); + numberOfDownloads.incrementAndGet(); + return future; + } + + @Override + public void close() { + fetcher.abort(); + downloadThreadPool.shutdownNow(); + distpatchThreadPool.shutdownNow(); + try { + downloadThreadPool.awaitTermination(10, TimeUnit.SECONDS); + distpatchThreadPool.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("Failed to shutdown downloader threads.", e); + } + } + + public void await() { + logger.info("Waiting downloader to finish..."); + try { + downloadThreadPool.shutdown(); + distpatchThreadPool.shutdown(); + downloadThreadPool.awaitTermination(1, TimeUnit.MINUTES); + distpatchThreadPool.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException("Thread interrupted while waiting downloader threads finalize.", e); + } + logger.info("Done."); + } + + public boolean hasPendingDownloads() { + if(numberOfDownloads.get() > 0) { + return true; + } else { + return false; + } + } + + public interface Callback { + + public void completed(FetchedResult result); + + public void failed(String url, Exception e); + + } + + private class RequestTask implements Callable { + + private final Callback callback; + private LinkRelevance link; +// private long startTime; +// private long finishTime; + + public RequestTask(LinkRelevance url, Callback callback) { + this.link = url; + this.callback = callback; + } + + @Override + public FetchedResult call() { + + final Payload payload = new Payload(); // Payload is used as a temporary storage + payload.put(PAYLOAD_KEY, link); + + try { + FetchedResult result = fetcher.fetch(new HttpGet(), link.getURL().toString(), payload); + distpatchThreadPool.submit(new FetchFinishedHandler(result, callback, null)); + return result; + } catch (BaseFetchException e) { + distpatchThreadPool.submit(new FetchFinishedHandler(null, callback, e)); + return null; + } + } + + } + + private final class FetchFinishedHandler implements Runnable { + + private FetchedResult response; + private Callback callback; + private BaseFetchException exception; + + public FetchFinishedHandler(FetchedResult response, Callback callback, BaseFetchException exception) { + this.response = response; + this.callback = callback; + this.exception = exception; + } + + @Override + public void run() { + if(callback != null) { + if(exception != null) { + callback.failed(exception.getUrl(), exception); + } else { + callback.completed(response); + } + } + numberOfDownloads.decrementAndGet(); + } + + } + +} diff --git a/src/main/java/focusedCrawler/link/DownloadScheduler.java b/src/main/java/focusedCrawler/link/DownloadScheduler.java new file mode 100644 index 000000000..0272882b0 --- /dev/null +++ b/src/main/java/focusedCrawler/link/DownloadScheduler.java @@ -0,0 +1,166 @@ +package focusedCrawler.link; + +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import focusedCrawler.util.LinkRelevance; + +public class DownloadScheduler { + + private static class DomainNode { + + final String domainName; + final Deque links; + long lastAccessTime; + + public DomainNode(String domainName, long lastAccessTime) { + this.domainName = domainName; + this.links = new LinkedList<>(); + this.lastAccessTime = lastAccessTime; + } + + } + + private final PriorityQueue domainsQueue; + private final PriorityQueue emptyDomainsQueue; + private final Map domains; + private final long minimumAccessTime; + private final int maxLinksInScheduler; + + private AtomicInteger numberOfLinks = new AtomicInteger(0); + + public DownloadScheduler(int minimumAccessTimeInterval, int maxLinksInScheduler) { + this.minimumAccessTime = minimumAccessTimeInterval; + this.maxLinksInScheduler = maxLinksInScheduler; + this.domains = new HashMap<>(); + this.emptyDomainsQueue = createDomainPriorityQueue(); + this.domainsQueue = createDomainPriorityQueue(); + } + + private PriorityQueue createDomainPriorityQueue() { + int initialCapacity = 10; + return new PriorityQueue(initialCapacity, new Comparator() { + @Override + public int compare(DomainNode o1, DomainNode o2) { + return Long.compare(o1.lastAccessTime, o2.lastAccessTime); + } + }); + } + + public void addLink(LinkRelevance link) { + numberOfLinks.incrementAndGet(); + + removeExpiredNodes(); + + while(numberOfLinks() > maxLinksInScheduler) { + // block until number of links is lower than max number of links + try { + Thread.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while adding link.", e); + } + } + + String domainName = link.getTopLevelDomainName(); + + synchronized(this) { + DomainNode domainNode = domains.get(domainName); + if(domainNode == null) { + domainNode = new DomainNode(domainName, 0l); + domains.put(domainName, domainNode); + } + + if(domainNode.links.isEmpty()) { + emptyDomainsQueue.remove(domainNode); + domainsQueue.add(domainNode); + } + + domainNode.links.addLast(link); + } + + } + + private synchronized void removeExpiredNodes() { + while(true) { + DomainNode node = emptyDomainsQueue.peek(); + if(node == null) { + break; + } + + long expirationTime = node.lastAccessTime + minimumAccessTime; + if(System.currentTimeMillis() > expirationTime) { + emptyDomainsQueue.poll(); + domains.remove(node.domainName); + } else { + break; + } + } + } + + public LinkRelevance nextLink() { + + long expirationTime; + LinkRelevance linkRelevance; + long waitTime = 0; + + synchronized(this) { + DomainNode domainNode = domainsQueue.poll(); + if(domainNode == null) { + return null; + } + + linkRelevance = domainNode.links.removeFirst(); + + if(domainNode.links.isEmpty()) { + emptyDomainsQueue.add(domainNode); + } else { + domainsQueue.add(domainNode); + } + + expirationTime = domainNode.lastAccessTime + minimumAccessTime; + long now = System.currentTimeMillis(); + waitTime = expirationTime - now; + + if(waitTime > 0) { + domainNode.lastAccessTime = now + waitTime; + } else { + domainNode.lastAccessTime = now; + } + } + + if(waitTime > 0) { + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + throw new RuntimeException(getClass()+" interrupted.", e); + } + } + + numberOfLinks.decrementAndGet(); + + return linkRelevance; + } + + public int numberOfNonExpiredDomains() { + removeExpiredNodes(); + return domains.size(); + } + + public int numberOfEmptyDomains() { + return emptyDomainsQueue.size(); + } + + public int numberOfLinks() { + return numberOfLinks.get(); + } + + public boolean hasPendingLinks() { + return numberOfLinks() > 0; + } + +} diff --git a/src/main/java/focusedCrawler/link/LinkStorage.java b/src/main/java/focusedCrawler/link/LinkStorage.java index d9d3c8f6d..9a8578d0b 100644 --- a/src/main/java/focusedCrawler/link/LinkStorage.java +++ b/src/main/java/focusedCrawler/link/LinkStorage.java @@ -21,8 +21,8 @@ ## ############################################################################ */ -package focusedCrawler.link; - +package focusedCrawler.link; + import java.io.IOException; import org.slf4j.Logger; @@ -40,6 +40,8 @@ import focusedCrawler.link.frontier.FrontierManager; import focusedCrawler.link.frontier.FrontierManagerFactory; import focusedCrawler.link.frontier.FrontierPersistentException; +import focusedCrawler.util.DataNotFoundException; +import focusedCrawler.util.LinkRelevance; import focusedCrawler.util.Page; import focusedCrawler.util.parser.SimpleWrapper; import focusedCrawler.util.storage.Storage; @@ -174,10 +176,15 @@ public synchronized Object insert(Object obj) throws StorageException { /** * This method sends a link to crawler + * @throws DataNotFoundException */ - public synchronized Object select(Object obj) throws StorageException { + public synchronized Object select(Object obj) throws StorageException, DataNotFoundException { try { - return frontierManager.nextURL(); + LinkRelevance nextURL = frontierManager.nextURL(); + if(nextURL == null) { + throw new DataNotFoundException("Frontier run out of links."); + } + return nextURL; } catch (FrontierPersistentException e) { throw new StorageException(e.getMessage(), e); } diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 6638b63d5..582460528 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -30,6 +30,9 @@ --> + + + diff --git a/src/test/java/focusedCrawler/config/ConfigServiceTest.java b/src/test/java/focusedCrawler/config/ConfigServiceTest.java index 03f735047..566588208 100644 --- a/src/test/java/focusedCrawler/config/ConfigServiceTest.java +++ b/src/test/java/focusedCrawler/config/ConfigServiceTest.java @@ -10,7 +10,7 @@ import org.junit.Before; import org.junit.Test; -import focusedCrawler.crawler.CrawlerManagerConfig; +import focusedCrawler.crawler.async.AsyncCrawler; import focusedCrawler.link.LinkStorageConfig; public class ConfigServiceTest { @@ -96,26 +96,23 @@ public void shouldReadLinkStorageConfig() throws IOException { } @Test - public void shouldReadCrawlerManagerConfig() throws IOException { + public void shouldReadCrawlerConfig() throws IOException { // given ConfigService configService = new ConfigService(configFilePath); // when - CrawlerManagerConfig config = configService.getCrawlerManagerConfig(); + AsyncCrawler.Config config = configService.getCrawlerConfig(); // then assertThat(config, is(notNullValue())); - assertThat(config.getRobotThreadGroup(), is("crawler_group_test")); - assertThat(config.getRobotManagerRestingTime(), is(11111L)); - assertThat(config.getRobotManagerSleepCheckTime(), is(11111L)); - assertThat(config.getRobotManagerMaxTime(), is(11111L)); - - assertThat(config.getRobotManagerRobotErrorTime(), is(222222L)); - assertThat(config.getRobotManagerRobotThreadFactor(), is(222222)); - assertThat(config.getRobotQuantity(), is(222222)); - assertThat(config.getDownloaderMaxBlockedThreads(), is(222222L)); - + assertThat(config.getHostMinAccessInterval(), is(123)); + assertThat(config.getMaxLinksInScheduler(), is(234)); + assertThat(config.getDownloaderConfig().getDownloadThreadPoolSize(), is(333)); + assertThat(config.getDownloaderConfig().getMaxRetryCount(), is(444)); + assertThat(config.getDownloaderConfig().getUserAgentName(), is("TestAgent")); + assertThat(config.getDownloaderConfig().getUserAgentUrl(), is("http://www.test-agent-crawler-example.com/robot")); + assertThat(config.getDownloaderConfig().getValidMimeTypes()[0], is("test/mimetype")); } } diff --git a/src/test/java/focusedCrawler/crawler/DownloaderTest.java b/src/test/java/focusedCrawler/crawler/DownloaderTest.java deleted file mode 100644 index 7c3076092..000000000 --- a/src/test/java/focusedCrawler/crawler/DownloaderTest.java +++ /dev/null @@ -1,52 +0,0 @@ -package focusedCrawler.crawler; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.junit.Assert.assertThat; - -import java.net.URL; - -import org.junit.Test; - -public class DownloaderTest { - - @Test - public void shouldReturnOriginalAndRedirectedUrl() throws Exception { - // given - URL originalUrl = new URL("http://onesource.thomsonreuters.com/"); - URL expectedRedirectedUrl = new URL("https://tax.thomsonreuters.com/products/brands/onesource"); - - // when - Downloader downloader = new Downloader(originalUrl); - - // then - assertThat(downloader.isRedirection(), is(true)); - assertThat(downloader.getOriginalUrl(), is(originalUrl)); - assertThat(downloader.getRedirectionUrl(), is(expectedRedirectedUrl)); - assertThat(downloader.getMimeType(), containsString("text/html")); - } - - @Test - public void shouldWorkWhenRedirectionsDoesntHappen() throws Exception { - // given - URL originalUrl = new URL("http://www.nyu.edu"); - - // when - Downloader downloader = new Downloader(originalUrl); - - // then - assertThat(downloader.isRedirection(), is(false)); - assertThat(downloader.getOriginalUrl(), is(originalUrl)); - assertThat(downloader.getRedirectionUrl(), is(nullValue())); - assertThat(downloader.getMimeType(), containsString("text/html")); - } - - @Test - public void shouldExtractMimeTypeWhenAvailable() throws Exception { - Downloader downloader = new Downloader("http://www.youtube.com/user/SamaritansPurseVideo"); - // Downloader downloader = new Downloader("http://www.nyu.edu"); - assertThat(downloader.getMimeType(), containsString("text/html")); - } - -} \ No newline at end of file diff --git a/src/test/java/focusedCrawler/crawler/async/HttpDownloaderTest.java b/src/test/java/focusedCrawler/crawler/async/HttpDownloaderTest.java new file mode 100644 index 000000000..254b98dcd --- /dev/null +++ b/src/test/java/focusedCrawler/crawler/async/HttpDownloaderTest.java @@ -0,0 +1,202 @@ +package focusedCrawler.crawler.async; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; + +import crawlercommons.fetcher.FetchedResult; + +public class HttpDownloaderTest { + + static class TestWebServerBuilder { + + private static final int port = 8345; + private static final String address = "http://localhost:"+port; + private HttpServer server; + + public TestWebServerBuilder() throws IOException { + server = HttpServer.create(new InetSocketAddress("localhost", port), 0); + } + + public TestWebServerBuilder withHandler(String path, HttpHandler handler) { + server.createContext(path, handler); + return this; + } + + private HttpServer start() { + server.setExecutor(null); // creates a default executor + server.start(); + return server; + } + } + + static class OkHandler implements HttpHandler { + + private final String responseContent; + + public OkHandler(String responseContent) { + this.responseContent = responseContent; + } + + @Override + public void handle(HttpExchange t) throws IOException { + t.getResponseHeaders().add("Content-Type", "text/html; charset=utf-8"); + t.sendResponseHeaders(HttpURLConnection.HTTP_OK, responseContent.getBytes().length); + OutputStream os = t.getResponseBody(); + os.write(responseContent.getBytes()); + os.close(); + t.close(); + } + } + + static class RedirectionHandler implements HttpHandler { + + private String newLocation; + + public RedirectionHandler(String newLocation) { + this.newLocation = newLocation; + } + + @Override + public void handle(HttpExchange t) throws IOException { + t.getResponseHeaders().add("Location", newLocation); + t.sendResponseHeaders(HttpURLConnection.HTTP_MOVED_PERM, 0); + t.close(); + } + } + + private HttpDownloader downloader; + + @Before + public void setUp() { + this.downloader = new HttpDownloader(); + } + + @Test + public void shouldFollowRedirections() throws Exception { + // given + HttpServer httpServer = new TestWebServerBuilder() + .withHandler("/index.html", new RedirectionHandler("/new/location.html")) + .withHandler("/new/location.html", new OkHandler("Hello world!")) + .start(); + + String originalUrl = TestWebServerBuilder.address+"/index.html"; + String expectedRedirectedUrl = TestWebServerBuilder.address+"/new/location.html"; + + // when + FetchedResult result = downloader.dipatchDownload(originalUrl).get(); + + // then + assertThat(result.getNumRedirects(), is(1)); + assertThat(result.getBaseUrl(), is(originalUrl)); + assertThat(result.getFetchedUrl(), is(expectedRedirectedUrl)); + assertThat(result.getNewBaseUrl(), is(expectedRedirectedUrl)); + assertThat(result.getStatusCode(), is(200)); + assertThat(result.getReasonPhrase(), is("OK")); + assertThat(result.getContentType(), is("text/html; charset=utf-8")); + assertThat(result.getContent(), is("Hello world!".getBytes())); + + httpServer.stop(0); + } + + @Test + public void shouldDownloadPageContentAndMetadata() throws Exception { + // given + String responseContent = "Hello world!"; + String originalUrl = TestWebServerBuilder.address+"/index.html"; + HttpServer httpServer = new TestWebServerBuilder() + .withHandler("/index.html", new OkHandler(responseContent)) + .start(); + + // when + FetchedResult result = downloader.dipatchDownload(originalUrl).get(); + + // then + assertThat(result.getNumRedirects(), is(0)); + assertThat(result.getBaseUrl(), is(originalUrl)); + assertThat(result.getFetchedUrl(), is(originalUrl)); + assertThat(result.getNewBaseUrl(), is(nullValue())); + assertThat(result.getStatusCode(), is(200)); + assertThat(result.getReasonPhrase(), is("OK")); + assertThat(result.getContentType(), is("text/html; charset=utf-8")); + assertThat(result.getContent(), is(responseContent.getBytes())); + + httpServer.stop(0); + } + + + @Test + public void shouldDownloadMultipleUrlsInParallel() throws Exception { + // given + String originalUrl = TestWebServerBuilder.address+"/index.html"; + HttpServer httpServer = new TestWebServerBuilder() + .withHandler("/index.html", new OkHandler("Hello world!")) + .start(); + + // when + List> results = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Future futureResult = downloader.dipatchDownload(originalUrl); + results.add(futureResult); + } + + // then + for (Future future : results) { + assertThat(future.get().getStatusCode(), is(200)); + } + + httpServer.stop(0); + } + + @Test + public void shouldCallCompletedCallbackAfterDownloadFinishes() throws Exception { + // given + String originalUrl = TestWebServerBuilder.address+"/index.html"; + HttpServer httpServer = new TestWebServerBuilder() + .withHandler("/index.html", new OkHandler("Hello world!")) + .start(); + final int numberOfRequests = 5; + final AtomicInteger requestsFinished = new AtomicInteger(0); + + // when + for (int i = 0; i < numberOfRequests; i++) { + downloader.dipatchDownload(new URL(originalUrl), new HttpDownloader.Callback() { + @Override + public void failed(String url, Exception e) { + } + @Override + public void completed(FetchedResult result) { + // increment counter when download finishes + requestsFinished.incrementAndGet(); + } + }); + } + while(downloader.hasPendingDownloads()) { + // wait until all downloads are finished + Thread.sleep(5); + } + + // then + assertThat(requestsFinished.get(), is(numberOfRequests)); + + httpServer.stop(0); + } + +} \ No newline at end of file diff --git a/src/test/java/focusedCrawler/link/DownloadSchedulerTest.java b/src/test/java/focusedCrawler/link/DownloadSchedulerTest.java new file mode 100644 index 000000000..f91e60665 --- /dev/null +++ b/src/test/java/focusedCrawler/link/DownloadSchedulerTest.java @@ -0,0 +1,140 @@ +package focusedCrawler.link; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +import focusedCrawler.util.LinkRelevance; + +public class DownloadSchedulerTest { + + @Test + public void shouldSelectLinksBasedOnPolitenes() throws Exception { + LinkRelevance l1 = new LinkRelevance("http://ex1.com/", 1); + LinkRelevance l2 = new LinkRelevance("http://ex2.com/", 2); + + LinkRelevance l3 = new LinkRelevance("http://ex1.com/3", 3); + LinkRelevance l4 = new LinkRelevance("http://ex2.com/4", 4); + LinkRelevance l5 = new LinkRelevance("http://ex3.com/5", 5); + + int minimumAccessTime = 500; + int maxLinksInScheduler = 100; + DownloadScheduler scheduler = new DownloadScheduler(minimumAccessTime, maxLinksInScheduler); + + scheduler.addLink(l1); + + assertThat(scheduler.numberOfLinks(), is(1)); + assertThat(scheduler.nextLink(), is(l1)); + + assertThat(scheduler.numberOfLinks(), is(0)); + assertThat(scheduler.nextLink(), is(nullValue())); + + assertThat(scheduler.numberOfNonExpiredDomains(), is(1)); + assertThat(scheduler.numberOfEmptyDomains(), is(1)); + + scheduler.addLink(l2); + + assertThat(scheduler.numberOfLinks(), is(1)); + + assertThat(scheduler.nextLink(), is(l2)); + + assertThat(scheduler.numberOfLinks(), is(0)); + assertThat(scheduler.nextLink(), is(nullValue())); + + assertThat(scheduler.numberOfNonExpiredDomains(), is(2)); + assertThat(scheduler.numberOfEmptyDomains(), is(2)); + + scheduler.addLink(l3); + scheduler.addLink(l4); + scheduler.addLink(l5); + + assertThat(scheduler.numberOfEmptyDomains(), is(0)); + assertThat(scheduler.numberOfNonExpiredDomains(), is(3)); + + assertThat(scheduler.nextLink(), is(l5)); + assertThat(scheduler.nextLink(), is(l3)); + assertThat(scheduler.nextLink(), is(l4)); + assertThat(scheduler.numberOfEmptyDomains(), is(3)); + + // should remove expired domains automatically + Thread.sleep(minimumAccessTime+10); + assertThat(scheduler.numberOfNonExpiredDomains(), is(0)); + + scheduler.addLink(l1); + + assertThat(scheduler.nextLink(), is(l1)); + assertThat(scheduler.numberOfNonExpiredDomains(), is(1)); + assertThat(scheduler.numberOfEmptyDomains(), is(1)); + + assertThat(scheduler.nextLink(), is(nullValue())); + assertThat(scheduler.numberOfEmptyDomains(), is(1)); + } + + @Test + public void addLinksShouldBlockWhenMaxNumberOfLinksIsReached() throws Exception { + LinkRelevance l1 = new LinkRelevance("http://ex1.com/", 1); + LinkRelevance l2 = new LinkRelevance("http://ex2.com/", 2); + + int minimumAccessTime = 100; + int maxLinksInScheduler = 1; + + final DownloadScheduler scheduler = new DownloadScheduler(minimumAccessTime, maxLinksInScheduler); + final int removeAfterTime = 500; + + Thread removeThread = new Thread() { + public void run() { + try { + Thread.sleep(removeAfterTime); + scheduler.nextLink(); + } catch (InterruptedException e) { + throw new RuntimeException("Test interrupted."); + } + }; + }; + + long t0 = System.currentTimeMillis(); + scheduler.addLink(l1); + long t1 = System.currentTimeMillis(); + removeThread.start(); + scheduler.addLink(l2); + long t2 = System.currentTimeMillis(); + + boolean addTime1 = t1 - t0 < removeAfterTime; + assertThat(addTime1, is(true)); + + boolean addTime2 = t2 - t1 > removeAfterTime; + assertThat(addTime2, is(true)); + } + + @Test + public void nextLinksShouldWaitMinimumAccessTimeOfDomain() throws Exception { + // given + LinkRelevance l1 = new LinkRelevance("http://ex1.com/1", 1); + LinkRelevance l2 = new LinkRelevance("http://ex1.com/2", 2); + + int minimumAccessTime = 500; + int maxLinksInScheduler = 10; + + final DownloadScheduler scheduler = new DownloadScheduler(minimumAccessTime, maxLinksInScheduler); + scheduler.addLink(l1); + scheduler.addLink(l2); + + // when + long t0 = System.currentTimeMillis(); + scheduler.nextLink(); + long t1 = System.currentTimeMillis(); + scheduler.nextLink(); + long t2 = System.currentTimeMillis(); + + // then + boolean nextLinkTime1 = t1 - t0 < minimumAccessTime; + assertThat(nextLinkTime1, is(true)); + + int recordTimeErrorMargin = 5; // sometimes the recorded times outside the scheduler + // can be slightly different from actual ones + boolean nextLinkTime2 = t2 - t1 > (minimumAccessTime - recordTimeErrorMargin); + assertThat(nextLinkTime2, is(true)); + } +} diff --git a/src/test/resources/focusedCrawler/config/ache.yml b/src/test/resources/focusedCrawler/config/ache.yml index 18974749a..49b229116 100644 --- a/src/test/resources/focusedCrawler/config/ache.yml +++ b/src/test/resources/focusedCrawler/config/ache.yml @@ -126,15 +126,12 @@ link_storage.backsurfer.pattern_end_title: "\",\"uu\":" # Configurations for Crawler Manager # -crawler_manager.robot_mananger.thread_group: crawler_group_test -crawler_manager.robot_mananger.resting_time: 11111 -crawler_manager.robot_mananger.check_time: 11111 -crawler_manager.robot_mananger.max_time: 11111 -crawler_manager.robot_mananger.robot_error_sleep_time: 222222 -crawler_manager.robot_mananger.thread_factor: 222222 -crawler_manager.robot_mananger.quantity: 222222 - -crawler_manager.downloader.max_blocked_threads: 222222 - -#crawler_manager.downloader.user_agent: "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT 4.0)" -#crawler_manager.downloader.request_accept: "Accept | */*" +crawler_manager.scheduler.host_min_access_interval: 123 +crawler_manager.scheduler.max_links: 234 +crawler_manager.downloader.download_thread_pool_size: 333 +crawler_manager.downloader.max_retry_count: 444 +crawler_manager.downloader.user_agent.name: TestAgent +crawler_manager.downloader.user_agent.url: http://www.test-agent-crawler-example.com/robot +crawler_manager.downloader.valid_mime_types: + - test/mimetype + - text/html