Skip to content

Commit

Permalink
create StreamingWrite at the begining of streaming execution
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Mar 5, 2019
1 parent 4490fd0 commit 3261ed5
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatch
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateControlMicroBatchStream}
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
Expand Down Expand Up @@ -122,7 +122,14 @@ class MicroBatchExecution(
case r: StreamingDataSourceV2Relation => r.stream
}
uniqueSources = sources.distinct
_logicalPlan

sink match {
case s: SupportsStreamingWrite =>
val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan)
WriteToMicroBatchDataSource(streamingWrite, _logicalPlan)

case _ => _logicalPlan
}
}

/**
Expand Down Expand Up @@ -513,9 +520,8 @@ class MicroBatchExecution(

val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: SupportsStreamingWrite =>
val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan)
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan)
case _: SupportsStreamingWrite =>
newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId)
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ abstract class StreamExecution(
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite = {
val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava))
.withQueryId(runId.toString)
.withQueryId(id.toString)
.withInputDataSchema(inputPlan.schema)
outputMode match {
case Append =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ContinuousExecution(
// Throwable that caused the execution to fail
private val failure: AtomicReference[Throwable] = new AtomicReference[Throwable](null)

override val logicalPlan: LogicalPlan = {
override val logicalPlan: WriteToContinuousDataSource = {
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
var nextSourceId = 0
val _logicalPlan = analyzedPlan.transform {
Expand All @@ -88,7 +88,8 @@ class ContinuousExecution(
}
uniqueSources = sources.distinct

_logicalPlan
WriteToContinuousDataSource(
createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan)
}

private val triggerExecutor = trigger match {
Expand Down Expand Up @@ -178,13 +179,10 @@ class ContinuousExecution(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
}

val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources)
val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources)

reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionForQuery,
planWithSink,
withNewSources,
outputMode,
checkpointFile("state"),
id,
Expand All @@ -194,7 +192,7 @@ class ContinuousExecution(
lastExecution.executedPlan // Force the lazy generation of execution plan
}

val stream = planWithSink.collect {
val stream = withNewSources.collect {
case relation: StreamingDataSourceV2Relation =>
relation.stream.asInstanceOf[ContinuousStream]
}.head
Expand All @@ -215,7 +213,13 @@ class ContinuousExecution(

// Use the parent Spark session for the endpoint since it's where this query ID is registered.
val epochEndpoint = EpochCoordinatorRef.create(
streamingWrite, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
logicalPlan.write,
stream,
this,
epochCoordinatorId,
currentBatchId,
sparkSession,
SparkEnv.get)
val epochUpdateThread = new Thread(new Runnable {
override def run: Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite

/**
* The logical plan for writing data to a micro-batch stream.
*
* Note that this logical plan does not have a corresponding physical plan, as it will be converted
* to [[WriteToDataSourceV2]] with [[MicroBatchWrite]] before execution.
*/
case class WriteToMicroBatchDataSource(write: StreamingWrite, query: LogicalPlan)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(query)
override def output: Seq[Attribute] = Nil

def createPlan(batchId: Long): WriteToDataSourceV2 = {
WriteToDataSourceV2(new MicroBatchWrite(batchId, write), query)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -59,6 +60,19 @@ class FakeScanBuilder extends ScanBuilder with Scan {
override def toContinuousStream(checkpointLocation: String): ContinuousStream = new FakeDataStream
}

class FakeWriteBuilder extends WriteBuilder with StreamingWrite {
override def buildForStreaming(): StreamingWrite = this
override def createStreamingWriterFactory(): StreamingDataWriterFactory = {
throw new IllegalStateException("fake sink - cannot actually write")
}
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
throw new IllegalStateException("fake sink - cannot actually write")
}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
throw new IllegalStateException("fake sink - cannot actually write")
}
}

trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
Expand All @@ -75,7 +89,7 @@ trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
throw new IllegalStateException("fake sink - cannot actually write")
new FakeWriteBuilder
}
}

Expand Down

0 comments on commit 3261ed5

Please sign in to comment.