Skip to content

Commit

Permalink
Add streaming metrics (#95)
Browse files Browse the repository at this point in the history
* Add streaming metrics

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

* Register source on spark conf

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

* Validate that rowsAfterValidation is not empty

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>
  • Loading branch information
khorshuheng committed Oct 4, 2021
1 parent f347686 commit e55a145
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 14 deletions.
10 changes: 10 additions & 0 deletions spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ object BasePipeline {
case None => ()
}

(jobConfig.metrics, jobConfig.mode) match {
case (Some(_), Modes.Online) =>
conf
.set(
"spark.metrics.conf.*.source.jvm.class",
"org.apache.spark.metrics.source.StreamingMetricSource"
)
case (_, _) => ()
}

jobConfig.stencilURL match {
case Some(url: String) =>
conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
*/
package feast.ingestion

import java.io.File
import java.util.concurrent.TimeUnit

import feast.ingestion.metrics.IngestionPipelineMetrics
import feast.ingestion.metrics.{IngestionPipelineMetrics, StreamingMetrics}
import feast.ingestion.registry.proto.ProtoRegistryFactory
import feast.ingestion.utils.ProtoReflection
import feast.ingestion.utils.testing.MemoryStreamingSource
Expand All @@ -32,11 +29,15 @@ import org.apache.spark.sql.avro.functions.from_avro
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
import org.apache.spark.sql.functions.{expr, lit, struct, udf, coalesce}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryListener}
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.{SparkEnv, SparkFiles}

import java.io.File
import java.sql.Timestamp
import java.util.concurrent.TimeUnit

/**
* Streaming pipeline (currently in micro-batches mode only, since we need to have multiple sinks: redis & deadletters).
* Flow:
Expand All @@ -56,8 +57,20 @@ object StreamingPipeline extends BasePipeline with Serializable {
val featureTable = config.featureTable
val projection =
BasePipeline.inputProjection(config.source, featureTable.features, featureTable.entities)
val rowValidator = new RowValidator(featureTable, config.source.eventTimestampColumn)
val metrics = new IngestionPipelineMetrics
val rowValidator = new RowValidator(featureTable, config.source.eventTimestampColumn)
val metrics = new IngestionPipelineMetrics
val streamingMetrics = new StreamingMetrics

sparkSession.streams.addListener(new StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = ()

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
streamingMetrics.updateStreamingProgress(event.progress)
}

override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = ()
})

val validationUDF = createValidationUDF(sparkSession, config)

val input = config.source match {
Expand All @@ -71,20 +84,29 @@ object StreamingPipeline extends BasePipeline with Serializable {
source.read
}

val parsed = config.source.asInstanceOf[StreamingSource].format match {
val featureStruct = config.source.asInstanceOf[StreamingSource].format match {
case ProtoFormat(classPath) =>
val parser = protoParser(sparkSession, classPath)
input.withColumn("features", parser($"value"))
parser($"value")
case AvroFormat(schemaJson) =>
input.select(from_avro($"value", schemaJson).alias("features"))
from_avro($"value", schemaJson)
case _ =>
val columns = input.columns.map(input(_))
input.select(struct(columns: _*).alias("features"))
struct(columns: _*)
}

val metadata: Array[Column] = config.source match {
case _: KafkaSource =>
Array(col("timestamp"))
case _ => Array()
}

val parsed = input
.withColumn("features", featureStruct)
.select(metadata :+ col("features.*"): _*)

val projected = parsed
.select("features.*")
.select(projection: _*)
.select(projection ++ metadata: _*)

TypeCheck.allTypesMatch(projected.schema, featureTable) match {
case Some(error) =>
Expand All @@ -107,9 +129,12 @@ object StreamingPipeline extends BasePipeline with Serializable {

implicit val rowEncoder: Encoder[Row] = RowEncoder(rowsAfterValidation.schema)

val metadataColName: Array[String] = metadata.map(_.toString)

rowsAfterValidation
.map(metrics.incrementRead)
.filter(if (config.doNotIngestInvalidRows) expr("_isValid") else rowValidator.allChecks)
.drop(metadataColName: _*)
.write
.format(config.store match {
case _: RedisConfig => "feast.ingestion.stores.redis"
Expand All @@ -124,6 +149,24 @@ object StreamingPipeline extends BasePipeline with Serializable {
.option("entity_repartition", "false")
.save()

config.source match {
case _: KafkaSource =>
val timestamp: Option[Timestamp] = if (rowsAfterValidation.isEmpty) {
None
} else {
Option(
rowsAfterValidation
.agg(max("timestamp") as "latest_timestamp")
.collect()(0)
.getTimestamp(0)
)
}
timestamp.foreach { t =>
streamingMetrics.updateKafkaTimestamp(t.getTime)
}
case _ => ()
}

config.deadLetterPath match {
case Some(path) =>
rowsAfterValidation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.metrics

import org.apache.spark.SparkEnv
import org.apache.spark.metrics.source.StreamingMetricSource
import org.apache.spark.sql.streaming.StreamingQueryProgress

class StreamingMetrics extends Serializable {

private val metricSource: Option[StreamingMetricSource] = {
val metricsSystem = SparkEnv.get.metricsSystem

metricsSystem.getSourcesByName(StreamingMetricSource.sourceName) match {
case Seq(head) => Some(head.asInstanceOf[StreamingMetricSource])
case _ => None
}
}

def updateStreamingProgress(
progress: StreamingQueryProgress
): Unit = {
metricSource.foreach(_.updateStreamingProgress(progress))
}

def updateKafkaTimestamp(timestamp: Long): Unit = {
metricSource.foreach(_.updateKafkaTimestamp(timestamp))
}
}

private object StreamingMetricsLock
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.metrics

import com.codahale.metrics.Gauge

import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}

class AtomicLongGauge(initialValue: Long = 0L) extends Gauge[Long] {
val value = new AtomicLong(initialValue)
override def getValue: Long = value.get()
}

class AtomicIntegerGauge(initialValue: Int = 0) extends Gauge[Int] {
val value = new AtomicInteger(initialValue)
override def getValue: Int = value.get()
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,12 @@ class BaseMetricSource extends Source {
s"$name#$metricLabels"
}
}

protected def gaugeWithLabels(name: String) = {
if (metricLabels.isEmpty) {
name
} else {
s"$name#$metricLabels"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.metrics.source

import org.apache.spark.metrics.AtomicLongGauge
import org.apache.spark.sql.streaming.StreamingQueryProgress

class StreamingMetricSource extends BaseMetricSource {
override val sourceName: String = StreamingMetricSource.sourceName

private val BATCH_DURATION_GAUGE =
metricRegistry.register(gaugeWithLabels("batch_duration_ms"), new AtomicLongGauge())
private val PROCESSED_ROWS_PER_SECOND_GAUGE =
metricRegistry.register(gaugeWithLabels("input_rows_per_second"), new AtomicLongGauge())
private val INPUT_ROWS_PER_SECOND_GAUGE =
metricRegistry.register(gaugeWithLabels("processed_rows_per_second"), new AtomicLongGauge())
private val LAST_CONSUMED_KAFKA_TIMESTAMP_GAUGE =
metricRegistry.register(gaugeWithLabels("last_consumed_kafka_timestamp"), new AtomicLongGauge())

def updateStreamingProgress(progress: StreamingQueryProgress): Unit = {
BATCH_DURATION_GAUGE.value.set(progress.batchDuration)
INPUT_ROWS_PER_SECOND_GAUGE.value.set(progress.inputRowsPerSecond.toLong)
PROCESSED_ROWS_PER_SECOND_GAUGE.value.set(progress.processedRowsPerSecond.toLong)
}

def updateKafkaTimestamp(timestamp: Long): Unit = {
LAST_CONSUMED_KAFKA_TIMESTAMP_GAUGE.value.set(timestamp)
}
}

object StreamingMetricSource {
val sourceName = "streaming"
}

0 comments on commit e55a145

Please sign in to comment.