Skip to content
Permalink
Browse files

[SPARK-23052][SS] Migrate ConsoleSink to data source V2 api.

## What changes were proposed in this pull request?

Migrate ConsoleSink to data source V2 api.

Note that this includes a missing piece in DataStreamWriter required to specify a data source V2 writer.

Note also that I've removed the "Rerun batch" part of the sink, because as far as I can tell this would never have actually happened. A MicroBatchExecution object will only commit each batch once for its lifetime, and a new MicroBatchExecution object would have a new ConsoleSink object which doesn't know it's retrying a batch. So I think this represents an anti-feature rather than a weakness in the V2 API.

## How was this patch tested?

new unit test

Author: Jose Torres <jose@databricks.com>

Closes #20243 from jose-torres/console-sink.

(cherry picked from commit 1c76a91)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
  • Loading branch information...
jose-torres authored and tdas committed Jan 18, 2018
1 parent 3a80cc5 commit 2a87c3a77cbe40cbe5a8bdef41e3c37a660e2308
@@ -91,11 +91,14 @@ class MicroBatchExecution(
nextSourceId += 1
StreamingExecutionRelation(reader, output)(sparkSession)
})
case s @ StreamingRelationV2(_, _, _, output, v1Relation) =>
case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
assert(v1Relation.isDefined, "v2 execution didn't match but v1 was unavailable")
if (v1Relation.isEmpty) {
throw new UnsupportedOperationException(
s"Data source $sourceName does not support microbatch processing.")
}
val source = v1Relation.get.dataSource.createSource(metadataPath)
nextSourceId += 1
StreamingExecutionRelation(source, output)(sparkSession)
@@ -17,58 +17,36 @@

package org.apache.spark.sql.execution.streaming

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
// Number of rows to display, by default 20 rows
private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20)

// Truncate the displayed data if it is too long, by default it is true
private val isTruncated = options.get("truncate").map(_.toBoolean).getOrElse(true)
import java.util.Optional

// Track the batch id
private var lastBatchId = -1L

override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val batchIdStr = if (batchId <= lastBatchId) {
s"Rerun batch: $batchId"
} else {
lastBatchId = batchId
s"Batch: $batchId"
}

// scalastyle:off println
println("-------------------------------------------")
println(batchIdStr)
println("-------------------------------------------")
// scalastyle:off println
data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
.show(numRowsToShow, isTruncated)
}
import scala.collection.JavaConverters._

override def toString(): String = s"ConsoleSink[numRows=$numRowsToShow, truncate=$isTruncated]"
}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
extends BaseRelation {
override def schema: StructType = data.schema
}

class ConsoleSinkProvider extends StreamSinkProvider
class ConsoleSinkProvider extends DataSourceV2
with MicroBatchWriteSupport
with DataSourceRegister
with CreatableRelationProvider {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new ConsoleSink(parameters)

override def createMicroBatchWriter(
queryId: String,
epochId: Long,
schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
Optional.of(new ConsoleWriter(epochId, schema, options))
}

def createRelation(
@@ -54,16 +54,13 @@ class ContinuousExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {

@volatile protected var continuousSources: Seq[ContinuousReader] = _
@volatile protected var continuousSources: Seq[ContinuousReader] = Seq()
override protected def sources: Seq[BaseStreamingSource] = continuousSources

// For use only in test harnesses.
private[sql] var currentEpochCoordinatorId: String = _

override lazy val logicalPlan: LogicalPlan = {
assert(queryExecutionThread eq Thread.currentThread,
"logicalPlan must be initialized in StreamExecutionThread " +
s"but the current thread was ${Thread.currentThread}")
override val logicalPlan: LogicalPlan = {
val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
analyzedPlan.transform {
case r @ StreamingRelationV2(
@@ -72,7 +69,7 @@ class ContinuousExecution(
ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
})
case StreamingRelationV2(_, sourceName, _, _, _) =>
throw new AnalysisException(
throw new UnsupportedOperationException(
s"Data source $sourceName does not support continuous processing.")
}
}
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.StructType

/**
* A [[DataSourceV2Writer]] that collects results to the driver and prints them in the console.
* Generated by [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
*
* This sink should not be used for production, as it requires sending all rows to the driver
* and does not support recovery.
*/
class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options)
extends DataSourceV2Writer with Logging {
// Number of rows to display, by default 20 rows
private val numRowsToShow = options.getInt("numRows", 20)

// Truncate the displayed data if it is too long, by default it is true
private val isTruncated = options.getBoolean("truncate", true)

assert(SparkSession.getActiveSession.isDefined)
private val spark = SparkSession.getActiveSession.get

override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory

override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized {
val batch = messages.collect {
case PackedRowCommitMessage(rows) => rows
}.flatten

// scalastyle:off println
println("-------------------------------------------")
println(s"Batch: $batchId")
println("-------------------------------------------")
// scalastyle:off println
spark.createDataFrame(
spark.sparkContext.parallelize(batch), schema)
.show(numRowsToShow, isTruncated)
}

override def abort(messages: Array[WriterCommitMessage]): Unit = {}

override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
}
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources

import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage}

/**
* A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery
* to a [[org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer]] on the driver.
*
* Note that, because it sends all rows to the driver, this factory will generally be unsuitable
* for production-quality sinks. It's intended for use in tests.
*/
case object PackedRowWriterFactory extends DataWriterFactory[Row] {
def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
new PackedRowDataWriter()
}
}

/**
* Commit message for a [[PackedRowDataWriter]], containing all the rows written in the most
* recent interval.
*/
case class PackedRowCommitMessage(rows: Array[Row]) extends WriterCommitMessage

/**
* A simple [[DataWriter]] that just sends all the rows it's received as a commit message.
*/
class PackedRowDataWriter() extends DataWriter[Row] with Logging {
private val data = mutable.Buffer[Row]()

override def write(row: Row): Unit = data.append(row)

override def commit(): PackedRowCommitMessage = {
val msg = PackedRowCommitMessage(data.toArray)
data.clear()
msg
}

override def abort(): Unit = data.clear()
}
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources.{MemoryPlanV2, MemorySinkV2}
import org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport
import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}

/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -280,14 +280,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
useTempCheckpointLocation = true,
trigger = trigger)
} else {
val sink = trigger match {
case _: ContinuousTrigger =>
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
ds.newInstance() match {
case w: ContinuousWriteSupport => w
case _ => throw new AnalysisException(
s"Data source $source does not support continuous writing")
}
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
val sink = (ds.newInstance(), trigger) match {
case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
case (_, _: ContinuousTrigger) => throw new UnsupportedOperationException(
s"Data source $source does not support continuous writing")
case (w: MicroBatchWriteSupport, _) => w
case _ =>
val ds = DataSource(
df.sparkSession,
@@ -5,3 +5,11 @@ org.apache.spark.sql.sources.FakeSourceFour
org.apache.fakesource.FakeExternalSourceOne
org.apache.fakesource.FakeExternalSourceTwo
org.apache.fakesource.FakeExternalSourceThree
org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly
org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly
org.apache.spark.sql.streaming.sources.FakeReadBothModes
org.apache.spark.sql.streaming.sources.FakeReadNeitherMode
org.apache.spark.sql.streaming.sources.FakeWriteMicroBatchOnly
org.apache.spark.sql.streaming.sources.FakeWriteContinuousOnly
org.apache.spark.sql.streaming.sources.FakeWriteBothModes
org.apache.spark.sql.streaming.sources.FakeWriteNeitherMode

0 comments on commit 2a87c3a

Please sign in to comment.
You can’t perform that action at this time.