Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Add more logging to cassandra sink #110

Merged
merged 1 commit into from Aug 25, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -29,15 +29,19 @@ object CassandraEventsSink{

registerUDFs(sparkSession)

dstream.foreachRDD{ (eventsRDD, time: Time) => {
eventsRDD.cache()
dstream.foreachRDD { (eventsRDD, time: Time) => {
Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) {
eventsRDD.cache()
}

if (!eventsRDD.isEmpty) {
val batchSize = eventsRDD.count()
val batchid = UUID.randomUUID().toString
val fortisEventsRDD = eventsRDD.map(CassandraEventSchema(_, batchid))

fortisEventsRDD.cache()
Timer.time(Telemetry.logSinkPhase("fortisEventsRDD.cache", _, _, -1)) {
fortisEventsRDD.cache()
}

Timer.time(Telemetry.logSinkPhase("writeEvents", _, _, batchSize)) {
writeFortisEvents(fortisEventsRDD)
Expand Down Expand Up @@ -82,37 +86,63 @@ object CassandraEventsSink{
def fetchEventBatch(batchid: String, events: RDD[Event], session: SparkSession): Dataset[Event] = {
import session.implicits._

val addedEventsDF = session.read.format(CassandraFormat)
.options(Map("keyspace" -> KeyspaceName, "table" -> TableEventBatches))
.load()
Timer.time(Telemetry.logSinkPhase("addedEventsDF", _, _, -1)) {
val addedEventsDF = session.read.format(CassandraFormat)
.options(Map("keyspace" -> KeyspaceName, "table" -> TableEventBatches))
.load()
addedEventsDF.createOrReplaceTempView(TableEventBatches)
addedEventsDF
}

addedEventsDF.createOrReplaceTempView(TableEventBatches)
val ds = session.sql(s"select eventid, pipelinekey from $TableEventBatches where batchid = '$batchid'")
val eventsDS = events.toDF().as[Event]
val filteredEvents = eventsDS.join(ds, Seq("eventid", "pipelinekey")).as[Event]
val filteredEvents = Timer.time(Telemetry.logSinkPhase("filteredEvents", _, _, -1)) {
val ds = session.sql(s"select eventid, pipelinekey from $TableEventBatches where batchid = '$batchid'")
val eventsDS = events.toDF().as[Event]
val filteredEvents = eventsDS.join(ds, Seq("eventid", "pipelinekey")).as[Event]
filteredEvents
}

filteredEvents.cache()
filteredEvents
Timer.time(Telemetry.logSinkPhase("filteredEvents.cache", _, _, -1)) {
filteredEvents.cache()
filteredEvents
}
}

def writeEventBatchToEventTagTables(eventDS: Dataset[Event], session: SparkSession): Unit = {
import session.implicits._
eventDS.flatMap(CassandraEventTopicSchema(_)).rdd.saveToCassandra(KeyspaceName, TableEventTopics)
eventDS.flatMap(CassandraEventPlacesSchema(_)).rdd.saveToCassandra(KeyspaceName, TableEventPlaces)

Timer.time(Telemetry.logSinkPhase(s"saveToCassandra-$TableEventTopics", _, _, -1)) {
eventDS.flatMap(CassandraEventTopicSchema(_)).rdd.saveToCassandra(KeyspaceName, TableEventTopics)
}

Timer.time(Telemetry.logSinkPhase(s"saveToCassandra-$TableEventPlaces", _, _, -1)) {
eventDS.flatMap(CassandraEventPlacesSchema(_)).rdd.saveToCassandra(KeyspaceName, TableEventPlaces)
}
}

def aggregateEventBatch(eventDS: Dataset[Event], session: SparkSession, aggregator: FortisAggregator): Unit = {
val flattenedDF = aggregator.flattenEvents(session, eventDS)
flattenedDF.createOrReplaceTempView(aggregator.DfTableNameFlattenedEvents)
val flattenedDF = Timer.time(Telemetry.logSinkPhase("flattenedDF", _, _, -1)) {
val flattenedDF = aggregator.flattenEvents(session, eventDS)
flattenedDF.createOrReplaceTempView(aggregator.DfTableNameFlattenedEvents)
flattenedDF
}

val aggregatedDF = aggregator.AggregateEventBatches(session, flattenedDF)
aggregatedDF.createOrReplaceTempView(aggregator.DfTableNameComputedAggregates)
val aggregatedDF = Timer.time(Telemetry.logSinkPhase("aggregatedDF", _, _, -1)) {
val aggregatedDF = aggregator.AggregateEventBatches(session, flattenedDF)
aggregatedDF.createOrReplaceTempView(aggregator.DfTableNameComputedAggregates)
aggregatedDF
}

val incrementallyUpdatedDF = aggregator.IncrementalUpdate(session, aggregatedDF)
incrementallyUpdatedDF.write
.format(CassandraFormat)
.mode(SaveMode.Append)
.options(Map("keyspace" -> KeyspaceName, "table" -> aggregator.FortisTargetTablename)).save
val incrementallyUpdatedDF = Timer.time(Telemetry.logSinkPhase("incrementallyUpdatedDF", _, _, -1)) {
val incrementallyUpdatedDF = aggregator.IncrementalUpdate(session, aggregatedDF)
incrementallyUpdatedDF
}

Timer.time(Telemetry.logSinkPhase("incrementallyUpdatedDF.write", _, _, -1)) {
incrementallyUpdatedDF.write
.format(CassandraFormat)
.mode(SaveMode.Append)
.options(Map("keyspace" -> KeyspaceName, "table" -> aggregator.FortisTargetTablename)).save
}
}
}
}