Skip to content

Commit

Permalink
[SPARK-23288][SS] Fix output metrics with parquet sink
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Output metrics were not filled when parquet sink used.

This PR fixes this problem by passing a `BasicWriteJobStatsTracker` in `FileStreamSink`.

## How was this patch tested?

Additional unit test added.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20745 from gaborgsomogyi/SPARK-23288.

(cherry picked from commit 918c7e9)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
gaborgsomogyi authored and cloud-fan committed Mar 21, 2018
1 parent 1e552b3 commit 4b9f33f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 16 deletions.
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command


import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration


import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
Expand All @@ -45,15 +44,7 @@ trait DataWritingCommand extends Command {
// Output columns of the analyzed input query plan // Output columns of the analyzed input query plan
def outputColumns: Seq[Attribute] def outputColumns: Seq[Attribute]


lazy val metrics: Map[String, SQLMetric] = { lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
val sparkContext = SparkContext.getActive.get
Map(
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
"numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numParts" -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
)
}


def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = { def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf) val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
Expand Down
Expand Up @@ -153,12 +153,29 @@ class BasicWriteJobStatsTracker(
totalNumOutput += summary.numRows totalNumOutput += summary.numRows
} }


metrics("numFiles").add(numFiles) metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
metrics("numOutputBytes").add(totalNumBytes) metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
metrics("numOutputRows").add(totalNumOutput) metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
metrics("numParts").add(numPartitions) metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)


val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList) SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
} }
} }

object BasicWriteJobStatsTracker {
private val NUM_FILES_KEY = "numFiles"
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"

def metrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"),
NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
)
}
}
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter} import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter}
import org.apache.spark.util.SerializableConfiguration


object FileStreamSink extends Logging { object FileStreamSink extends Logging {
// The name of the subdirectory that is used to store metadata about which files are valid. // The name of the subdirectory that is used to store metadata about which files are valid.
Expand Down Expand Up @@ -97,6 +98,11 @@ class FileStreamSink(
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString) new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
private val hadoopConf = sparkSession.sessionState.newHadoopConf() private val hadoopConf = sparkSession.sessionState.newHadoopConf()


private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
new BasicWriteJobStatsTracker(serializableHadoopConf, BasicWriteJobStatsTracker.metrics)
}

override def addBatch(batchId: Long, data: DataFrame): Unit = { override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId") logInfo(s"Skipping already committed batch $batchId")
Expand Down Expand Up @@ -131,7 +137,7 @@ class FileStreamSink(
hadoopConf = hadoopConf, hadoopConf = hadoopConf,
partitionColumns = partitionColumns, partitionColumns = partitionColumns,
bucketSpec = None, bucketSpec = None,
statsTrackers = Nil, statsTrackers = Seq(basicWriteJobStatsTracker),
options = options) options = options)
} }
} }
Expand Down
Expand Up @@ -21,6 +21,7 @@ import java.util.Locale


import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path


import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame} import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources._
Expand Down Expand Up @@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest {
} }
} }
} }

test("SPARK-23288 writing and checking output metrics") {
Seq("parquet", "orc", "text", "json").foreach { format =>
val inputData = MemoryStream[String]
val df = inputData.toDF()

withTempDir { outputDir =>
withTempDir { checkpointDir =>

var query: StreamingQuery = null

var numTasks = 0
var recordsWritten: Long = 0L
var bytesWritten: Long = 0L
try {
spark.sparkContext.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val outputMetrics = taskEnd.taskMetrics.outputMetrics
recordsWritten += outputMetrics.recordsWritten
bytesWritten += outputMetrics.bytesWritten
numTasks += 1
}
})

query =
df.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format(format)
.start(outputDir.getCanonicalPath)

inputData.addData("1", "2", "3")
inputData.addData("4", "5")

failAfter(streamingTimeout) {
query.processAllAvailable()
}
spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)

assert(numTasks > 0)
assert(recordsWritten === 5)
// This is heavily file type/version specific but should be filled
assert(bytesWritten > 0)
} finally {
if (query != null) {
query.stop()
}
}
}
}
}
}
} }

0 comments on commit 4b9f33f

Please sign in to comment.