Skip to content

Commit

Permalink
[SPARK-32776][SS] Limit in streaming should not be optimized away by …
Browse files Browse the repository at this point in the history
…PropagateEmptyRelation

PropagateEmptyRelation will not be applied to LIMIT operators in streaming queries.

Right now, the limit operator in a streaming query may get optimized away when the relation is empty. This can be problematic for stateful streaming, as this empty batch will not write any state store files, and the next batch will fail when trying to read these state store files and throw a file not found error.

We should not let PropagateEmptyRelation optimize away the Limit operator for streaming queries.

This PR is intended as a small and safe fix for PropagateEmptyRelation. A fundamental fix that can prevent this from happening again in the future and in other optimizer rules is more desirable, but that's a much larger task.

No

unit tests.

Closes apache#29623 from liwensun/spark-32776.

Authored-by: liwensun <liwen.sun@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit f0851e9)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
liwensun authored and dongjoon-hyun committed Sep 8, 2020
1 parent 7e5b9b6 commit fa53e21
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
case _: Filter => empty(p)
case _: Sample => empty(p)
case _: Sort => empty(p)
case _: GlobalLimit => empty(p)
case _: LocalLimit => empty(p)
case _: GlobalLimit if !p.isStreaming => empty(p)
case _: LocalLimit if !p.isStreaming => empty(p)
case _: Repartition => empty(p)
case _: RepartitionByExpression => empty(p)
// An aggregate with non-empty group expression will return one output row per group when the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,10 @@ class PropagateEmptyRelationSuite extends PlanTest {
val optimized = Optimize.execute(query.analyze)
assert(optimized.resolved)
}

test("should not optimize away limit if streaming") {
val query = LocalRelation(Nil, Nil, isStreaming = true).limit(1).analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import org.scalatestplus.mockito.MockitoSugar
import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
Expand Down Expand Up @@ -1141,6 +1143,42 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}

testQuietly("limit on empty batch should not cause state store error") {
// The source only produces two batches, the first batch is empty and the second batch has data.
val source = new Source {
var batchId = 0
override def stop(): Unit = {}
override def getOffset: Option[Offset] = {
Some(LongOffset(batchId + 1))
}
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
if (batchId == 0) {
batchId += 1
Dataset.ofRows(spark, LocalRelation(schema.toAttributes, Nil, isStreaming = true))
} else {
Dataset.ofRows(spark,
LocalRelation(schema.toAttributes, InternalRow(10) :: Nil, isStreaming = true))
}
}
override def schema: StructType = MockSourceProvider.fakeSchema
}

MockSourceProvider.withMockSources(source) {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.util.MockSourceProvider")
.load()
.limit(1)

testStream(df)(
StartStream(),
AssertOnQuery { q =>
q.processAllAvailable()
true
},
CheckAnswer(10))
}
}

private def checkExceptionMessage(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down

0 comments on commit fa53e21

Please sign in to comment.