Skip to content

Commit

Permalink
[SPARK-29348][SQL][FOLLOWUP] Fix slight bug on streaming example for …
Browse files Browse the repository at this point in the history
…Dataset.observe

### What changes were proposed in this pull request?

This patch fixes a small bug in the example of streaming query, as the type of observable metrics is Java Map instead of Scala Map, so to use foreach it should be converted first.

### Why are the changes needed?

Described above.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Ran below query via `spark-shell`:

**Streaming**

```scala
import scala.collection.JavaConverters._
import scala.util.Random
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.asScala.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
        println(s"alert! error ratio: $ratio")
      }
    }
  }

  def onQueryStarted(event: QueryStartedEvent): Unit = {}
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
})

val rates = spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load

val rand = new Random()
val df = rates.map { row => (row.getLong(1), if (row.getLong(1) % 2 == 0) "error" else null) }.toDF
val ds = df.selectExpr("_1 AS id", "_2 AS error")
// Observe row count (rc) and error row count (erc) in the batch Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("console").start()
```

Closes #27046 from HeartSaVioR/SPARK-29348-FOLLOWUP.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
HeartSaVioR authored and HyukjinKwon committed Dec 30, 2019
1 parent 7079e87 commit e054a0a
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -1891,14 +1891,10 @@ class Dataset[T] private[sql](
* [[org.apache.spark.sql.util.QueryExecutionListener]] to the spark session.
*
* {{{
* // Observe row count (rc) and error row count (erc) in the streaming Dataset
* val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
* observed_ds.writeStream.format("...").start()
*
* // Monitor the metrics using a listener.
* spark.streams.addListener(new StreamingQueryListener() {
* override def onQueryProgress(event: QueryProgressEvent): Unit = {
* event.progress.observedMetrics.get("my_event").foreach { row =>
* event.progress.observedMetrics.asScala.get("my_event").foreach { row =>
* // Trigger if the number of errors exceeds 5 percent
* val num_rows = row.getAs[Long]("rc")
* val num_error_rows = row.getAs[Long]("erc")
Expand All @@ -1908,7 +1904,12 @@ class Dataset[T] private[sql](
* }
* }
* }
* def onQueryStarted(event: QueryStartedEvent): Unit = {}
* def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
* })
* // Observe row count (rc) and error row count (erc) in the streaming Dataset
* val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
* observed_ds.writeStream.format("...").start()
* }}}
*
* @group typedrel
Expand Down

0 comments on commit e054a0a

Please sign in to comment.