From 0e8d80da1350b950fd690ff7c762d07d0767eafd Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 21 Mar 2018 15:05:31 -0700 Subject: [PATCH 1/8] partial --- .../continuous/ContinuousWriteExec.scala | 83 +++++++++++++++++++ .../continuous/ContinuousWriteRDD.scala | 79 ++++++++++++++++++ 2 files changed, 162 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala new file mode 100644 index 0000000000000..9ed0bc9954445 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkException, TaskContext} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, SupportsWriteInternalRow, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter + +class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) extends SparkPlan { + override def children: Seq[SparkPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { + val writerFactory = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) + } + + val useCommitCoordinator = writer.useCommitCoordinator + val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) + val messages = new Array[WriterCommitMessage](rdd.partitions.length) + + logInfo(s"Start processing data source writer: $writer. " + + s"The input RDD has ${messages.length} partitions.") + EpochCoordinatorRef.get( + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) + .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) + + try { + // Force the RDD to run so continuous processing starts; no data is actually being collected + // to the driver, as ContinuousWriteRDD outputs nothing. + rdd.collect() + } catch { + case _: InterruptedException => + // Interruption is how continuous queries are ended, so accept and ignore the exception. + case cause: Throwable => + logError(s"Data source writer $writer is aborting.") + try { + writer.abort(0, messages) + } catch { + case t: Throwable => + logError(s"Data source writer $writer failed to abort.") + cause.addSuppressed(t) + throw new SparkException("Writing job failed.", cause) + } + logError(s"Data source writer $writer aborted.") + cause match { + // Do not wrap interruption exceptions that will be handled by streaming specially. + case _ if StreamExecution.isInterruptionException(cause) => throw cause + // Only wrap non fatal exceptions. + case NonFatal(e) => throw new SparkException("Writing job aborted.", e) + case _ => throw cause + } + } + + sparkContext.emptyRDD + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala new file mode 100644 index 0000000000000..c4cf2f294f2ed --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.util.Utils + +class ContinuousWriteRDD[T](var prev: RDD[T], writeTask: DataWriterFactory[InternalRow]) + extends RDD[T](prev) { + + override val partitioner = firstParent[T].partitioner + + override def getPartitions: Array[Partition] = firstParent[T].partitions + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + val epochCoordinator = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + SparkEnv.get) + val currentMsg: WriterCommitMessage = null + var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + do { + var dataWriter: DataWriter[InternalRow] = null + // write the data and commit this writer. + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + try { + dataWriter = writeTask.createDataWriter( + context.partitionId(), context.attemptNumber(), currentEpoch) + val dataIterator = prev.compute(split, context) + while (dataIterator.hasNext) { + dataWriter.write(dataIterator.next()) + } + logInfo(s"Writer for partition ${context.partitionId()} is committing.") + val msg = dataWriter.commit() + logInfo(s"Writer for partition ${context.partitionId()} committed.") + epochCoordinator.send( + CommitPartitionEpoch(context.partitionId(), currentEpoch, msg) + ) + currentEpoch += 1 + } catch { + case _: InterruptedException => + // Continuous shutdown always involves an interrupt. Just finish the task. + } + })(catchBlock = { + // If there is an error, abort this writer. We enter this callback in the middle of + // rethrowing an exception, so runContinuous will stop executing at this point. + logError(s"Writer for partition ${context.partitionId()} is aborting.") + if (dataWriter != null) dataWriter.abort() + logError(s"Writer for partition ${context.partitionId()} aborted.") + }) + } while (!context.isInterrupted()) + + currentMsg + } + + override def clearDependencies() { + super.clearDependencies() + prev = null + } +} From 270f8ffe062d76e44a726753afc07c78348c3cc6 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 21 Mar 2018 23:08:52 -0700 Subject: [PATCH 2/8] make ContinuousWriteExec work --- .../datasources/v2/DataSourceV2Strategy.scala | 5 +++++ .../continuous/ContinuousWriteExec.scala | 4 +--- .../continuous/ContinuousWriteRDD.scala | 16 ++++++++-------- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 1ac9572de6412..2c5fe4168a554 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteExec +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { @@ -29,6 +31,9 @@ object DataSourceV2Strategy extends Strategy { case r: StreamingDataSourceV2Relation => DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil + case WriteToDataSourceV2(writer: StreamWriter, query) => + ContinuousWriteExec(writer, planLater(query)) :: Nil + case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala index 9ed0bc9954445..177ea3da4db20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming.continuous import scala.util.control.NonFatal import org.apache.spark.{SparkException, TaskContext} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -30,7 +29,7 @@ import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, SupportsWriteInternalRow, WriterCommitMessage} import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter -class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) extends SparkPlan { +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) extends SparkPlan { override def children: Seq[SparkPlan] = Seq(query) override def output: Seq[Attribute] = Nil @@ -40,7 +39,6 @@ class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) extends SparkP case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) } - val useCommitCoordinator = writer.useCommitCoordinator val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) val messages = new Array[WriterCommitMessage](rdd.partitions.length) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala index c4cf2f294f2ed..8d3749faff61b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala @@ -24,20 +24,21 @@ import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logEr import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} import org.apache.spark.util.Utils -class ContinuousWriteRDD[T](var prev: RDD[T], writeTask: DataWriterFactory[InternalRow]) - extends RDD[T](prev) { +class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactory[InternalRow]) + extends RDD[Unit](prev) { - override val partitioner = firstParent[T].partitioner + override val partitioner = prev.partitioner - override def getPartitions: Array[Partition] = firstParent[T].partitions + override def getPartitions: Array[Partition] = prev.partitions - override def compute(split: Partition, context: TaskContext): Iterator[T] = { + override def compute(split: Partition, context: TaskContext): Iterator[Unit] = { val epochCoordinator = EpochCoordinatorRef.get( context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) val currentMsg: WriterCommitMessage = null var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + val dataIterator = prev.compute(split, context) do { var dataWriter: DataWriter[InternalRow] = null // write the data and commit this writer. @@ -45,7 +46,6 @@ class ContinuousWriteRDD[T](var prev: RDD[T], writeTask: DataWriterFactory[Inter try { dataWriter = writeTask.createDataWriter( context.partitionId(), context.attemptNumber(), currentEpoch) - val dataIterator = prev.compute(split, context) while (dataIterator.hasNext) { dataWriter.write(dataIterator.next()) } @@ -67,9 +67,9 @@ class ContinuousWriteRDD[T](var prev: RDD[T], writeTask: DataWriterFactory[Inter if (dataWriter != null) dataWriter.abort() logError(s"Writer for partition ${context.partitionId()} aborted.") }) - } while (!context.isInterrupted()) + } while (!context.isInterrupted() && !context.isCompleted()) - currentMsg + Iterator() } override def clearDependencies() { From 0cfeaeb6c0e6a3b500341852db8c53359120a753 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 22 Mar 2018 19:44:10 -0700 Subject: [PATCH 3/8] fix docs --- .../streaming/continuous/ContinuousWriteRDD.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala index 8d3749faff61b..c2ada791219a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala @@ -35,7 +35,6 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactor val epochCoordinator = EpochCoordinatorRef.get( context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) - val currentMsg: WriterCommitMessage = null var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong val dataIterator = prev.compute(split, context) @@ -49,12 +48,14 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactor while (dataIterator.hasNext) { dataWriter.write(dataIterator.next()) } - logInfo(s"Writer for partition ${context.partitionId()} is committing.") + logInfo(s"Writer for partition ${context.partitionId()} " + + s"in epoch $currentEpoch is committing.") val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") epochCoordinator.send( CommitPartitionEpoch(context.partitionId(), currentEpoch, msg) ) + logInfo(s"Writer for partition ${context.partitionId()} " + + s"in epoch $currentEpoch committed.") currentEpoch += 1 } catch { case _: InterruptedException => @@ -62,7 +63,7 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactor } })(catchBlock = { // If there is an error, abort this writer. We enter this callback in the middle of - // rethrowing an exception, so runContinuous will stop executing at this point. + // rethrowing an exception, so compute() will stop executing at this point. logError(s"Writer for partition ${context.partitionId()} is aborting.") if (dataWriter != null) dataWriter.abort() logError(s"Writer for partition ${context.partitionId()} aborted.") From 7c375339bac0704c99ba6d87ee671dc3b7c0f531 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 26 Mar 2018 09:47:16 -0700 Subject: [PATCH 4/8] remove old path --- .../datasources/v2/WriteToDataSourceV2.scala | 74 ++----------------- 1 file changed, 5 insertions(+), 69 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index e80b44c1cdc66..ea283ed77efda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -65,25 +65,10 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e s"The input RDD has ${messages.length} partitions.") try { - val runTask = writer match { - // This case means that we're doing continuous processing. In microbatch streaming, the - // StreamWriter is wrapped in a MicroBatchWriter, which is executed as a normal batch. - case w: StreamWriter => - EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), - sparkContext.env) - .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) - - (context: TaskContext, iter: Iterator[InternalRow]) => - DataWritingSparkTask.runContinuous(writeTask, context, iter) - case _ => - (context: TaskContext, iter: Iterator[InternalRow]) => - DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator) - } - sparkContext.runJob( rdd, - runTask, + (context: TaskContext, iter: Iterator[InternalRow]) => + DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator), rdd.partitions.indices, (index, message: WriterCommitMessage) => { messages(index) = message @@ -91,14 +76,10 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e } ) - if (!writer.isInstanceOf[StreamWriter]) { - logInfo(s"Data source writer $writer is committing.") - writer.commit(messages) - logInfo(s"Data source writer $writer committed.") - } + logInfo(s"Data source writer $writer is committing.") + writer.commit(messages) + logInfo(s"Data source writer $writer committed.") } catch { - case _: InterruptedException if writer.isInstanceOf[StreamWriter] => - // Interruption is how continuous queries are ended, so accept and ignore the exception. case cause: Throwable => logError(s"Data source writer $writer is aborting.") try { @@ -111,8 +92,6 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e } logError(s"Data source writer $writer aborted.") cause match { - // Do not wrap interruption exceptions that will be handled by streaming specially. - case _ if StreamExecution.isInterruptionException(cause) => throw cause // Only wrap non fatal exceptions. case NonFatal(e) => throw new SparkException("Writing job aborted.", e) case _ => throw cause @@ -168,49 +147,6 @@ object DataWritingSparkTask extends Logging { logError(s"Writer for stage $stageId, task $partId.$attemptId aborted.") }) } - - def runContinuous( - writeTask: DataWriterFactory[InternalRow], - context: TaskContext, - iter: Iterator[InternalRow]): WriterCommitMessage = { - val epochCoordinator = EpochCoordinatorRef.get( - context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), - SparkEnv.get) - val currentMsg: WriterCommitMessage = null - var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong - - do { - var dataWriter: DataWriter[InternalRow] = null - // write the data and commit this writer. - Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - try { - dataWriter = writeTask.createDataWriter( - context.partitionId(), context.attemptNumber(), currentEpoch) - while (iter.hasNext) { - dataWriter.write(iter.next()) - } - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") - epochCoordinator.send( - CommitPartitionEpoch(context.partitionId(), currentEpoch, msg) - ) - currentEpoch += 1 - } catch { - case _: InterruptedException => - // Continuous shutdown always involves an interrupt. Just finish the task. - } - })(catchBlock = { - // If there is an error, abort this writer. We enter this callback in the middle of - // rethrowing an exception, so runContinuous will stop executing at this point. - logError(s"Writer for partition ${context.partitionId()} is aborting.") - if (dataWriter != null) dataWriter.abort() - logError(s"Writer for partition ${context.partitionId()} aborted.") - }) - } while (!context.isInterrupted()) - - currentMsg - } } class InternalRowDataWriterFactory( From 26c1eadc67ffc48bcaf877154660982455892389 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 26 Mar 2018 10:44:19 -0700 Subject: [PATCH 5/8] rm old path --- .../continuous/ContinuousWriteExec.scala | 73 +++++++++++++---- .../continuous/ContinuousWriteRDD.scala | 80 ------------------- 2 files changed, 57 insertions(+), 96 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala index 177ea3da4db20..39b5cd7a7b03e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala @@ -19,17 +19,21 @@ package org.apache.spark.sql.execution.streaming.continuous import scala.util.control.NonFatal -import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} import org.apache.spark.sql.execution.streaming.StreamExecution -import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, SupportsWriteInternalRow, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils -case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) extends SparkPlan { +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) + extends SparkPlan with Logging { override def children: Seq[SparkPlan] = Seq(query) override def output: Seq[Attribute] = Nil @@ -39,7 +43,7 @@ case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) extends S case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) } - val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) + val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) logInfo(s"Start processing data source writer: $writer. " + @@ -52,21 +56,15 @@ case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) extends S try { // Force the RDD to run so continuous processing starts; no data is actually being collected // to the driver, as ContinuousWriteRDD outputs nothing. - rdd.collect() + sparkContext.runJob( + rdd, + (context: TaskContext, iter: Iterator[InternalRow]) => + ContinuousWriteExec.run(writerFactory, context, iter), + rdd.partitions.indices) } catch { case _: InterruptedException => - // Interruption is how continuous queries are ended, so accept and ignore the exception. + // Interruption is how continuous queries are ended, so accept and ignore the exception. case cause: Throwable => - logError(s"Data source writer $writer is aborting.") - try { - writer.abort(0, messages) - } catch { - case t: Throwable => - logError(s"Data source writer $writer failed to abort.") - cause.addSuppressed(t) - throw new SparkException("Writing job failed.", cause) - } - logError(s"Data source writer $writer aborted.") cause match { // Do not wrap interruption exceptions that will be handled by streaming specially. case _ if StreamExecution.isInterruptionException(cause) => throw cause @@ -79,3 +77,46 @@ case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) extends S sparkContext.emptyRDD } } + +object ContinuousWriteExec extends Logging { + def run( + writeTask: DataWriterFactory[InternalRow], + context: TaskContext, + iter: Iterator[InternalRow]): Unit = { + val epochCoordinator = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + SparkEnv.get) + val currentMsg: WriterCommitMessage = null + var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + do { + var dataWriter: DataWriter[InternalRow] = null + // write the data and commit this writer. + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + try { + dataWriter = writeTask.createDataWriter( + context.partitionId(), context.attemptNumber(), currentEpoch) + while (iter.hasNext) { + dataWriter.write(iter.next()) + } + logInfo(s"Writer for partition ${context.partitionId()} is committing.") + val msg = dataWriter.commit() + logInfo(s"Writer for partition ${context.partitionId()} committed.") + epochCoordinator.send( + CommitPartitionEpoch(context.partitionId(), currentEpoch, msg) + ) + currentEpoch += 1 + } catch { + case _: InterruptedException => + // Continuous shutdown always involves an interrupt. Just finish the task. + } + })(catchBlock = { + // If there is an error, abort this writer. We enter this callback in the middle of + // rethrowing an exception, so runContinuous will stop executing at this point. + logError(s"Writer for partition ${context.partitionId()} is aborting.") + if (dataWriter != null) dataWriter.abort() + logError(s"Writer for partition ${context.partitionId()} aborted.") + }) + } while (!context.isInterrupted()) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala deleted file mode 100644 index c2ada791219a4..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous - -import org.apache.spark.{Partition, SparkEnv, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} -import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} -import org.apache.spark.util.Utils - -class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactory[InternalRow]) - extends RDD[Unit](prev) { - - override val partitioner = prev.partitioner - - override def getPartitions: Array[Partition] = prev.partitions - - override def compute(split: Partition, context: TaskContext): Iterator[Unit] = { - val epochCoordinator = EpochCoordinatorRef.get( - context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), - SparkEnv.get) - var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong - - val dataIterator = prev.compute(split, context) - do { - var dataWriter: DataWriter[InternalRow] = null - // write the data and commit this writer. - Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - try { - dataWriter = writeTask.createDataWriter( - context.partitionId(), context.attemptNumber(), currentEpoch) - while (dataIterator.hasNext) { - dataWriter.write(dataIterator.next()) - } - logInfo(s"Writer for partition ${context.partitionId()} " + - s"in epoch $currentEpoch is committing.") - val msg = dataWriter.commit() - epochCoordinator.send( - CommitPartitionEpoch(context.partitionId(), currentEpoch, msg) - ) - logInfo(s"Writer for partition ${context.partitionId()} " + - s"in epoch $currentEpoch committed.") - currentEpoch += 1 - } catch { - case _: InterruptedException => - // Continuous shutdown always involves an interrupt. Just finish the task. - } - })(catchBlock = { - // If there is an error, abort this writer. We enter this callback in the middle of - // rethrowing an exception, so compute() will stop executing at this point. - logError(s"Writer for partition ${context.partitionId()} is aborting.") - if (dataWriter != null) dataWriter.abort() - logError(s"Writer for partition ${context.partitionId()} aborted.") - }) - } while (!context.isInterrupted() && !context.isCompleted()) - - Iterator() - } - - override def clearDependencies() { - super.clearDependencies() - prev = null - } -} From e53707f1c2b836c38d00ad9527f1cf7b498051b7 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 26 Mar 2018 19:15:00 -0700 Subject: [PATCH 6/8] format + docs --- .../streaming/continuous/ContinuousWriteExec.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala index 39b5cd7a7b03e..8c0539f6215d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala @@ -32,6 +32,9 @@ import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.util.Utils +/** + * The physical plan for writing data into a continuous processing [[StreamWriter]]. + */ case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) extends SparkPlan with Logging { override def children: Seq[SparkPlan] = Seq(query) @@ -48,9 +51,10 @@ case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) logInfo(s"Start processing data source writer: $writer. " + s"The input RDD has ${messages.length} partitions.") + // Let the epoch coordinator know how many partitions the write RDD has. EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), - sparkContext.env) + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) try { From 4a7a4cc4aef2e27025e019d8dde29c30026cb330 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 26 Mar 2018 19:16:28 -0700 Subject: [PATCH 7/8] rename node --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 4 ++-- ...riteExec.scala => WriteToContinuousDataSourceExec.scala} | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/{ContinuousWriteExec.scala => WriteToContinuousDataSourceExec.scala} (96%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 2c5fe4168a554..ac5b0f2d2ac6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteExec +import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter object DataSourceV2Strategy extends Strategy { @@ -32,7 +32,7 @@ object DataSourceV2Strategy extends Strategy { DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil case WriteToDataSourceV2(writer: StreamWriter, query) => - ContinuousWriteExec(writer, planLater(query)) :: Nil + WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala similarity index 96% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index 8c0539f6215d9..92819b65f7131 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.Utils /** * The physical plan for writing data into a continuous processing [[StreamWriter]]. */ -case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan) extends SparkPlan with Logging { override def children: Seq[SparkPlan] = Seq(query) override def output: Seq[Attribute] = Nil @@ -63,7 +63,7 @@ case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) sparkContext.runJob( rdd, (context: TaskContext, iter: Iterator[InternalRow]) => - ContinuousWriteExec.run(writerFactory, context, iter), + WriteToContinuousDataSourceExec.run(writerFactory, context, iter), rdd.partitions.indices) } catch { case _: InterruptedException => @@ -82,7 +82,7 @@ case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) } } -object ContinuousWriteExec extends Logging { +object WriteToContinuousDataSourceExec extends Logging { def run( writeTask: DataWriterFactory[InternalRow], context: TaskContext, From bf1bef4a40d18fbc55f2a905dee7c01649af47ca Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 29 Mar 2018 17:59:36 -0700 Subject: [PATCH 8/8] remove inheritance altogether --- .../v2/writer/streaming/StreamWriter.java | 26 +++++++++------- .../datasources/v2/DataSourceV2Strategy.scala | 9 +++--- .../continuous/ContinuousExecution.scala | 2 +- .../WriteToContinuousDataSource.scala | 31 +++++++++++++++++++ 4 files changed, 50 insertions(+), 18 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java index a316b2a4c1d82..c4dc6fc0f4873 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java @@ -18,8 +18,10 @@ package org.apache.spark.sql.sources.v2.writer.streaming; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; /** @@ -29,14 +31,15 @@ * increasing numeric ID. This writer handles commits and aborts for each successive epoch. */ @InterfaceStability.Evolving -public interface StreamWriter extends DataSourceWriter { +public interface StreamWriter { /** * Commits this writing job for the specified epoch with a list of commit messages. The commit * messages are collected from successful data writers and are produced by * {@link DataWriter#commit()}. * * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * failed, and the execution engine will attempt to call + * {@link #abort(long, WriterCommitMessage[])}. * * The execution engine may call commit() multiple times for the same epoch in some circumstances. * To support exactly-once data semantics, implementations must ensure that multiple commits for @@ -46,7 +49,8 @@ public interface StreamWriter extends DataSourceWriter { /** * Aborts this writing job because some data writers are failed and keep failing when retried, or - * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails. + * the Spark job fails with some unknown reasons, or + * {@link #commit(long, WriterCommitMessage[])} fails. * * If this method fails (by throwing an exception), the underlying data source may require manual * cleanup. @@ -59,13 +63,11 @@ public interface StreamWriter extends DataSourceWriter { */ void abort(long epochId, WriterCommitMessage[] messages); - default void commit(WriterCommitMessage[] messages) { - throw new UnsupportedOperationException( - "Commit without epoch should not be called with StreamWriter"); - } - - default void abort(WriterCommitMessage[] messages) { - throw new UnsupportedOperationException( - "Abort without epoch should not be called with StreamWriter"); - } + /** + * Creates a writer factory which will be serialized and sent to executors. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + */ + DataWriterFactory createWriterFactory(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index ac5b0f2d2ac6c..c2a31442d2be5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { @@ -31,12 +30,12 @@ object DataSourceV2Strategy extends Strategy { case r: StreamingDataSourceV2Relation => DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil - case WriteToDataSourceV2(writer: StreamWriter, query) => - WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil - case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil + case WriteToContinuousDataSource(writer, query) => + WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 1758b3844bd62..56b294f889aee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -198,7 +198,7 @@ class ContinuousExecution( triggerLogicalPlan.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) - val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan) + val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan) val reader = withSink.collect { case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala new file mode 100644 index 0000000000000..943c731a70529 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter + +/** + * The logical plan for writing data in a continuous stream. + */ +case class WriteToContinuousDataSource( + writer: StreamWriter, query: LogicalPlan) extends LogicalPlan { + override def children: Seq[LogicalPlan] = Seq(query) + override def output: Seq[Attribute] = Nil +}