-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs #20445
Closed
Closed
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
7c09b37
save for so far
brkyvz 78c50f8
Save so far
brkyvz 2777b5b
save so far
brkyvz 50a541b
Compiles and I think also runs correctly
brkyvz fd61724
save
brkyvz 7a0b564
fix merge conflicts
brkyvz a81c2ec
fix hive
brkyvz 1a4f410
Undo changes to HiveSessionStateBuilder.scala
tdas 083e93c
Merge remote-tracking branch 'apache-github/master' into HEAD
tdas a817c8d
Fixed the setOffsetRange bug
tdas 35b8854
Fixed DataSourceV2ScanExec canonicalization bug
tdas e66d809
Fixed metrics reported by MicroBatchExecution
tdas 5adf1fe
Merge remote-tracking branch 'apache-github/master' into SPARK-23092
tdas 478ad17
Reverted changes to DataSourceV2*
tdas 6389d80
Merge remote-tracking branch 'apache-github/master' into SPARK-23092
tdas 3f50f33
Updated package paths to fix compilation
tdas c713048
Store added data as rows not datasets
tdas 1204755
Fixed ForeachSinkSuite
tdas f0ce5df
Fixed bug
tdas c3508e9
Addressed comments
tdas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,11 +17,12 @@ | |
|
||
package org.apache.spark.sql.execution.streaming | ||
|
||
import java.{util => ju} | ||
import java.util.Optional | ||
import java.util.concurrent.atomic.AtomicInteger | ||
import javax.annotation.concurrent.GuardedBy | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable | ||
import scala.collection.mutable.{ArrayBuffer, ListBuffer} | ||
import scala.util.control.NonFatal | ||
|
||
|
@@ -31,7 +32,8 @@ import org.apache.spark.sql.catalyst.encoders.encoderFor | |
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} | ||
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ | ||
import org.apache.spark.sql.execution.SQLExecution | ||
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} | ||
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2} | ||
import org.apache.spark.sql.streaming.OutputMode | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.util.Utils | ||
|
@@ -51,9 +53,10 @@ object MemoryStream { | |
* available. | ||
*/ | ||
case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | ||
extends Source with Logging { | ||
extends MicroBatchReader with Logging { | ||
protected val encoder = encoderFor[A] | ||
protected val logicalPlan = StreamingExecutionRelation(this, sqlContext.sparkSession) | ||
private val attributes = encoder.schema.toAttributes | ||
protected val logicalPlan = StreamingExecutionRelation(this, attributes)(sqlContext.sparkSession) | ||
protected val output = logicalPlan.output | ||
|
||
/** | ||
|
@@ -66,15 +69,19 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
@GuardedBy("this") | ||
protected var currentOffset: LongOffset = new LongOffset(-1) | ||
|
||
@GuardedBy("this") | ||
private var startOffset = new LongOffset(-1) | ||
|
||
@GuardedBy("this") | ||
private var endOffset = new LongOffset(-1) | ||
|
||
/** | ||
* Last offset that was discarded, or -1 if no commits have occurred. Note that the value | ||
* -1 is used in calculations below and isn't just an arbitrary constant. | ||
*/ | ||
@GuardedBy("this") | ||
protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) | ||
|
||
def schema: StructType = encoder.schema | ||
|
||
def toDS(): Dataset[A] = { | ||
Dataset(sqlContext.sparkSession, logicalPlan) | ||
} | ||
|
@@ -89,7 +96,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
|
||
def addData(data: TraversableOnce[A]): Offset = { | ||
val encoded = data.toVector.map(d => encoder.toRow(d).copy()) | ||
val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming = true) | ||
val plan = new LocalRelation(attributes, encoded, isStreaming = false) | ||
val ds = Dataset[A](sqlContext.sparkSession, plan) | ||
logDebug(s"Adding ds: $ds") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need to store the batches as datasets, now that we're just collect()ing them back out in createDataReaderFactories()? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. |
||
this.synchronized { | ||
|
@@ -101,19 +108,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
|
||
override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" | ||
|
||
override def getOffset: Option[Offset] = synchronized { | ||
if (currentOffset.offset == -1) { | ||
None | ||
} else { | ||
Some(currentOffset) | ||
override def setOffsetRange(start: Optional[OffsetV2], end: Optional[OffsetV2]): Unit = { | ||
synchronized { | ||
startOffset = start.orElse(LongOffset(-1)).asInstanceOf[LongOffset] | ||
endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset] | ||
} | ||
} | ||
|
||
override def getBatch(start: Option[Offset], end: Offset): DataFrame = { | ||
override def readSchema(): StructType = encoder.schema | ||
|
||
override def deserializeOffset(json: String): OffsetV2 = LongOffset(json.toLong) | ||
|
||
override def getStartOffset: OffsetV2 = synchronized { | ||
if (startOffset.offset == -1) null else startOffset | ||
} | ||
|
||
override def getEndOffset: OffsetV2 = synchronized { | ||
if (endOffset.offset == -1) null else endOffset | ||
} | ||
|
||
override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = synchronized { | ||
// Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) | ||
val startOrdinal = | ||
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 | ||
val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 | ||
val startOrdinal = startOffset.offset.toInt + 1 | ||
val endOrdinal = endOffset.offset.toInt + 1 | ||
|
||
// Internal buffer only holds the batches after lastCommittedOffset. | ||
val newBlocks = synchronized { | ||
|
@@ -123,19 +140,11 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
batches.slice(sliceStart, sliceEnd) | ||
} | ||
|
||
if (newBlocks.isEmpty) { | ||
return sqlContext.internalCreateDataFrame( | ||
sqlContext.sparkContext.emptyRDD, schema, isStreaming = true) | ||
} | ||
|
||
logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal)) | ||
|
||
newBlocks | ||
.map(_.toDF()) | ||
.reduceOption(_ union _) | ||
.getOrElse { | ||
sys.error("No data selected!") | ||
} | ||
newBlocks.map { ds => | ||
new MemoryStreamDataReaderFactory(ds.toDF().collect()).asInstanceOf[DataReaderFactory[Row]] | ||
}.asJava | ||
} | ||
|
||
private def generateDebugString( | ||
|
@@ -153,7 +162,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
} | ||
} | ||
|
||
override def commit(end: Offset): Unit = synchronized { | ||
override def commit(end: OffsetV2): Unit = synchronized { | ||
def check(newOffset: LongOffset): Unit = { | ||
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt | ||
|
||
|
@@ -176,11 +185,32 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
|
||
def reset(): Unit = synchronized { | ||
batches.clear() | ||
startOffset = LongOffset(-1) | ||
endOffset = LongOffset(-1) | ||
currentOffset = new LongOffset(-1) | ||
lastOffsetCommitted = new LongOffset(-1) | ||
} | ||
} | ||
|
||
|
||
class MemoryStreamDataReaderFactory(records: Array[Row]) extends DataReaderFactory[Row] { | ||
override def createDataReader(): DataReader[Row] = { | ||
new DataReader[Row] { | ||
private var currentIndex = -1 | ||
|
||
override def next(): Boolean = { | ||
// Return true as long as the new index is in the array. | ||
currentIndex += 1 | ||
currentIndex < records.length | ||
} | ||
|
||
override def get(): Row = records(currentIndex) | ||
|
||
override def close(): Unit = {} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit | ||
* tests and does not provide durability. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the old metric names don't make much sense anymore, but I worry about changing external-facing behavior as part of an API migration.