Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Implement production cassandra schema
Browse files Browse the repository at this point in the history
  • Loading branch information
c-w committed Jun 23, 2017
1 parent fb867d9 commit e13eef1
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 9 deletions.
@@ -1,6 +1,9 @@
package com.microsoft.partnercatalyst.fortis.spark.dto

import java.util.UUID

case class AnalyzedItem(
id: UUID,
createdAtEpoch: Long,
body: String,
title: String,
Expand Down
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
Expand All @@ -17,6 +18,7 @@ object BingPipeline extends Pipeline {

private def convertToSchema(stream: DStream[BingPost], transformContext: TransformContext): DStream[AnalyzedItem] = {
stream.map(post => AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = post.snippet,
title = post.name,
Expand Down
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
Expand All @@ -19,6 +20,7 @@ object FacebookPipeline extends Pipeline {
import transformContext._

stream.map(post => AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = post.post.getMessage,
title = "",
Expand Down
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem
Expand All @@ -18,6 +19,7 @@ object InstagramPipeline extends Pipeline {
// do computer vision analysis
val analysis = imageAnalyzer.analyze(instagram.images.standard_resolution.url)
AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = analysis.summary.getOrElse(""),
title = instagram.caption.text,
Expand Down
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -17,6 +18,7 @@ object RadioPipeline extends Pipeline {

private def convertToSchema(stream: DStream[RadioTranscription], transformContext: TransformContext): DStream[AnalyzedItem] = {
stream.map(transcription => AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = transcription.text,
title = "",
Expand Down
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -19,6 +20,7 @@ object TadawebPipeline extends Pipeline {
import transformContext._

stream.map(tada => AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = tada.text,
title = tada.title,
Expand Down
@@ -1,6 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -19,6 +20,7 @@ object TwitterPipeline extends Pipeline {
import transformContext._

stream.map(tweet => AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = tweet.getText,
title = "",
Expand Down
@@ -1,17 +1,95 @@
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra

import java.time.Instant.now
import java.util.UUID

import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem
import com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra.Utils.{mean, rescale}
import com.microsoft.partnercatalyst.fortis.spark.transforms.sentiment.SentimentDetector.Neutral

case class Sentiment(
pos_avg: Float,
neg_avg: Float)

case class Gender(
male_mentions: Int,
female_mentions: Int)

// todo: update with real schema once defined on the cassandra db
case class CassandraRow(
created_at: Long,
pipeline: String)
case class Entities(
name: String,
externalsource: String,
externalrefid: String,
count: Float)

case class Feature(
mentions: Int,
sentiment: Set[Sentiment],
gender: Set[Gender],
entities: Set[Entities])

case class Event(
pipeline: String,
externalid: String,
computedfeatures: Set[Feature],
detectedkeywords: Set[String],
detectedplaceids: Set[String],
event_time: Long,
eventlangcode: String,
id: UUID,
insertion_time: Long,
messagebody: String,
sourceid: String,
sourceurl: String,
title: String)

object CassandraSchema {
def apply(item: AnalyzedItem): CassandraRow = {
CassandraRow(
created_at = item.createdAtEpoch,
pipeline = item.publisher
)
def apply(item: AnalyzedItem): Event = {
Event(
pipeline = item.publisher,
externalid = null, // todo
computedfeatures = Set(getFeature(item)),
detectedkeywords = item.analysis.keywords.map(_.name).toSet,
detectedplaceids = item.analysis.locations.map(_.wofId).toSet,
event_time = item.createdAtEpoch,
eventlangcode = item.analysis.language.orNull,
id = item.id,
insertion_time = now.getEpochSecond,
messagebody = item.body,
sourceid = null, // todo
sourceurl = item.sourceUrl,
title = item.title)
}

private def getFeature(item: AnalyzedItem): Feature = {
val genderCounts = item.analysis.genders.map(_.name).groupBy(identity).mapValues(_.size)
val entityCounts = item.analysis.entities.map(_.name).groupBy(identity).mapValues(_.size)
val (positiveSentiments, negativeSentiments) = item.analysis.sentiments.partition(_ > Neutral)
Feature(
mentions = -1,
sentiment = Set(Sentiment(
pos_avg = mean(rescale(positiveSentiments, 0, 1)).toFloat,
neg_avg = mean(rescale(negativeSentiments, 0, 1)).toFloat)),
gender = Set(Gender(
male_mentions = genderCounts("M"),
female_mentions = genderCounts("F"))),
entities = entityCounts.map(kv => Entities(
name = kv._1,
count = kv._2,
externalsource = null,
externalrefid = null)).toSet)
}
}

object Utils {
def mean(items: List[Double]): Double = {
items.sum / items.length
}

/** @see https://stats.stackexchange.com/a/25897 */
def rescale(items: List[Double], min_new: Double, max_new: Double): List[Double] = {
val min_old = items.min
val max_old = items.max
val coef = (max_new - min_new) / (max_old - min_old)
items.map(v => coef * (v - max_old) + max_new)
}
}
@@ -1,6 +1,8 @@
package com.microsoft.partnercatalyst.fortis.spark

import java.time.Instant
import java.util.UUID
import java.util.UUID.randomUUID

import com.datastax.spark.connector._
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
Expand Down Expand Up @@ -128,6 +130,7 @@ class SparkSpec extends FlatSpec with BeforeAndAfter {
val rdds = mutable.Queue[RDD[AnalyzedItem]]()
rdds += sc.makeRDD(Seq(
AnalyzedItem(
id = randomUUID(),
createdAtEpoch = Instant.now.getEpochSecond,
body = "body-1",
title = "title-1",
Expand All @@ -136,13 +139,15 @@ class SparkSpec extends FlatSpec with BeforeAndAfter {
analysis = Analysis())))
rdds += sc.makeRDD(Seq(
AnalyzedItem(
id = randomUUID(),
createdAtEpoch = Instant.now.getEpochSecond,
body = "body-2",
title = "title-2",
publisher = "publisher-2",
sourceUrl = "sourceUrl-2",
analysis = Analysis()),
AnalyzedItem(
id = randomUUID(),
createdAtEpoch = Instant.now.getEpochSecond,
body = "body-3",
title = "title-3",
Expand Down
@@ -0,0 +1,18 @@
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra

import java.text.DecimalFormat

import org.scalatest.FlatSpec

class UtilsSpec extends FlatSpec {
"The mean function" should "compute the mean" in {
assert(Utils.mean(List(1, 2, 3, 4)) == 2.5)
assert(Utils.mean(List(1.2, -0.2, 1.0, 2)) == 1.0)
}

"The rescale function" should "rescale numbers" in {
val formatter = new DecimalFormat("#.#")
assert(Utils.rescale(List(10, 20, 30), 1, 3) == List(1, 2, 3))
assert(Utils.rescale(List(0.2, 0.4, 0.6), 0, 1).map(formatter.format) == List("0", "0.5", "1"))
}
}

0 comments on commit e13eef1

Please sign in to comment.