This repository has been archived by the owner on Mar 7, 2018. It is now read-only.
/
TadawebPipeline.scala
41 lines (36 loc) · 1.82 KB
/
TadawebPipeline.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.microsoft.partnercatalyst.fortis.spark.pipeline
import java.time.Instant.now
import java.util.UUID.randomUUID
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
import com.microsoft.partnercatalyst.fortis.spark.tadaweb.dto.TadawebEvent
import com.microsoft.partnercatalyst.fortis.spark.transforms.sentiment.SentimentDetector
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
object TadawebPipeline extends Pipeline {
override def apply(streamProvider: StreamProvider, streamRegistry: Map[String, List[ConnectorConfig]], ssc: StreamingContext, transformContext: TransformContext): Option[DStream[AnalyzedItem]] = {
streamProvider.buildStream[TadawebEvent](ssc, streamRegistry("tadaweb")).map(stream =>
TextPipeline(convertToSchema(stream, transformContext), transformContext))
}
private def convertToSchema(stream: DStream[TadawebEvent], transformContext: TransformContext): DStream[AnalyzedItem] = {
import transformContext._
stream.map(tada => AnalyzedItem(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = tada.text,
title = tada.title,
publisher = "TadaWeb",
sourceUrl = tada.tada.name,
sharedLocations = tada.cities.flatMap(city => city.coordinates match {
case Seq(latitude, longitude) => locationsExtractor.fetch(latitude, longitude)
case _ => None
}).toList,
analysis = Analysis(
sentiments = tada.sentiment match {
case "negative" => List(SentimentDetector.Negative)
case "neutral" => List(SentimentDetector.Neutral)
case "positive" => List(SentimentDetector.Positive)
case _ => List()}
)))
}
}