From e13eef1295cbfc56b5cb1b5f5cf26f3345ce33af Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Fri, 23 Jun 2017 11:37:56 -0700 Subject: [PATCH] Implement production cassandra schema --- .../fortis/spark/dto/AnalyzedItem.scala | 3 + .../fortis/spark/pipeline/BingPipeline.scala | 2 + .../spark/pipeline/FacebookPipeline.scala | 2 + .../spark/pipeline/InstagramPipeline.scala | 2 + .../fortis/spark/pipeline/RadioPipeline.scala | 2 + .../spark/pipeline/TadawebPipeline.scala | 2 + .../spark/pipeline/TwitterPipeline.scala | 2 + .../sinks/cassandra/CassandraSchema.scala | 96 +++++++++++++++++-- .../fortis/spark/SparkSpec.scala | 5 + .../spark/sinks/cassandra/UtilsSpec.scala | 18 ++++ 10 files changed, 125 insertions(+), 9 deletions(-) create mode 100644 src/test/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/UtilsSpec.scala diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/dto/AnalyzedItem.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/dto/AnalyzedItem.scala index 0657f10..9f5f7c7 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/dto/AnalyzedItem.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/dto/AnalyzedItem.scala @@ -1,6 +1,9 @@ package com.microsoft.partnercatalyst.fortis.spark.dto +import java.util.UUID + case class AnalyzedItem( + id: UUID, createdAtEpoch: Long, body: String, title: String, diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/BingPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/BingPipeline.scala index fd72e60..b37dba2 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/BingPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/BingPipeline.scala @@ -1,6 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline import java.time.Instant.now +import java.util.UUID.randomUUID import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} @@ -17,6 +18,7 @@ object BingPipeline extends Pipeline { private def convertToSchema(stream: DStream[BingPost], transformContext: TransformContext): DStream[AnalyzedItem] = { stream.map(post => AnalyzedItem( + id = randomUUID(), createdAtEpoch = now.getEpochSecond, body = post.snippet, title = post.name, diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/FacebookPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/FacebookPipeline.scala index 976aae2..d661cee 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/FacebookPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/FacebookPipeline.scala @@ -1,6 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline import java.time.Instant.now +import java.util.UUID.randomUUID import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} @@ -19,6 +20,7 @@ object FacebookPipeline extends Pipeline { import transformContext._ stream.map(post => AnalyzedItem( + id = randomUUID(), createdAtEpoch = now.getEpochSecond, body = post.post.getMessage, title = "", diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/InstagramPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/InstagramPipeline.scala index fadf6bb..6e0f492 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/InstagramPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/InstagramPipeline.scala @@ -1,6 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline import java.time.Instant.now +import java.util.UUID.randomUUID import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem @@ -18,6 +19,7 @@ object InstagramPipeline extends Pipeline { // do computer vision analysis val analysis = imageAnalyzer.analyze(instagram.images.standard_resolution.url) AnalyzedItem( + id = randomUUID(), createdAtEpoch = now.getEpochSecond, body = analysis.summary.getOrElse(""), title = instagram.caption.text, diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/RadioPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/RadioPipeline.scala index 9faff3c..66ef82d 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/RadioPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/RadioPipeline.scala @@ -1,6 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline import java.time.Instant.now +import java.util.UUID.randomUUID import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider} @@ -17,6 +18,7 @@ object RadioPipeline extends Pipeline { private def convertToSchema(stream: DStream[RadioTranscription], transformContext: TransformContext): DStream[AnalyzedItem] = { stream.map(transcription => AnalyzedItem( + id = randomUUID(), createdAtEpoch = now.getEpochSecond, body = transcription.text, title = "", diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TadawebPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TadawebPipeline.scala index 8bb0e80..d7917f0 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TadawebPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TadawebPipeline.scala @@ -1,6 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline import java.time.Instant.now +import java.util.UUID.randomUUID import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider} @@ -19,6 +20,7 @@ object TadawebPipeline extends Pipeline { import transformContext._ stream.map(tada => AnalyzedItem( + id = randomUUID(), createdAtEpoch = now.getEpochSecond, body = tada.text, title = tada.title, diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TwitterPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TwitterPipeline.scala index 777016f..9cc8bb0 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TwitterPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TwitterPipeline.scala @@ -1,6 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline import java.time.Instant.now +import java.util.UUID.randomUUID import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider} @@ -19,6 +20,7 @@ object TwitterPipeline extends Pipeline { import transformContext._ stream.map(tweet => AnalyzedItem( + id = randomUUID(), createdAtEpoch = now.getEpochSecond, body = tweet.getText, title = "", diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraSchema.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraSchema.scala index c682dbb..b29c16d 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraSchema.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraSchema.scala @@ -1,17 +1,95 @@ package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra +import java.time.Instant.now +import java.util.UUID + import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem +import com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra.Utils.{mean, rescale} +import com.microsoft.partnercatalyst.fortis.spark.transforms.sentiment.SentimentDetector.Neutral + +case class Sentiment( + pos_avg: Float, + neg_avg: Float) + +case class Gender( + male_mentions: Int, + female_mentions: Int) -// todo: update with real schema once defined on the cassandra db -case class CassandraRow( - created_at: Long, - pipeline: String) +case class Entities( + name: String, + externalsource: String, + externalrefid: String, + count: Float) + +case class Feature( + mentions: Int, + sentiment: Set[Sentiment], + gender: Set[Gender], + entities: Set[Entities]) + +case class Event( + pipeline: String, + externalid: String, + computedfeatures: Set[Feature], + detectedkeywords: Set[String], + detectedplaceids: Set[String], + event_time: Long, + eventlangcode: String, + id: UUID, + insertion_time: Long, + messagebody: String, + sourceid: String, + sourceurl: String, + title: String) object CassandraSchema { - def apply(item: AnalyzedItem): CassandraRow = { - CassandraRow( - created_at = item.createdAtEpoch, - pipeline = item.publisher - ) + def apply(item: AnalyzedItem): Event = { + Event( + pipeline = item.publisher, + externalid = null, // todo + computedfeatures = Set(getFeature(item)), + detectedkeywords = item.analysis.keywords.map(_.name).toSet, + detectedplaceids = item.analysis.locations.map(_.wofId).toSet, + event_time = item.createdAtEpoch, + eventlangcode = item.analysis.language.orNull, + id = item.id, + insertion_time = now.getEpochSecond, + messagebody = item.body, + sourceid = null, // todo + sourceurl = item.sourceUrl, + title = item.title) + } + + private def getFeature(item: AnalyzedItem): Feature = { + val genderCounts = item.analysis.genders.map(_.name).groupBy(identity).mapValues(_.size) + val entityCounts = item.analysis.entities.map(_.name).groupBy(identity).mapValues(_.size) + val (positiveSentiments, negativeSentiments) = item.analysis.sentiments.partition(_ > Neutral) + Feature( + mentions = -1, + sentiment = Set(Sentiment( + pos_avg = mean(rescale(positiveSentiments, 0, 1)).toFloat, + neg_avg = mean(rescale(negativeSentiments, 0, 1)).toFloat)), + gender = Set(Gender( + male_mentions = genderCounts("M"), + female_mentions = genderCounts("F"))), + entities = entityCounts.map(kv => Entities( + name = kv._1, + count = kv._2, + externalsource = null, + externalrefid = null)).toSet) + } +} + +object Utils { + def mean(items: List[Double]): Double = { + items.sum / items.length + } + + /** @see https://stats.stackexchange.com/a/25897 */ + def rescale(items: List[Double], min_new: Double, max_new: Double): List[Double] = { + val min_old = items.min + val max_old = items.max + val coef = (max_new - min_new) / (max_old - min_old) + items.map(v => coef * (v - max_old) + max_new) } } diff --git a/src/test/scala/com/microsoft/partnercatalyst/fortis/spark/SparkSpec.scala b/src/test/scala/com/microsoft/partnercatalyst/fortis/spark/SparkSpec.scala index 2bdcdf1..0768c02 100644 --- a/src/test/scala/com/microsoft/partnercatalyst/fortis/spark/SparkSpec.scala +++ b/src/test/scala/com/microsoft/partnercatalyst/fortis/spark/SparkSpec.scala @@ -1,6 +1,8 @@ package com.microsoft.partnercatalyst.fortis.spark import java.time.Instant +import java.util.UUID +import java.util.UUID.randomUUID import com.datastax.spark.connector._ import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} @@ -128,6 +130,7 @@ class SparkSpec extends FlatSpec with BeforeAndAfter { val rdds = mutable.Queue[RDD[AnalyzedItem]]() rdds += sc.makeRDD(Seq( AnalyzedItem( + id = randomUUID(), createdAtEpoch = Instant.now.getEpochSecond, body = "body-1", title = "title-1", @@ -136,6 +139,7 @@ class SparkSpec extends FlatSpec with BeforeAndAfter { analysis = Analysis()))) rdds += sc.makeRDD(Seq( AnalyzedItem( + id = randomUUID(), createdAtEpoch = Instant.now.getEpochSecond, body = "body-2", title = "title-2", @@ -143,6 +147,7 @@ class SparkSpec extends FlatSpec with BeforeAndAfter { sourceUrl = "sourceUrl-2", analysis = Analysis()), AnalyzedItem( + id = randomUUID(), createdAtEpoch = Instant.now.getEpochSecond, body = "body-3", title = "title-3", diff --git a/src/test/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/UtilsSpec.scala b/src/test/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/UtilsSpec.scala new file mode 100644 index 0000000..84f3576 --- /dev/null +++ b/src/test/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/UtilsSpec.scala @@ -0,0 +1,18 @@ +package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra + +import java.text.DecimalFormat + +import org.scalatest.FlatSpec + +class UtilsSpec extends FlatSpec { + "The mean function" should "compute the mean" in { + assert(Utils.mean(List(1, 2, 3, 4)) == 2.5) + assert(Utils.mean(List(1.2, -0.2, 1.0, 2)) == 1.0) + } + + "The rescale function" should "rescale numbers" in { + val formatter = new DecimalFormat("#.#") + assert(Utils.rescale(List(10, 20, 30), 1, 3) == List(1, 2, 3)) + assert(Utils.rescale(List(0.2, 0.4, 0.6), 0, 1).map(formatter.format) == List("0", "0.5", "1")) + } +} \ No newline at end of file