From 53ba226e97cc1b216b3333239042f53a74bb13f6 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 16 Mar 2016 17:14:09 -0700 Subject: [PATCH 1/7] [SPARK-13985][SQL] Deterministic batches with ids --- .../spark/sql/ContinuousQueryManager.scala | 8 +- .../apache/spark/sql/DataFrameWriter.scala | 8 +- .../org/apache/spark/sql/SinkStatus.scala | 2 +- .../execution/streaming/CompositeOffset.scala | 8 + .../streaming/FileStreamSource.scala | 24 ++- .../execution/streaming/HDFSMetadataLog.scala | 7 +- .../spark/sql/execution/streaming/Sink.scala | 24 +-- .../sql/execution/streaming/Source.scala | 5 +- .../execution/streaming/StreamExecution.scala | 164 ++++++++++++------ .../execution/streaming/StreamProgress.scala | 26 ++- .../sql/execution/streaming/memory.scala | 85 ++++----- .../org/apache/spark/sql/StreamTest.scala | 13 +- .../ContinuousQueryManagerSuite.scala | 9 +- .../sql/streaming/ContinuousQuerySuite.scala | 10 +- .../DataFrameReaderWriterSuite.scala | 31 +++- .../sql/streaming/FileStreamSourceSuite.scala | 10 +- 16 files changed, 262 insertions(+), 172 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 0a156ea99a297..2709c60721be1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -164,13 +164,17 @@ class ContinuousQueryManager(sqlContext: SQLContext) { } /** Start a query */ - private[sql] def startQuery(name: String, df: DataFrame, sink: Sink): ContinuousQuery = { + private[sql] def startQuery( + name: String, + metadataRoot: String, + df: DataFrame, + sink: Sink): ContinuousQuery = { activeQueriesLock.synchronized { if (activeQueries.contains(name)) { throw new IllegalArgumentException( s"Cannot start query with name $name as a query with that name is already active") } - val query = new StreamExecution(sqlContext, name, df.logicalPlan, sink) + val query = new StreamExecution(sqlContext, name, metadataRoot, df.logicalPlan, sink) query.start() activeQueries.put(name, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9951f0fabff15..07a364afa77a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -241,9 +241,13 @@ final class DataFrameWriter private[sql](df: DataFrame) { className = source, options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil)) - + // TODO: promote query name out of options + // TODO: figure out how metadata dir fits in the API df.sqlContext.sessionState.continuousQueryManager.startQuery( - extraOptions.getOrElse("queryName", StreamExecution.nextName), df, dataSource.createSink()) + extraOptions.getOrElse("queryName", StreamExecution.nextName), + extraOptions.getOrElse("metadata", sys.error("metadata must be specified")), + df, + dataSource.createSink()) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala index ce21451b2c9c7..5a9852809c0eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala @@ -31,4 +31,4 @@ import org.apache.spark.sql.execution.streaming.{Offset, Sink} @Experimental class SinkStatus private[sql]( val description: String, - val offset: Option[Offset]) + val offset: Offset) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 59a52a3d59d91..34268e0538bca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -52,6 +52,14 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { case i if i == 0 => 0 case i if i > 0 => 1 } + + def toStreamProgress( + sources: Seq[Source], + dest: StreamProgress = new StreamProgress): StreamProgress = { + assert(sources.size == offsets.size) + sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }.foreach(dest.update) + dest + } } object CompositeOffset { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 787e93f543963..d13b1a6166798 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -109,20 +109,16 @@ class FileStreamSource( /** * Returns the next batch of data that is available after `start`, if any is available. */ - override def getNextBatch(start: Option[Offset]): Option[Batch] = { + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) - val end = fetchMaxOffset() - val endId = end.offset - - if (startId + 1 <= endId) { - val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten - logDebug(s"Return files from batches ${startId + 1}:$endId") - logDebug(s"Streaming ${files.mkString(", ")}") - Some(new Batch(end, dataFrameBuilder(files))) - } - else { - None - } + val endId = end.asInstanceOf[LongOffset].offset + + assert(startId <= endId) + val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten + logDebug(s"Return files from batches ${startId + 1}:$endId") + logDebug(s"Streaming ${files.mkString(", ")}") + dataFrameBuilder(files) + } private def fetchAllFiles(): Seq[String] = { @@ -130,4 +126,6 @@ class FileStreamSource( .filterNot(_.getPath.getName.startsWith("_")) .map(_.getPath.toUri.toString) } + + override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index ac2842b6d5df9..68b5c0368b6ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission +import org.apache.spark.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer import org.apache.spark.sql.SQLContext @@ -42,7 +43,9 @@ import org.apache.spark.sql.SQLContext * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing * files in a directory always shows the latest files. */ -class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends MetadataLog[T] { +class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) + extends MetadataLog[T] + with Logging { private val metadataPath = new Path(path) @@ -113,6 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends try { // Try to commit the batch // It will fail if there is an existing file (someone has committed the batch) + logError(s"Attempting to write log ${batchFile(batchId)}") fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE) return } catch { @@ -161,6 +165,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends val bytes = IOUtils.toByteArray(input) Some(serializer.deserialize[T](ByteBuffer.wrap(bytes))) } else { + logError(s"Unable to find batch $batchMetadataFile") None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala index e3b2d2f67ee0c..d1e3d0876c346 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala @@ -17,31 +17,17 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.sql.DataFrame + /** * An interface for systems that can collect the results of a streaming query. * - * When new data is produced by a query, a [[Sink]] must be able to transactionally collect the - * data and update the [[Offset]]. In the case of a failure, the sink will be recreated - * and must be able to return the [[Offset]] for all of the data that is made durable. - * This contract allows Spark to process data with exactly-once semantics, even in the case - * of failures that require the computation to be restarted. + * TODO */ trait Sink { - /** - * Returns the [[Offset]] for all data that is currently present in the sink, if any. This - * function will be called by Spark when restarting execution in order to determine at which point - * in the input stream computation should be resumed from. - */ - def currentOffset: Option[Offset] /** - * Accepts a new batch of data as well as a [[Offset]] that denotes how far in the input - * data computation has progressed to. When computation restarts after a failure, it is important - * that a [[Sink]] returns the same [[Offset]] as the most recent batch of data that - * has been persisted durably. Note that this does not necessarily have to be the - * [[Offset]] for the most recent batch of data that was given to the sink. For example, - * it is valid to buffer data before persisting, as long as the [[Offset]] is stored - * transactionally as data is eventually persisted. + * TODO */ - def addBatch(batch: Batch): Unit + def addBatch(batchId: Long, data: DataFrame): Unit } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 25922979ac83e..f3b3dff222290 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType /** @@ -29,8 +30,10 @@ trait Source { /** Returns the schema of the data from this source */ def schema: StructType + def getOffset: Option[Offset] + /** * Returns the next batch of data that is available after `start`, if any is available. */ - def getNextBatch(start: Option[Offset]): Option[Batch] + def getBatch(start: Option[Offset], end: Offset): DataFrame } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 0062b7fc75c4a..dc0032fc852b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import org.apache.hadoop.fs.Path + import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -41,6 +43,7 @@ import org.apache.spark.sql.util.ContinuousQueryListener._ class StreamExecution( val sqlContext: SQLContext, override val name: String, + val metadataRoot: String, private[sql] val logicalPlan: LogicalPlan, val sink: Sink) extends ContinuousQuery with Logging { @@ -52,13 +55,22 @@ class StreamExecution( /** Minimum amount of time in between the start of each batch. */ private val minBatchTime = 10 - /** Tracks how much data we have processed from each input source. */ - private[sql] val streamProgress = new StreamProgress + /** + * Tracks how much data we have processed and committed to the sink or state store from each + * input source. + */ + private[sql] val committedOffsets = new StreamProgress + + private[sql] val availableOffsets = new StreamProgress + + private[sql] var currentBatchId: Long = -1 /** All stream sources present the query plan. */ private val sources = logicalPlan.collect { case s: StreamingRelation => s.source } + private val uniqueSources = sources.distinct + /** Defines the internal state of execution */ @volatile private var state: State = INITIALIZED @@ -74,20 +86,26 @@ class StreamExecution( override def run(): Unit = { runBatches() } } + val offsetLog = new HDFSMetadataLog[Offset](sqlContext, metadataDirectory("offsets")) + /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE /** Returns current status of all the sources. */ override def sourceStatuses: Array[SourceStatus] = { - sources.map(s => new SourceStatus(s.toString, streamProgress.get(s))).toArray + sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray } /** Returns current status of the sink. */ - override def sinkStatus: SinkStatus = new SinkStatus(sink.toString, sink.currentOffset) + override def sinkStatus: SinkStatus = + new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources)) /** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */ override def exception: Option[ContinuousQueryException] = Option(streamDeathCause) + private def metadataDirectory(name: String): String = + new Path(new Path(metadataRoot), name).toUri.toString + /** * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event * has been posted to all the listeners. @@ -102,7 +120,7 @@ class StreamExecution( * Repeatedly attempts to run batches as data arrives. * * Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted - * so that listeners are guaranteed to get former event before the latter. Furthermore, this + * such that listeners are guaranteed to get a start event before a termination. Furthermore, this * method also ensures that [[QueryStarted]] event is posted before the `start()` method returns. */ private def runBatches(): Unit = { @@ -118,9 +136,10 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SQLContext.setActive(sqlContext) populateStartOffsets() - logInfo(s"Stream running at $streamProgress") + logError(s"Stream running from $committedOffsets to $availableOffsets") while (isActive) { - attemptBatch() + if (dataAvailable) attemptBatch() + commitAndConstructNextBatch() Thread.sleep(minBatchTime) // TODO: Could be tighter } } catch { @@ -130,7 +149,7 @@ class StreamExecution( this, s"Query $name terminated with exception: ${e.getMessage}", e, - Some(streamProgress.toCompositeOffset(sources))) + Some(committedOffsets.toCompositeOffset(sources))) logError(s"Query $name terminated with error", e) } finally { state = TERMINATED @@ -145,19 +164,61 @@ class StreamExecution( * (i.e. avoid reprocessing data that we have already processed). */ private def populateStartOffsets(): Unit = { - sink.currentOffset match { - case Some(c: CompositeOffset) => - val storedProgress = c.offsets - val sources = logicalPlan collect { - case StreamingRelation(source, _) => source + offsetLog.getLatest() match { + case Some((batchId, nextOffsets: CompositeOffset)) => + logError(s"Resuming continuous query, starting with batch $batchId") + currentBatchId = batchId + 1 + nextOffsets.toStreamProgress(sources, availableOffsets) + logError(s"Found possibly uncommitted offsets $availableOffsets") + + logError(s"Attempting to restore ${batchId - 1}") + offsetLog.get(batchId - 1).foreach { + case lastOffsets: CompositeOffset => + lastOffsets.toStreamProgress(sources, committedOffsets) + logError(s"Resuming with committed offsets: $committedOffsets") } - assert(sources.size == storedProgress.size) - sources.zip(storedProgress).foreach { case (source, offset) => - offset.foreach(streamProgress.update(source, _)) - } case None => // We are starting this stream for the first time. - case _ => throw new IllegalArgumentException("Expected composite offset from sink") + logError(s"Starting new continuous query.") + currentBatchId = 0 + commitAndConstructNextBatch() + + case Some((_, offset)) => + sys.error(s"Invalid offset $offset") + } + } + + def dataAvailable: Boolean = { + availableOffsets.exists { + case (source, available) => + committedOffsets + .get(source) + .map(committed => committed < available) + .getOrElse(true) + } + } + + /** + * + */ + def commitAndConstructNextBatch(): Boolean = committedOffsets.synchronized { + // Update committed offsets. + availableOffsets.foreach(committedOffsets.update) + + // Check to see what new data is available. + uniqueSources.foreach { source => + source.getOffset.foreach(availableOffsets.update(source, _)) + } + + if (dataAvailable) { + logError(s"Commiting offsets for batch $currentBatchId.") + assert( + offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), + "Concurrent update to the log. Multiple streaming jobs detected.") + currentBatchId += 1 + true + } else { + false } } @@ -168,22 +229,24 @@ class StreamExecution( private def attemptBatch(): Unit = { val startTime = System.nanoTime() - // A list of offsets that need to be updated if this batch is successful. - // Populated while walking the tree. - val newOffsets = new ArrayBuffer[(Source, Offset)] + val newData = availableOffsets.flatMap { + case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) => + val current = committedOffsets.get(source) + val batch = source.getBatch(current, available) + logError(s"Retrieving data from $source: $current -> $available") + Some(source -> batch) + case _ => None + }.toMap + // A list of attributes that will need to be updated. var replacements = new ArrayBuffer[(Attribute, Attribute)] // Replace sources in the logical plan with data that has arrived since the last batch. val withNewSources = logicalPlan transform { case StreamingRelation(source, output) => - val prevOffset = streamProgress.get(source) - val newBatch = source.getNextBatch(prevOffset) - - newBatch.map { batch => - newOffsets += ((source, batch.end)) - val newPlan = batch.data.logicalPlan - - assert(output.size == newPlan.output.size) + newData.get(source).map { data => + val newPlan = data.logicalPlan + assert(output.size == newPlan.output.size, + s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}") replacements ++= output.zip(newPlan.output) newPlan }.getOrElse { @@ -197,35 +260,24 @@ class StreamExecution( case a: Attribute if replacementMap.contains(a) => replacementMap(a) } - if (newOffsets.nonEmpty) { - val optimizerStart = System.nanoTime() - - lastExecution = new QueryExecution(sqlContext, newPlan) - val executedPlan = lastExecution.executedPlan - val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000 - logDebug(s"Optimized batch in ${optimizerTime}ms") + val optimizerStart = System.nanoTime() - streamProgress.synchronized { - // Update the offsets and calculate a new composite offset - newOffsets.foreach(streamProgress.update) + lastExecution = new QueryExecution(sqlContext, newPlan) + val executedPlan = lastExecution.executedPlan + val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000 + logDebug(s"Optimized batch in ${optimizerTime}ms") - // Construct the batch and send it to the sink. - val batchOffset = streamProgress.toCompositeOffset(sources) - val nextBatch = new Batch(batchOffset, Dataset.newDataFrame(sqlContext, newPlan)) - sink.addBatch(nextBatch) - } - - awaitBatchLock.synchronized { - // Wake up any threads that are waiting for the stream to progress. - awaitBatchLock.notifyAll() - } + val nextBatch = Dataset.newDataFrame(sqlContext, newPlan) + sink.addBatch(currentBatchId - 1, nextBatch) - val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 - logInfo(s"Completed up to $newOffsets in ${batchTime}ms") - postEvent(new QueryProgress(this)) + awaitBatchLock.synchronized { + // Wake up any threads that are waiting for the stream to progress. + awaitBatchLock.notifyAll() } - logDebug(s"Waiting for data, current: $streamProgress") + val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 + logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") + postEvent(new QueryProgress(this)) } private def postEvent(event: ContinuousQueryListener.Event) { @@ -252,8 +304,8 @@ class StreamExecution( * least the given `Offset`. This method is indented for use primarily when writing tests. */ def awaitOffset(source: Source, newOffset: Offset): Unit = { - def notDone = streamProgress.synchronized { - !streamProgress.contains(source) || streamProgress(source) < newOffset + def notDone = committedOffsets.synchronized { + !committedOffsets.contains(source) || committedOffsets(source) < newOffset } while (notDone) { @@ -297,7 +349,7 @@ class StreamExecution( s""" |=== Continuous Query === |Name: $name - |Current Offsets: $streamProgress + |Current Offsets: $committedOffsets | |Current State: $state |Thread State: ${microBatchThread.getState} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index d45b9bd9838c1..fa9522c004642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -17,26 +17,25 @@ package org.apache.spark.sql.execution.streaming +import scala.collection import scala.collection.mutable /** * A helper class that looks like a Map[Source, Offset]. */ -class StreamProgress { +class StreamProgress extends scala.collection.Map[Source, Offset] { private val currentOffsets = new mutable.HashMap[Source, Offset] private[streaming] def update(source: Source, newOffset: Offset): Unit = { currentOffsets.get(source).foreach(old => - assert(newOffset > old, s"Stream going backwards $newOffset -> $old")) + assert(newOffset >= old, s"Stream going backwards $newOffset -> $old")) currentOffsets.put(source, newOffset) } private[streaming] def update(newOffset: (Source, Offset)): Unit = update(newOffset._1, newOffset._2) - private[streaming] def apply(source: Source): Offset = currentOffsets(source) - private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source) - private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source) + def get(source: Source): Option[Offset] = currentOffsets.get(source) private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = { val updated = new StreamProgress @@ -68,4 +67,21 @@ class StreamProgress { } override def hashCode: Int = currentOffsets.hashCode() + + override def iterator: Iterator[(Source, Offset)] = { + currentOffsets.toIterator + } + + override def -(key: Source): StreamProgress = { + val copied = new StreamProgress + currentOffsets.foreach { + case (k, v) if k != key => copied.update(k, v) + case _ => + } + copied + } + + override def +[B1 >: Offset](kv: (Source, B1)): collection.Map[Source, B1] = { + ++(Map(kv)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index a6504cd088b7f..452df2ab2bb9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -51,8 +51,6 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) protected var currentOffset: LongOffset = new LongOffset(-1) - protected def blockManager = SparkEnv.get.blockManager - def schema: StructType = encoder.schema def toDS()(implicit sqlContext: SQLContext): Dataset[A] = { @@ -78,25 +76,31 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } - override def getNextBatch(start: Option[Offset]): Option[Batch] = synchronized { - val newBlocks = - batches.drop( - start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1) - - if (newBlocks.nonEmpty) { - logDebug(s"Running [$start, $currentOffset] on blocks ${newBlocks.mkString(", ")}") - val df = newBlocks - .map(_.toDF()) - .reduceOption(_ unionAll _) - .getOrElse(sqlContext.emptyDataFrame) + override def toString: String = s"MemoryStream[${output.mkString(",")}]" - Some(new Batch(currentOffset, df)) - } else { - None - } + override def getOffset: Option[Offset] = if (batches.isEmpty) { + None + } else { + Some(currentOffset) } - override def toString: String = s"MemoryStream[${output.mkString(",")}]" + /** + * Returns the next batch of data that is available after `start`, if any is available. + */ + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + val startOrdinal = + start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 + val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 + val newBlocks = batches.slice(startOrdinal, endOrdinal) + + logError(s"Running [$startOrdinal, $endOrdinal] on blocks ${newBlocks.mkString(", ")}") + newBlocks + .map(_.toDF()) + .reduceOption(_ unionAll _) + .getOrElse { + sys.error("No data selected!") + } + } } /** @@ -105,45 +109,30 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) */ class MemorySink(schema: StructType) extends Sink with Logging { /** An order list of batches that have been written to this [[Sink]]. */ - private var batches = new ArrayBuffer[Batch]() - - /** Used to convert an [[InternalRow]] to an external [[Row]] for comparison in testing. */ - private val externalRowConverter = RowEncoder(schema) - - override def currentOffset: Option[Offset] = synchronized { - batches.lastOption.map(_.end) - } - - override def addBatch(nextBatch: Batch): Unit = synchronized { - nextBatch.data.collect() // 'compute' the batch's data and record the batch - batches.append(nextBatch) - } + private val batches = new ArrayBuffer[Array[Row]]() /** Returns all rows that are stored in this [[Sink]]. */ def allData: Seq[Row] = synchronized { - batches - .map(_.data) - .reduceOption(_ unionAll _) - .map(_.collect().toSeq) - .getOrElse(Seq.empty) - } - - /** - * Atomically drops the most recent `num` batches and resets the [[StreamProgress]] to the - * corresponding point in the input. This function can be used when testing to simulate data - * that has been lost due to buffering. - */ - def dropBatches(num: Int): Unit = synchronized { - batches.dropRight(num) + batches.flatten } def toDebugString: String = synchronized { - batches.map { b => - val dataStr = try b.data.collect().mkString(" ") catch { + batches.zipWithIndex.map { case (b, i) => + val dataStr = try b.mkString(" ") catch { case NonFatal(e) => "[Error converting to string]" } - s"${b.end}: $dataStr" + s"$i: $dataStr" }.mkString("\n") } + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + logError(s"Committing $batchId to memory") + if (batchId == batches.size) { + logError(s"Growing for batch $batchId") + batches.append(data.collect()) + } else { + batches(batchId.toInt) = data.collect() + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 81078dc6a0450..a350b30bbc344 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql import java.lang.Thread.UncaughtExceptionHandler +import org.apache.spark.util.Utils + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.language.experimental.macros @@ -126,8 +128,6 @@ trait StreamTest extends QueryTest with Timeouts { override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}" } - case class DropBatches(num: Int) extends StreamAction - /** Stops the stream. It must currently be running. */ case object StopStream extends StreamAction with StreamMustBeRunning @@ -202,7 +202,7 @@ trait StreamTest extends QueryTest with Timeouts { }.mkString("\n") def currentOffsets = - if (currentStream != null) currentStream.streamProgress.toString else "not started" + if (currentStream != null) currentStream.committedOffsets.toString else "not started" def threadState = if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead" @@ -266,6 +266,7 @@ trait StreamTest extends QueryTest with Timeouts { } val testThread = Thread.currentThread() + val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath try { startedTest.foreach { action => @@ -276,7 +277,7 @@ trait StreamTest extends QueryTest with Timeouts { currentStream = sqlContext .streams - .startQuery(StreamExecution.nextName, stream, sink) + .startQuery(StreamExecution.nextName, metadataRoot, stream, sink) .asInstanceOf[StreamExecution] currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { @@ -308,10 +309,6 @@ trait StreamTest extends QueryTest with Timeouts { currentStream = null } - case DropBatches(num) => - verify(currentStream == null, "dropping batches while running leads to corruption") - sink.dropBatches(num) - case ef: ExpectFailure[_] => verify(currentStream != null, "can not expect failure when stream is not running") try failAfter(streamingTimeout) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 45e824ad6353e..33a424c43fe39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming +import org.apache.spark.util.Utils + import scala.concurrent.Future import scala.util.Random import scala.util.control.NonFatal @@ -235,9 +237,14 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with @volatile var query: StreamExecution = null try { val df = ds.toDF + val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath query = sqlContext .streams - .startQuery(StreamExecution.nextName, df, new MemorySink(df.schema)) + .startQuery( + StreamExecution.nextName, + metadataRoot, + df, + new MemorySink(df.schema)) .asInstanceOf[StreamExecution] } catch { case NonFatal(e) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index 84ed017a9d0d4..deafff8a751c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -54,7 +54,7 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext { TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), AssertOnQuery( - q => q.exception.get.startOffset.get === q.streamProgress.toCompositeOffset(Seq(inputData)), + q => q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)), "incorrect start offset on exception") ) } @@ -68,19 +68,19 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext { AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")), AssertOnQuery(_.sourceStatuses(0).offset === None), AssertOnQuery(_.sinkStatus.description.contains("Memory")), - AssertOnQuery(_.sinkStatus.offset === None), + AssertOnQuery(_.sinkStatus.offset === new CompositeOffset(None :: Nil)), AddData(inputData, 1, 2), CheckAnswer(6, 3), AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(0))), - AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0)))), + AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))), AddData(inputData, 1, 2), CheckAnswer(6, 3, 6, 3), AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(1))), - AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(1)))), + AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))), AddData(inputData, 0), ExpectFailure[SparkException], AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(2))), - AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(1)))) + AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index 0878277811e12..ea55ec908ec1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.streaming.test +import org.apache.spark.util.Utils import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, ContinuousQuery, SQLContext, StreamTest} -import org.apache.spark.sql.execution.streaming.{Batch, Offset, Sink, Source} +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -41,8 +42,15 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { LastOptions.parameters = parameters LastOptions.schema = schema new Source { - override def getNextBatch(start: Option[Offset]): Option[Batch] = None override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil) + + override def getOffset: Option[Offset] = Some(new LongOffset(0)) + + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + import sqlContext.implicits._ + + Seq[Int]().toDS().toDF() + } } } @@ -53,8 +61,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { LastOptions.parameters = parameters LastOptions.partitionColumns = partitionColumns new Sink { - override def addBatch(batch: Batch): Unit = {} - override def currentOffset: Option[Offset] = None + override def addBatch(batchId: Long, data: DataFrame): Unit = {} } } } @@ -62,6 +69,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { import testImplicits._ + private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath + after { sqlContext.streams.active.foreach(_.stop()) } @@ -72,6 +81,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream() .write .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .startStream() .stop() } @@ -82,6 +92,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream() .write .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .startStream() .stop() } @@ -108,6 +119,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("opt1", "1") .options(Map("opt2" -> "2")) .options(map) + .option("metadata", newMetadataDir) .startStream() .stop() @@ -123,12 +135,14 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B df.write .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .startStream() .stop() assert(LastOptions.partitionColumns == Nil) df.write .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .partitionBy("a") .startStream() .stop() @@ -137,6 +151,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B withSQLConf("spark.sql.caseSensitive" -> "false") { df.write .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .partitionBy("A") .startStream() .stop() @@ -146,6 +161,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B intercept[AnalysisException] { df.write .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .partitionBy("b") .startStream() .stop() @@ -155,6 +171,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B test("stream paths") { val df = sqlContext.read .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .stream("/test") assert(LastOptions.parameters("path") == "/test") @@ -163,6 +180,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B df.write .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .startStream("/test") .stop() @@ -187,6 +205,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("intOpt", 56) .option("boolOpt", false) .option("doubleOpt", 6.7) + .option("metadata", newMetadataDir) .startStream("/test") .stop() @@ -204,6 +223,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream("/test") .write .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .queryName(name) .startStream() } @@ -215,6 +235,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream("/test") .write .format("org.apache.spark.sql.streaming.test") + .option("metadata", newMetadataDir) .startStream() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 4c18e38db8280..90ca7998bd138 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -347,11 +347,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { val textSource2 = createFileStreamSource("text", src.getCanonicalPath) assert(textSource2.currentOffset === textSource.currentOffset) - assertBatch(textSource2.getNextBatch(None), textSource.getNextBatch(None)) - for (f <- 0L to textSource.currentOffset.offset) { - val offset = LongOffset(f) - assertBatch(textSource2.getNextBatch(Some(offset)), textSource.getNextBatch(Some(offset))) - } +// assertBatch(textSource2.getNextBatch(None), textSource.getNextBatch(None)) +// for (f <- 0L to textSource.currentOffset.offset) { +// val offset = LongOffset(f) +// assertBatch(textSource2.getNextBatch(Some(offset)), textSource.getNextBatch(Some(offset))) +// } Utils.deleteRecursively(src) Utils.deleteRecursively(tmp) From 97503f1b45d45c4e4524cf43853e9faab43d0032 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 17 Mar 2016 16:58:34 -0700 Subject: [PATCH 2/7] cleanup --- .../spark/sql/ContinuousQueryManager.scala | 4 +- .../apache/spark/sql/DataFrameWriter.scala | 13 ++++-- .../execution/streaming/CompositeOffset.scala | 7 +++ .../execution/streaming/HDFSMetadataLog.scala | 6 +-- .../spark/sql/execution/streaming/Sink.scala | 10 ++-- .../sql/execution/streaming/Source.scala | 5 +- .../execution/streaming/StreamExecution.scala | 46 ++++++++++++++----- .../sql/execution/streaming/memory.scala | 8 ++-- .../apache/spark/sql/internal/SQLConf.scala | 7 +++ .../org/apache/spark/sql/StreamTest.scala | 3 +- .../ContinuousQueryManagerSuite.scala | 3 +- .../sql/streaming/ContinuousQuerySuite.scala | 3 +- .../DataFrameReaderWriterSuite.scala | 26 +++++------ .../sql/streaming/FileStreamSourceSuite.scala | 18 -------- 14 files changed, 94 insertions(+), 65 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 2709c60721be1..fa8219bbed0d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -166,7 +166,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) { /** Start a query */ private[sql] def startQuery( name: String, - metadataRoot: String, + checkpointLocation: String, df: DataFrame, sink: Sink): ContinuousQuery = { activeQueriesLock.synchronized { @@ -174,7 +174,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) { throw new IllegalArgumentException( s"Cannot start query with name $name as a query with that name is already active") } - val query = new StreamExecution(sqlContext, name, metadataRoot, df.logicalPlan, sink) + val query = new StreamExecution(sqlContext, name, checkpointLocation, df.logicalPlan, sink) query.start() activeQueries.put(name, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 07a364afa77a9..65a558dc9e3ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -21,6 +21,8 @@ import java.util.Properties import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -241,11 +243,14 @@ final class DataFrameWriter private[sql](df: DataFrame) { className = source, options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil)) - // TODO: promote query name out of options - // TODO: figure out how metadata dir fits in the API + + val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName) + val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { + new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString + }) df.sqlContext.sessionState.continuousQueryManager.startQuery( - extraOptions.getOrElse("queryName", StreamExecution.nextName), - extraOptions.getOrElse("metadata", sys.error("metadata must be specified")), + queryName, + checkpointLocation, df, dataSource.createSink()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 34268e0538bca..3789c64b55bd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -53,6 +53,13 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { case i if i > 0 => 1 } + /** + * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of + * sources. + * + * This method is typically used to associate a serialized offset with actual sources (which + * cannot be serialized). + */ def toStreamProgress( sources: Seq[Source], dest: StreamProgress = new StreamProgress): StreamProgress = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 68b5c0368b6ae..18500f8efcd7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -27,7 +27,7 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission -import org.apache.spark.Logging +import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer import org.apache.spark.sql.SQLContext @@ -116,7 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) try { // Try to commit the batch // It will fail if there is an existing file (someone has committed the batch) - logError(s"Attempting to write log ${batchFile(batchId)}") + logInfo(s"Attempting to write log ${batchFile(batchId)}") fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE) return } catch { @@ -165,7 +165,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) val bytes = IOUtils.toByteArray(input) Some(serializer.deserialize[T](ByteBuffer.wrap(bytes))) } else { - logError(s"Unable to find batch $batchMetadataFile") + logDebug(s"Unable to find batch $batchMetadataFile") None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala index d1e3d0876c346..25015d58f75ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala @@ -20,14 +20,16 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.DataFrame /** - * An interface for systems that can collect the results of a streaming query. - * - * TODO + * An interface for systems that can collect the results of a streaming query. In order to preserve + * exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same + * batch. */ trait Sink { /** - * TODO + * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if + * this method is called more than once with the same batchId (which will happen in the case of + * failures), then `data` should only be added once. */ def addBatch(batchId: Long, data: DataFrame): Unit } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index f3b3dff222290..6457f928ed887 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -30,10 +30,13 @@ trait Source { /** Returns the schema of the data from this source */ def schema: StructType + /** Returns the maximum available offset for this source. */ def getOffset: Option[Offset] /** - * Returns the next batch of data that is available after `start`, if any is available. + * Returns the data that is is between the offsets (`start`, `end`]. When `start` is `None` then + * the batch should begin with the first available record. This method must always return the + * same data for a particular `start` and `end` pair. */ def getBatch(start: Option[Offset], end: Offset): DataFrame } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index dc0032fc852b3..39dd687b51e6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger -import org.apache.hadoop.fs.Path - import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} @@ -61,14 +61,20 @@ class StreamExecution( */ private[sql] val committedOffsets = new StreamProgress + /** + * Tracks the offsets that are available to be processed, but have not yet be committed to the + * sink. + */ private[sql] val availableOffsets = new StreamProgress + /** The current batchId or -1 if execution has not yet been initialized. */ private[sql] var currentBatchId: Long = -1 /** All stream sources present the query plan. */ private val sources = logicalPlan.collect { case s: StreamingRelation => s.source } + /** A list of unique sources in the query plan. */ private val uniqueSources = sources.distinct /** Defines the internal state of execution */ @@ -86,6 +92,12 @@ class StreamExecution( override def run(): Unit = { runBatches() } } + /** + * A write-ahead-log that records the offsets that are present in each batch. In order to ensure + * that a given batch will always consist of the same data, we write to this log *before* any + * processing is done. Thus, the Nth record in this log indicated data that is currently being + * processed and the N-1th entry indicates which offsets have been durably committed to the sink. + */ val offsetLog = new HDFSMetadataLog[Offset](sqlContext, metadataDirectory("offsets")) /** Whether the query is currently active or not */ @@ -136,7 +148,7 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SQLContext.setActive(sqlContext) populateStartOffsets() - logError(s"Stream running from $committedOffsets to $availableOffsets") + logDebug(s"Stream running from $committedOffsets to $availableOffsets") while (isActive) { if (dataAvailable) attemptBatch() commitAndConstructNextBatch() @@ -161,25 +173,28 @@ class StreamExecution( /** * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). + * (i.e. avoid reprocessing data that we have already processed). This function must be called + * before any processing occurs and will populate the following fields: + * - currentBatchId + * - committedOffsets + * - availableOffsets */ private def populateStartOffsets(): Unit = { offsetLog.getLatest() match { case Some((batchId, nextOffsets: CompositeOffset)) => - logError(s"Resuming continuous query, starting with batch $batchId") + logInfo(s"Resuming continuous query, starting with batch $batchId") currentBatchId = batchId + 1 nextOffsets.toStreamProgress(sources, availableOffsets) - logError(s"Found possibly uncommitted offsets $availableOffsets") + logDebug(s"Found possibly uncommitted offsets $availableOffsets") - logError(s"Attempting to restore ${batchId - 1}") offsetLog.get(batchId - 1).foreach { case lastOffsets: CompositeOffset => lastOffsets.toStreamProgress(sources, committedOffsets) - logError(s"Resuming with committed offsets: $committedOffsets") + logDebug(s"Resuming with committed offsets: $committedOffsets") } case None => // We are starting this stream for the first time. - logError(s"Starting new continuous query.") + logInfo(s"Starting new continuous query.") currentBatchId = 0 commitAndConstructNextBatch() @@ -188,6 +203,9 @@ class StreamExecution( } } + /** + * Returns true if there is any new data available to be processed. + */ def dataAvailable: Boolean = { availableOffsets.exists { case (source, available) => @@ -199,7 +217,12 @@ class StreamExecution( } /** + * Queries all of the sources to see if any new data is available. When there is new data the + * batchId counter is incremented and a new log entry is written with the newest offsets. * + * Note that committing the offsets for a new batch implicitly marks the previous batch as + * finished and thus this method should only be called when all currently available data + * has been written to the sink. */ def commitAndConstructNextBatch(): Boolean = committedOffsets.synchronized { // Update committed offsets. @@ -211,7 +234,7 @@ class StreamExecution( } if (dataAvailable) { - logError(s"Commiting offsets for batch $currentBatchId.") + logInfo(s"Commiting offsets for batch $currentBatchId.") assert( offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), "Concurrent update to the log. Multiple streaming jobs detected.") @@ -229,11 +252,12 @@ class StreamExecution( private def attemptBatch(): Unit = { val startTime = System.nanoTime() + // Request unprocessed data from all sources. val newData = availableOffsets.flatMap { case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) => val current = committedOffsets.get(source) val batch = source.getBatch(current, available) - logError(s"Retrieving data from $source: $current -> $available") + logDebug(s"Retrieving data from $source: $current -> $available") Some(source -> batch) case _ => None }.toMap diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 452df2ab2bb9e..8c2bb4abd5f6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -93,7 +93,8 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 val newBlocks = batches.slice(startOrdinal, endOrdinal) - logError(s"Running [$startOrdinal, $endOrdinal] on blocks ${newBlocks.mkString(", ")}") + logDebug( + s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") newBlocks .map(_.toDF()) .reduceOption(_ unionAll _) @@ -126,12 +127,11 @@ class MemorySink(schema: StructType) extends Sink with Logging { } override def addBatch(batchId: Long, data: DataFrame): Unit = { - logError(s"Committing $batchId to memory") if (batchId == batches.size) { - logError(s"Growing for batch $batchId") + logDebug(s"Committing batch $batchId") batches.append(data.collect()) } else { - batches(batchId.toInt) = data.collect() + logDebug(s"Skipping already committed batch: $batchId") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9aabe2d0abe1c..d88fe19afd56e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -514,6 +514,11 @@ object SQLConf { doc = "When true, the planner will try to find out duplicated exchanges and re-use them", isPublic = false) + val CHECKPOINT_LOCATION = stringConf("spark.sql.streaming.checkpointLocation", + defaultValue = None, + doc = "The default location for storing checkpoint data for continuously executing queries.", + isPublic = true) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val EXTERNAL_SORT = "spark.sql.planner.externalSort" @@ -543,6 +548,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin /** ************************ Spark SQL Params/Hints ******************* */ + def checkpointLocation: String = getConf(CHECKPOINT_LOCATION) + def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) def useCompression: Boolean = getConf(COMPRESS_CACHED) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index a350b30bbc344..2dd6416853a2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql import java.lang.Thread.UncaughtExceptionHandler -import org.apache.spark.util.Utils - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.language.experimental.macros @@ -39,6 +37,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.util.Utils /** * A framework for implementing tests for streaming queries and sources. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 33a424c43fe39..54ce98d195e25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.streaming -import org.apache.spark.util.Utils - import scala.concurrent.Future import scala.util.Random import scala.util.control.NonFatal @@ -33,6 +31,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest} import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation} import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index deafff8a751c4..3be0ea481dc53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -54,7 +54,8 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext { TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), AssertOnQuery( - q => q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)), + q => + q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)), "incorrect start offset on exception") ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index ea55ec908ec1a..c1bab9b577bbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.streaming.test -import org.apache.spark.util.Utils import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ @@ -25,6 +24,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.util.Utils object LastOptions { var parameters: Map[String, String] = null @@ -81,7 +81,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream() .write .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .startStream() .stop() } @@ -92,7 +92,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream() .write .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .startStream() .stop() } @@ -119,7 +119,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("opt1", "1") .options(Map("opt2" -> "2")) .options(map) - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .startStream() .stop() @@ -135,14 +135,14 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B df.write .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .startStream() .stop() assert(LastOptions.partitionColumns == Nil) df.write .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .partitionBy("a") .startStream() .stop() @@ -151,7 +151,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B withSQLConf("spark.sql.caseSensitive" -> "false") { df.write .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .partitionBy("A") .startStream() .stop() @@ -161,7 +161,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B intercept[AnalysisException] { df.write .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .partitionBy("b") .startStream() .stop() @@ -171,7 +171,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B test("stream paths") { val df = sqlContext.read .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .stream("/test") assert(LastOptions.parameters("path") == "/test") @@ -180,7 +180,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B df.write .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .startStream("/test") .stop() @@ -205,7 +205,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("intOpt", 56) .option("boolOpt", false) .option("doubleOpt", 6.7) - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .startStream("/test") .stop() @@ -223,7 +223,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream("/test") .write .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .queryName(name) .startStream() } @@ -235,7 +235,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream("/test") .write .format("org.apache.spark.sql.streaming.test") - .option("metadata", newMetadataDir) + .option("checkpointLocation", newMetadataDir) .startStream() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 90ca7998bd138..89de15acf506d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -318,16 +318,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("fault tolerance") { - def assertBatch(batch1: Option[Batch], batch2: Option[Batch]): Unit = { - (batch1, batch2) match { - case (Some(b1), Some(b2)) => - assert(b1.end === b2.end) - assert(b1.data.as[String].collect() === b2.data.as[String].collect()) - case (None, None) => - case _ => fail(s"batch ($batch1) is not equal to batch ($batch2)") - } - } - val src = Utils.createTempDir("streaming.src") val tmp = Utils.createTempDir("streaming.tmp") @@ -345,14 +335,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") ) - val textSource2 = createFileStreamSource("text", src.getCanonicalPath) - assert(textSource2.currentOffset === textSource.currentOffset) -// assertBatch(textSource2.getNextBatch(None), textSource.getNextBatch(None)) -// for (f <- 0L to textSource.currentOffset.offset) { -// val offset = LongOffset(f) -// assertBatch(textSource2.getNextBatch(Some(offset)), textSource.getNextBatch(Some(offset))) -// } - Utils.deleteRecursively(src) Utils.deleteRecursively(tmp) } From 23c5a057824db20108a912194469d7930842b8f6 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 18 Mar 2016 13:14:05 -0700 Subject: [PATCH 3/7] update tests --- .../spark/sql/util/ContinuousQueryListenerSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala index 52783281abb00..d04783ecacbb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala @@ -61,7 +61,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with // The source and sink offsets must be None as this must be called before the // batches have started assert(status.sourceStatuses(0).offset === None) - assert(status.sinkStatus.offset === None) + assert(status.sinkStatus.offset === CompositeOffset(None :: Nil)) // No progress events or termination events assert(listener.progressStatuses.isEmpty) @@ -78,7 +78,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with assert(status != null) assert(status.active == true) assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) - assert(status.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0)))) + assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) // No termination events assert(listener.terminationStatus === null) @@ -92,7 +92,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with assert(status.active === false) // must be inactive by the time onQueryTerm is called assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) - assert(status.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0)))) + assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) } listener.checkAsyncErrors() } From 3c893a30dba2dfbaf9446e5f186edc81b4917467 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 21 Mar 2016 15:40:13 -0700 Subject: [PATCH 4/7] comments --- .../execution/streaming/StreamExecution.scala | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 39dd687b51e6d..6927ea7dc0bfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.util.ContinuousQueryListener._ class StreamExecution( val sqlContext: SQLContext, override val name: String, - val metadataRoot: String, + val checkpointRoot: String, private[sql] val logicalPlan: LogicalPlan, val sink: Sink) extends ContinuousQuery with Logging { @@ -65,10 +65,10 @@ class StreamExecution( * Tracks the offsets that are available to be processed, but have not yet be committed to the * sink. */ - private[sql] val availableOffsets = new StreamProgress + private val availableOffsets = new StreamProgress /** The current batchId or -1 if execution has not yet been initialized. */ - private[sql] var currentBatchId: Long = -1 + private var currentBatchId: Long = -1 /** All stream sources present the query plan. */ private val sources = @@ -98,7 +98,8 @@ class StreamExecution( * processing is done. Thus, the Nth record in this log indicated data that is currently being * processed and the N-1th entry indicates which offsets have been durably committed to the sink. */ - val offsetLog = new HDFSMetadataLog[Offset](sqlContext, metadataDirectory("offsets")) + private val offsetLog = + new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets")) /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE @@ -115,8 +116,9 @@ class StreamExecution( /** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */ override def exception: Option[ContinuousQueryException] = Option(streamDeathCause) - private def metadataDirectory(name: String): String = - new Path(new Path(metadataRoot), name).toUri.toString + /** Returns the path of a file with `name` in the checkpoint directory. */ + private def checkpointFile(name: String): String = + new Path(new Path(checkpointRoot), name).toUri.toString /** * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event @@ -150,7 +152,7 @@ class StreamExecution( populateStartOffsets() logDebug(s"Stream running from $committedOffsets to $availableOffsets") while (isActive) { - if (dataAvailable) attemptBatch() + if (dataAvailable) runBatch() commitAndConstructNextBatch() Thread.sleep(minBatchTime) // TODO: Could be tighter } @@ -173,7 +175,7 @@ class StreamExecution( /** * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). This function must be called + * (i.e. avoid reprocessing data that we have already processed). This function must be called * before any processing occurs and will populate the following fields: * - currentBatchId * - committedOffsets @@ -181,14 +183,14 @@ class StreamExecution( */ private def populateStartOffsets(): Unit = { offsetLog.getLatest() match { - case Some((batchId, nextOffsets: CompositeOffset)) => + case Some((batchId, nextOffsets)) => logInfo(s"Resuming continuous query, starting with batch $batchId") currentBatchId = batchId + 1 nextOffsets.toStreamProgress(sources, availableOffsets) logDebug(s"Found possibly uncommitted offsets $availableOffsets") offsetLog.get(batchId - 1).foreach { - case lastOffsets: CompositeOffset => + case lastOffsets => lastOffsets.toStreamProgress(sources, committedOffsets) logDebug(s"Resuming with committed offsets: $committedOffsets") } @@ -197,16 +199,13 @@ class StreamExecution( logInfo(s"Starting new continuous query.") currentBatchId = 0 commitAndConstructNextBatch() - - case Some((_, offset)) => - sys.error(s"Invalid offset $offset") } } /** * Returns true if there is any new data available to be processed. */ - def dataAvailable: Boolean = { + private def dataAvailable: Boolean = { availableOffsets.exists { case (source, available) => committedOffsets @@ -224,7 +223,7 @@ class StreamExecution( * finished and thus this method should only be called when all currently available data * has been written to the sink. */ - def commitAndConstructNextBatch(): Boolean = committedOffsets.synchronized { + private def commitAndConstructNextBatch(): Boolean = committedOffsets.synchronized { // Update committed offsets. availableOffsets.foreach(committedOffsets.update) @@ -246,10 +245,9 @@ class StreamExecution( } /** - * Checks to see if any new data is present in any of the sources. When new data is available, - * a batch is executed and passed to the sink, updating the currentOffsets. + * Processes any data available between `availableOffsets` and `committedOffsets`. */ - private def attemptBatch(): Unit = { + private def runBatch(): Unit = { val startTime = System.nanoTime() // Request unprocessed data from all sources. From 636e75c3ffdcc07da0c8b9f493fb67bd37b45b68 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 21 Mar 2016 16:12:39 -0700 Subject: [PATCH 5/7] make StreamProgress Immutable --- .../execution/streaming/CompositeOffset.scala | 7 +-- .../execution/streaming/StreamExecution.scala | 21 +++---- .../execution/streaming/StreamProgress.scala | 62 ++++--------------- 3 files changed, 22 insertions(+), 68 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 3789c64b55bd4..e48ac598929ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -60,12 +60,9 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { * This method is typically used to associate a serialized offset with actual sources (which * cannot be serialized). */ - def toStreamProgress( - sources: Seq[Source], - dest: StreamProgress = new StreamProgress): StreamProgress = { + def toStreamProgress(sources: Seq[Source]): StreamProgress = { assert(sources.size == offsets.size) - sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }.foreach(dest.update) - dest + new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6927ea7dc0bfe..5d736b58754d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -59,13 +59,13 @@ class StreamExecution( * Tracks how much data we have processed and committed to the sink or state store from each * input source. */ - private[sql] val committedOffsets = new StreamProgress + private[sql] var committedOffsets = new StreamProgress /** * Tracks the offsets that are available to be processed, but have not yet be committed to the * sink. */ - private val availableOffsets = new StreamProgress + private var availableOffsets = new StreamProgress /** The current batchId or -1 if execution has not yet been initialized. */ private var currentBatchId: Long = -1 @@ -186,12 +186,12 @@ class StreamExecution( case Some((batchId, nextOffsets)) => logInfo(s"Resuming continuous query, starting with batch $batchId") currentBatchId = batchId + 1 - nextOffsets.toStreamProgress(sources, availableOffsets) + availableOffsets = nextOffsets.toStreamProgress(sources) logDebug(s"Found possibly uncommitted offsets $availableOffsets") offsetLog.get(batchId - 1).foreach { case lastOffsets => - lastOffsets.toStreamProgress(sources, committedOffsets) + committedOffsets = lastOffsets.toStreamProgress(sources) logDebug(s"Resuming with committed offsets: $committedOffsets") } @@ -223,14 +223,13 @@ class StreamExecution( * finished and thus this method should only be called when all currently available data * has been written to the sink. */ - private def commitAndConstructNextBatch(): Boolean = committedOffsets.synchronized { + private def commitAndConstructNextBatch(): Boolean = { // Update committed offsets. - availableOffsets.foreach(committedOffsets.update) + committedOffsets ++= availableOffsets // Check to see what new data is available. - uniqueSources.foreach { source => - source.getOffset.foreach(availableOffsets.update(source, _)) - } + val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) + availableOffsets ++= newData if (dataAvailable) { logInfo(s"Commiting offsets for batch $currentBatchId.") @@ -326,9 +325,7 @@ class StreamExecution( * least the given `Offset`. This method is indented for use primarily when writing tests. */ def awaitOffset(source: Source, newOffset: Offset): Unit = { - def notDone = committedOffsets.synchronized { - !committedOffsets.contains(source) || committedOffsets(source) < newOffset - } + def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset while (notDone) { logInfo(s"Waiting until $newOffset at $source") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index fa9522c004642..405a5f0387a7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -17,71 +17,31 @@ package org.apache.spark.sql.execution.streaming -import scala.collection -import scala.collection.mutable +import scala.collection.{immutable, GenTraversableOnce} /** * A helper class that looks like a Map[Source, Offset]. */ -class StreamProgress extends scala.collection.Map[Source, Offset] { - private val currentOffsets = new mutable.HashMap[Source, Offset] - - private[streaming] def update(source: Source, newOffset: Offset): Unit = { - currentOffsets.get(source).foreach(old => - assert(newOffset >= old, s"Stream going backwards $newOffset -> $old")) - currentOffsets.put(source, newOffset) - } - - private[streaming] def update(newOffset: (Source, Offset)): Unit = - update(newOffset._1, newOffset._2) - - def get(source: Source): Option[Offset] = currentOffsets.get(source) - - private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = { - val updated = new StreamProgress - currentOffsets.foreach(updated.update) - updates.foreach(updated.update) - updated - } - - /** - * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable, - * it should be copied before being passed to user code. - */ - private[streaming] def copy(): StreamProgress = { - val copied = new StreamProgress - currentOffsets.foreach(copied.update) - copied - } +class StreamProgress( + val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset]) + extends scala.collection.immutable.Map[Source, Offset] { private[sql] def toCompositeOffset(source: Seq[Source]): CompositeOffset = { CompositeOffset(source.map(get)) } override def toString: String = - currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}") + baseMap.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}") - override def equals(other: Any): Boolean = other match { - case s: StreamProgress => currentOffsets == s.currentOffsets - case _ => false - } + override def +[B1 >: Offset](kv: (Source, B1)): Map[Source, B1] = baseMap + kv - override def hashCode: Int = currentOffsets.hashCode() + override def get(key: Source): Option[Offset] = baseMap.get(key) - override def iterator: Iterator[(Source, Offset)] = { - currentOffsets.toIterator - } + override def iterator: Iterator[(Source, Offset)] = baseMap.iterator - override def -(key: Source): StreamProgress = { - val copied = new StreamProgress - currentOffsets.foreach { - case (k, v) if k != key => copied.update(k, v) - case _ => - } - copied - } + override def -(key: Source): Map[Source, Offset] = baseMap - key - override def +[B1 >: Offset](kv: (Source, B1)): collection.Map[Source, B1] = { - ++(Map(kv)) + def ++(updates: GenTraversableOnce[(Source, Offset)]): StreamProgress = { + new StreamProgress(baseMap ++ updates) } } From 3fc2fe21c5b61508a6038f60b1c5e6399a1585c7 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 21 Mar 2016 22:47:44 -0700 Subject: [PATCH 6/7] try to fix tests --- .../execution/datasources/DataSource.scala | 3 ++- .../org/apache/spark/sql/StreamTest.scala | 6 +++++ .../DataFrameReaderWriterSuite.scala | 24 +++++++++---------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 6116cce17e756..8b1bb10237598 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -162,7 +162,8 @@ case class DataSource( paths = files, userSpecifiedSchema = Some(dataSchema), className = className, - options = options.filterKeys(_ != "path")).resolveRelation())) + options = + new CaseInsensitiveMap(options.filterKeys(_ != "path"))).resolveRelation())) } new FileStreamSource( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 2dd6416853a2e..f356cde9cf87a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -65,6 +65,12 @@ import org.apache.spark.util.Utils */ trait StreamTest extends QueryTest with Timeouts { + implicit class RichContinuousQuery(cq: ContinuousQuery) { + def stopQuietly(): Unit = quietly { + cq.stop() + } + } + implicit class RichSource(s: Source) { def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index c1bab9b577bbb..e485aa837b7ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -72,7 +72,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath after { - sqlContext.streams.active.foreach(_.stop()) + sqlContext.streams.active.foreach(_.stopQuietly()) } test("resolve default source") { @@ -83,7 +83,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream() - .stop() + .stopQuietly() } test("resolve full class") { @@ -94,7 +94,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream() - .stop() + .stopQuietly() } test("options") { @@ -121,7 +121,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .options(map) .option("checkpointLocation", newMetadataDir) .startStream() - .stop() + .stopQuietly() assert(LastOptions.parameters("opt1") == "1") assert(LastOptions.parameters("opt2") == "2") @@ -137,7 +137,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream() - .stop() + .stopQuietly() assert(LastOptions.partitionColumns == Nil) df.write @@ -145,7 +145,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("checkpointLocation", newMetadataDir) .partitionBy("a") .startStream() - .stop() + .stopQuietly() assert(LastOptions.partitionColumns == Seq("a")) withSQLConf("spark.sql.caseSensitive" -> "false") { @@ -154,7 +154,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("checkpointLocation", newMetadataDir) .partitionBy("A") .startStream() - .stop() + .stopQuietly() assert(LastOptions.partitionColumns == Seq("a")) } @@ -164,7 +164,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("checkpointLocation", newMetadataDir) .partitionBy("b") .startStream() - .stop() + .stopQuietly() } } @@ -182,7 +182,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream("/test") - .stop() + .stopQuietly() assert(LastOptions.parameters("path") == "/test") } @@ -207,7 +207,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("doubleOpt", 6.7) .option("checkpointLocation", newMetadataDir) .startStream("/test") - .stop() + .stopQuietly() assert(LastOptions.parameters("intOpt") == "56") assert(LastOptions.parameters("boolOpt") == "false") @@ -269,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B } // Should be able to start query with that name after stopping the previous query - q1.stop() + q1.stopQuietly() val q5 = startQueryWithName("name") assert(activeStreamNames.contains("name")) - sqlContext.streams.active.foreach(_.stop()) + sqlContext.streams.active.foreach(_.stopQuietly()) } } From 042d969eee0c5c3f6857cd0422053ba63ce8a39f Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 21 Mar 2016 22:54:28 -0700 Subject: [PATCH 7/7] comments --- .../spark/sql/execution/streaming/HDFSMetadataLog.scala | 2 +- .../spark/sql/execution/streaming/StreamExecution.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 18500f8efcd7b..298b5d292e8e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -116,7 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) try { // Try to commit the batch // It will fail if there is an existing file (someone has committed the batch) - logInfo(s"Attempting to write log ${batchFile(batchId)}") + logDebug(s"Attempting to write log #${batchFile(batchId)}") fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE) return } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5d736b58754d5..c5fefb5346bc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -232,11 +232,11 @@ class StreamExecution( availableOffsets ++= newData if (dataAvailable) { - logInfo(s"Commiting offsets for batch $currentBatchId.") assert( offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), - "Concurrent update to the log. Multiple streaming jobs detected.") + s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") currentBatchId += 1 + logInfo(s"Committed offsets for batch $currentBatchId.") true } else { false