Skip to content

Commit

Permalink
[SPARK-21947][SS] Check and report error when monotonically_increasin…
Browse files Browse the repository at this point in the history
…g_id is used in streaming query

## What changes were proposed in this pull request?

`monotonically_increasing_id` doesn't work in Structured Streaming. We should throw an exception if a streaming query uses it.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19336 from viirya/SPARK-21947.
  • Loading branch information
viirya authored and zsxwing committed Oct 6, 2017
1 parent 08b204f commit debcbec
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -129,6 +129,16 @@ object UnsupportedOperationChecker {
!subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
}

def checkUnsupportedExpressions(implicit operator: LogicalPlan): Unit = {
val unsupportedExprs = operator.expressions.flatMap(_.collect {
case m: MonotonicallyIncreasingID => m
}).distinct
if (unsupportedExprs.nonEmpty) {
throwError("Expression(s): " + unsupportedExprs.map(_.sql).mkString(", ") +
" is not supported with streaming DataFrames/Datasets")
}
}

plan.foreachUp { implicit subPlan =>

// Operations that cannot exists anywhere in a streaming plan
Expand Down Expand Up @@ -323,6 +333,9 @@ object UnsupportedOperationChecker {

case _ =>
}

// Check if there are unsupported expressions in streaming query plan.
checkUnsupportedExpressions(subPlan)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, MonotonicallyIncreasingID, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _}
Expand Down Expand Up @@ -614,6 +614,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true)
testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false)

// Unsupported expressions in streaming plan
assertNotSupportedInStreamingPlan(
"MonotonicallyIncreasingID",
streamRelation.select(MonotonicallyIncreasingID()),
outputMode = Append,
expectedMsgs = Seq("monotonically_increasing_id"))


/*
=======================================================================================
TESTING FUNCTIONS
Expand Down

0 comments on commit debcbec

Please sign in to comment.