Skip to content
Permalink
Browse files

[SPARK-23196] Unify continuous and microbatch V2 sinks

## What changes were proposed in this pull request?

Replace streaming V2 sinks with a unified StreamWriteSupport interface, with a shim to use it with microbatch execution.

Add a new SQL config to use for disabling V2 sinks, falling back to the V1 sink implementation.

## How was this patch tested?

Existing tests, which in the case of Kafka (the only existing continuous V2 sink) now use V2 for microbatch.

Author: Jose Torres <jose@databricks.com>

Closes #20369 from jose-torres/streaming-sink.

(cherry picked from commit 49b0207)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information...
jose-torres authored and cloud-fan committed Jan 29, 2018
1 parent 7ca2cd4 commit 588b9694c1967ff45774431441e84081ee6eb515
Showing with 265 additions and 297 deletions.
  1. +8 −8 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
  2. +13 −17 .../main/scala/org/apache/spark/sql/kafka010/{KafkaContinuousWriter.scala → KafkaStreamWriter.scala}
  3. +3 −5 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
  4. +9 −5 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
  5. +3 −5 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
  6. +9 −0 sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
  7. +0 −60 sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
  8. +5 −7 ...org/apache/spark/sql/sources/v2/streaming/{ContinuousWriteSupport.java → StreamWriteSupport.java}
  9. +31 −3 ...java/org/apache/spark/sql/sources/v2/streaming/writer/{ContinuousWriter.java → StreamWriter.java}
  10. +1 −3 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
  11. +6 −5 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
  12. +12 −7 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
  13. +7 −20 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
  14. +9 −10 ...core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
  15. +3 −6 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
  16. +13 −46 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
  17. +54 −0 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
  18. +9 −20 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
  19. +4 −6 sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
  20. +3 −6 sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
  21. +3 −4 sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
  22. +1 −1 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
  23. +59 −53 sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -28,11 +28,11 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.{Offset, Sink, Source}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

@@ -46,7 +46,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with ContinuousWriteSupport
with StreamWriteSupport
with ContinuousReadSupport
with Logging {
import KafkaSourceProvider._
@@ -223,11 +223,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}

override def createContinuousWriter(
override def createStreamWriter(
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): Optional[ContinuousWriter] = {
options: DataSourceV2Options): StreamWriter = {
import scala.collection.JavaConverters._

val spark = SparkSession.getActiveSession.get
@@ -238,7 +238,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
KafkaWriter.validateQuery(
schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic)

Optional.of(new KafkaContinuousWriter(topic, producerParams, schema))
new KafkaStreamWriter(topic, producerParams, schema)
}

private def strategy(caseInsensitiveParams: Map[String, String]) =
@@ -17,19 +17,14 @@

package org.apache.spark.sql.kafka010

import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}
import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
import org.apache.spark.sql.types.StructType

/**
* Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
@@ -38,23 +33,24 @@ import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
case object KafkaWriterCommitMessage extends WriterCommitMessage

/**
* A [[ContinuousWriter]] for Kafka writing. Responsible for generating the writer factory.
* A [[StreamWriter]] for Kafka writing. Responsible for generating the writer factory.
*
* @param topic The topic this writer is responsible for. If None, topic will be inferred from
* a `topic` field in the incoming data.
* @param producerParams Parameters for Kafka producers in each task.
* @param schema The schema of the input data.
*/
class KafkaContinuousWriter(
class KafkaStreamWriter(
topic: Option[String], producerParams: Map[String, String], schema: StructType)
extends ContinuousWriter with SupportsWriteInternalRow {
extends StreamWriter with SupportsWriteInternalRow {

validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)

override def createInternalRowWriterFactory(): KafkaContinuousWriterFactory =
KafkaContinuousWriterFactory(topic, producerParams, schema)
override def createInternalRowWriterFactory(): KafkaStreamWriterFactory =
KafkaStreamWriterFactory(topic, producerParams, schema)

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def abort(messages: Array[WriterCommitMessage]): Unit = {}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
}

/**
@@ -65,12 +61,12 @@ class KafkaContinuousWriter(
* @param producerParams Parameters for Kafka producers in each task.
* @param schema The schema of the input data.
*/
case class KafkaContinuousWriterFactory(
case class KafkaStreamWriterFactory(
topic: Option[String], producerParams: Map[String, String], schema: StructType)
extends DataWriterFactory[InternalRow] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
new KafkaContinuousDataWriter(topic, producerParams, schema.toAttributes)
new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
}
}

@@ -83,7 +79,7 @@ case class KafkaContinuousWriterFactory(
* @param producerParams Parameters to use for the Kafka producer.
* @param inputSchema The attributes in the input data.
*/
class KafkaContinuousDataWriter(
class KafkaStreamDataWriter(
targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
import scala.collection.JavaConverters._
@@ -18,16 +18,14 @@
package org.apache.spark.sql.kafka010

import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.time.SpanSugar._
import scala.collection.JavaConverters._

import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{BinaryType, DataType}
import org.apache.spark.util.Utils
@@ -362,7 +360,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
}

test("streaming - exception on config serializer") {
@@ -424,7 +422,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
val inputSchema = Seq(AttributeReference("value", BinaryType)())
val data = new Array[Byte](15000) // large value
val writeTask = new KafkaContinuousDataWriter(Some(topic), options.asScala.toMap, inputSchema)
val writeTask = new KafkaStreamDataWriter(Some(topic), options.asScala.toMap, inputSchema)
try {
val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
@@ -336,27 +336,31 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
}

test("streaming - exception on config serializer") {
val input = MemoryStream[String]
var writer: StreamingQuery = null
var ex: Exception = null
ex = intercept[IllegalArgumentException] {
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
input.addData("1")
writer.processAllAvailable()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))

ex = intercept[IllegalArgumentException] {
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
input.addData("1")
writer.processAllAvailable()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
}

@@ -29,19 +29,17 @@ import scala.util.Random

import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2Exec}
import org.apache.spark.sql.{Dataset, ForeachWriter}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryWriter
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest, Trigger}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.util.Utils
@@ -1127,6 +1127,13 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100)

val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
.internal()
.doc("A comma-separated list of fully qualified data source register class names for which" +
" StreamWriteSupport is disabled. Writes to these sources will fail back to the V1 Sink.")
.stringConf
.createWithDefault("")

object PartitionOverwriteMode extends Enumeration {
val STATIC, DYNAMIC = Value
}
@@ -1494,6 +1501,8 @@ class SQLConf extends Serializable with Logging {
def continuousStreamingExecutorPollIntervalMs: Long =
getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS)

def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)

def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)

def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)

This file was deleted.

@@ -17,26 +17,24 @@

package org.apache.spark.sql.sources.v2.streaming;

import java.util.Optional;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter;
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter;
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data writing ability for continuous stream processing.
* provide data writing ability for structured streaming.
*/
@InterfaceStability.Evolving
public interface ContinuousWriteSupport extends BaseStreamingSink {
public interface StreamWriteSupport extends BaseStreamingSink {

/**
* Creates an optional {@link ContinuousWriter} to save the data to this data source. Data
* Creates an optional {@link StreamWriter} to save the data to this data source. Data
* sources can return None if there is no writing needed to be done.
*
* @param queryId A unique string for the writing query. It's possible that there are many
@@ -48,7 +46,7 @@
* @param options the options for the returned data source writer, which is an immutable
* case-insensitive string-to-string map.
*/
Optional<ContinuousWriter> createContinuousWriter(
StreamWriter createStreamWriter(
String queryId,
StructType schema,
OutputMode mode,
@@ -23,22 +23,50 @@
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;

/**
* A {@link DataSourceV2Writer} for use with continuous stream processing.
* A {@link DataSourceV2Writer} for use with structured streaming. This writer handles commits and
* aborts relative to an epoch ID determined by the execution engine.
*
* {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
* and so must reset any internal state after a successful commit.
*/
@InterfaceStability.Evolving
public interface ContinuousWriter extends DataSourceV2Writer {
public interface StreamWriter extends DataSourceV2Writer {
/**
* Commits this writing job for the specified epoch with a list of commit messages. The commit
* messages are collected from successful data writers and are produced by
* {@link DataWriter#commit()}.
*
* If this method fails (by throwing an exception), this writing job is considered to have been
* failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
*
* To support exactly-once processing, writer implementations should ensure that this method is
* idempotent. The execution engine may call commit() multiple times for the same epoch
* in some circumstances.
*/
void commit(long epochId, WriterCommitMessage[] messages);

/**
* Aborts this writing job because some data writers are failed and keep failing when retry, or
* the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
*
* If this method fails (by throwing an exception), the underlying data source may require manual
* cleanup.
*
* Unless the abort is triggered by the failure of commit, the given messages should have some
* null slots as there maybe only a few data writers that are committed before the abort
* happens, or some data writers were committed but their commit messages haven't reached the
* driver when the abort is triggered. So this is just a "best effort" for data sources to
* clean up the data left by data writers.
*/
void abort(long epochId, WriterCommitMessage[] messages);

default void commit(WriterCommitMessage[] messages) {
throw new UnsupportedOperationException(
"Commit without epoch should not be called with ContinuousWriter");
"Commit without epoch should not be called with StreamWriter");
}

default void abort(WriterCommitMessage[] messages) {
throw new UnsupportedOperationException(
"Abort without epoch should not be called with StreamWriter");
}
}

0 comments on commit 588b969

Please sign in to comment.
You can’t perform that action at this time.