Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Merge pull request #9 from CatalystCode/more-sentiments
Browse files Browse the repository at this point in the history
Add sentiment analysis capability for 68 more languages
  • Loading branch information
c-w committed Jun 10, 2017
2 parents e6bffa3 + e89573a commit d0d9835
Show file tree
Hide file tree
Showing 19 changed files with 453 additions and 116 deletions.
10 changes: 5 additions & 5 deletions src/main/scala/DemoFortis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import com.microsoft.partnercatalyst.fortis.spark.tadaweb.dto.TadawebEvent
import com.microsoft.partnercatalyst.fortis.spark.transforms.image.{ImageAnalysisAuth, ImageAnalyzer}
import com.microsoft.partnercatalyst.fortis.spark.transforms.language.{LanguageDetector, LanguageDetectorAuth}
import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.client.FeatureServiceClient
import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.nlp.PlaceRecognizer
import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.{Geofence, LocationsExtractor}
import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.{Geofence, LocationsExtractor, PlaceRecognizer}
import com.microsoft.partnercatalyst.fortis.spark.transforms.sentiment.SentimentDetector.{Negative, Neutral, Positive}
import com.microsoft.partnercatalyst.fortis.spark.transforms.sentiment.{SentimentDetector, SentimentDetectorAuth}
import com.microsoft.partnercatalyst.fortis.spark.transforms.topic.KeywordExtractor
import com.microsoft.partnercatalyst.fortis.spark.transforms.{Analysis, AnalyzedItem}
Expand Down Expand Up @@ -231,9 +231,9 @@ object DemoFortis {
.map(fortisEvent => {
val language = languageDetection.detectLanguage(fortisEvent.originalItem.text)
val sentiment: Option[Double] = fortisEvent.originalItem.sentiment match {
case "negative" => Some(0)
case "neutral" => Some(0.6)
case "positive" => Some(1)
case "negative" => Some(Negative)
case "neutral" => Some(Neutral)
case "positive" => Some(Positive)
case _ => language match {
case Some(lang) =>
sentimentDetection.detectSentiment(fortisEvent.originalItem.text, lang)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.locations
package com.microsoft.partnercatalyst.fortis.spark.logging

import org.apache.log4j.LogManager

trait Logger {
@transient private lazy val log = LogManager.getLogger("liblocations")
trait Loggable {
@transient private lazy val log = LogManager.getLogger(getClass.getName)

def logDebug(message: String): Unit = log.debug(message)
def logInfo(message: String): Unit = log.info(message)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,25 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.locations.nlp
package com.microsoft.partnercatalyst.fortis.spark.transforms

import java.io.{File, FileNotFoundException, IOError}
import java.io.{File, FileNotFoundException}
import java.net.URL
import java.nio.file.Files
import java.util.concurrent.ConcurrentHashMap

import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.Logger
import ixa.kaflib.Entity
import com.microsoft.partnercatalyst.fortis.spark.logging.Loggable
import net.lingala.zip4j.core.ZipFile

import scala.collection.JavaConversions._
import scala.sys.process._

@SerialVersionUID(100L)
class PlaceRecognizer(
modelsSource: Option[String] = None,
enabledLanguages: Set[String] = Set("de", "en", "es", "eu", "it", "nl")
) extends Serializable with Logger {
class ZipModelsProvider(
modelsUrlFromLanguage: String => String,
modelsSource: Option[String] = None
) extends Serializable with Loggable {

@volatile private lazy val modelDirectories = new ConcurrentHashMap[String, String]

def extractPlaces(text: String, language: String): Iterable[String] = {
if (!enabledLanguages.contains(language)) {
return Set()
}

try {
val resourcesDirectory = ensureModelsAreDownloaded(language)

val kaf = OpeNER.tokAnnotate(resourcesDirectory, text, language)
OpeNER.posAnnotate(resourcesDirectory, language, kaf)
OpeNER.nerAnnotate(resourcesDirectory, language, kaf)

logDebug(s"Analyzed text $text in language $language: $kaf")

kaf.getEntities.toList.filter(entityIsPlace).map(_.getStr).toSet
} catch {
case ex @ (_ : NullPointerException | _ : IOError) =>
logError(s"Unable to extract places for language $language", ex)
Set()
}
}

private def entityIsPlace(entity: Entity) = {
val entityType = Option(entity.getType).getOrElse("").toLowerCase
entityType == "location" || entityType == "gpe"
}

private def ensureModelsAreDownloaded(language: String): String = {
def ensureModelsAreDownloaded(language: String): String = {
val localPath = modelsSource.getOrElse("")
if (hasModelFiles(localPath, language)) {
logDebug(s"Using locally provided model files from $localPath")
Expand All @@ -61,7 +33,7 @@ class PlaceRecognizer(
return previouslyDownloadedPath
}

val remotePath = modelsSource.getOrElse(s"https://fortismodels.blob.core.windows.net/public/opener-$language.zip")
val remotePath = modelsSource.getOrElse(modelsUrlFromLanguage(language))
if ((!remotePath.startsWith("http://") && !remotePath.startsWith("https://")) || !remotePath.endsWith(".zip")) {
throw new FileNotFoundException(s"Unable to process $remotePath, should be http(s) link to zip file")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.locations

import com.microsoft.partnercatalyst.fortis.spark.logging.Loggable
import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.client.FeatureServiceClient
import com.microsoft.partnercatalyst.fortis.spark.transforms.Location
import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.nlp.PlaceRecognizer

import scala.collection.mutable

Expand All @@ -14,7 +14,7 @@ class LocationsExtractor(
geofence: Geofence,
placeRecognizer: Option[PlaceRecognizer] = None,
ngrams: Int = 3
) extends Serializable with Logger {
) extends Serializable with Loggable {

protected var lookup: Map[String, Set[String]] = _

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.locations

import java.io.{IOError, IOException}

import com.microsoft.partnercatalyst.fortis.spark.logging.Loggable
import com.microsoft.partnercatalyst.fortis.spark.transforms.ZipModelsProvider
import com.microsoft.partnercatalyst.fortis.spark.transforms.nlp.OpeNER
import ixa.kaflib.Entity

import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}

@SerialVersionUID(100L)
class PlaceRecognizer(
modelsSource: Option[String] = None,
enabledLanguages: Set[String] = Set("de", "en", "es", "eu", "it", "nl")
) extends Serializable with Loggable {

@volatile private lazy val modelsProvider = createModelsProvider()

def extractPlaces(text: String, language: String): Set[String] = {
if (!enabledLanguages.contains(language)) {
return Set()
}

Try(modelsProvider.ensureModelsAreDownloaded(language)) match {
case Failure(ex) =>
logError(s"Unable to load models for language $language", ex)
Set()

case Success(resourcesDirectory) =>
extractPlacesUsingModels(text, language, resourcesDirectory)
}
}

private def extractPlacesUsingModels(text: String, language: String, resourcesDirectory: String): Set[String] = {
try {
val kaf = OpeNER.tokAnnotate(resourcesDirectory, text, language)
OpeNER.posAnnotate(resourcesDirectory, language, kaf)
OpeNER.nerAnnotate(resourcesDirectory, language, kaf)

logDebug(s"Analyzed text $text in language $language: $kaf")

kaf.getEntities.toList.filter(entityIsPlace).map(_.getStr).toSet
} catch {
case ex @ (_ : NullPointerException | _ : IOError | _ : IOException) =>
logError(s"Unable to extract places for language $language", ex)
Set()
}
}

private def entityIsPlace(entity: Entity) = {
val entityType = Option(entity.getType).getOrElse("").toLowerCase
entityType == "location" || entityType == "gpe"
}

protected def createModelsProvider(): ZipModelsProvider = {
new ZipModelsProvider(
language => s"https://fortismodels.blob.core.windows.net/public/opener-$language.zip",
modelsSource)
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.locations.client

import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.{Geofence, Logger}
import com.microsoft.partnercatalyst.fortis.spark.logging.Loggable
import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.Geofence
import com.microsoft.partnercatalyst.fortis.spark.transforms.locations.dto.{FeatureServiceFeature, FeatureServiceResponse}
import net.liftweb.json

import scala.io.Source
import scala.util.{Failure, Success, Try}

@SerialVersionUID(100L)
class FeatureServiceClient(host: String) extends Serializable with Logger {
class FeatureServiceClient(host: String) extends Serializable with Loggable {
def bbox(geofence: Geofence): Iterable[FeatureServiceFeature] = {
unpack(fetchBboxResponse(geofence), "bbox")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.locations.nlp
package com.microsoft.partnercatalyst.fortis.spark.transforms.nlp

import java.io.{BufferedReader, ByteArrayInputStream, File, InputStreamReader}
import java.util.Properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.nlp

object Tokenizer {
@transient private lazy val wordTokenizer = """\b""".r

def apply(sentence: String): Seq[String] = {
wordTokenizer.split(sentence).toSeq
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.sentiment

import net.liftweb.json

import scalaj.http.Http

case class SentimentDetectorAuth(key: String, apiHost: String = "westus.api.cognitive.microsoft.com")

@SerialVersionUID(100L)
class CognitiveServicesSentimentDetector(
auth: SentimentDetectorAuth
) extends DetectsSentiment {

def detectSentiment(text: String, language: String): Option[Double] = {
val textId = "0"
val requestBody = buildRequestBody(text, textId, language)
val response = callCognitiveServices(requestBody)
parseResponse(response, textId)
}

protected def callCognitiveServices(requestBody: String): String = {
Http(s"https://${auth.apiHost}/text/analytics/v2.0/sentiment")
.headers(
"Content-Type" -> "application/json",
"Ocp-Apim-Subscription-Key" -> auth.key)
.postData(requestBody)
.asString
.body
}

protected def buildRequestBody(text: String, textId: String, language: String): String = {
implicit val formats = json.DefaultFormats
val requestBody = dto.JsonSentimentDetectionRequest(documents = List(dto.JsonSentimentDetectionRequestItem(
id = textId,
language = language,
text = text)))
json.compactRender(json.Extraction.decompose(requestBody))
}

protected def parseResponse(apiResponse: String, textId: String): Option[Double] = {
implicit val formats = json.DefaultFormats
val response = json.parse(apiResponse).extract[dto.JsonSentimentDetectionResponse]
if (response.errors.exists(_.id == textId)) {
None
} else {
response.documents.find(_.id == textId).map(_.score)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,49 +1,43 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.sentiment

import net.liftweb.json
import com.microsoft.partnercatalyst.fortis.spark.logging.Loggable

import scalaj.http.Http

case class SentimentDetectorAuth(key: String, apiHost: String = "westus.api.cognitive.microsoft.com")
import scala.util.{Failure, Success, Try}

@SerialVersionUID(100L)
class SentimentDetector(
auth: SentimentDetectorAuth
) extends Serializable {
) extends DetectsSentiment {

private lazy val detectors = initializeDetectors()

def detectSentiment(text: String, language: String): Option[Double] = {
val textId = "0"
val requestBody = buildRequestBody(text, textId, language)
val response = callCognitiveServices(requestBody)
parseResponse(response, textId)
detectors.view.map(detector => {
Try(detector.detectSentiment(text, language)) match {
case Success(Some(sentimentScore)) =>
logDebug(s"Computed sentiment via ${detector.getClass}")
Some(sentimentScore)
case Success(None) | Failure(_) =>
logDebug(s"Unable to compute sentiment via ${detector.getClass}")
None
}
})
.find(_.isDefined)
.getOrElse(None)
}

protected def callCognitiveServices(requestBody: String): String = {
Http(s"https://${auth.apiHost}/text/analytics/v2.0/sentiment")
.headers(
"Content-Type" -> "application/json",
"Ocp-Apim-Subscription-Key" -> auth.key)
.postData(requestBody)
.asString
.body
protected def initializeDetectors(): Seq[DetectsSentiment] = {
Seq(new CognitiveServicesSentimentDetector(auth),
new WordListSentimentDetector())
}
}

protected def buildRequestBody(text: String, textId: String, language: String): String = {
implicit val formats = json.DefaultFormats
val requestBody = dto.JsonSentimentDetectionRequest(documents = List(dto.JsonSentimentDetectionRequestItem(
id = textId,
language = language,
text = text)))
json.compactRender(json.Extraction.decompose(requestBody))
}
object SentimentDetector {
val Positive: Double = 1.0
val Neutral: Double = 0.6
val Negative: Double = 0.0
}

protected def parseResponse(apiResponse: String, textId: String): Option[Double] = {
implicit val formats = json.DefaultFormats
val response = json.parse(apiResponse).extract[dto.JsonSentimentDetectionResponse]
if (response.errors.exists(_.id == textId)) {
None
} else {
response.documents.find(_.id == textId).map(_.score)
}
}
trait DetectsSentiment extends Serializable with Loggable {
def detectSentiment(text: String, language: String): Option[Double]
}
Loading

0 comments on commit d0d9835

Please sign in to comment.