Skip to content

Commit

Permalink
Revert "Merge pull request #1 from seralf/scala-version"
Browse files Browse the repository at this point in the history
This reverts commit a7d4a9b, reversing
changes made to 5978baf.
  • Loading branch information
fhopf committed Jan 11, 2013
1 parent a7d4a9b commit c074168
Show file tree
Hide file tree
Showing 46 changed files with 977 additions and 773 deletions.
13 changes: 2 additions & 11 deletions README.md
@@ -1,9 +1,4 @@
# Simple Producer Consumer example for Akka in scala

This is a rewrite in scala of the original project by Florian Hopf:
http://blog.florian-hopf.de/2012/08/getting-rid-of-synchronized-using-akka.html
https://github.com/fhopf/akka-crawler-example

# Simple Producer Consumer example for Akka in Java

This repository contains 3 examples of a simple web crawler:
* A sequential example
Expand All @@ -16,8 +11,4 @@ To start the simple actor execution run gradle runActors

To start the parallel page fetching run gradle runParallelActors

The code is only meant as an example on how to implement a producer consumer example in Akka.
For more information visit http://blog.florian-hopf.de/2012/08/getting-rid-of-synchronized-using-akka.html


TODO (SCALA): refactoring code in order to use the best scala constructs.
The code is only meant as an example on how to implement a producer consumer example in Akka. For more information visit http://blog.florian-hopf.de/2012/08/getting-rid-of-synchronized-using-akka.html
36 changes: 36 additions & 0 deletions build.gradle
@@ -0,0 +1,36 @@
apply plugin: 'java'
apply plugin: 'maven'

repositories {
mavenCentral()
maven {
url "http://repo.typesafe.com/typesafe/releases/"
}
}

dependencies {
compile group: 'org.apache.lucene', name: 'lucene-core', version: '3.6.0'
compile group: 'com.typesafe.akka', name: 'akka-actor', version: '2.0'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.6.4'
compile group: 'org.htmlparser', name: 'htmlparser', version: '1.6'
compile group: 'org.perf4j', name: 'perf4j', version: '0.9.16'
runtime group: 'org.slf4j', name: 'slf4j-simple', version: '1.6.4'
testCompile group: 'junit', name: 'junit', version: '4.10'
}

task runSequential(type: JavaExec) {
main = 'de.fhopf.akka.sequential.SequentialExecution'
classpath = sourceSets.main.runtimeClasspath
}

task runActors(type: JavaExec) {
main = 'de.fhopf.akka.actor.SimpleActorExecution'
classpath = sourceSets.main.runtimeClasspath
}

task runParallelActors(type: JavaExec) {
main = 'de.fhopf.akka.actor.parallel.FetchInParallelExecution'
classpath = sourceSets.main.runtimeClasspath
}


21 changes: 0 additions & 21 deletions build.sbt

This file was deleted.

18 changes: 18 additions & 0 deletions nb-configuration.xml
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project-shared-configuration>
<!--
This file contains additional configuration written by modules in the NetBeans IDE.
The configuration is intended to be shared among all the users of project and
therefore it is assumed to be part of version control checkout.
Without this configuration present, some functionality in the IDE may be limited or fail altogether.
-->
<properties xmlns="http://www.netbeans.org/ns/maven-properties-data/1">
<!--
Properties that influence various parts of the IDE, especially code formatting and the like.
You can copy and paste the single properties, into the pom.xml file and the IDE will pick them up.
That way multiple projects can share the same settings (useful for formatting rules for example).
Any value defined here will override the pom.xml file value but is only applicable to the current project.
-->
<netbeans.compile.on.save>none</netbeans.compile.on.save>
</properties>
</project-shared-configuration>
1 change: 0 additions & 1 deletion project/sbteclipse.sbt

This file was deleted.

13 changes: 13 additions & 0 deletions src/main/java/de/fhopf/akka/Execution.java
@@ -0,0 +1,13 @@
package de.fhopf.akka;

import org.apache.lucene.index.IndexWriter;

/**
*
* @author Florian Hopf, http://www.florian-hopf.de
*/
public interface Execution {

public void downloadAndIndex(String path, IndexWriter writer);

}
109 changes: 109 additions & 0 deletions src/main/java/de/fhopf/akka/Executor.java
@@ -0,0 +1,109 @@
package de.fhopf.akka;

import java.io.File;
import java.io.IOException;
import java.util.logging.Level;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Version;
import org.perf4j.LoggingStopWatch;
import org.perf4j.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class that provides the index writer, logs the duration and checks the
* document count. Concrete
*
* @author Florian Hopf, http://www.florian-hopf.de
*/
public class Executor {

private final Execution execution;
private final Logger logger = LoggerFactory.getLogger(Executor.class);

public Executor(Execution execution) {
this.execution = execution;
}

public void execute(String path) {
IndexWriter writer = null;
IndexSearcher searcher = null;
try {
File indexDir = new File(System.getProperty("java.io.tmpdir"), "index-" + System.currentTimeMillis());
writer = openWriter(indexDir);

StopWatch stopWatch = new LoggingStopWatch();
execution.downloadAndIndex(path, writer);
stopWatch.stop(execution.getClass().getSimpleName());

searcher = openSearcher(indexDir);
TopDocs result = searcher.search(new MatchAllDocsQuery(), 100);

logger.info("Found {} results", result.totalHits);

for(ScoreDoc scoreDoc: result.scoreDocs) {
Document doc = searcher.doc(scoreDoc.doc);
logger.debug(doc.get("id"));
}

searcher.close();
} catch (Exception ex) {
logger.error(ex.getMessage(), ex);
if (writer != null) {
try {
writer.rollback();
} catch (IOException ex1) {
logger.error(ex1.getMessage(), ex1);
}
}
} finally {
if (writer != null) {
try {
writer.close();
} catch (CorruptIndexException ex) {
logger.error(ex.getMessage(), ex);
} catch (IOException ex) {
logger.error(ex.getMessage(), ex);
}
}
if (searcher != null) {
try {
searcher.close();
} catch (IOException ex) {
logger.error(ex.getMessage(), ex);
}
}
}


}

private IndexWriter openWriter(File indexDir) throws CorruptIndexException, LockObtainFailedException, IOException {

Directory dir = FSDirectory.open(indexDir);

IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_35, new StandardAnalyzer(Version.LUCENE_35));

return new IndexWriter(dir, config);
}

private IndexSearcher openSearcher(File indexDir) throws CorruptIndexException, IOException {

Directory dir = FSDirectory.open(indexDir);
IndexReader reader = IndexReader.open(dir);

return new IndexSearcher(reader);
}
}
87 changes: 87 additions & 0 deletions src/main/java/de/fhopf/akka/HtmlParserPageRetriever.java
@@ -0,0 +1,87 @@
package de.fhopf.akka;

import java.util.ArrayList;
import java.util.List;
import org.htmlparser.Parser;
import org.htmlparser.Tag;
import org.htmlparser.tags.BodyTag;
import org.htmlparser.tags.LinkTag;
import org.htmlparser.tags.TitleTag;
import org.htmlparser.util.ParserException;
import org.htmlparser.visitors.NodeVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Retrieves the content using HtmlParser.
*
* @author Florian Hopf, http://www.florian-hopf.de
*/
public class HtmlParserPageRetriever implements PageRetriever {

private final String baseUrl;
private static final Logger logger = LoggerFactory.getLogger(HtmlParserPageRetriever.class);

public HtmlParserPageRetriever(String baseUrl) {
this.baseUrl = baseUrl;
}

@Override
public PageContent fetchPageContent(String url) {
logger.debug("Fetching {}", url);
try {
Parser parser = new Parser(url);
PageContentVisitor visitor = new PageContentVisitor(baseUrl, url);
parser.visitAllNodesWith(visitor);

return visitor.getContent();
} catch (ParserException ex) {
throw new IllegalStateException(ex);
}
}


private static class PageContentVisitor extends NodeVisitor {

private List<String> linksToVisit = new ArrayList<String>();
private String content;
private String title;

private final String baseUrl;
private final String currentUrl;

public PageContentVisitor(String baseUrl, String currentUrl) {
super(true);
this.baseUrl = baseUrl;
this.currentUrl = currentUrl;

}

@Override
public void visitTag(Tag tag) {
if (tag instanceof LinkTag) {
LinkTag linkTag = (LinkTag) tag;
if (linkTag.getLink().startsWith(baseUrl) && isProbablyHtml(linkTag.getLink())) {
logger.debug("Using link pointing to {}", linkTag.getLink());
linksToVisit.add(linkTag.getLink());
} else {
logger.debug("Skipping link pointing to {}", linkTag.getLink());
}
} else if (tag instanceof TitleTag) {
TitleTag titleTag = (TitleTag) tag;
title = titleTag.getTitle();
} else if (tag instanceof BodyTag) {
BodyTag bodyTag = (BodyTag) tag;
content = bodyTag.toPlainTextString();
}
}

public PageContent getContent() {
return new PageContent(currentUrl, linksToVisit, title, content);
}

private boolean isProbablyHtml(String link) {
return link.endsWith(".html") || link.endsWith("/");
}
}
}
18 changes: 18 additions & 0 deletions src/main/java/de/fhopf/akka/Indexer.java
@@ -0,0 +1,18 @@
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package de.fhopf.akka;

/**
*
* @author Florian Hopf, http://www.florian-hopf.de
*/
public interface Indexer {

void commit();

void index(PageContent pageContent);

void close();
}
63 changes: 63 additions & 0 deletions src/main/java/de/fhopf/akka/IndexerImpl.java
@@ -0,0 +1,63 @@
package de.fhopf.akka;

import java.io.IOException;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexWriter;

/**
* Indexer Impl, contaijns writer state.
* @author Florian Hopf, http://www.florian-hopf.de
*/
public class IndexerImpl implements Indexer {

private final IndexWriter indexWriter;

public IndexerImpl(IndexWriter indexWriter) {
this.indexWriter = indexWriter;
}

@Override
public void index(PageContent pageContent) {
try {
indexWriter.addDocument(toDocument(pageContent));
} catch (CorruptIndexException ex) {
throw new IllegalStateException(ex);
} catch (IOException ex) {
throw new IllegalStateException(ex);
}
}

private Document toDocument(PageContent content) {
Document doc = new Document();
doc.add(new Field("id", content.getPath(), Field.Store.YES, Field.Index.NOT_ANALYZED));
doc.add(new Field("title", content.getTitle(), Field.Store.YES, Field.Index.ANALYZED));
doc.add(new Field("content", content.getContent(), Field.Store.NO, Field.Index.ANALYZED));
return doc;
}

@Override
public void commit() {
try {
indexWriter.commit();
} catch (CorruptIndexException ex) {
throw new IllegalStateException(ex);
} catch (IOException ex) {
throw new IllegalStateException(ex);
}
}

@Override
public void close() {
try {
indexWriter.close(true);
} catch (CorruptIndexException ex) {
throw new IllegalStateException(ex);
} catch (IOException ex) {
throw new IllegalStateException(ex);
}

}

}

0 comments on commit c074168

Please sign in to comment.