Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Publish AnalyzedItems to Cassandra #28

Merged
merged 19 commits into from
Jun 23, 2017
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ Run it via:
# set up all the requisite environment variables
export FORTIS_APPINSIGHTS_IKEY="..."
export FORTIS_FEATURE_SERVICE_HOST="..."
export FORTIS_CASSANDRA_HOST="..."
export FORTIS_CASSANDRA_USER="..."
export FORTIS_CASSANDRA_PASSWORD="..."
export INSTAGRAM_AUTH_TOKEN="..."
export OXFORD_VISION_TOKEN="..."
export OXFORD_SPEECH_TOKEN="..."
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ val sparkVersion = "2.1.0"
// Dependencies provided by the Spark distro
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
).map(_ % "provided")

Expand All @@ -33,6 +34,7 @@ libraryDependencies ++= Seq(
"org.apache.commons" % "commons-collections4" % "4.1",
"com.microsoft.azure" %% "spark-streaming-eventhubs" % "2.0.5",
"com.esotericsoftware.kryo" % "kryo" % "2.24.0",
"com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2",
"net.liftweb" %% "lift-json" % "3.0.1",
"org.scalaj" %% "scalaj-http" % "2.3.0",
"net.lingala.zip4j" % "zip4j" % "1.3.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.microsoft.partnercatalyst.fortis.spark

import com.microsoft.partnercatalyst.fortis.spark.logging.AppInsights
import com.microsoft.partnercatalyst.fortis.spark.pipeline._
import com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra.CassandraSink
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.ConnectorConfig
import com.microsoft.partnercatalyst.fortis.spark.transforms.image.{ImageAnalysisAuth, ImageAnalyzer}
import com.microsoft.partnercatalyst.fortis.spark.transforms.language.{LanguageDetector, LanguageDetectorAuth}
Expand Down Expand Up @@ -88,12 +89,8 @@ object ProjectFortis extends App {

// Attach each pipeline (aka code path)
// 'fortisEvents' is the stream of analyzed data aggregated (union) from all pipelines
val fortisEvents = pipelines.flatMap(
pipeline => pipeline(streamProvider, streamRegistry, ssc, TransformContext)
).reduceOption(_.union(_))

// TODO: other computations and save to DB
fortisEvents.foreach(_.print())
val fortisEvents = pipelines.flatMap(pipeline => pipeline(streamProvider, streamRegistry, ssc, TransformContext)).reduceOption(_.union(_))
CassandraSink(fortisEvents, "fortistest", "events") // todo: fill in real values

ssc.checkpoint(Settings.progressDir)
ssc
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.microsoft.partnercatalyst.fortis.spark.dto

import java.util.UUID

case class AnalyzedItem(
id: UUID,
createdAtEpoch: Long,
body: String,
title: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
Expand All @@ -17,6 +18,7 @@ object BingPipeline extends Pipeline {

private def convertToSchema(stream: DStream[BingPost], transformContext: TransformContext): DStream[AnalyzedItem] = {
stream.map(post => AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = post.snippet,
title = post.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
Expand All @@ -19,6 +20,7 @@ object FacebookPipeline extends Pipeline {
import transformContext._

stream.map(post => AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = post.post.getMessage,
title = "",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem
Expand All @@ -18,6 +19,7 @@ object InstagramPipeline extends Pipeline {
// do computer vision analysis
val analysis = imageAnalyzer.analyze(instagram.images.standard_resolution.url)
AnalyzedItem(
id = randomUUID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called via the uuid() function in cassandra.

createdAtEpoch = now.getEpochSecond,
body = analysis.summary.getOrElse(""),
title = instagram.caption.text,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -17,6 +18,7 @@ object RadioPipeline extends Pipeline {

private def convertToSchema(stream: DStream[RadioTranscription], transformContext: TransformContext): DStream[AnalyzedItem] = {
stream.map(transcription => AnalyzedItem(
id = randomUUID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called via the uuid() function in cassandra.

createdAtEpoch = now.getEpochSecond,
body = transcription.text,
title = "",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -19,6 +20,7 @@ object TadawebPipeline extends Pipeline {
import transformContext._

stream.map(tada => AnalyzedItem(
id = randomUUID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called via the uuid() function in cassandra.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage of creating the id early is that we have a way to track every event through the pipeline (e.g. useful when logging). Is this benefit worth explicitly creating the UUID?

createdAtEpoch = now.getEpochSecond,
body = tada.text,
title = tada.title,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -19,6 +20,7 @@ object TwitterPipeline extends Pipeline {
import transformContext._

stream.map(tweet => AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = tweet.getText,
title = "",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra

import scala.util.Properties.envOrElse

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration

object CassandraConfig {
def init(conf: SparkConf, batchDuration: Duration): SparkConf = {
conf.setIfMissing("spark.cassandra.connection.host", envOrElse("FORTIS_CASSANDRA_HOST", ""))
.setIfMissing("spark.cassandra.auth.username", envOrElse("FORTIS_CASSANDRA_USER", ""))
.setIfMissing("spark.cassandra.auth.password", envOrElse("FORTIS_CASSANDRA_PASSWORD", ""))
.setIfMissing("spark.cassandra.connection.keep_alive_ms", (batchDuration.milliseconds * 2).toString)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra

import java.time.Instant.now
import java.util.UUID

import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem
import com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra.Utils.{mean, rescale}
import com.microsoft.partnercatalyst.fortis.spark.transforms.gender.GenderDetector.{Female, Male}
import com.microsoft.partnercatalyst.fortis.spark.transforms.sentiment.SentimentDetector.Neutral

case class Sentiment(
pos_avg: Float,
neg_avg: Float)

case class Gender(
male_mentions: Int,
female_mentions: Int)

case class Entities(
name: String,
externalsource: String,
externalrefid: String,
count: Float)

case class Features(
mentions: Int,
sentiment: Sentiment,
gender: Gender,
entities: Set[Entities])

case class Event(
pipeline: String,
externalid: String,
computedfeatures: Features,
detectedkeywords: Set[String],
detectedplaceids: Set[String],
event_time: Long,
eventlangcode: String,
id: UUID,
insertion_time: Long,
messagebody: String,
sourceid: String,
sourceurl: String,
title: String)

object CassandraSchema {
def apply(item: AnalyzedItem): Event = {
Event(
pipeline = item.publisher,
externalid = "", // todo
computedfeatures = getFeature(item),
detectedkeywords = item.analysis.keywords.map(_.name).toSet,
detectedplaceids = item.analysis.locations.map(_.wofId).toSet,
event_time = item.createdAtEpoch,
eventlangcode = item.analysis.language.orNull,
id = item.id,
insertion_time = now.getEpochSecond,
messagebody = item.body,
sourceid = "", // todo
sourceurl = item.sourceUrl,
title = item.title)
}

private def getFeature(item: AnalyzedItem): Features = {
val genderCounts = item.analysis.genders.map(_.name).groupBy(identity).mapValues(_.size)
val entityCounts = item.analysis.entities.map(_.name).groupBy(identity).mapValues(_.size)
val positiveSentiments = item.analysis.sentiments.filter(_ > Neutral)
val negativeSentiments = item.analysis.sentiments.filter(_ < Neutral)
Features(
mentions = -1,
sentiment = Sentiment(
pos_avg = if (positiveSentiments.nonEmpty) mean(rescale(positiveSentiments, 0, 1)).toFloat else -1,
neg_avg = if (negativeSentiments.nonEmpty) mean(rescale(negativeSentiments, 0, 1)).toFloat else -1),
gender = Gender(
male_mentions = genderCounts.getOrElse(Male, -1),
female_mentions = genderCounts.getOrElse(Female, -1)),
entities = entityCounts.map(kv => Entities(
name = kv._1,
count = kv._2,
externalsource = "", // todo
externalrefid = "" // todo
)).toSet)
}
}

object Utils {
def mean(items: List[Double]): Double = {
items.sum / items.length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the length of items is 0? Wouldn't this cause an error?

Copy link
Contributor Author

@c-w c-w Jun 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved error handling in 37c1983.

}

/** @see https://stats.stackexchange.com/a/25897 */
def rescale(items: List[Double], min_new: Double, max_new: Double): List[Double] = {
val min_old = items.min
val max_old = items.max
val coef = (max_new - min_new) / (max_old - min_old)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If max_old == min_old, the denominator would be 0. Maybe put a check for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added error handling in 37c1983.

items.map(v => coef * (v - max_old) + max_new)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra

import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem
import org.apache.spark.streaming.dstream.DStream
import com.datastax.spark.connector.streaming._

object CassandraSink {
def apply(dstream: Option[DStream[AnalyzedItem]], keyspaceName: String, tableName: String): Unit = {
if (dstream.isDefined) {
dstream.get.map(CassandraSchema(_)).saveToCassandra(keyspaceName, tableName)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.gender

object GenderDetector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I suggest adding extends Enumeration here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet. Done in 8162fe3.

val Male = "M"
val Female = "F"
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.microsoft.partnercatalyst.fortis.spark

import scala.util.Properties.{envOrElse, envOrNone}

import org.scalatest.FlatSpec

class IntegrationTestSpec extends FlatSpec {
protected def checkIfShouldRunWithLocalModels(): Option[String] = {
val runIntegrationTests = Option(System.getenv("FORTIS_INTEGRATION_TESTS")).getOrElse("false").toBoolean
val localModels = Option(System.getenv("FORTIS_MODELS_DIRECTORY"))
val runIntegrationTests = envOrElse("FORTIS_INTEGRATION_TESTS", "false").toBoolean
val localModels = envOrNone("FORTIS_MODELS_DIRECTORY")
if (!runIntegrationTests && localModels.isEmpty) {
cancel("Integration tests disabled and no local models available")
}
Expand Down
Loading