Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
liwensun committed Sep 2, 2020
1 parent 812d091 commit 6ae55d4
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
case _: Filter => empty(p)
case _: Sample => empty(p)
case _: Sort => empty(p)
case _: GlobalLimit => empty(p)
case _: LocalLimit => empty(p)
case _: GlobalLimit if !p.isStreaming => empty(p)
case _: LocalLimit if !p.isStreaming => empty(p)
case _: Repartition => empty(p)
case _: RepartitionByExpression => empty(p)
// An aggregate with non-empty group expression will return one output row per group when the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,10 @@ class PropagateEmptyRelationSuite extends PlanTest {
val optimized = Optimize.execute(query.analyze)
assert(optimized.resolved)
}

test("should not optimize away limit if streaming") {
val query = LocalRelation(Nil, Nil, isStreaming = true).limit(1).analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import org.scalatestplus.mockito.MockitoSugar
import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
Expand Down Expand Up @@ -1141,6 +1143,42 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}

testQuietly("limit on empty batch should not cause state store error") {
// The source only produces two batches, the first batch is empty and the second batch has data.
val source = new Source {
var batchId = 0
override def stop(): Unit = {}
override def getOffset: Option[Offset] = {
Some(LongOffset(batchId + 1))
}
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
if (batchId == 0) {
batchId += 1
Dataset.ofRows(spark, LocalRelation(schema.toAttributes, Nil, isStreaming = true))
} else {
Dataset.ofRows(spark,
LocalRelation(schema.toAttributes, InternalRow(10) :: Nil, isStreaming = true))
}
}
override def schema: StructType = MockSourceProvider.fakeSchema
}

MockSourceProvider.withMockSources(source) {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.util.MockSourceProvider")
.load()
.limit(1)

testStream(df)(
StartStream(),
AssertOnQuery { q =>
q.processAllAvailable()
true
},
CheckAnswer(10))
}
}

private def checkExceptionMessage(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down

0 comments on commit 6ae55d4

Please sign in to comment.