Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Merge pull request #112 from CatalystCode/more-logging
Browse files Browse the repository at this point in the history
Add granularity to sub-metrics, add overall metric
  • Loading branch information
c-w committed Aug 25, 2017
2 parents c27122c + 444e13a commit 221c653
Showing 1 changed file with 37 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,47 @@ object CassandraEventsSink{
registerUDFs(sparkSession)

dstream.foreachRDD { (eventsRDD, time: Time) => {
Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) {
eventsRDD.cache()
}

if (!eventsRDD.isEmpty) {
val batchSize = eventsRDD.count()
val batchid = UUID.randomUUID().toString
val fortisEventsRDD = eventsRDD.map(CassandraEventSchema(_, batchid))

Timer.time(Telemetry.logSinkPhase("fortisEventsRDD.cache", _, _, -1)) {
fortisEventsRDD.cache()
Timer.time(Telemetry.logSinkPhase("all", _, _, -1)) {
Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) {
eventsRDD.cache()
}

Timer.time(Telemetry.logSinkPhase("writeEvents", _, _, batchSize)) {
writeFortisEvents(fortisEventsRDD)
}
if (!eventsRDD.isEmpty) {
val batchSize = eventsRDD.count()
val batchid = UUID.randomUUID().toString
val fortisEventsRDD = eventsRDD.map(CassandraEventSchema(_, batchid))

val aggregators = Seq(
new ConjunctiveTopicsAggregator,
new PopularPlacesAggregator,
new PopularTopicAggregator,
new ComputedTilesAggregator
)
Timer.time(Telemetry.logSinkPhase("fortisEventsRDD.cache", _, _, -1)) {
fortisEventsRDD.cache()
}

val eventBatchDF = Timer.time(Telemetry.logSinkPhase("fetchEventsByBatchId", _, _, batchSize)) {
fetchEventBatch(batchid, fortisEventsRDD, sparkSession)
}
Timer.time(Telemetry.logSinkPhase("writeEvents", _, _, batchSize)) {
writeFortisEvents(fortisEventsRDD)
}

Timer.time(Telemetry.logSinkPhase("writeTagTables", _, _, batchSize)) {
writeEventBatchToEventTagTables(eventBatchDF, sparkSession)
}
val aggregators = Seq(
new ConjunctiveTopicsAggregator,
new PopularPlacesAggregator,
new PopularTopicAggregator,
new ComputedTilesAggregator
)

aggregators.foreach(aggregator => {
val eventName = aggregator.FortisTargetTablename
val eventBatchDF = Timer.time(Telemetry.logSinkPhase("fetchEventsByBatchId", _, _, batchSize)) {
fetchEventBatch(batchid, fortisEventsRDD, sparkSession)
}

Timer.time(Telemetry.logSinkPhase(s"aggregate_$eventName", _, _, batchSize)) {
aggregateEventBatch(eventBatchDF, sparkSession, aggregator)
Timer.time(Telemetry.logSinkPhase("writeTagTables", _, _, batchSize)) {
writeEventBatchToEventTagTables(eventBatchDF, sparkSession)
}
})

aggregators.foreach(aggregator => {
val eventName = aggregator.FortisTargetTablename

Timer.time(Telemetry.logSinkPhase(s"aggregate_$eventName", _, _, batchSize)) {
aggregateEventBatch(eventBatchDF, sparkSession, aggregator)
}
})
}
}
}}

Expand Down Expand Up @@ -120,24 +122,24 @@ object CassandraEventsSink{
}

def aggregateEventBatch(eventDS: Dataset[Event], session: SparkSession, aggregator: FortisAggregator): Unit = {
val flattenedDF = Timer.time(Telemetry.logSinkPhase("flattenedDF", _, _, -1)) {
val flattenedDF = Timer.time(Telemetry.logSinkPhase(s"flattenedDF-${aggregator.FortisTargetTablename}", _, _, -1)) {
val flattenedDF = aggregator.flattenEvents(session, eventDS)
flattenedDF.createOrReplaceTempView(aggregator.DfTableNameFlattenedEvents)
flattenedDF
}

val aggregatedDF = Timer.time(Telemetry.logSinkPhase("aggregatedDF", _, _, -1)) {
val aggregatedDF = Timer.time(Telemetry.logSinkPhase(s"aggregatedDF-${aggregator.FortisTargetTablename}", _, _, -1)) {
val aggregatedDF = aggregator.AggregateEventBatches(session, flattenedDF)
aggregatedDF.createOrReplaceTempView(aggregator.DfTableNameComputedAggregates)
aggregatedDF
}

val incrementallyUpdatedDF = Timer.time(Telemetry.logSinkPhase("incrementallyUpdatedDF", _, _, -1)) {
val incrementallyUpdatedDF = Timer.time(Telemetry.logSinkPhase(s"incrementallyUpdatedDF-${aggregator.FortisTargetTablename}", _, _, -1)) {
val incrementallyUpdatedDF = aggregator.IncrementalUpdate(session, aggregatedDF)
incrementallyUpdatedDF
}

Timer.time(Telemetry.logSinkPhase("incrementallyUpdatedDF.write", _, _, -1)) {
Timer.time(Telemetry.logSinkPhase(s"incrementallyUpdatedDF.write-${aggregator.FortisTargetTablename}", _, _, -1)) {
incrementallyUpdatedDF.write
.format(CassandraFormat)
.mode(SaveMode.Append)
Expand Down

0 comments on commit 221c653

Please sign in to comment.