diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f92659de6aab9..cbdc08c2e4219 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1891,14 +1891,10 @@ class Dataset[T] private[sql]( * [[org.apache.spark.sql.util.QueryExecutionListener]] to the spark session. * * {{{ - * // Observe row count (rc) and error row count (erc) in the streaming Dataset - * val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc")) - * observed_ds.writeStream.format("...").start() - * * // Monitor the metrics using a listener. * spark.streams.addListener(new StreamingQueryListener() { * override def onQueryProgress(event: QueryProgressEvent): Unit = { - * event.progress.observedMetrics.get("my_event").foreach { row => + * event.progress.observedMetrics.asScala.get("my_event").foreach { row => * // Trigger if the number of errors exceeds 5 percent * val num_rows = row.getAs[Long]("rc") * val num_error_rows = row.getAs[Long]("erc") @@ -1908,7 +1904,12 @@ class Dataset[T] private[sql]( * } * } * } + * def onQueryStarted(event: QueryStartedEvent): Unit = {} + * def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} * }) + * // Observe row count (rc) and error row count (erc) in the streaming Dataset + * val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc")) + * observed_ds.writeStream.format("...").start() * }}} * * @group typedrel