Skip to content

Commit

Permalink
[SPARK-26649][SS] Add DSv2 noop sink
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Noop data source for batch was added in [#23471](#23471).
In this PR I've added the streaming part.

## How was this patch tested?

Additional unit tests.

Closes #23631 from gaborgsomogyi/SPARK-26649.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
gaborgsomogyi authored and dongjoon-hyun committed Jan 25, 2019
1 parent f5b9370 commit 9452e05
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 2 deletions.
Expand Up @@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -31,10 +33,16 @@ import org.apache.spark.sql.types.StructType
class NoopDataSource
extends DataSourceV2
with TableProvider
with DataSourceRegister {
with DataSourceRegister
with StreamingWriteSupportProvider {

override def shortName(): String = "noop"
override def getTable(options: DataSourceOptions): Table = NoopTable
override def createStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceOptions): StreamingWriteSupport = NoopStreamingWriteSupport
}

private[noop] object NoopTable extends Table with SupportsBatchWrite {
Expand Down Expand Up @@ -64,3 +72,17 @@ private[noop] object NoopWriter extends DataWriter[InternalRow] {
override def abort(): Unit = {}
}

private[noop] object NoopStreamingWriteSupport extends StreamingWriteSupport {
override def createStreamingWriterFactory(): StreamingDataWriterFactory =
NoopStreamingDataWriterFactory
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
}

private[noop] object NoopStreamingDataWriterFactory extends StreamingDataWriterFactory {
override def createWriter(
partitionId: Int,
taskId: Long,
epochId: Long): DataWriter[InternalRow] = NoopWriter
}

Expand Up @@ -330,7 +330,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
options,
sink,
outputMode,
useTempCheckpointLocation = source == "console",
useTempCheckpointLocation = source == "console" || source == "noop",
recoverFromCheckpointLocation = true,
trigger = trigger)
}
Expand Down
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.noop

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{StreamingQuery, StreamTest, Trigger}

class NoopStreamSuite extends StreamTest {
import testImplicits._

test("microbatch") {
val input = MemoryStream[Int]
val query = input.toDF().writeStream.format("noop").start()
testMicroBatchQuery(query, input)
}

test("microbatch restart with checkpoint") {
val input = MemoryStream[Int]
withTempDir { checkpointDir =>
def testWithCheckpoint(): Unit = {
val query = input.toDF().writeStream
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.format("noop")
.start()
testMicroBatchQuery(query, input)
}
testWithCheckpoint()
testWithCheckpoint()
}
}

private def testMicroBatchQuery(
query: StreamingQuery,
input: MemoryStream[Int],
data: Int*): Unit = {
assert(query.isActive)
try {
input.addData(1, 2, 3)
eventually(timeout(streamingTimeout)) {
assert(query.recentProgress.map(_.numInputRows).sum == 3)
}
} finally {
query.stop()
}
}

test("continuous") {
val input = getRateDataFrame()
val query = input.writeStream.format("noop").trigger(Trigger.Continuous(200)).start()
assert(query.isActive)
query.stop()
}

test("continuous restart with checkpoint") {
withTempDir { checkpointDir =>
def testWithCheckpoint(): Unit = {
val input = getRateDataFrame()
val query = input.writeStream
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.format("noop")
.trigger(Trigger.Continuous(200))
.start()
assert(query.isActive)
query.stop()
}
testWithCheckpoint()
testWithCheckpoint()
}
}

private def getRateDataFrame(): DataFrame = {
spark.readStream
.format("rate")
.option("numPartitions", "1")
.option("rowsPerSecond", "5")
.load()
.select('value)
}
}

0 comments on commit 9452e05

Please sign in to comment.