diff --git a/README.md b/README.md index 99823fe..bd13964 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,4 @@ -# Simple Producer Consumer example for Akka in scala - -This is a rewrite in scala of the original project by Florian Hopf: -http://blog.florian-hopf.de/2012/08/getting-rid-of-synchronized-using-akka.html -https://github.com/fhopf/akka-crawler-example - +# Simple Producer Consumer example for Akka in Java This repository contains 3 examples of a simple web crawler: * A sequential example @@ -16,8 +11,4 @@ To start the simple actor execution run gradle runActors To start the parallel page fetching run gradle runParallelActors -The code is only meant as an example on how to implement a producer consumer example in Akka. -For more information visit http://blog.florian-hopf.de/2012/08/getting-rid-of-synchronized-using-akka.html - - -TODO (SCALA): refactoring code in order to use the best scala constructs. \ No newline at end of file +The code is only meant as an example on how to implement a producer consumer example in Akka. For more information visit http://blog.florian-hopf.de/2012/08/getting-rid-of-synchronized-using-akka.html diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..1ab037b --- /dev/null +++ b/build.gradle @@ -0,0 +1,36 @@ +apply plugin: 'java' +apply plugin: 'maven' + +repositories { + mavenCentral() + maven { + url "http://repo.typesafe.com/typesafe/releases/" + } +} + +dependencies { + compile group: 'org.apache.lucene', name: 'lucene-core', version: '3.6.0' + compile group: 'com.typesafe.akka', name: 'akka-actor', version: '2.0' + compile group: 'org.slf4j', name: 'slf4j-api', version: '1.6.4' + compile group: 'org.htmlparser', name: 'htmlparser', version: '1.6' + compile group: 'org.perf4j', name: 'perf4j', version: '0.9.16' + runtime group: 'org.slf4j', name: 'slf4j-simple', version: '1.6.4' + testCompile group: 'junit', name: 'junit', version: '4.10' +} + +task runSequential(type: JavaExec) { + main = 'de.fhopf.akka.sequential.SequentialExecution' + classpath = sourceSets.main.runtimeClasspath +} + +task runActors(type: JavaExec) { + main = 'de.fhopf.akka.actor.SimpleActorExecution' + classpath = sourceSets.main.runtimeClasspath +} + +task runParallelActors(type: JavaExec) { + main = 'de.fhopf.akka.actor.parallel.FetchInParallelExecution' + classpath = sourceSets.main.runtimeClasspath +} + + diff --git a/build.sbt b/build.sbt deleted file mode 100644 index dc8bc64..0000000 --- a/build.sbt +++ /dev/null @@ -1,21 +0,0 @@ -name := "akka-crawler-example-scala" - -version := "1.0" - -scalaVersion := "2.9.2" - -retrieveManaged := true - -libraryDependencies += "org.apache.lucene" % "lucene-core" % "3.6.1" - -libraryDependencies += "com.typesafe.akka" % "akka-actor_2.10.0-RC5" % "2.1.0-RC6" - -libraryDependencies += "org.slf4j" % "slf4j-api" % "1.6.4" - -libraryDependencies += "org.htmlparser" % "htmlparser" % "2.1" - -libraryDependencies += "org.perf4j" % "perf4j" % "0.9.16" - -libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4" - -libraryDependencies += "junit" % "junit" % "4.10" diff --git a/nb-configuration.xml b/nb-configuration.xml new file mode 100644 index 0000000..ec4540c --- /dev/null +++ b/nb-configuration.xml @@ -0,0 +1,18 @@ + + + + + + none + + diff --git a/project/sbteclipse.sbt b/project/sbteclipse.sbt deleted file mode 100644 index 50fa738..0000000 --- a/project/sbteclipse.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.0.0") \ No newline at end of file diff --git a/src/main/java/de/fhopf/akka/Execution.java b/src/main/java/de/fhopf/akka/Execution.java new file mode 100644 index 0000000..ca11dcd --- /dev/null +++ b/src/main/java/de/fhopf/akka/Execution.java @@ -0,0 +1,13 @@ +package de.fhopf.akka; + +import org.apache.lucene.index.IndexWriter; + +/** + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public interface Execution { + + public void downloadAndIndex(String path, IndexWriter writer); + +} diff --git a/src/main/java/de/fhopf/akka/Executor.java b/src/main/java/de/fhopf/akka/Executor.java new file mode 100644 index 0000000..36a9b1f --- /dev/null +++ b/src/main/java/de/fhopf/akka/Executor.java @@ -0,0 +1,109 @@ +package de.fhopf.akka; + +import java.io.File; +import java.io.IOException; +import java.util.logging.Level; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.util.Version; +import org.perf4j.LoggingStopWatch; +import org.perf4j.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class that provides the index writer, logs the duration and checks the + * document count. Concrete + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class Executor { + + private final Execution execution; + private final Logger logger = LoggerFactory.getLogger(Executor.class); + + public Executor(Execution execution) { + this.execution = execution; + } + + public void execute(String path) { + IndexWriter writer = null; + IndexSearcher searcher = null; + try { + File indexDir = new File(System.getProperty("java.io.tmpdir"), "index-" + System.currentTimeMillis()); + writer = openWriter(indexDir); + + StopWatch stopWatch = new LoggingStopWatch(); + execution.downloadAndIndex(path, writer); + stopWatch.stop(execution.getClass().getSimpleName()); + + searcher = openSearcher(indexDir); + TopDocs result = searcher.search(new MatchAllDocsQuery(), 100); + + logger.info("Found {} results", result.totalHits); + + for(ScoreDoc scoreDoc: result.scoreDocs) { + Document doc = searcher.doc(scoreDoc.doc); + logger.debug(doc.get("id")); + } + + searcher.close(); + } catch (Exception ex) { + logger.error(ex.getMessage(), ex); + if (writer != null) { + try { + writer.rollback(); + } catch (IOException ex1) { + logger.error(ex1.getMessage(), ex1); + } + } + } finally { + if (writer != null) { + try { + writer.close(); + } catch (CorruptIndexException ex) { + logger.error(ex.getMessage(), ex); + } catch (IOException ex) { + logger.error(ex.getMessage(), ex); + } + } + if (searcher != null) { + try { + searcher.close(); + } catch (IOException ex) { + logger.error(ex.getMessage(), ex); + } + } + } + + + } + + private IndexWriter openWriter(File indexDir) throws CorruptIndexException, LockObtainFailedException, IOException { + + Directory dir = FSDirectory.open(indexDir); + + IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_35, new StandardAnalyzer(Version.LUCENE_35)); + + return new IndexWriter(dir, config); + } + + private IndexSearcher openSearcher(File indexDir) throws CorruptIndexException, IOException { + + Directory dir = FSDirectory.open(indexDir); + IndexReader reader = IndexReader.open(dir); + + return new IndexSearcher(reader); + } +} diff --git a/src/main/java/de/fhopf/akka/HtmlParserPageRetriever.java b/src/main/java/de/fhopf/akka/HtmlParserPageRetriever.java new file mode 100644 index 0000000..52f43ec --- /dev/null +++ b/src/main/java/de/fhopf/akka/HtmlParserPageRetriever.java @@ -0,0 +1,87 @@ +package de.fhopf.akka; + +import java.util.ArrayList; +import java.util.List; +import org.htmlparser.Parser; +import org.htmlparser.Tag; +import org.htmlparser.tags.BodyTag; +import org.htmlparser.tags.LinkTag; +import org.htmlparser.tags.TitleTag; +import org.htmlparser.util.ParserException; +import org.htmlparser.visitors.NodeVisitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retrieves the content using HtmlParser. + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class HtmlParserPageRetriever implements PageRetriever { + + private final String baseUrl; + private static final Logger logger = LoggerFactory.getLogger(HtmlParserPageRetriever.class); + + public HtmlParserPageRetriever(String baseUrl) { + this.baseUrl = baseUrl; + } + + @Override + public PageContent fetchPageContent(String url) { + logger.debug("Fetching {}", url); + try { + Parser parser = new Parser(url); + PageContentVisitor visitor = new PageContentVisitor(baseUrl, url); + parser.visitAllNodesWith(visitor); + + return visitor.getContent(); + } catch (ParserException ex) { + throw new IllegalStateException(ex); + } + } + + + private static class PageContentVisitor extends NodeVisitor { + + private List linksToVisit = new ArrayList(); + private String content; + private String title; + + private final String baseUrl; + private final String currentUrl; + + public PageContentVisitor(String baseUrl, String currentUrl) { + super(true); + this.baseUrl = baseUrl; + this.currentUrl = currentUrl; + + } + + @Override + public void visitTag(Tag tag) { + if (tag instanceof LinkTag) { + LinkTag linkTag = (LinkTag) tag; + if (linkTag.getLink().startsWith(baseUrl) && isProbablyHtml(linkTag.getLink())) { + logger.debug("Using link pointing to {}", linkTag.getLink()); + linksToVisit.add(linkTag.getLink()); + } else { + logger.debug("Skipping link pointing to {}", linkTag.getLink()); + } + } else if (tag instanceof TitleTag) { + TitleTag titleTag = (TitleTag) tag; + title = titleTag.getTitle(); + } else if (tag instanceof BodyTag) { + BodyTag bodyTag = (BodyTag) tag; + content = bodyTag.toPlainTextString(); + } + } + + public PageContent getContent() { + return new PageContent(currentUrl, linksToVisit, title, content); + } + + private boolean isProbablyHtml(String link) { + return link.endsWith(".html") || link.endsWith("/"); + } + } +} diff --git a/src/main/java/de/fhopf/akka/Indexer.java b/src/main/java/de/fhopf/akka/Indexer.java new file mode 100644 index 0000000..31166fe --- /dev/null +++ b/src/main/java/de/fhopf/akka/Indexer.java @@ -0,0 +1,18 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package de.fhopf.akka; + +/** + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public interface Indexer { + + void commit(); + + void index(PageContent pageContent); + + void close(); +} diff --git a/src/main/java/de/fhopf/akka/IndexerImpl.java b/src/main/java/de/fhopf/akka/IndexerImpl.java new file mode 100644 index 0000000..4b70eff --- /dev/null +++ b/src/main/java/de/fhopf/akka/IndexerImpl.java @@ -0,0 +1,63 @@ +package de.fhopf.akka; + +import java.io.IOException; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexWriter; + +/** + * Indexer Impl, contaijns writer state. + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class IndexerImpl implements Indexer { + + private final IndexWriter indexWriter; + + public IndexerImpl(IndexWriter indexWriter) { + this.indexWriter = indexWriter; + } + + @Override + public void index(PageContent pageContent) { + try { + indexWriter.addDocument(toDocument(pageContent)); + } catch (CorruptIndexException ex) { + throw new IllegalStateException(ex); + } catch (IOException ex) { + throw new IllegalStateException(ex); + } + } + + private Document toDocument(PageContent content) { + Document doc = new Document(); + doc.add(new Field("id", content.getPath(), Field.Store.YES, Field.Index.NOT_ANALYZED)); + doc.add(new Field("title", content.getTitle(), Field.Store.YES, Field.Index.ANALYZED)); + doc.add(new Field("content", content.getContent(), Field.Store.NO, Field.Index.ANALYZED)); + return doc; + } + + @Override + public void commit() { + try { + indexWriter.commit(); + } catch (CorruptIndexException ex) { + throw new IllegalStateException(ex); + } catch (IOException ex) { + throw new IllegalStateException(ex); + } + } + + @Override + public void close() { + try { + indexWriter.close(true); + } catch (CorruptIndexException ex) { + throw new IllegalStateException(ex); + } catch (IOException ex) { + throw new IllegalStateException(ex); + } + + } + +} diff --git a/src/main/java/de/fhopf/akka/PageContent.java b/src/main/java/de/fhopf/akka/PageContent.java new file mode 100644 index 0000000..72946b5 --- /dev/null +++ b/src/main/java/de/fhopf/akka/PageContent.java @@ -0,0 +1,45 @@ +package de.fhopf.akka; + +import java.util.List; + +/** + * The content for a certain page. + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class PageContent { + + private final List linksToFollow; + private final String title; + private final String content; + private final String path; + + public PageContent(String path, List linksToFollow, String title, String content) { + this.path = path; + this.linksToFollow = linksToFollow; + this.title = title; + this.content = content; + } + + public String getContent() { + return content; + } + + public List getLinksToFollow() { + return linksToFollow; + } + + public String getTitle() { + return title; + } + + public String getPath() { + return path; + } + + @Override + public String toString() { + return "PageContent{title=" + title + ", content=" + content + ", linksToFollow=" + linksToFollow + "}"; + } + + +} diff --git a/src/main/java/de/fhopf/akka/PageRetriever.java b/src/main/java/de/fhopf/akka/PageRetriever.java new file mode 100644 index 0000000..27e5c4b --- /dev/null +++ b/src/main/java/de/fhopf/akka/PageRetriever.java @@ -0,0 +1,15 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package de.fhopf.akka; + +/** + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public interface PageRetriever { + + PageContent fetchPageContent(String url); + +} diff --git a/src/main/java/de/fhopf/akka/VisitedPageStore.java b/src/main/java/de/fhopf/akka/VisitedPageStore.java new file mode 100644 index 0000000..5b58be0 --- /dev/null +++ b/src/main/java/de/fhopf/akka/VisitedPageStore.java @@ -0,0 +1,56 @@ +package de.fhopf.akka; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * Stores the information on which pages have already been retrieved. + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class VisitedPageStore { + + private Set pagesToVisit = new HashSet(); + private Set allPages = new HashSet(); + private Set inProgress = new HashSet(); + + public void add(String page) { + if (!allPages.contains(page)) { + pagesToVisit.add(page); + allPages.add(page); + } + } + + public void addAll(Collection pages) { + for (String page: pages) { + add(page); + } + } + + public void finished(String page) { + inProgress.remove(page); + } + + public String getNext() { + if (pagesToVisit.isEmpty()) { + return null; + } else { + String next = pagesToVisit.iterator().next(); + pagesToVisit.remove(next); + inProgress.add(next); + return next; + } + } + + public Collection getNextBatch() { + Set pages = new HashSet(); + pages.addAll(pagesToVisit); + pagesToVisit.clear(); + inProgress.addAll(pages); + return pages; + } + + public boolean isFinished() { + return pagesToVisit.isEmpty() && inProgress.isEmpty(); + } +} diff --git a/src/main/java/de/fhopf/akka/actor/IndexedMessage.java b/src/main/java/de/fhopf/akka/actor/IndexedMessage.java new file mode 100644 index 0000000..2d505ab --- /dev/null +++ b/src/main/java/de/fhopf/akka/actor/IndexedMessage.java @@ -0,0 +1,15 @@ +package de.fhopf.akka.actor; + +/** + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public final class IndexedMessage { + + public final String path; + + public IndexedMessage(String path) { + this.path = path; + } + +} diff --git a/src/main/java/de/fhopf/akka/actor/IndexingActor.java b/src/main/java/de/fhopf/akka/actor/IndexingActor.java new file mode 100644 index 0000000..3980e7c --- /dev/null +++ b/src/main/java/de/fhopf/akka/actor/IndexingActor.java @@ -0,0 +1,36 @@ +package de.fhopf.akka.actor; + +import akka.actor.UntypedActor; +import de.fhopf.akka.Indexer; +import de.fhopf.akka.PageContent; + +/** + * Indexes pages in Lucene. + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class IndexingActor extends UntypedActor { + + public static final Object COMMIT_MESSAGE = new Object(); + public static final Object COMMITTED_MESSAGE = new Object(); + + private final Indexer indexer; + + public IndexingActor(Indexer indexer) { + this.indexer = indexer; + } + + @Override + public void onReceive(Object o) throws Exception { + if (o instanceof PageContent) { + PageContent content = (PageContent) o; + indexer.index(content); + getSender().tell(new IndexedMessage(content.getPath()), getSelf()); + } else if (COMMIT_MESSAGE == o) { + indexer.commit(); + getSender().tell(COMMITTED_MESSAGE, getSelf()); + } else { + unhandled(o); + } + } + +} diff --git a/src/main/java/de/fhopf/akka/actor/Master.java b/src/main/java/de/fhopf/akka/actor/Master.java new file mode 100644 index 0000000..ce1d6f0 --- /dev/null +++ b/src/main/java/de/fhopf/akka/actor/Master.java @@ -0,0 +1,70 @@ +package de.fhopf.akka.actor; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; + +import de.fhopf.akka.PageContent; +import de.fhopf.akka.VisitedPageStore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; + + +/** + * Master actor that coordinates the two actors. + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public abstract class Master extends UntypedActor { + + private final Logger logger = LoggerFactory.getLogger(Master.class); + private final VisitedPageStore visitedPageStore = new VisitedPageStore(); + private final CountDownLatch countDownLatch; + + protected Master(final CountDownLatch latch) { + + this.countDownLatch = latch; + } + + @Override + public void onReceive(Object message) throws Exception { + + if (message instanceof String) { + // start + String start = (String) message; + visitedPageStore.add(start); + getParser().tell(visitedPageStore.getNext(), getSelf()); + } else if (message instanceof PageContent) { + PageContent content = (PageContent) message; + getIndexer().tell(content, getSelf()); + visitedPageStore.addAll(content.getLinksToFollow()); + + if (visitedPageStore.isFinished()) { + getIndexer().tell(IndexingActor.COMMIT_MESSAGE, getSelf()); + } else { + for (String page : visitedPageStore.getNextBatch()) { + getParser().tell(page, getSelf()); + } + } + } else if (message instanceof IndexedMessage) { + IndexedMessage indexedMessage = (IndexedMessage) message; + visitedPageStore.finished(indexedMessage.path); + + if (visitedPageStore.isFinished()) { + getIndexer().tell(IndexingActor.COMMIT_MESSAGE, getSelf()); + } + } else if (message == IndexingActor.COMMITTED_MESSAGE) { + logger.info("Shutting down, finished"); + getContext().system().shutdown(); + countDownLatch.countDown(); + } + } + + + protected abstract ActorRef getIndexer(); + + + protected abstract ActorRef getParser(); +} diff --git a/src/main/java/de/fhopf/akka/actor/PageParsingActor.java b/src/main/java/de/fhopf/akka/actor/PageParsingActor.java new file mode 100644 index 0000000..9c53747 --- /dev/null +++ b/src/main/java/de/fhopf/akka/actor/PageParsingActor.java @@ -0,0 +1,33 @@ +package de.fhopf.akka.actor; + +import akka.actor.UntypedActor; +import de.fhopf.akka.PageContent; +import de.fhopf.akka.PageRetriever; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Downloads pages and passes these on to the next actor. + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class PageParsingActor extends UntypedActor { + + private final PageRetriever pageRetriever; + + private final Logger logger = LoggerFactory.getLogger(PageParsingActor.class); + + public PageParsingActor(PageRetriever pageRetriever) { + this.pageRetriever = pageRetriever; + } + + @Override + public void onReceive(Object o) throws Exception { + if (o instanceof String) { + PageContent content = pageRetriever.fetchPageContent((String) o); + getSender().tell(content, getSelf()); + } else { + // fail on any message we don't expect + unhandled(o); + } + } +} diff --git a/src/main/java/de/fhopf/akka/actor/SimpleActorExecution.java b/src/main/java/de/fhopf/akka/actor/SimpleActorExecution.java new file mode 100644 index 0000000..c7914d6 --- /dev/null +++ b/src/main/java/de/fhopf/akka/actor/SimpleActorExecution.java @@ -0,0 +1,43 @@ +package de.fhopf.akka.actor; + +import akka.actor.*; +import de.fhopf.akka.Execution; +import de.fhopf.akka.Executor; +import de.fhopf.akka.HtmlParserPageRetriever; +import java.util.concurrent.CountDownLatch; +import org.apache.lucene.index.IndexWriter; + +/** + * Runs the example with one actor of each. + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class SimpleActorExecution implements Execution { + + @Override + public void downloadAndIndex(final String path, final IndexWriter writer) { + ActorSystem actorSystem = ActorSystem.create(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + ActorRef master = actorSystem.actorOf(new Props(new UntypedActorFactory() { + + @Override + public Actor create() { + return new SimpleActorMaster(new HtmlParserPageRetriever(path), writer, countDownLatch); + } + })); + + master.tell(path); + try { + countDownLatch.await(); + actorSystem.shutdown(); + } catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + } + + public static void main(String[] args) { + SimpleActorExecution execution = new SimpleActorExecution(); + Executor exec = new Executor(execution); + exec.execute("http://www.synyx.de/"); + } +} diff --git a/src/main/java/de/fhopf/akka/actor/SimpleActorMaster.java b/src/main/java/de/fhopf/akka/actor/SimpleActorMaster.java new file mode 100644 index 0000000..90af4cf --- /dev/null +++ b/src/main/java/de/fhopf/akka/actor/SimpleActorMaster.java @@ -0,0 +1,60 @@ +package de.fhopf.akka.actor; + +import akka.actor.Actor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActorFactory; + +import de.fhopf.akka.IndexerImpl; +import de.fhopf.akka.PageRetriever; + +import org.apache.lucene.index.IndexWriter; + +import java.util.concurrent.CountDownLatch; + + +/** + * Works with one indexer and one page parser. + * @author Florian Hopf, http://www.florian-hopf.de + */ +class SimpleActorMaster extends Master { + + private final ActorRef indexer; + private final ActorRef parser; + + public SimpleActorMaster(final PageRetriever pageRetriever, final IndexWriter indexWriter, + final CountDownLatch latch) { + + super(latch); + this.indexer = getContext().actorOf(new Props(new UntypedActorFactory() { + + @Override + public Actor create() { + + return new IndexingActor(new IndexerImpl(indexWriter)); + } + })); + + this.parser = getContext().actorOf(new Props(new UntypedActorFactory() { + + @Override + public Actor create() { + + return new PageParsingActor(pageRetriever); + } + })); + } + + @Override + protected ActorRef getIndexer() { + + return indexer; + } + + + @Override + protected ActorRef getParser() { + + return parser; + } +} diff --git a/src/main/java/de/fhopf/akka/actor/parallel/FetchInParallelExecution.java b/src/main/java/de/fhopf/akka/actor/parallel/FetchInParallelExecution.java new file mode 100644 index 0000000..d5a70a8 --- /dev/null +++ b/src/main/java/de/fhopf/akka/actor/parallel/FetchInParallelExecution.java @@ -0,0 +1,42 @@ +package de.fhopf.akka.actor.parallel; + +import akka.actor.*; +import de.fhopf.akka.Execution; +import de.fhopf.akka.Executor; +import de.fhopf.akka.HtmlParserPageRetriever; +import java.util.concurrent.CountDownLatch; +import org.apache.lucene.index.IndexWriter; + +/** + * Uses multiple actors for fetching and parsing pages. + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class FetchInParallelExecution implements Execution { + + @Override + public void downloadAndIndex(final String path, final IndexWriter writer) { + ActorSystem actorSystem = ActorSystem.create(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + ActorRef master = actorSystem.actorOf(new Props(new UntypedActorFactory() { + + @Override + public Actor create() { + return new ParallelMaster(writer, new HtmlParserPageRetriever(path), countDownLatch); + } + })); + master.tell(path); + try { + countDownLatch.await(); + actorSystem.shutdown(); + } catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + } + + public static void main(String[] args) { + FetchInParallelExecution execution = new FetchInParallelExecution(); + Executor exec = new Executor(execution); + exec.execute("http://www.synyx.de/"); + } + +} diff --git a/src/main/java/de/fhopf/akka/actor/parallel/ParallelMaster.java b/src/main/java/de/fhopf/akka/actor/parallel/ParallelMaster.java new file mode 100644 index 0000000..6fb1011 --- /dev/null +++ b/src/main/java/de/fhopf/akka/actor/parallel/ParallelMaster.java @@ -0,0 +1,70 @@ +package de.fhopf.akka.actor.parallel; + +import akka.actor.Actor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActorFactory; + +import akka.routing.RoundRobinRouter; + +import de.fhopf.akka.IndexerImpl; +import de.fhopf.akka.PageRetriever; +import de.fhopf.akka.actor.IndexingActor; +import de.fhopf.akka.actor.Master; +import de.fhopf.akka.actor.PageParsingActor; + +import org.apache.lucene.index.IndexWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; + + +/** + * Fetches and parses pages in parallel. + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +class ParallelMaster extends Master { + + private final Logger logger = LoggerFactory.getLogger(ParallelMaster.class); + + private final ActorRef parser; + private final ActorRef indexer; + + public ParallelMaster(final IndexWriter indexWriter, final PageRetriever pageRetriever, + final CountDownLatch latch) { + + super(latch); + parser = getContext().actorOf(new Props(new UntypedActorFactory() { + + @Override + public Actor create() { + + return new PageParsingActor(pageRetriever); + } + }).withRouter(new RoundRobinRouter(10))); + indexer = getContext().actorOf(new Props(new UntypedActorFactory() { + + @Override + public Actor create() { + + return new IndexingActor(new IndexerImpl(indexWriter)); + } + })); + } + + @Override + protected ActorRef getIndexer() { + + return indexer; + } + + + @Override + protected ActorRef getParser() { + + return parser; + } +} diff --git a/src/main/java/de/fhopf/akka/sequential/SequentialExecution.java b/src/main/java/de/fhopf/akka/sequential/SequentialExecution.java new file mode 100644 index 0000000..74ec2f7 --- /dev/null +++ b/src/main/java/de/fhopf/akka/sequential/SequentialExecution.java @@ -0,0 +1,39 @@ +package de.fhopf.akka.sequential; + +import de.fhopf.akka.*; +import de.fhopf.akka.Execution; +import de.fhopf.akka.Executor; +import org.apache.lucene.index.IndexWriter; + +/** + * Indexes pages sequentially. + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class SequentialExecution implements Execution { + + @Override + public void downloadAndIndex(String path, IndexWriter writer) { + VisitedPageStore pageStore = new VisitedPageStore(); + pageStore.add(path); + + Indexer indexer = new IndexerImpl(writer); + PageRetriever retriever = new HtmlParserPageRetriever(path); + + String page; + while ((page = pageStore.getNext()) != null) { + PageContent pageContent = retriever.fetchPageContent(page); + pageStore.addAll(pageContent.getLinksToFollow()); + indexer.index(pageContent); + pageStore.finished(page); + } + + indexer.commit(); + } + + public static void main(String[] args) { + SequentialExecution execution = new SequentialExecution(); + Executor exec = new Executor(execution); + exec.execute("http://www.synyx.de/"); + } + +} diff --git a/src/main/scala/de/fhopf/akka/Execution.scala b/src/main/scala/de/fhopf/akka/Execution.scala deleted file mode 100644 index ab32cda..0000000 --- a/src/main/scala/de/fhopf/akka/Execution.scala +++ /dev/null @@ -1,11 +0,0 @@ -package de.fhopf.akka - -import org.apache.lucene.index.IndexWriter - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -abstract class Execution { - def downloadAndIndex(path: String, writer: IndexWriter) -} \ No newline at end of file diff --git a/src/main/scala/de/fhopf/akka/Executor.scala b/src/main/scala/de/fhopf/akka/Executor.scala deleted file mode 100644 index 2dce494..0000000 --- a/src/main/scala/de/fhopf/akka/Executor.scala +++ /dev/null @@ -1,98 +0,0 @@ -package de.fhopf.akka - -import java.io.File -import java.io.IOException -import java.util.logging.Level -import org.apache.lucene.analysis.standard.StandardAnalyzer -import org.apache.lucene.document.Document -import org.apache.lucene.index.CorruptIndexException -import org.apache.lucene.index.IndexReader -import org.apache.lucene.index.IndexWriter -import org.apache.lucene.index.IndexWriterConfig -import org.apache.lucene.search.IndexSearcher -import org.apache.lucene.search.MatchAllDocsQuery -import org.apache.lucene.search.ScoreDoc -import org.apache.lucene.search.TopDocs -import org.apache.lucene.store.Directory -import org.apache.lucene.store.FSDirectory -import org.apache.lucene.store.LockObtainFailedException -import org.apache.lucene.util.Version -import org.perf4j.LoggingStopWatch -import org.perf4j.StopWatch -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class Executor(execution: Execution) { - - private var logger: Logger = LoggerFactory.getLogger(classOf[Executor]) - - def execute(path: String) = { - var writer: IndexWriter = null - var searcher: IndexSearcher = null - try { - var indexDir: File = new File(System.getProperty("java.io.tmpdir"), "index-" + System.currentTimeMillis()) - writer = openWriter(indexDir) - - val stopWatch: StopWatch = new LoggingStopWatch() - execution.downloadAndIndex(path, writer) - stopWatch.stop(execution.getClass().getSimpleName()) - - searcher = openSearcher(indexDir) - val result: TopDocs = searcher.search(new MatchAllDocsQuery(), 100) - - logger.info("Found {} results", result.totalHits) - - for (scoreDoc <- result.scoreDocs) { - val doc: Document = searcher.doc(scoreDoc.doc) - logger.debug(doc.get("id")) - } - - searcher.close() - - } catch { - case ex => - logger.error(ex.getMessage(), ex) - if (writer != null) { - try { - writer.rollback() - } catch { - case ex1: IOException => - logger.error(ex1.getMessage(), ex1) - } - } - } finally { - if (writer != null) { - try { - writer.close() - } catch { - case ex: CorruptIndexException => logger.error(ex.getMessage(), ex) - case ex: IOException => logger.error(ex.getMessage(), ex) - } - } - if (searcher != null) { - try { - searcher.close() - } catch { - case ex: IOException => logger.error(ex.getMessage(), ex) - } - } - } - - } - - private def openWriter(indexDir: File): IndexWriter = { - val dir: Directory = FSDirectory.open(indexDir) - val config: IndexWriterConfig = new IndexWriterConfig(Version.LUCENE_35, new StandardAnalyzer(Version.LUCENE_35)) - new IndexWriter(dir, config) - } - - private def openSearcher(indexDir: File): IndexSearcher = { - val dir: Directory = FSDirectory.open(indexDir) - val reader: IndexReader = IndexReader.open(dir) - new IndexSearcher(reader) - } -} diff --git a/src/main/scala/de/fhopf/akka/HtmlParserPageRetriever.scala b/src/main/scala/de/fhopf/akka/HtmlParserPageRetriever.scala deleted file mode 100644 index cabd93b..0000000 --- a/src/main/scala/de/fhopf/akka/HtmlParserPageRetriever.scala +++ /dev/null @@ -1,74 +0,0 @@ -package de.fhopf.akka - -import java.util.ArrayList -import java.util.List -import org.htmlparser.Parser -import org.htmlparser.Tag -import org.htmlparser.tags.BodyTag -import org.htmlparser.tags.LinkTag -import org.htmlparser.tags.TitleTag -import org.htmlparser.util.ParserException -import org.htmlparser.visitors.NodeVisitor -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class HtmlParserPageRetriever(baseUrl: String) extends PageRetriever { - - private val logger: Logger = LoggerFactory.getLogger(classOf[HtmlParserPageRetriever]) - - override def fetchPageContent(url: String): PageContent = { - logger.debug("Fetching {}", url) - try { - val parser: Parser = new Parser(url) - val visitor: PageContentVisitor = new PageContentVisitor(baseUrl, url) - parser.visitAllNodesWith(visitor) - visitor.getContent() - } catch { - case ex: ParserException => throw new IllegalStateException(ex) - } - } - - class PageContentVisitor(recursive: Boolean) extends NodeVisitor(recursive) { - - private val linksToVisit: List[String] = new ArrayList[String]() - private var content: String = "" - private var title: String = "" - - private var baseUrl: String = "" - private var currentUrl: String = "" - - def this(baseUrl: String, currentUrl: String) { - this(true) - this.baseUrl = baseUrl - this.currentUrl = currentUrl - - } - - override def visitTag(tag: Tag) = { - tag match { - case linkTag: LinkTag => { - if (linkTag.getLink().startsWith(baseUrl) && isProbablyHtml(linkTag.getLink())) { - logger.debug("Using link pointing to {}", linkTag.getLink()) - linksToVisit.add(linkTag.getLink()) - } else { - logger.debug("Skipping link pointing to {}", linkTag.getLink()) - } - } - case titleTag: TitleTag => { - title = titleTag.getTitle() - } - case bodyTag: BodyTag => { - content = bodyTag.toPlainTextString() - } - } - } - - def getContent(): PageContent = new PageContent(currentUrl, linksToVisit, title, content) - - private def isProbablyHtml(link: String): Boolean = link.endsWith(".html") || link.endsWith("/") - } -} diff --git a/src/main/scala/de/fhopf/akka/Indexer.scala b/src/main/scala/de/fhopf/akka/Indexer.scala deleted file mode 100644 index 103cfd8..0000000 --- a/src/main/scala/de/fhopf/akka/Indexer.scala +++ /dev/null @@ -1,11 +0,0 @@ -package de.fhopf.akka; - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -abstract class Indexer { - def commit() - def index(pageContent: PageContent) - def close() -} diff --git a/src/main/scala/de/fhopf/akka/IndexerImpl.scala b/src/main/scala/de/fhopf/akka/IndexerImpl.scala deleted file mode 100644 index ec149cd..0000000 --- a/src/main/scala/de/fhopf/akka/IndexerImpl.scala +++ /dev/null @@ -1,50 +0,0 @@ -package de.fhopf.akka - -import java.io.IOException -import org.apache.lucene.document.Document -import org.apache.lucene.document.Field -import org.apache.lucene.index.CorruptIndexException -import org.apache.lucene.index.IndexWriter - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class IndexerImpl(indexWriter: IndexWriter) extends Indexer { - - override def index(pageContent: PageContent) = { - try { - indexWriter.addDocument(toDocument(pageContent)) - } catch { - case ex: CorruptIndexException => throw new IllegalStateException(ex) - case ex: IOException => throw new IllegalStateException(ex) - } - } - - private def toDocument(content: PageContent): Document = { - val doc: Document = new Document() - doc.add(new Field("id", content.getPath(), Field.Store.YES, Field.Index.NOT_ANALYZED)) - doc.add(new Field("title", content.getTitle(), Field.Store.YES, Field.Index.ANALYZED)) - doc.add(new Field("content", content.getContent(), Field.Store.NO, Field.Index.ANALYZED)) - doc - } - - override def commit() { - try { - indexWriter.commit() - } catch { - case ex: CorruptIndexException => throw new IllegalStateException(ex) - case ex: IOException => throw new IllegalStateException(ex) - } - } - - override def close() { - try { - indexWriter.close(true) - } catch { - case ex: CorruptIndexException => throw new IllegalStateException(ex) - case ex: IOException => throw new IllegalStateException(ex) - } - } - -} diff --git a/src/main/scala/de/fhopf/akka/PageContent.scala b/src/main/scala/de/fhopf/akka/PageContent.scala deleted file mode 100644 index 5697dd4..0000000 --- a/src/main/scala/de/fhopf/akka/PageContent.scala +++ /dev/null @@ -1,21 +0,0 @@ -package de.fhopf.akka - -import java.util.List - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class PageContent(path: String, linksToFollow: List[String], title: String, content: String) { - - def getContent(): String = content - - def getLinksToFollow(): List[String] = linksToFollow - - def getTitle(): String = title - - def getPath(): String = path - - override def toString() = "PageContent{title=" + title + ", content=" + content + ", linksToFollow=" + linksToFollow + "}" - -} diff --git a/src/main/scala/de/fhopf/akka/PageRetriever.scala b/src/main/scala/de/fhopf/akka/PageRetriever.scala deleted file mode 100644 index b755e9f..0000000 --- a/src/main/scala/de/fhopf/akka/PageRetriever.scala +++ /dev/null @@ -1,9 +0,0 @@ -package de.fhopf.akka - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -abstract class PageRetriever { - def fetchPageContent(url: String): PageContent -} diff --git a/src/main/scala/de/fhopf/akka/VisitedPageStore.scala b/src/main/scala/de/fhopf/akka/VisitedPageStore.scala deleted file mode 100644 index 1ae8a1d..0000000 --- a/src/main/scala/de/fhopf/akka/VisitedPageStore.scala +++ /dev/null @@ -1,56 +0,0 @@ -package de.fhopf.akka - -import java.util.Collection -import java.util.HashSet -import java.util.Set -import collection.JavaConversions._ - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class VisitedPageStore { - - private var pagesToVisit: Set[String] = new HashSet[String]() - private var allPages: Set[String] = new HashSet[String]() - private var inProgress: Set[String] = new HashSet[String]() - - def add(page: String) = { - if (!allPages.contains(page)) { - pagesToVisit.add(page) - allPages.add(page) - } - } - - def addAll(pages: Collection[String]) = { - for (page <- pages) { - add(page) - } - } - - def finished(page: String) = { - inProgress.remove(page) - } - - def getNext() = { - if (pagesToVisit.isEmpty()) { - null - } else { - val next: String = pagesToVisit.iterator().next() - pagesToVisit.remove(next) - inProgress.add(next) - next - } - } - - def getNextBatch(): Collection[String] = { - val pages: Set[String] = new HashSet[String]() - pages.addAll(pagesToVisit) - pagesToVisit.clear() - inProgress.addAll(pages) - pages - } - - def isFinished(): Boolean = pagesToVisit.isEmpty() && inProgress.isEmpty() - -} diff --git a/src/main/scala/de/fhopf/akka/actor/COMMITTED_MESSAGE.scala b/src/main/scala/de/fhopf/akka/actor/COMMITTED_MESSAGE.scala deleted file mode 100644 index 180e991..0000000 --- a/src/main/scala/de/fhopf/akka/actor/COMMITTED_MESSAGE.scala +++ /dev/null @@ -1,9 +0,0 @@ -package de.fhopf.akka.actor - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class COMMITTED_MESSAGE { - -} \ No newline at end of file diff --git a/src/main/scala/de/fhopf/akka/actor/COMMIT_MESSAGE.scala b/src/main/scala/de/fhopf/akka/actor/COMMIT_MESSAGE.scala deleted file mode 100644 index 275f0d4..0000000 --- a/src/main/scala/de/fhopf/akka/actor/COMMIT_MESSAGE.scala +++ /dev/null @@ -1,9 +0,0 @@ -package de.fhopf.akka.actor - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class COMMIT_MESSAGE { - -} \ No newline at end of file diff --git a/src/main/scala/de/fhopf/akka/actor/IndexedMessage.scala b/src/main/scala/de/fhopf/akka/actor/IndexedMessage.scala deleted file mode 100644 index caa5904..0000000 --- a/src/main/scala/de/fhopf/akka/actor/IndexedMessage.scala +++ /dev/null @@ -1,9 +0,0 @@ -package de.fhopf.akka.actor - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class IndexedMessage(path: String) { - def getPath = path -} diff --git a/src/main/scala/de/fhopf/akka/actor/IndexingActor.scala b/src/main/scala/de/fhopf/akka/actor/IndexingActor.scala deleted file mode 100644 index 7939253..0000000 --- a/src/main/scala/de/fhopf/akka/actor/IndexingActor.scala +++ /dev/null @@ -1,28 +0,0 @@ -package de.fhopf.akka.actor - -import akka.actor.UntypedActor -import de.fhopf.akka.Indexer -import de.fhopf.akka.PageContent - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class IndexingActor(indexer: Indexer) extends UntypedActor { - - override def onReceive(message: Any) = { - message match { - case content: PageContent => { - indexer.index(content) - this.getSender().tell(new IndexedMessage(content.getPath()), this.getSelf()) - } - case _: COMMIT_MESSAGE => { - indexer.commit() - this.getSender().tell(new COMMITTED_MESSAGE, this.getSelf()) - } - case _ => { - this.unhandled(message) - } - } - } -} diff --git a/src/main/scala/de/fhopf/akka/actor/Master.scala b/src/main/scala/de/fhopf/akka/actor/Master.scala deleted file mode 100644 index 8ae332a..0000000 --- a/src/main/scala/de/fhopf/akka/actor/Master.scala +++ /dev/null @@ -1,57 +0,0 @@ -package de.fhopf.akka.actor - -import akka.actor.ActorRef -import akka.actor.UntypedActor -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import java.util.concurrent.CountDownLatch -import de.fhopf.akka._ - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -abstract class Master(latch: CountDownLatch) extends UntypedActor { - - private var logger: Logger = LoggerFactory.getLogger(classOf[Master]) - private var visitedPageStore: VisitedPageStore = new VisitedPageStore() - - override def onReceive(message: Any) = { - - message match { - case start: String => { - visitedPageStore.add(start) - getParser().tell(visitedPageStore.getNext(), getSelf()) - } - case content: PageContent => { - getIndexer().tell(content, getSelf()) - visitedPageStore.addAll(content.getLinksToFollow()) - if (visitedPageStore.isFinished()) { - getIndexer().tell(new COMMIT_MESSAGE(), getSelf()) - } else { - // TODO - // for (String page : visitedPageStore.getNextBatch()) { - // getParser().tell(page, getSelf()) - // } - } - - } - case indexedMessage: IndexedMessage => { - visitedPageStore.finished(indexedMessage.getPath) - if (visitedPageStore.isFinished()) - getIndexer().tell(new COMMIT_MESSAGE(), getSelf()) - - } - case _: COMMITTED_MESSAGE => { - logger.info("Shutting down, finished") - getContext().system.shutdown() - latch.countDown() - } - } - - } - - protected def getIndexer(): ActorRef - - protected def getParser(): ActorRef -} diff --git a/src/main/scala/de/fhopf/akka/actor/PageParsingActor.scala b/src/main/scala/de/fhopf/akka/actor/PageParsingActor.scala deleted file mode 100644 index 3af077a..0000000 --- a/src/main/scala/de/fhopf/akka/actor/PageParsingActor.scala +++ /dev/null @@ -1,28 +0,0 @@ -package de.fhopf.akka.actor - -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import akka.actor.UntypedActor -import akka.actor.actorRef2Scala -import de.fhopf.akka.PageContent -import de.fhopf.akka.PageRetriever - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class PageParsingActor(pageRetriever: PageRetriever) extends UntypedActor { - - private val logger: Logger = LoggerFactory.getLogger(classOf[PageParsingActor]) - - override def onReceive(message: Any) = { - message match { - case msg: String => { - val content: PageContent = pageRetriever.fetchPageContent(msg) - sender ! (content, self) - } - case _ => unhandled(message) - } - } -} diff --git a/src/main/scala/de/fhopf/akka/actor/SimpleActorMaster.scala b/src/main/scala/de/fhopf/akka/actor/SimpleActorMaster.scala deleted file mode 100644 index e3a6142..0000000 --- a/src/main/scala/de/fhopf/akka/actor/SimpleActorMaster.scala +++ /dev/null @@ -1,46 +0,0 @@ -package de.fhopf.akka.actor - -import java.util.concurrent.CountDownLatch -import akka.actor.ActorRef -import de.fhopf.akka.PageRetriever -import org.apache.lucene.index.IndexWriter -import akka.actor.Props -import akka.actor.UntypedActorFactory -import scala.actors.Actor -import de.fhopf.akka.IndexerImpl - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - * @param pageRetriever - * @param indexWriter - * @param latch - */ -class SimpleActorMaster(latch: CountDownLatch) extends Master(latch) { - - private var indexer: ActorRef = null - private var parser: ActorRef = null - - def this() { this(null) } - - def this(pageRetriever: PageRetriever, indexWriter: IndexWriter, latch: CountDownLatch) = { - - this(latch) - - this.indexer = getContext().actorOf(new Props(new UntypedActorFactory() { - override def create() = new IndexingActor(new IndexerImpl(indexWriter)) - })) - - this.parser = getContext().actorOf(new Props(new UntypedActorFactory() { - override def create() = new PageParsingActor(pageRetriever) - })) - - } - - protected def getIndexer(): ActorRef = indexer - - protected def getParser(): ActorRef = parser - - // def onReceive(arg0: Object): Unit = { } - -} \ No newline at end of file diff --git a/src/main/scala/de/fhopf/akka/actor/parallel/ParallelMaster.scala b/src/main/scala/de/fhopf/akka/actor/parallel/ParallelMaster.scala deleted file mode 100644 index 82430c3..0000000 --- a/src/main/scala/de/fhopf/akka/actor/parallel/ParallelMaster.scala +++ /dev/null @@ -1,49 +0,0 @@ -package de.fhopf.akka.actor.parallel - -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.UntypedActorFactory - -import akka.routing.RoundRobinRouter - -import de.fhopf.akka.IndexerImpl -import de.fhopf.akka.PageRetriever -import de.fhopf.akka.actor.IndexingActor -import de.fhopf.akka.actor.Master -import de.fhopf.akka.actor.PageParsingActor - -import org.apache.lucene.index.IndexWriter - -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import java.util.concurrent.CountDownLatch - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class ParallelMaster(latch: CountDownLatch) extends Master(latch) { - - private val logger: Logger = LoggerFactory.getLogger(classOf[ParallelMaster]) - - private var parser: ActorRef = null - private var indexer: ActorRef = null - - def this(indexWriter: IndexWriter, pageRetriever: PageRetriever, latch: CountDownLatch) = { - - this(latch) - parser = getContext().actorOf(new Props(new UntypedActorFactory() { - override def create(): Actor = new PageParsingActor(pageRetriever) - }).withRouter(new RoundRobinRouter(10))) - indexer = getContext().actorOf(new Props(new UntypedActorFactory() { - override def create(): Actor = new IndexingActor(new IndexerImpl(indexWriter)) - })) - } - - protected override def getIndexer(): ActorRef = indexer - - protected override def getParser(): ActorRef = return parser - -} diff --git a/src/main/scala/de/fhopf/akka/parallel/FetchInParallelExecution.scala b/src/main/scala/de/fhopf/akka/parallel/FetchInParallelExecution.scala deleted file mode 100644 index 698bc4a..0000000 --- a/src/main/scala/de/fhopf/akka/parallel/FetchInParallelExecution.scala +++ /dev/null @@ -1,37 +0,0 @@ -package de.fhopf.akka.actor.parallel - -import akka.actor._ -import de.fhopf.akka.Execution -import de.fhopf.akka.Executor -import de.fhopf.akka.HtmlParserPageRetriever -import java.util.concurrent.CountDownLatch -import org.apache.lucene.index.IndexWriter - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class FetchInParallelExecution extends Execution { - - override def downloadAndIndex(path: String, writer: IndexWriter) = { - val actorSystem: ActorSystem = ActorSystem.create() - val countDownLatch: CountDownLatch = new CountDownLatch(1) - val master: ActorRef = actorSystem.actorOf(new Props(new UntypedActorFactory() { - override def create(): Actor = new ParallelMaster(writer, new HtmlParserPageRetriever(path), countDownLatch) - })) - master.tell(path) - try { - countDownLatch.await() - actorSystem.shutdown() - } catch { - case ex: InterruptedException => throw new IllegalStateException(ex) - } - } - - def main(args: Array[String]) { - val execution: FetchInParallelExecution = new FetchInParallelExecution() - val exec: Executor = new Executor(execution) - exec.execute("http://www.synyx.de/") - } - -} diff --git a/src/main/scala/de/fhopf/akka/sequential/SequentialExecution.scala b/src/main/scala/de/fhopf/akka/sequential/SequentialExecution.scala deleted file mode 100644 index b02b32c..0000000 --- a/src/main/scala/de/fhopf/akka/sequential/SequentialExecution.scala +++ /dev/null @@ -1,39 +0,0 @@ -package de.fhopf.akka.sequential - -import de.fhopf.akka._ -import de.fhopf.akka.Execution -import de.fhopf.akka.Executor -import org.apache.lucene.index.IndexWriter - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class SequentialExecution extends Execution { - - override def downloadAndIndex(path: String, writer: IndexWriter) = { - - val pageStore: VisitedPageStore = new VisitedPageStore() - pageStore.add(path) - val indexer: Indexer = new IndexerImpl(writer) - val retriever: PageRetriever = new HtmlParserPageRetriever(path) - - var page = "" - // TODO: check the correct values here - while ((page = pageStore.getNext()) != None) { - val pageContent: PageContent = retriever.fetchPageContent(page) - pageStore.addAll(pageContent.getLinksToFollow()) - indexer.index(pageContent) - pageStore.finished(page) - } - - indexer.commit() - } - - def main(args: Array[String]) { - val execution: SequentialExecution = new SequentialExecution() - val exec: Executor = new Executor(execution) - exec.execute("http://www.synyx.de/") - } - -} diff --git a/src/test/java/de/fhopf/akka/HtmlParserPageRetrieverTest.java b/src/test/java/de/fhopf/akka/HtmlParserPageRetrieverTest.java new file mode 100644 index 0000000..c8f2d9a --- /dev/null +++ b/src/test/java/de/fhopf/akka/HtmlParserPageRetrieverTest.java @@ -0,0 +1,31 @@ +package de.fhopf.akka; + +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class HtmlParserPageRetrieverTest { + + private HtmlParserPageRetriever pageRetriever = new HtmlParserPageRetriever("http://www.synyx.de"); + + private final Logger logger = LoggerFactory.getLogger(HtmlParserPageRetrieverTest.class); + + @Test + public void contentIsExtracted() { + PageContent content = pageRetriever.fetchPageContent("http://www.synyx.de/unternehmen/"); + + assertNotNull(content); + assertNotNull(content.getContent()); + assertNotNull(content.getTitle()); + // TODO add more meaningful assertions + logger.info(content.toString()); + //System.out.println(content); + } + +} diff --git a/src/test/java/de/fhopf/akka/HtmlParserTest.java b/src/test/java/de/fhopf/akka/HtmlParserTest.java new file mode 100644 index 0000000..1278bf9 --- /dev/null +++ b/src/test/java/de/fhopf/akka/HtmlParserTest.java @@ -0,0 +1,33 @@ +package de.fhopf.akka; + +import org.htmlparser.Node; +import org.htmlparser.Parser; +import org.htmlparser.tags.LinkTag; +import org.htmlparser.util.ParserException; +import org.htmlparser.visitors.ObjectFindingVisitor; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * Experiment with some of the html parser functionality. + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class HtmlParserTest { + + @Test + public void testLinkExtraction() throws ParserException { + Parser parser = new Parser("http://synyx.de"); + ObjectFindingVisitor visitor = new ObjectFindingVisitor(LinkTag.class); + parser.visitAllNodesWith(visitor); + Node[] links = visitor.getTags(); + // TODO this could use some more meaningful assertions + assertTrue(links.length > 0); + for (int i = 0; i < links.length; i++) { + LinkTag linkTag = (LinkTag) links[i]; + System.out.print("\"" + linkTag.getLinkText() + "\" => "); + System.out.println(linkTag.getLink()); + } + } +} diff --git a/src/test/java/de/fhopf/akka/IndexerImplTest.java b/src/test/java/de/fhopf/akka/IndexerImplTest.java new file mode 100644 index 0000000..b665aa4 --- /dev/null +++ b/src/test/java/de/fhopf/akka/IndexerImplTest.java @@ -0,0 +1,43 @@ +package de.fhopf.akka; + +import java.io.IOException; +import java.util.ArrayList; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.index.*; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.Version; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * + * @author Florian Hopf, http://www.florian-hopf.de + */ +public class IndexerImplTest { + + @Test + public void pageContentIsFoundAfterCommit() throws CorruptIndexException, LockObtainFailedException, IOException { + Directory index = new RAMDirectory(); + IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_35, new StandardAnalyzer(Version.LUCENE_35)); + IndexWriter writer = new IndexWriter(index, config); + IndexerImpl indexerImpl = new IndexerImpl(writer); + PageContent content = new PageContent("http://path", new ArrayList(), "This is the title", "This is the content"); + indexerImpl.index(content); + indexerImpl.commit(); + + IndexReader reader = IndexReader.open(index); + IndexSearcher searcher = new IndexSearcher(reader); + + TermQuery query = new TermQuery(new Term("content", "content")); + TopDocs result = searcher.search(query, 10); + + assertEquals(1, result.totalHits); + } + +} diff --git a/src/test/scala/de/fhopf/akka/HtmlParserPageRetrieverTest.scala b/src/test/scala/de/fhopf/akka/HtmlParserPageRetrieverTest.scala deleted file mode 100644 index 9cc5cc8..0000000 --- a/src/test/scala/de/fhopf/akka/HtmlParserPageRetrieverTest.scala +++ /dev/null @@ -1,27 +0,0 @@ -package de.fhopf.akka - -import org.junit.Test - -import org.junit.Assert.assertNotNull -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class HtmlParserPageRetrieverTest { - - private val pageRetriever: HtmlParserPageRetriever = new HtmlParserPageRetriever("http://www.synyx.de") - private val logger: Logger = LoggerFactory.getLogger(classOf[HtmlParserPageRetrieverTest]) - - @Test def contentIsExtracted() { - val content: PageContent = pageRetriever.fetchPageContent("http://www.synyx.de/unternehmen/") - assertNotNull(content) - assertNotNull(content.getContent()) - assertNotNull(content.getTitle()) - logger.info(content.toString()) - println(content) - } - -} diff --git a/src/test/scala/de/fhopf/akka/HtmlParserTest.scala b/src/test/scala/de/fhopf/akka/HtmlParserTest.scala deleted file mode 100644 index ecb04c3..0000000 --- a/src/test/scala/de/fhopf/akka/HtmlParserTest.scala +++ /dev/null @@ -1,33 +0,0 @@ -package de.fhopf.akka - -import org.htmlparser.Node -import org.htmlparser.Parser -import org.htmlparser.tags.LinkTag -import org.htmlparser.util.ParserException -import org.htmlparser.visitors.ObjectFindingVisitor -import org.junit.Test -import org.junit.Assert.assertTrue - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class HtmlParserTest { - - @Test - def testLinkExtraction() = { - val parser: Parser = new Parser("http://synyx.de") - val visitor: ObjectFindingVisitor = new ObjectFindingVisitor(classOf[LinkTag]) - parser.visitAllNodesWith(visitor) - val links: Array[Node] = visitor.getTags() - assertTrue(links.length > 0) - - for (i <- 0 until links.length) { - val linkTag = links { i }.asInstanceOf[LinkTag] - print("\"" + linkTag.getLinkText() + "\" => ") - println(linkTag.getLink()) - } - - } - -} diff --git a/src/test/scala/de/fhopf/akka/IndexerImplTest.scala b/src/test/scala/de/fhopf/akka/IndexerImplTest.scala deleted file mode 100644 index d28d800..0000000 --- a/src/test/scala/de/fhopf/akka/IndexerImplTest.scala +++ /dev/null @@ -1,39 +0,0 @@ -package de.fhopf.akka -import java.util.ArrayList -import org.apache.lucene.analysis.standard.StandardAnalyzer -import org.apache.lucene.index._ -import org.apache.lucene.search.IndexSearcher -import org.apache.lucene.search.TermQuery -import org.apache.lucene.search.TopDocs -import org.apache.lucene.store.Directory -import org.apache.lucene.store.RAMDirectory -import org.apache.lucene.util.Version -import org.junit.Test -import org.junit.Assert.assertEquals -import org.junit.Test - -/** - * @author Florian Hopf, http://www.florian-hopf.de - * @author Alfredo Serafini, http://www.seralf.it - */ -class IndexerImplTest { - - @Test - def pageContentIsFoundAfterCommit() = { - - val index: Directory = new RAMDirectory() - val config: IndexWriterConfig = new IndexWriterConfig(Version.LUCENE_35, new StandardAnalyzer(Version.LUCENE_35)) - val writer: IndexWriter = new IndexWriter(index, config) - val indexerImpl: IndexerImpl = new IndexerImpl(writer) - val content: PageContent = new PageContent("http://path", new ArrayList[String](), "This is the title", "This is the content") - indexerImpl.index(content) - indexerImpl.commit() - val reader: IndexReader = IndexReader.open(index) - val searcher: IndexSearcher = new IndexSearcher(reader) - val query: TermQuery = new TermQuery(new Term("content", "content")) - val result: TopDocs = searcher.search(query, 10) - assertEquals(1, result.totalHits) - - } - -}