Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Merge pull request #26 from CatalystCode/filter-keywords
Browse files Browse the repository at this point in the history
Filter out events that don't match any keywords
  • Loading branch information
c-w committed Jun 22, 2017
2 parents 3217caf + 8e95135 commit 6dc0dcd
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ object ProjectFortis extends App {
Logger.getLogger("liblocations").setLevel(Level.DEBUG)

private object TransformContext extends TransformContext {
val targetKeywords = Set("Ariana")
val geofence = Geofence(north = 49.6185146245, west = -124.9578052195, south = 46.8691952854, east = -121.0945042053)
val placeRecognizer = new PlaceRecognizer(Settings.modelsDir)
val peopleRecognizer = new PeopleRecognizer(Settings.modelsDir)
val featureServiceClient = new FeatureServiceClient(Settings.featureServiceHost)
val locationsExtractor = new LocationsExtractor(featureServiceClient, geofence, Some(placeRecognizer)).buildLookup()
val keywordExtractor = new KeywordExtractor(List("Ariana"))
val keywordExtractor = new KeywordExtractor(targetKeywords)
val imageAnalyzer = new ImageAnalyzer(ImageAnalysisAuth(Settings.oxfordVisionToken), featureServiceClient)
val languageDetector = new LanguageDetector(LanguageDetectorAuth(Settings.oxfordLanguageToken))
val sentimentDetector = new SentimentDetector(SentimentDetectorAuth(Settings.oxfordLanguageToken))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ object InstagramPipeline extends Pipeline {
streamProvider.buildStream[InstagramItem](ssc, streamRegistry("instagram")).map(_
.map(instagram => {
// do computer vision analysis
val analysis = imageAnalyzer.analyze(instagram.images.standard_resolution.url)
AnalyzedItem(
body = instagram.caption.text,
title = "",
body = analysis.summary.getOrElse(""),
title = instagram.caption.text,
sharedLocations = instagram.location match {
case Some(location) => locationsExtractor.fetch(location.latitude, location.longitude).toList
case None => List()},
analysis = imageAnalyzer.analyze(instagram.images.standard_resolution.url),
analysis = analysis.copy(
keywords = analysis.keywords.filter(tag => targetKeywords.contains(tag.name))),
source = instagram.link)
})
.map(analyzedItem => {
// keyword extraction
val keywords = keywordExtractor.extractKeywords(analyzedItem.body)
val keywords = keywordExtractor.extractKeywords(analyzedItem.title) ::: keywordExtractor.extractKeywords(analyzedItem.body)
analyzedItem.copy(analysis = analyzedItem.analysis.copy(keywords = keywords ::: analyzedItem.analysis.keywords))
}))
})
.filter(_.analysis.keywords.nonEmpty))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ object TextPipeline {
.map(inferLanguage(_, transformContext))
.filter(isLanguageSupported(_, transformContext))
.map(extractKeywords(_, transformContext))
.filter(hasKeywords(_, transformContext))
.map(extractEntities(_, transformContext))
.map(analyzeSentiment(_, transformContext))
.map(extractLocations(_, transformContext))
Expand Down Expand Up @@ -47,6 +48,10 @@ object TextPipeline {
}
}

private def hasKeywords(analyzedItem: AnalyzedItem, transformContext: TransformContext): Boolean = {
analyzedItem.analysis.keywords.nonEmpty
}

private def extractEntities(analyzedItem: AnalyzedItem, transformContext: TransformContext): AnalyzedItem = {
import transformContext.peopleRecognizer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.microsoft.partnercatalyst.fortis.spark.transforms.sentiment.Sentiment
import com.microsoft.partnercatalyst.fortis.spark.transforms.topic.KeywordExtractor

trait TransformContext {
val targetKeywords: Set[String]
val geofence: Geofence
val placeRecognizer: PlaceRecognizer
val peopleRecognizer: PeopleRecognizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.apache.commons.collections4.trie.PatriciaTrie
import scala.collection.mutable.ListBuffer

@SerialVersionUID(100L)
class KeywordExtractor(keywords: Seq[String]) extends Serializable {
class KeywordExtractor(keywords: Iterable[String]) extends Serializable {
@transient private lazy val keywordTrie = initializeTrie(keywords)

def extractKeywords(text: String): List[Tag] = {
Expand All @@ -34,7 +34,7 @@ class KeywordExtractor(keywords: Seq[String]) extends Serializable {
tokens.tails.flatMap(findMatches(_).map(Tag(_, confidence = None))).toList
}

private def initializeTrie(keywords: Seq[String]): PatriciaTrie[String] = {
private def initializeTrie(keywords: Iterable[String]): PatriciaTrie[String] = {
val trie = new PatriciaTrie[String]()
keywords.foreach(k => trie.put(k.toLowerCase, k))

Expand Down

0 comments on commit 6dc0dcd

Please sign in to comment.