Skip to content
This repository was archived by the owner on Mar 7, 2018. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,47 @@ object CassandraEventsSink{
registerUDFs(sparkSession)

dstream.foreachRDD { (eventsRDD, time: Time) => {
Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) {
eventsRDD.cache()
}

if (!eventsRDD.isEmpty) {
val batchSize = eventsRDD.count()
val batchid = UUID.randomUUID().toString
val fortisEventsRDD = eventsRDD.map(CassandraEventSchema(_, batchid))

Timer.time(Telemetry.logSinkPhase("fortisEventsRDD.cache", _, _, -1)) {
fortisEventsRDD.cache()
Timer.time(Telemetry.logSinkPhase("all", _, _, -1)) {
Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) {
eventsRDD.cache()
}

Timer.time(Telemetry.logSinkPhase("writeEvents", _, _, batchSize)) {
writeFortisEvents(fortisEventsRDD)
}
if (!eventsRDD.isEmpty) {
val batchSize = eventsRDD.count()
val batchid = UUID.randomUUID().toString
val fortisEventsRDD = eventsRDD.map(CassandraEventSchema(_, batchid))

val aggregators = Seq(
new ConjunctiveTopicsAggregator,
new PopularPlacesAggregator,
new PopularTopicAggregator,
new ComputedTilesAggregator
)
Timer.time(Telemetry.logSinkPhase("fortisEventsRDD.cache", _, _, -1)) {
fortisEventsRDD.cache()
}

val eventBatchDF = Timer.time(Telemetry.logSinkPhase("fetchEventsByBatchId", _, _, batchSize)) {
fetchEventBatch(batchid, fortisEventsRDD, sparkSession)
}
Timer.time(Telemetry.logSinkPhase("writeEvents", _, _, batchSize)) {
writeFortisEvents(fortisEventsRDD)
}

Timer.time(Telemetry.logSinkPhase("writeTagTables", _, _, batchSize)) {
writeEventBatchToEventTagTables(eventBatchDF, sparkSession)
}
val aggregators = Seq(
new ConjunctiveTopicsAggregator,
new PopularPlacesAggregator,
new PopularTopicAggregator,
new ComputedTilesAggregator
)

aggregators.foreach(aggregator => {
val eventName = aggregator.FortisTargetTablename
val eventBatchDF = Timer.time(Telemetry.logSinkPhase("fetchEventsByBatchId", _, _, batchSize)) {
fetchEventBatch(batchid, fortisEventsRDD, sparkSession)
}

Timer.time(Telemetry.logSinkPhase(s"aggregate_$eventName", _, _, batchSize)) {
aggregateEventBatch(eventBatchDF, sparkSession, aggregator)
Timer.time(Telemetry.logSinkPhase("writeTagTables", _, _, batchSize)) {
writeEventBatchToEventTagTables(eventBatchDF, sparkSession)
}
})

aggregators.foreach(aggregator => {
val eventName = aggregator.FortisTargetTablename

Timer.time(Telemetry.logSinkPhase(s"aggregate_$eventName", _, _, batchSize)) {
aggregateEventBatch(eventBatchDF, sparkSession, aggregator)
}
})
}
}
}}

Expand Down Expand Up @@ -120,24 +122,24 @@ object CassandraEventsSink{
}

def aggregateEventBatch(eventDS: Dataset[Event], session: SparkSession, aggregator: FortisAggregator): Unit = {
val flattenedDF = Timer.time(Telemetry.logSinkPhase("flattenedDF", _, _, -1)) {
val flattenedDF = Timer.time(Telemetry.logSinkPhase(s"flattenedDF-${aggregator.FortisTargetTablename}", _, _, -1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we're using _ for aggregate above

val flattenedDF = aggregator.flattenEvents(session, eventDS)
flattenedDF.createOrReplaceTempView(aggregator.DfTableNameFlattenedEvents)
flattenedDF
}

val aggregatedDF = Timer.time(Telemetry.logSinkPhase("aggregatedDF", _, _, -1)) {
val aggregatedDF = Timer.time(Telemetry.logSinkPhase(s"aggregatedDF-${aggregator.FortisTargetTablename}", _, _, -1)) {
val aggregatedDF = aggregator.AggregateEventBatches(session, flattenedDF)
aggregatedDF.createOrReplaceTempView(aggregator.DfTableNameComputedAggregates)
aggregatedDF
}

val incrementallyUpdatedDF = Timer.time(Telemetry.logSinkPhase("incrementallyUpdatedDF", _, _, -1)) {
val incrementallyUpdatedDF = Timer.time(Telemetry.logSinkPhase(s"incrementallyUpdatedDF-${aggregator.FortisTargetTablename}", _, _, -1)) {
val incrementallyUpdatedDF = aggregator.IncrementalUpdate(session, aggregatedDF)
incrementallyUpdatedDF
}

Timer.time(Telemetry.logSinkPhase("incrementallyUpdatedDF.write", _, _, -1)) {
Timer.time(Telemetry.logSinkPhase(s"incrementallyUpdatedDF.write-${aggregator.FortisTargetTablename}", _, _, -1)) {
incrementallyUpdatedDF.write
.format(CassandraFormat)
.mode(SaveMode.Append)
Expand Down