diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraEventsSink.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraEventsSink.scala index 1793490..253644d 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraEventsSink.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraEventsSink.scala @@ -30,45 +30,47 @@ object CassandraEventsSink{ registerUDFs(sparkSession) dstream.foreachRDD { (eventsRDD, time: Time) => { - Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) { - eventsRDD.cache() - } - - if (!eventsRDD.isEmpty) { - val batchSize = eventsRDD.count() - val batchid = UUID.randomUUID().toString - val fortisEventsRDD = eventsRDD.map(CassandraEventSchema(_, batchid)) - - Timer.time(Telemetry.logSinkPhase("fortisEventsRDD.cache", _, _, -1)) { - fortisEventsRDD.cache() + Timer.time(Telemetry.logSinkPhase("all", _, _, -1)) { + Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) { + eventsRDD.cache() } - Timer.time(Telemetry.logSinkPhase("writeEvents", _, _, batchSize)) { - writeFortisEvents(fortisEventsRDD) - } + if (!eventsRDD.isEmpty) { + val batchSize = eventsRDD.count() + val batchid = UUID.randomUUID().toString + val fortisEventsRDD = eventsRDD.map(CassandraEventSchema(_, batchid)) - val aggregators = Seq( - new ConjunctiveTopicsAggregator, - new PopularPlacesAggregator, - new PopularTopicAggregator, - new ComputedTilesAggregator - ) + Timer.time(Telemetry.logSinkPhase("fortisEventsRDD.cache", _, _, -1)) { + fortisEventsRDD.cache() + } - val eventBatchDF = Timer.time(Telemetry.logSinkPhase("fetchEventsByBatchId", _, _, batchSize)) { - fetchEventBatch(batchid, fortisEventsRDD, sparkSession) - } + Timer.time(Telemetry.logSinkPhase("writeEvents", _, _, batchSize)) { + writeFortisEvents(fortisEventsRDD) + } - Timer.time(Telemetry.logSinkPhase("writeTagTables", _, _, batchSize)) { - writeEventBatchToEventTagTables(eventBatchDF, sparkSession) - } + val aggregators = Seq( + new ConjunctiveTopicsAggregator, + new PopularPlacesAggregator, + new PopularTopicAggregator, + new ComputedTilesAggregator + ) - aggregators.foreach(aggregator => { - val eventName = aggregator.FortisTargetTablename + val eventBatchDF = Timer.time(Telemetry.logSinkPhase("fetchEventsByBatchId", _, _, batchSize)) { + fetchEventBatch(batchid, fortisEventsRDD, sparkSession) + } - Timer.time(Telemetry.logSinkPhase(s"aggregate_$eventName", _, _, batchSize)) { - aggregateEventBatch(eventBatchDF, sparkSession, aggregator) + Timer.time(Telemetry.logSinkPhase("writeTagTables", _, _, batchSize)) { + writeEventBatchToEventTagTables(eventBatchDF, sparkSession) } - }) + + aggregators.foreach(aggregator => { + val eventName = aggregator.FortisTargetTablename + + Timer.time(Telemetry.logSinkPhase(s"aggregate_$eventName", _, _, batchSize)) { + aggregateEventBatch(eventBatchDF, sparkSession, aggregator) + } + }) + } } }} @@ -120,24 +122,24 @@ object CassandraEventsSink{ } def aggregateEventBatch(eventDS: Dataset[Event], session: SparkSession, aggregator: FortisAggregator): Unit = { - val flattenedDF = Timer.time(Telemetry.logSinkPhase("flattenedDF", _, _, -1)) { + val flattenedDF = Timer.time(Telemetry.logSinkPhase(s"flattenedDF-${aggregator.FortisTargetTablename}", _, _, -1)) { val flattenedDF = aggregator.flattenEvents(session, eventDS) flattenedDF.createOrReplaceTempView(aggregator.DfTableNameFlattenedEvents) flattenedDF } - val aggregatedDF = Timer.time(Telemetry.logSinkPhase("aggregatedDF", _, _, -1)) { + val aggregatedDF = Timer.time(Telemetry.logSinkPhase(s"aggregatedDF-${aggregator.FortisTargetTablename}", _, _, -1)) { val aggregatedDF = aggregator.AggregateEventBatches(session, flattenedDF) aggregatedDF.createOrReplaceTempView(aggregator.DfTableNameComputedAggregates) aggregatedDF } - val incrementallyUpdatedDF = Timer.time(Telemetry.logSinkPhase("incrementallyUpdatedDF", _, _, -1)) { + val incrementallyUpdatedDF = Timer.time(Telemetry.logSinkPhase(s"incrementallyUpdatedDF-${aggregator.FortisTargetTablename}", _, _, -1)) { val incrementallyUpdatedDF = aggregator.IncrementalUpdate(session, aggregatedDF) incrementallyUpdatedDF } - Timer.time(Telemetry.logSinkPhase("incrementallyUpdatedDF.write", _, _, -1)) { + Timer.time(Telemetry.logSinkPhase(s"incrementallyUpdatedDF.write-${aggregator.FortisTargetTablename}", _, _, -1)) { incrementallyUpdatedDF.write .format(CassandraFormat) .mode(SaveMode.Append)