Skip to content

Commit

Permalink
[SPARK-22789] Map-only continuous processing execution
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Basic continuous execution, supporting map/flatMap/filter, with commits and advancement through RPC.

## How was this patch tested?

new unit-ish tests (exercising execution end to end)

Author: Jose Torres <jose@databricks.com>

Closes #19984 from jose-torres/continuous-impl.
  • Loading branch information
jose-torres authored and zsxwing committed Dec 23, 2017
1 parent d23dc5b commit 8941a4a
Show file tree
Hide file tree
Showing 36 changed files with 1,682 additions and 150 deletions.
5 changes: 5 additions & 0 deletions project/MimaExcludes.scala
Expand Up @@ -36,6 +36,11 @@ object MimaExcludes {

// Exclude rules for 2.3.x
lazy val v23excludes = v22excludes ++ Seq(
// SPARK-22789: Map-only continuous processing execution
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$8"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$6"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$9"),

// SPARK-22372: Make cluster submission use SparkApplication.
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getSecretKeyFromUserCredentials"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.isYarnMode"),
Expand Down
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, CurrentDate, CurrentTimestamp, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -339,6 +339,29 @@ object UnsupportedOperationChecker {
}
}

def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = {
checkForStreaming(plan, outputMode)

plan.foreachUp { implicit subPlan =>
subPlan match {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject) =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
}

subPlan.expressions.foreach { e =>
if (e.collectLeaves().exists {
case (_: CurrentTimestamp | _: CurrentDate) => true
case _ => false
}) {
throwError(s"Continuous processing does not support current time operations.")
}
}
}
}

private def throwErrorIf(
condition: Boolean,
msg: String)(implicit operator: LogicalPlan): Unit = {
Expand Down
Expand Up @@ -1044,6 +1044,22 @@ object SQLConf {
"When this conf is not set, the value from `spark.redaction.string.regex` is used.")
.fallbackConf(org.apache.spark.internal.config.STRING_REDACTION_PATTERN)

val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.executorQueueSize")
.internal()
.doc("The size (measured in number of rows) of the queue used in continuous execution to" +
" buffer the results of a ContinuousDataReader.")
.intConf
.createWithDefault(1024)

val CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS =
buildConf("spark.sql.streaming.continuous.executorPollIntervalMs")
.internal()
.doc("The interval at which continuous execution readers will poll to check whether" +
" the epoch has advanced on the driver.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -1357,6 +1373,11 @@ class SQLConf extends Serializable with Logging {

def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)

def continuousStreamingExecutorQueueSize: Int = getConf(CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE)

def continuousStreamingExecutorPollIntervalMs: Long =
getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Expand Up @@ -65,4 +65,10 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reade
default boolean needsReconfiguration() {
return false;
}

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);
}
Expand Up @@ -61,4 +61,10 @@ public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSourc
* @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
*/
Offset deserializeOffset(String json);

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);
}
54 changes: 54 additions & 0 deletions sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
Expand Up @@ -22,6 +22,7 @@
import scala.concurrent.duration.Duration;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;

/**
Expand Down Expand Up @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
public static Trigger Once() {
return OneTimeTrigger$.MODULE$;
}

/**
* A trigger that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*
* @since 2.3.0
*/
public static Trigger Continuous(long intervalMs) {
return ContinuousTrigger.apply(intervalMs);
}

/**
* A trigger that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*
* {{{
* import java.util.concurrent.TimeUnit
* df.writeStream.trigger(Trigger.Continuous(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.3.0
*/
public static Trigger Continuous(long interval, TimeUnit timeUnit) {
return ContinuousTrigger.create(interval, timeUnit);
}

/**
* (Scala-friendly)
* A trigger that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*
* {{{
* import scala.concurrent.duration._
* df.writeStream.trigger(Trigger.Continuous(10.seconds))
* }}}
* @since 2.3.0
*/
public static Trigger Continuous(Duration interval) {
return ContinuousTrigger.apply(interval);
}

/**
* A trigger that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*
* {{{
* df.writeStream.trigger(Trigger.Continuous("10 seconds"))
* }}}
* @since 2.3.0
*/
public static Trigger Continuous(String interval) {
return ContinuousTrigger.apply(interval);
}
}
Expand Up @@ -31,8 +31,10 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemoryPlanV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType

/**
* Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
Expand Down Expand Up @@ -374,6 +376,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
StreamingRelationExec(s.sourceName, s.output) :: Nil
case s: StreamingExecutionRelation =>
StreamingRelationExec(s.toString, s.output) :: Nil
case s: StreamingRelationV2 =>
StreamingRelationExec(s.sourceName, s.output) :: Nil
case _ => Nil
}
}
Expand Down Expand Up @@ -404,6 +408,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
case MemoryPlanV2(sink, output) =>
val encoder = RowEncoder(StructType.fromAttributes(output))
LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil

case logical.Distinct(child) =>
throw new IllegalStateException(
Expand Down
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousDataSourceRDD, ContinuousExecution, EpochCoordinatorRef, SetReaderPartitions}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.types.StructType

Expand All @@ -52,10 +54,20 @@ case class DataSourceV2ScanExec(
}.asJava
}

val inputRDD = new DataSourceRDD(sparkContext, readTasks)
.asInstanceOf[RDD[InternalRow]]
val inputRDD = reader match {
case _: ContinuousReader =>
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.RUN_ID_KEY), sparkContext.env)
.askSync[Unit](SetReaderPartitions(readTasks.size()))

new ContinuousDataSourceRDD(sparkContext, sqlContext, readTasks)

case _ =>
new DataSourceRDD(sparkContext, readTasks)
}

val numOutputRows = longMetric("numOutputRows")
inputRDD.map { r =>
inputRDD.asInstanceOf[RDD[InternalRow]].map { r =>
numOutputRows += 1
r
}
Expand All @@ -73,7 +85,7 @@ class RowToUnsafeRowReadTask(rowReadTask: ReadTask[Row], schema: StructType)
}
}

class RowToUnsafeDataReader(rowReader: DataReader[Row], encoder: ExpressionEncoder[Row])
class RowToUnsafeDataReader(val rowReader: DataReader[Row], encoder: ExpressionEncoder[Row])
extends DataReader[UnsafeRow] {

override def next: Boolean = rowReader.next
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
Expand All @@ -26,6 +26,8 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -58,10 +60,22 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan)
s"The input RDD has ${messages.length} partitions.")

try {
val runTask = writer match {
case w: ContinuousWriter =>
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.RUN_ID_KEY), sparkContext.env)
.askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))

(context: TaskContext, iter: Iterator[InternalRow]) =>
DataWritingSparkTask.runContinuous(writeTask, context, iter)
case _ =>
(context: TaskContext, iter: Iterator[InternalRow]) =>
DataWritingSparkTask.run(writeTask, context, iter)
}

sparkContext.runJob(
rdd,
(context: TaskContext, iter: Iterator[InternalRow]) =>
DataWritingSparkTask.run(writeTask, context, iter),
runTask,
rdd.partitions.indices,
(index, message: WriterCommitMessage) => messages(index) = message
)
Expand All @@ -70,6 +84,8 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan)
writer.commit(messages)
logInfo(s"Data source writer $writer committed.")
} catch {
case _: InterruptedException if writer.isInstanceOf[ContinuousWriter] =>
// Interruption is how continuous queries are ended, so accept and ignore the exception.
case cause: Throwable =>
logError(s"Data source writer $writer is aborting.")
try {
Expand Down Expand Up @@ -109,6 +125,44 @@ object DataWritingSparkTask extends Logging {
logError(s"Writer for partition ${context.partitionId()} aborted.")
})
}

def runContinuous(
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
val epochCoordinator = EpochCoordinatorRef.get(
context.getLocalProperty(ContinuousExecution.RUN_ID_KEY),
SparkEnv.get)
val currentMsg: WriterCommitMessage = null
var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong

do {
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
try {
iter.foreach(dataWriter.write)
logInfo(s"Writer for partition ${context.partitionId()} is committing.")
val msg = dataWriter.commit()
logInfo(s"Writer for partition ${context.partitionId()} committed.")
epochCoordinator.send(
CommitPartitionEpoch(context.partitionId(), currentEpoch, msg)
)
currentEpoch += 1
} catch {
case _: InterruptedException =>
// Continuous shutdown always involves an interrupt. Just finish the task.
}
})(catchBlock = {
// If there is an error, abort this writer
logError(s"Writer for partition ${context.partitionId()} is aborting.")
dataWriter.abort()
logError(s"Writer for partition ${context.partitionId()} aborted.")
})
} while (!context.isInterrupted())

currentMsg
}
}

class InternalRowDataWriterFactory(
Expand Down
Expand Up @@ -17,21 +17,13 @@

package org.apache.spark.sql.execution.streaming;

import org.apache.spark.sql.sources.v2.reader.Offset;

/**
* The shared interface between V1 streaming sources and V2 streaming readers.
*
* This is a temporary interface for compatibility during migration. It should not be implemented
* directly, and will be removed in future versions.
*/
public interface BaseStreamingSource {
/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);

/** Stop this source and free any resources it has allocated. */
void stop();
}
Expand Up @@ -266,6 +266,20 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}

/**
* Removes all log entries later than thresholdBatchId (exclusive).
*/
def purgeAfter(thresholdBatchId: Long): Unit = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))

for (batchId <- batchIds if batchId > thresholdBatchId) {
val path = batchIdToPath(batchId)
fileManager.delete(path)
logTrace(s"Removed metadata log file: $path")
}
}

private def createFileManager(): FileManager = {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
try {
Expand Down

0 comments on commit 8941a4a

Please sign in to comment.