Skip to content

Commit

Permalink
[SPARK-24933][SS] Report numOutputRows in SinkProgress
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

SinkProgress should report similar properties like SourceProgress as long as they are available for given Sink. Count of written rows is metric availble for all Sinks. Since relevant progress information is with respect to commited rows, ideal object to carry this info is WriterCommitMessage. For brevity the implementation will focus only on Sinks with API V2 and on Micro Batch mode. Implemention for Continuous mode will be provided at later date.

### Before
```
{"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider3c0bd317"}
```

### After
```
{"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider3c0bd317","numOutputRows":5000}
```

### This PR is related to:
- https://issues.apache.org/jira/browse/SPARK-24647
- https://issues.apache.org/jira/browse/SPARK-21313

## How was this patch tested?

Existing and new unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21919 from vackosar/feature/SPARK-24933-numOutputRows.

Lead-authored-by: Vaclav Kosar <admin@vaclavkosar.com>
Co-authored-by: Kosar, Vaclav: Functions Transformation <Vaclav.Kosar@barclayscapital.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
2 people authored and gatorsmile committed Dec 17, 2018
1 parent 5a116e6 commit 81d377d
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 16 deletions.
Expand Up @@ -232,6 +232,27 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
}
}

test("streaming - sink progress is produced") {
/* ensure sink progress is correctly produced. */
val input = MemoryStream[String]
val topic = newTopic()
testUtils.createTopic(topic)

val writer = createKafkaWriter(
input.toDF(),
withTopic = Some(topic),
withOutputMode = Some(OutputMode.Update()))()

try {
input.addData("1", "2", "3")
failAfter(streamingTimeout) {
writer.processAllAvailable()
}
assert(writer.lastProgress.sink.numOutputRows == 3L)
} finally {
writer.stop()
}
}

test("streaming - write data with bad schema") {
val input = MemoryStream[String]
Expand Down
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.util.Utils
import org.apache.spark.util.{LongAccumulator, Utils}

/**
* Deprecated logical plan for writing data into data source v2. This is being replaced by more
Expand All @@ -47,6 +47,8 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl
case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan)
extends UnaryExecNode {

var commitProgress: Option[StreamWriterCommitProgress] = None

override def child: SparkPlan = query
override def output: Seq[Attribute] = Nil

Expand All @@ -55,6 +57,7 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark
val useCommitCoordinator = writeSupport.useCommitCoordinator
val rdd = query.execute()
val messages = new Array[WriterCommitMessage](rdd.partitions.length)
val totalNumRowsAccumulator = new LongAccumulator()

logInfo(s"Start processing data source write support: $writeSupport. " +
s"The input RDD has ${messages.length} partitions.")
Expand All @@ -65,15 +68,18 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark
(context: TaskContext, iter: Iterator[InternalRow]) =>
DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator),
rdd.partitions.indices,
(index, message: WriterCommitMessage) => {
messages(index) = message
writeSupport.onDataWriterCommit(message)
(index, result: DataWritingSparkTaskResult) => {
val commitMessage = result.writerCommitMessage
messages(index) = commitMessage
totalNumRowsAccumulator.add(result.numRows)
writeSupport.onDataWriterCommit(commitMessage)
}
)

logInfo(s"Data source write support $writeSupport is committing.")
writeSupport.commit(messages)
logInfo(s"Data source write support $writeSupport committed.")
commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
} catch {
case cause: Throwable =>
logError(s"Data source write support $writeSupport is aborting.")
Expand Down Expand Up @@ -102,17 +108,20 @@ object DataWritingSparkTask extends Logging {
writerFactory: DataWriterFactory,
context: TaskContext,
iter: Iterator[InternalRow],
useCommitCoordinator: Boolean): WriterCommitMessage = {
useCommitCoordinator: Boolean): DataWritingSparkTaskResult = {
val stageId = context.stageId()
val stageAttempt = context.stageAttemptNumber()
val partId = context.partitionId()
val taskId = context.taskAttemptId()
val attemptId = context.attemptNumber()
val dataWriter = writerFactory.createWriter(partId, taskId)

var count = 0L
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
while (iter.hasNext) {
// Count is here.
count += 1
dataWriter.write(iter.next())
}

Expand All @@ -139,7 +148,7 @@ object DataWritingSparkTask extends Logging {
logInfo(s"Committed partition $partId (task $taskId, attempt $attemptId" +
s"stage $stageId.$stageAttempt)")

msg
DataWritingSparkTaskResult(count, msg)

})(catchBlock = {
// If there is an error, abort this writer
Expand All @@ -151,3 +160,12 @@ object DataWritingSparkTask extends Logging {
})
}
}

private[v2] case class DataWritingSparkTaskResult(
numRows: Long,
writerCommitMessage: WriterCommitMessage)

/**
* Sink progress information collected after commit.
*/
private[sql] case class StreamWriterCommitProgress(numOutputRows: Long)
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp,
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2}
Expand Down Expand Up @@ -246,6 +246,7 @@ class MicroBatchExecution(
* DONE
*/
private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
sinkCommitProgress = None
offsetLog.getLatest() match {
case Some((latestBatchId, nextOffsets)) =>
/* First assume that we are re-executing the latest known batch
Expand Down Expand Up @@ -537,18 +538,24 @@ class MicroBatchExecution(
val nextBatch =
new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))

reportTimeTaken("addBatch") {
val batchSinkProgress: Option[StreamWriterCommitProgress] =
reportTimeTaken("addBatch") {
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
sink match {
case s: Sink => s.addBatch(currentBatchId, nextBatch)
case _: StreamingWriteSupportProvider =>
// This doesn't accumulate any data - it just forces execution of the microbatch writer.
nextBatch.collect()
}
lastExecution.executedPlan match {
case w: WriteToDataSourceV2Exec => w.commitProgress
case _ => None
}
}
}

withProgressLocked {
sinkCommitProgress = batchSinkProgress
watermarkTracker.updateWatermark(lastExecution.executedPlan)
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
committedOffsets ++= availableOffsets
Expand Down
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec, StreamWriterCommitProgress}
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
Expand Down Expand Up @@ -56,6 +56,7 @@ trait ProgressReporter extends Logging {
protected def logicalPlan: LogicalPlan
protected def lastExecution: QueryExecution
protected def newData: Map[BaseStreamingSource, LogicalPlan]
protected def sinkCommitProgress: Option[StreamWriterCommitProgress]
protected def sources: Seq[BaseStreamingSource]
protected def sink: BaseStreamingSink
protected def offsetSeqMetadata: OffsetSeqMetadata
Expand Down Expand Up @@ -167,7 +168,9 @@ trait ProgressReporter extends Logging {
)
}

val sinkProgress = new SinkProgress(sink.toString)
val sinkProgress = SinkProgress(
sink.toString,
sinkCommitProgress.map(_.numOutputRows))

val newProgress = new StreamingQueryProgress(
id = id,
Expand Down
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
Expand Down Expand Up @@ -114,6 +115,9 @@ abstract class StreamExecution(
@volatile
var availableOffsets = new StreamProgress

@volatile
var sinkCommitProgress: Option[StreamWriterCommitProgress] = None

/** The current batchId or -1 if execution has not yet been initialized. */
protected var currentBatchId: Long = -1

Expand Down
Expand Up @@ -30,6 +30,7 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS

/**
* Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
Expand Down Expand Up @@ -207,11 +208,19 @@ class SourceProgress protected[sql](
* during a trigger. See [[StreamingQueryProgress]] for more information.
*
* @param description Description of the source corresponding to this status.
* @param numOutputRows Number of rows written to the sink or -1 for Continuous Mode (temporarily)
* or Sink V1 (until decommissioned).
* @since 2.1.0
*/
@Evolving
class SinkProgress protected[sql](
val description: String) extends Serializable {
val description: String,
val numOutputRows: Long) extends Serializable {

/** SinkProgress without custom metrics. */
protected[sql] def this(description: String) {
this(description, DEFAULT_NUM_OUTPUT_ROWS)
}

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
Expand All @@ -222,6 +231,14 @@ class SinkProgress protected[sql](
override def toString: String = prettyJson

private[sql] def jsonValue: JValue = {
("description" -> JString(description))
("description" -> JString(description)) ~
("numOutputRows" -> JInt(numOutputRows))
}
}

private[sql] object SinkProgress {
val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L

def apply(description: String, numOutputRows: Option[Long]): SinkProgress =
new SinkProgress(description, numOutputRows.getOrElse(DEFAULT_NUM_OUTPUT_ROWS))
}
Expand Up @@ -73,7 +73,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "inputRowsPerSecond" : 10.0
| } ],
| "sink" : {
| "description" : "sink"
| "description" : "sink",
| "numOutputRows" : -1
| }
|}
""".stripMargin.trim)
Expand Down Expand Up @@ -105,7 +106,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "numInputRows" : 678
| } ],
| "sink" : {
| "description" : "sink"
| "description" : "sink",
| "numOutputRows" : -1
| }
|}
""".stripMargin.trim)
Expand Down Expand Up @@ -250,7 +252,7 @@ object StreamingQueryStatusAndProgressSuite {
processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json
)
),
sink = new SinkProgress("sink")
sink = SinkProgress("sink", None)
)

val testProgress2 = new StreamingQueryProgress(
Expand All @@ -274,7 +276,7 @@ object StreamingQueryStatusAndProgressSuite {
processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json
)
),
sink = new SinkProgress("sink")
sink = SinkProgress("sink", None)
)

val testStatus = new StreamingQueryStatus("active", true, false)
Expand Down

0 comments on commit 81d377d

Please sign in to comment.