From 656b50395a03a0d59c136f77c9d9da8540a8e7fc Mon Sep 17 00:00:00 2001 From: "Kosar, Vaclav: Functions Transformation" Date: Tue, 26 Jun 2018 11:41:25 +0200 Subject: [PATCH 1/2] [SPARK-24933][SS] Report numOutputRows in SinkProgress. --- .../sql/kafka010/KafkaStreamWriter.scala | 2 +- .../spark/sql/kafka010/KafkaSinkSuite.scala | 21 +++++++++++++ .../v2/WriteToDataSourceV2Exec.scala | 31 +++++++++++++++---- .../streaming/MicroBatchExecution.scala | 14 +++++++-- .../streaming/ProgressReporter.scala | 9 ++++-- .../execution/streaming/StreamExecution.scala | 4 +++ .../apache/spark/sql/streaming/progress.scala | 20 +++++++++--- ...StreamingQueryStatusAndProgressSuite.scala | 10 +++--- 8 files changed, 90 insertions(+), 21 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala index 5f0802b466039..0c76e27cb79ad 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala @@ -44,7 +44,7 @@ class KafkaStreamWriter( topic: Option[String], producerParams: Map[String, String], schema: StructType) extends StreamWriter { - validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) + validateQuery(schema.toAttributes, producerParams.asInstanceOf[Map[String, Object]].asJava, topic) override def createWriterFactory(): KafkaStreamWriterFactory = KafkaStreamWriterFactory(topic, producerParams, schema) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 70ffd7dee89d7..a6d6e1327cdd6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -229,6 +229,27 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } } + test("streaming - sink progress is produced") { + /* ensure sink progress is correctly produced. */ + val input = MemoryStream[String] + val topic = newTopic() + testUtils.createTopic(topic) + + val writer = createKafkaWriter( + input.toDF(), + withTopic = Some(topic), + withOutputMode = Some(OutputMode.Update()))() + + try { + input.addData("1", "2", "3") + failAfter(streamingTimeout) { + writer.processAllAvailable() + } + assert(writer.lastProgress.sink.numOutputRows == 3L) + } finally { + writer.stop() + } + } test("streaming - write data with bad schema") { val input = MemoryStream[String] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 59ebb9bc5431b..7b444e55bb55d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming.MicroBatchExecution import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.types.StructType -import org.apache.spark.util.Utils +import org.apache.spark.util.{LongAccumulator, Utils} /** * Deprecated logical plan for writing data into data source v2. This is being replaced by more @@ -48,6 +48,9 @@ case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) ext * The physical plan for writing data into data source v2. */ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlan { + + var commitProgress: Option[StreamWriterCommitProgress] = None + override def children: Seq[SparkPlan] = Seq(query) override def output: Seq[Attribute] = Nil @@ -56,6 +59,7 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e val useCommitCoordinator = writer.useCommitCoordinator val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) + val totalNumRowsAccumulator = new LongAccumulator() logInfo(s"Start processing data source writer: $writer. " + s"The input RDD has ${messages.length} partitions.") @@ -66,15 +70,18 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e (context: TaskContext, iter: Iterator[InternalRow]) => DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator), rdd.partitions.indices, - (index, message: WriterCommitMessage) => { - messages(index) = message - writer.onDataWriterCommit(message) + (index, result: DataWritingSparkTaskResult) => { + val commitMessage = result.writerCommitMessage + messages(index) = commitMessage + totalNumRowsAccumulator.add(result.numRows) + writer.onDataWriterCommit(commitMessage) } ) logInfo(s"Data source writer $writer is committing.") writer.commit(messages) logInfo(s"Data source writer $writer committed.") + commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value)) } catch { case cause: Throwable => logError(s"Data source writer $writer is aborting.") @@ -103,7 +110,7 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow], - useCommitCoordinator: Boolean): WriterCommitMessage = { + useCommitCoordinator: Boolean): DataWritingSparkTaskResult = { val stageId = context.stageId() val stageAttempt = context.stageAttemptNumber() val partId = context.partitionId() @@ -112,9 +119,12 @@ object DataWritingSparkTask extends Logging { val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0") val dataWriter = writeTask.createDataWriter(partId, taskId, epochId.toLong) + var count = 0L // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { while (iter.hasNext) { + // Count is here. + count += 1 dataWriter.write(iter.next()) } @@ -141,7 +151,7 @@ object DataWritingSparkTask extends Logging { logInfo(s"Committed partition $partId (task $taskId, attempt $attemptId" + s"stage $stageId.$stageAttempt)") - msg + DataWritingSparkTaskResult(count, msg) })(catchBlock = { // If there is an error, abort this writer @@ -153,3 +163,12 @@ object DataWritingSparkTask extends Logging { }) } } + +private[v2] case class DataWritingSparkTaskResult( + numRows: Long, + writerCommitMessage: WriterCommitMessage) + +/** + * Sink progress information collected after commit. + */ +private[sql] case class StreamWriterCommitProgress(numOutputRows: Long) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index b1c91ac94b268..a57c66d8e5e76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport, StreamWriteSupport} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} @@ -238,6 +238,7 @@ class MicroBatchExecution( * DONE */ private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { + sinkCommitProgress = None offsetLog.getLatest() match { case Some((latestBatchId, nextOffsets)) => /* First assume that we are re-executing the latest known batch @@ -522,18 +523,25 @@ class MicroBatchExecution( val nextBatch = new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema)) - reportTimeTaken("addBatch") { + val batchSinkProgress: Option[StreamWriterCommitProgress] = + reportTimeTaken("addBatch") { SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) { sink match { - case s: Sink => s.addBatch(currentBatchId, nextBatch) + case s: Sink => + s.addBatch(currentBatchId, nextBatch) case _: StreamWriteSupport => // This doesn't accumulate any data - it just forces execution of the microbatch writer. nextBatch.collect() } + lastExecution.executedPlan match { + case w: WriteToDataSourceV2Exec => w.commitProgress + case _ => None + } } } withProgressLocked { + sinkCommitProgress = batchSinkProgress watermarkTracker.updateWatermark(lastExecution.executedPlan) commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) committedOffsets ++= availableOffsets diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index ae1bfa2e499bb..806f6819cd5d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal -import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods.parse import org.apache.spark.internal.Logging @@ -32,7 +31,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter import org.apache.spark.sql.sources.v2.CustomMetrics import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, SupportsCustomReaderMetrics} @@ -64,6 +63,7 @@ trait ProgressReporter extends Logging { protected def logicalPlan: LogicalPlan protected def lastExecution: QueryExecution protected def newData: Map[BaseStreamingSource, LogicalPlan] + protected def sinkCommitProgress: Option[StreamWriterCommitProgress] protected def sources: Seq[BaseStreamingSource] protected def sink: BaseStreamingSink protected def offsetSeqMetadata: OffsetSeqMetadata @@ -208,7 +208,10 @@ trait ProgressReporter extends Logging { case _ => None } - val sinkProgress = new SinkProgress(sink.toString, customWriterMetrics.orNull) + val sinkProgress = SinkProgress( + sink.toString, + customWriterMetrics.orNull, + sinkCommitProgress.map(_.numOutputRows)) val newProgress = new StreamingQueryProgress( id = id, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 290de873c5cfb..97de287c5e5eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand +import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -112,6 +113,9 @@ abstract class StreamExecution( @volatile var availableOffsets = new StreamProgress + @volatile + var sinkCommitProgress: Option[StreamWriterCommitProgress] = None + /** The current batchId or -1 if execution has not yet been initialized. */ protected var currentBatchId: Long = -1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 2fb87960ccb04..4ea6a6e4ed4e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -24,6 +24,7 @@ import java.util.UUID import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import SinkProgress.DEFAULT_NUM_OUTPUT_ROWS import org.json4s._ import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ @@ -224,16 +225,19 @@ class SourceProgress protected[sql]( * during a trigger. See [[StreamingQueryProgress]] for more information. * * @param description Description of the source corresponding to this status. + * @param numOutputRows Number of rows written to the sink or -1 for Continuous Mode (temporarily) + * or Sink V1 (until decommissioned). * @since 2.1.0 */ @InterfaceStability.Evolving class SinkProgress protected[sql]( - val description: String, - val customMetrics: String) extends Serializable { + val description: String, + val customMetrics: String, + val numOutputRows: Long) extends Serializable { /** SinkProgress without custom metrics. */ protected[sql] def this(description: String) { - this(description, null) + this(description, null, DEFAULT_NUM_OUTPUT_ROWS) } /** The compact JSON representation of this progress. */ @@ -245,7 +249,8 @@ class SinkProgress protected[sql]( override def toString: String = prettyJson private[sql] def jsonValue: JValue = { - val jsonVal = ("description" -> JString(description)) + val jsonVal = ("description" -> JString(description)) ~ + ("numOutputRows" -> JInt(numOutputRows)) if (customMetrics != null) { jsonVal ~ ("customMetrics" -> parse(customMetrics)) @@ -254,3 +259,10 @@ class SinkProgress protected[sql]( } } } + +private[sql] object SinkProgress { + val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L + + def apply(description: String, customMetrics: String, numOutputRows: Option[Long]): SinkProgress = + new SinkProgress(description, customMetrics, numOutputRows.getOrElse(DEFAULT_NUM_OUTPUT_ROWS)) +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 79bb827e0de93..43604b076f963 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -68,7 +68,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "inputRowsPerSecond" : 10.0 | } ], | "sink" : { - | "description" : "sink" + | "description" : "sink", + | "numOutputRows" : -1 | } |} """.stripMargin.trim) @@ -100,7 +101,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "numInputRows" : 678 | } ], | "sink" : { - | "description" : "sink" + | "description" : "sink", + | "numOutputRows" : -1 | } |} """.stripMargin.trim) @@ -241,7 +243,7 @@ object StreamingQueryStatusAndProgressSuite { processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json ) ), - sink = new SinkProgress("sink") + sink = SinkProgress("sink", null, None) ) val testProgress2 = new StreamingQueryProgress( @@ -265,7 +267,7 @@ object StreamingQueryStatusAndProgressSuite { processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json ) ), - sink = new SinkProgress("sink") + sink = SinkProgress("sink", null, None) ) val testStatus = new StreamingQueryStatus("active", true, false) From cd07a53544209749a6077005e21de6c6041d08e3 Mon Sep 17 00:00:00 2001 From: Vaclav Kosar Date: Fri, 2 Nov 2018 17:26:34 +0100 Subject: [PATCH 2/2] [SPARK-24933][SS] Resolve conflicts --- .../spark/sql/execution/streaming/MicroBatchExecution.scala | 2 +- .../spark/sql/execution/streaming/ProgressReporter.scala | 2 +- .../main/scala/org/apache/spark/sql/streaming/progress.scala | 3 ++- .../sql/streaming/StreamingQueryStatusAndProgressSuite.scala | 4 ++-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index c12d5f06a4d42..d395bf1dfe115 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 5bf4cf2b244fe..ba994f5ca1cb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, StreamWriterCommitProgress} import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index ee0c891f1b0ee..a73dc1d2dc7fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -30,6 +30,7 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.InterfaceStability +import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS /** * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. @@ -218,7 +219,7 @@ class SinkProgress protected[sql]( /** SinkProgress without custom metrics. */ protected[sql] def this(description: String) { - this(description, null, DEFAULT_NUM_OUTPUT_ROWS) + this(description, DEFAULT_NUM_OUTPUT_ROWS) } /** The compact JSON representation of this progress. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 2a0cd1258e786..2f460b044b237 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -252,7 +252,7 @@ object StreamingQueryStatusAndProgressSuite { processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json ) ), - sink = SinkProgress("sink", null, None) + sink = SinkProgress("sink", None) ) val testProgress2 = new StreamingQueryProgress( @@ -276,7 +276,7 @@ object StreamingQueryStatusAndProgressSuite { processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json ) ), - sink = SinkProgress("sink", null, None) + sink = SinkProgress("sink", None) ) val testStatus = new StreamingQueryStatus("active", true, false)