This repository has been archived by the owner on Mar 7, 2018. It is now read-only.
/
CassandraEventsSink.scala
126 lines (105 loc) · 5.23 KB
/
CassandraEventsSink.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra
import java.util.UUID
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.writer.WriteConf
import com.microsoft.partnercatalyst.fortis.spark.dba.ConfigurationManager
import com.microsoft.partnercatalyst.fortis.spark.dto.FortisEvent
import com.microsoft.partnercatalyst.fortis.spark.logging.FortisTelemetry.{get => Telemetry}
import com.microsoft.partnercatalyst.fortis.spark.logging.{Loggable, Timer}
import com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra.aggregators._
import com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra.dto._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.dstream.DStream
object CassandraEventsSink extends Loggable {
private val KeyspaceName = "fortis"
private val TableEvent = "events"
private val TableEventTopics = "eventtopics"
private val TableEventPlaces = "eventplaces"
private val TableEventBatches = "eventbatches"
private val CassandraFormat = "org.apache.spark.sql.cassandra"
def apply(dstream: DStream[FortisEvent], sparkSession: SparkSession, configurationManager: ConfigurationManager): Unit = {
implicit lazy val connector: CassandraConnector = CassandraConnector(sparkSession.sparkContext)
dstream
.map(event => event.copy(analysis = event.analysis.copy(
keywords = event.analysis.keywords.distinct,
locations = event.analysis.locations.distinct,
entities = event.analysis.entities.distinct
)))
.foreachRDD { (eventsRDD, _: Time) => {
Timer.time(Telemetry.logSinkPhase("all", _, _, -1)) {
Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) {
eventsRDD.cache()
}
if (!eventsRDD.isEmpty) {
val batchSize = eventsRDD.count()
val batchid = UUID.randomUUID().toString
val fortisEventsRDD = eventsRDD.map(CassandraEventSchema(_, batchid))
Timer.time(Telemetry.logSinkPhase("fortisEventsRDD.cache", _, _, -1)) {
fortisEventsRDD.cache()
}
Timer.time(Telemetry.logSinkPhase("writeEvents", _, _, batchSize)) {
writeFortisEvents(fortisEventsRDD)
}
val offlineAggregators = Seq[OfflineAggregator[_]](
new ConjunctiveTopicsOffineAggregator(configurationManager),
new PopularPlacesOfflineAggregator(configurationManager),
new HeatmapOfflineAggregator(sparkSession, configurationManager)
)
val eventBatchDF = Timer.time(Telemetry.logSinkPhase("fetchEventsByBatchId", _, _, batchSize)) {
fetchEventBatch(batchid, fortisEventsRDD, sparkSession)
}
Timer.time(Telemetry.logSinkPhase("writeTagTables", _, _, batchSize)) {
writeEventBatchToEventTagTables(eventBatchDF, sparkSession)
}
offlineAggregators.foreach(aggregator => {
val aggregatorName = aggregator.getClass.getSimpleName
Timer.time(Telemetry.logSinkPhase(s"offlineAggregators.$aggregatorName", _, _, -1)) {
try {
aggregator.aggregateAndSave(fortisEventsRDD, KeyspaceName)
} catch {
case e: Exception =>
logError(s"Failed performing offline aggregation $aggregatorName", e)
}
}
})
}
}
}}
def writeFortisEvents(events: RDD[Event]): Unit = {
events.saveToCassandra(KeyspaceName, TableEvent, writeConf = WriteConf(ifNotExists = true))
}
def fetchEventBatch(batchid: String, events: RDD[Event], session: SparkSession): Dataset[Event] = {
import session.implicits._
Timer.time(Telemetry.logSinkPhase("addedEventsDF", _, _, -1)) {
val addedEventsDF = session.read.format(CassandraFormat)
.options(Map("keyspace" -> KeyspaceName, "table" -> TableEventBatches))
.load()
addedEventsDF.createOrReplaceTempView(TableEventBatches)
addedEventsDF
}
val filteredEvents = Timer.time(Telemetry.logSinkPhase("filteredEvents", _, _, -1)) {
val ds = session.sql(s"select eventid, pipelinekey from $TableEventBatches where batchid = '$batchid'")
val eventsDS = events.toDF().as[Event]
val filteredEvents = eventsDS.join(ds, Seq("eventid", "pipelinekey")).as[Event]
filteredEvents
}
Timer.time(Telemetry.logSinkPhase("filteredEvents.cache", _, _, -1)) {
filteredEvents.cache()
filteredEvents
}
}
def writeEventBatchToEventTagTables(eventDS: Dataset[Event], session: SparkSession): Unit = {
import session.implicits._
val defaultZoom = configurationManager.fetchSiteSettings(session.sparkContext).defaultzoom
Timer.time(Telemetry.logSinkPhase(s"saveToCassandra-$TableEventTopics", _, _, -1)) {
eventDS.flatMap(CassandraEventTopicSchema(_)).rdd.saveToCassandra(KeyspaceName, TableEventTopics)
}
Timer.time(Telemetry.logSinkPhase(s"saveToCassandra-$TableEventPlaces", _, _, -1)) {
eventDS.flatMap(CassandraEventPlacesSchema(_, defaultZoom)).rdd.saveToCassandra(KeyspaceName, TableEventPlaces)
}
}
}
}