Skip to content
Browse files

Made DB tokenizer and spotter parallel to avoid OpenNLP threading Nul…

…lPointers and improve tokenization performance.
  • Loading branch information...
1 parent 9f2276b commit d7d8009ca925e64dd201c98f646f905cdeea4f9b @jodaiber jodaiber committed Feb 6, 2013
View
6 core/pom.xml
@@ -254,6 +254,12 @@
<artifactId>breeze-learn_2.9.2</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor</artifactId>
+ </dependency>
+
+
</dependencies>
</project>
View
40 core/src/main/scala/org/dbpedia/spotlight/db/DefaultTokenizer.scala
@@ -21,35 +21,31 @@ class DefaultTokenizer(
) extends Tokenizer {
def tokenize(text: Text): List[Token] = {
- this.synchronized {
- val sentences: Array[Span] = sentenceDetector.sentPosDetect(text.text)
+ sentenceDetector.sentPosDetect(text.text).map{ sentencePos: Span =>
- sentences.map{ sentencePos: Span =>
+ val sentence = text.text.substring(sentencePos.getStart, sentencePos.getEnd)
- val sentence = text.text.substring(sentencePos.getStart, sentencePos.getEnd)
+ val sentenceTokens = tokenizer.tokenize(sentence)
+ val sentenceTokenPos = tokenizer.tokenizePos(sentence)
+ val posTags = if(posTagger != null) posTagger.tag(sentenceTokens) else Array[String]()
- val sentenceTokens = tokenizer.tokenize(sentence)
- val sentenceTokenPos = tokenizer.tokenizePos(sentence)
- val posTags = if(posTagger != null) posTagger.tag(sentenceTokens) else Array[String]()
+ (0 to sentenceTokens.size-1).map{ i: Int =>
+ val token = if (stopWords contains sentenceTokens(i)) {
+ new Token(sentenceTokens(i), sentencePos.getStart + sentenceTokenPos(i).getStart, TokenType.STOPWORD)
+ } else {
+ new Token(sentenceTokens(i), sentencePos.getStart + sentenceTokenPos(i).getStart, getStemmedTokenType(sentenceTokens(i)))
+ }
- (0 to sentenceTokens.size-1).map{ i: Int =>
- val token = if (stopWords contains sentenceTokens(i)) {
- new Token(sentenceTokens(i), sentencePos.getStart + sentenceTokenPos(i).getStart, TokenType.STOPWORD)
- } else {
- new Token(sentenceTokens(i), sentencePos.getStart + sentenceTokenPos(i).getStart, getStemmedTokenType(sentenceTokens(i)))
- }
+ if(posTagger != null)
+ token.setFeature(new Feature("pos", posTags(i)))
- if(posTagger != null)
- token.setFeature(new Feature("pos", posTags(i)))
+ if(i == sentenceTokens.size-1)
+ token.setFeature(new Feature("end-of-sentence", true))
- if(i == sentenceTokens.size-1)
- token.setFeature(new Feature("end-of-sentence", true))
-
- token
- }
- }.flatten.toList
- }
+ token
+ }
+ }.flatten.toList
}
View
58 core/src/main/scala/org/dbpedia/spotlight/db/SpotlightModel.scala
@@ -1,5 +1,6 @@
package org.dbpedia.spotlight.db
+import concurrent.{TokenizerWrapper, SpotterWrapper}
import memory.MemoryStore
import model._
import org.tartarus.snowball.SnowballProgram
@@ -17,6 +18,8 @@ import org.dbpedia.spotlight.spot.Spotter
import java.io.{File, FileInputStream}
import java.util.Properties
import breeze.linalg.DenseVector
+import opennlp.tools.chunker.ChunkerModel
+import opennlp.tools.namefind.TokenNameFinderModel
class SpotlightModel(val tokenizer: Tokenizer,
val spotters: java.util.Map[SpotterPolicy, Spotter],
@@ -50,14 +53,25 @@ object SpotlightModel {
//Create the tokenizer:
val posTagger = new File(modelFolder, "opennlp/pos-maxent.bin")
- val tokenizer: Tokenizer = new DefaultTokenizer(
- new TokenizerME(new TokenizerModel(new FileInputStream(new File(modelFolder, "opennlp/token.bin")))),
- stopwords,
- stemmer,
- new SentenceDetectorME(new SentenceModel(new FileInputStream(new File(modelFolder, "opennlp/sent.bin")))),
- if (posTagger.exists()) new POSTaggerME(new POSModel(new FileInputStream(posTagger))) else null,
- tokenTypeStore
- )
+ val posModel = new POSModel(new FileInputStream(posTagger))
+ val tokenModel = new TokenizerModel(new FileInputStream(new File(modelFolder, "opennlp/token.bin")))
+ val sentenceModel = new SentenceModel(new FileInputStream(new File(modelFolder, "opennlp/sent.bin")))
+
+ val cores = (1 to Runtime.getRuntime.availableProcessors())
+
+
+ val tokenizer: Tokenizer = new TokenizerWrapper(
+ cores.map(_ =>
+ new DefaultTokenizer(
+ new TokenizerME(tokenModel),
+ stopwords,
+ stemmer,
+ new SentenceDetectorME(sentenceModel),
+ if (posTagger.exists()) new POSTaggerME(posModel) else null,
+ tokenTypeStore
+ )
+ )
+ ).asInstanceOf[Tokenizer]
val searcher = new DBCandidateSearcher(resStore, sfStore, candMapStore)
val disambiguator = new ParagraphDisambiguatorJ(new DBTwoStepDisambiguator(
@@ -71,18 +85,26 @@ object SpotlightModel {
))
val nerModels = new File(modelFolder, "opennlp").list().filter(_.startsWith("ner-")).map { f: String =>
- new FileInputStream(new File(new File(modelFolder, "opennlp"), f))
+ new TokenNameFinderModel(new FileInputStream(new File(new File(modelFolder, "opennlp"), f)))
}.toList
- val chunker = new File(modelFolder, "opennlp/chunker.bin")
-
- val spotter = new OpenNLPSpotter(
- if (chunker.exists()) Some(new FileInputStream(chunker)) else None,
- nerModels,
- sfStore,
- stopwords,
- Some(loadSpotterThresholds(new File(modelFolder, "opennlp_chunker_thresholds.txt"))),
- Set("NP", "MWU", "PP"), "N"
+ val chunkerFile = new File(modelFolder, "opennlp/chunker.bin")
+ val chunkerModel = if (chunkerFile.exists())
+ Some(new ChunkerModel(new FileInputStream(chunkerFile)))
+ else
+ None
+
+ val spotter = new SpotterWrapper(
+ cores.map(_ =>
+ new OpenNLPSpotter(
+ chunkerModel,
+ nerModels,
+ sfStore,
+ stopwords,
+ Some(loadSpotterThresholds(new File(modelFolder, "opennlp_chunker_thresholds.txt"))),
+ Set("NP", "MWU", "PP"), "N"
+ ).asInstanceOf[Spotter]
+ )
).asInstanceOf[Spotter]
val spotters: java.util.Map[SpotterPolicy, Spotter] = Map(SpotterPolicy.Default -> spotter).asJava
View
70 core/src/main/scala/org/dbpedia/spotlight/db/concurrent/SpotterWrapper.scala
@@ -0,0 +1,70 @@
+package org.dbpedia.spotlight.db.concurrent
+
+import java.io.IOException
+import org.dbpedia.spotlight.model.{SurfaceFormOccurrence, Text}
+import akka.actor.{OneForOneStrategy, Props, ActorSystem, Actor}
+import akka.routing.SmallestMailboxRouter
+import akka.actor.SupervisorStrategy.Restart
+import org.dbpedia.spotlight.spot.Spotter
+import akka.dispatch.Await
+import akka.util.duration._
+import akka.pattern.ask
+import akka.util
+
+/**
+ * A Wrapper for Spotter workers.
+ *
+ * @author Joachim Daiber
+ */
+
+class SpotterWrapper(val spotters: Seq[Spotter]) extends Spotter {
+
+ var requestTimeout = 60
+
+ val system = ActorSystem()
+ val workers = spotters.map { spotter: Spotter =>
+ system.actorOf(Props(new SpotterActor(spotter)))
+ }
+
+ def size: Int = spotters.size
+
+ val router = system.actorOf(Props[SpotterActor].withRouter(
+ SmallestMailboxRouter(routees = workers).withSupervisorStrategy(
+ OneForOneStrategy(maxNrOfRetries = 10) {
+ case _: IOException => Restart
+ })
+ )
+ )
+
+ implicit val timeout = util.Timeout(requestTimeout seconds)
+
+ def extract(text: Text): java.util.List[SurfaceFormOccurrence] = {
+ val futureResult = router ? SpotterRequest(text)
+ Await.result(futureResult, timeout.duration).asInstanceOf[java.util.List[SurfaceFormOccurrence]]
+ }
+
+ def close() {
+ system.shutdown()
+ }
+
+ def getName: String = "SpotterWrapper[%s]".format(spotters.head.getClass.getSimpleName)
+
+ def setName(name: String) {}
+}
+
+class SpotterActor(val spotter: Spotter) extends Actor {
+
+ def receive = {
+ case SpotterRequest(text) => {
+ try {
+ sender ! spotter.extract(text)
+
+ } catch {
+ case e: NullPointerException => throw new IOException("Could not tokenize.")
+ }
+ }
+ }
+
+}
+
+case class SpotterRequest(text: Text)
View
73 core/src/main/scala/org/dbpedia/spotlight/db/concurrent/TokenizerWrapper.scala
@@ -0,0 +1,73 @@
+package org.dbpedia.spotlight.db.concurrent
+
+import java.io.IOException
+import org.dbpedia.spotlight.db.model.Tokenizer
+import org.dbpedia.spotlight.model.{Token, Text}
+import akka.actor.{OneForOneStrategy, Props, ActorSystem, Actor}
+import akka.routing.SmallestMailboxRouter
+import akka.actor.SupervisorStrategy.Restart
+import akka.dispatch.Await
+import akka.util
+import akka.util.duration._
+import akka.pattern.ask
+
+/**
+ * A Wrapper for Tokenizer workers.
+ *
+ * @author Joachim Daiber
+ */
+
+class TokenizerWrapper(val tokenizers: Seq[Tokenizer]) extends Tokenizer {
+
+ var requestTimeout = 60
+
+ val system = ActorSystem()
+ val workers = tokenizers.map { case tokenizer:Tokenizer =>
+ system.actorOf(Props(new TokenizerActor(tokenizer)))
+ }.seq
+
+ def size: Int = tokenizers.size
+
+ val router = system.actorOf(Props[TokenizerActor].withRouter(
+ SmallestMailboxRouter(routees = workers).withSupervisorStrategy(
+ OneForOneStrategy(maxNrOfRetries = 10) {
+ case _: IOException => Restart
+ })
+ )
+ )
+
+ implicit val timeout = util.Timeout(requestTimeout seconds)
+
+ override def tokenizeMaybe(text: Text) {
+ val futureResult = router ? TokenizerRequest(text)
+ Await.result(futureResult, timeout.duration)
+ }
+
+ override def tokenize(text: Text): List[Token] = {
+ tokenizeMaybe(text)
+ text.featureValue[List[Token]]("tokens").get
+ }
+
+ def close() {
+ system.shutdown()
+ }
+
+}
+
+class TokenizerActor(val tokenizer: Tokenizer) extends Actor {
+
+ def receive = {
+ case TokenizerRequest(text) => {
+ try {
+ sender ! tokenizer.tokenizeMaybe(text)
+
+ } catch {
+ case e: NullPointerException => throw new IOException("Could not tokenize.")
+ }
+ }
+ }
+
+}
+
+
+case class TokenizerRequest(text: Text)
View
10 core/src/main/scala/org/dbpedia/spotlight/spot/opennlp/OpenNLPSpotter.scala
@@ -28,8 +28,8 @@ import scala.Some
*/
class OpenNLPSpotter(
- chunkerModel: Option[InputStream],
- nerModels: List[InputStream],
+ chunkerModel: Option[ChunkerModel],
+ nerModels: List[TokenNameFinderModel],
surfaceFormStore: SurfaceFormStore,
stopwords: Set[String],
spotFeatureWeights: Option[Seq[Double]],
@@ -43,12 +43,12 @@ class OpenNLPSpotter(
}
val chunker = chunkerModel match {
- case Some(m) => Some(new ChunkerME(new ChunkerModel(m)))
+ case Some(m) => Some(new ChunkerME(m))
case None => None
}
- val ners = nerModels.map{ m: InputStream =>
- new NameFinderME(new TokenNameFinderModel(m))
+ val ners = nerModels.map{ m: TokenNameFinderModel =>
+ new NameFinderME(m)
}
val uppercaseFinder = new RegexNameFinder(
View
3 index/src/main/scala/org/dbpedia/spotlight/db/CreateSpotlightModel.scala
@@ -13,6 +13,7 @@ import opennlp.tools.tokenize.{TokenizerModel, TokenizerME}
import opennlp.tools.sentdetect.{SentenceModel, SentenceDetectorME}
import opennlp.tools.postag.{POSModel, POSTaggerME}
import org.dbpedia.spotlight.spot.opennlp.OpenNLPSpotter
+import opennlp.tools.chunker.ChunkerModel
/**
* This script creates a Spotlight model folder from the results of
@@ -185,7 +186,7 @@ object CreateSpotlightModel {
val spotter = new OpenNLPSpotter(
- Some(new FileInputStream(new File(opennlpOut, "chunker.bin"))),
+ Some(new ChunkerModel(new FileInputStream(new File(opennlpOut, "chunker.bin")))),
List(),
sfStore,
stopwords,
View
12 pom.xml
@@ -524,6 +524,12 @@
<version>0.1</version>
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor</artifactId>
+ <version>2.0.5</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
@@ -578,6 +584,12 @@
<id>sonatype</id>
<url>http://repository.sonatype.org/content/groups/public/</url>
</repository>
+
+ <repository>
+ <id>akka</id>
+ <url>http://repo.akka.io/releases/</url>
+ </repository>
+
</repositories>

0 comments on commit d7d8009

Please sign in to comment.
Something went wrong with that request. Please try again.