Skip to content

Commit

Permalink
[SPARK-22053][SS] Stream-stream inner join in Append Mode
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

#### Architecture
This PR implements stream-stream inner join using a two-way symmetric hash join. At a high level, we want to do the following.

1. For each stream, we maintain the past rows as state in State Store.
  - For each joining key, there can be multiple rows that have been received.
  - So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
2. In each batch, for each input row in each stream
  - Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition
  - Add the input row to corresponding stream’s state.
  - If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state.

Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common.

- Queries to time range conditions - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR`
- Queries with windows as the matching key - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour")` (pseudo-SQL)

#### Implementation
The stream-stream join is primarily implemented in three classes
- `StreamingSymmetricHashJoinExec` implements the above symmetric join algorithm.
- `SymmetricsHashJoinStateManagers` manages the streaming state for the join. This essentially is a fault-tolerant key-to-list-of-values multimap built on the StateStore APIs. `StreamingSymmetricHashJoinExec` instantiates two such managers, one for each join side.
- `StreamingSymmetricHashJoinExecHelper` is a helper class to extract threshold for the state based on the join conditions and the event watermark.

Refer to the scaladocs class for more implementation details.

Besides the implementation of stream-stream inner join SparkPlan. Some additional changes are
- Allowed inner join in append mode in UnsupportedOperationChecker
- Prevented stream-stream join on an empty batch dataframe to be collapsed by the optimizer

## How was this patch tested?
- New tests in StreamingJoinSuite
- Updated tests UnsupportedOperationSuite

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #19271 from tdas/SPARK-22053.
  • Loading branch information
tdas committed Sep 21, 2017
1 parent a8a5cd2 commit f32a842
Show file tree
Hide file tree
Showing 18 changed files with 1,940 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2390,7 +2390,7 @@ object TimeWindowing extends Rule[LogicalPlan] {

if (window.windowDuration == window.slideDuration) {
val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)(
exprId = windowAttr.exprId)
exprId = windowAttr.exprId, explicitMetadata = Some(metadata))

val replacedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,17 @@ object UnsupportedOperationChecker {
joinType match {

case _: InnerLike =>
if (left.isStreaming && right.isStreaming) {
throwError("Inner join between two streaming DataFrames/Datasets is not supported")
if (left.isStreaming && right.isStreaming &&
outputMode != InternalOutputModes.Append) {
throwError("Inner join between two streaming DataFrames/Datasets is not supported" +
s" in ${outputMode} output mode, only in Append output mode")
}

case FullOuter =>
if (left.isStreaming || right.isStreaming) {
throwError("Full outer joins with streaming DataFrames/Datasets are not supported")
}


case LeftOuter | LeftSemi | LeftAnti =>
if (right.isStreaming) {
throwError("Left outer/semi/anti joins with a streaming DataFrame/Dataset " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,14 @@ case class Alias(child: Expression, name: String)(
}
}

override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix"
/** Used to signal the column used to calculate an eventTime watermark (e.g. a#1-T{delayMs}) */
private def delaySuffix = if (metadata.contains(EventTimeWatermark.delayKey)) {
s"-T${metadata.getLong(EventTimeWatermark.delayKey)}ms"
} else {
""
}

override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix$delaySuffix"

override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifier :: explicitMetadata :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
case p: Union if p.children.forall(isEmptyLocalRelation) =>
empty(p)

case p @ Join(_, _, joinType, _) if p.children.exists(isEmptyLocalRelation) => joinType match {
case _: InnerLike => empty(p)
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p)
case RightOuter if isEmptyLocalRelation(p.right) => empty(p)
case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p)
case _ => p
// Joins on empty LocalRelations generated from streaming sources are not eliminated
// as stateful streaming joins need to perform other state management operations other than
// just processing the input data.
case p @ Join(_, _, joinType, _)
if !p.children.exists(_.isStreaming) && p.children.exists(isEmptyLocalRelation) =>
joinType match {
case _: InnerLike => empty(p)
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p)
case RightOuter if isEmptyLocalRelation(p.right) => empty(p)
case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p)
case _ => p
}

case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match {
Expand All @@ -74,6 +79,10 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
//
// If the grouping expressions are empty, however, then the aggregate will always produce a
// single output row and thus we cannot propagate the EmptyRelation.
//
// Aggregation on empty LocalRelation generated from a streaming source is not eliminated
// as stateful streaming aggregation need to perform other state management operations other
// than just processing the input data.
case Aggregate(ge, _, _) if ge.nonEmpty && !p.isStreaming => empty(p)
// Generators like Hive-style UDTF may return their records within `close`.
case Generate(_: Explode, _, _, _, _, _) => empty(p)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,27 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append
)

// Inner joins: Stream-stream not supported
// Inner joins: Multiple stream-stream joins supported only in append mode
testBinaryOperationInStreamingPlan(
"inner join",
"single inner join in append mode",
_.join(_, joinType = Inner),
streamStreamSupported = false)
outputMode = Append,
streamStreamSupported = true)

testBinaryOperationInStreamingPlan(
"multiple inner joins in append mode",
(x: LogicalPlan, y: LogicalPlan) => {
x.join(y, joinType = Inner).join(streamRelation, joinType = Inner)
},
outputMode = Append,
streamStreamSupported = true)

testBinaryOperationInStreamingPlan(
"inner join in update mode",
_.join(_, joinType = Inner),
outputMode = Update,
streamStreamSupported = false,
expectedMsg = "inner join")

// Full outer joins: only batch-batch is allowed
testBinaryOperationInStreamingPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.{execution, AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
Expand Down Expand Up @@ -257,6 +256,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

object StreamingJoinStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if left.isStreaming && right.isStreaming =>

new StreamingSymmetricHashJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

case Join(left, right, _, _) if left.isStreaming && right.isStreaming =>
throw new AnalysisException(
"Stream stream joins without equality predicate is not supported", plan = Some(plan))

case _ => Nil
}
}
}

/**
* Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class IncrementalExecution(
sparkSession.sessionState.planner.strategies

override def extraPlanningStrategies: Seq[Strategy] =
StreamingJoinStrategy ::
StatefulAggregationStrategy ::
FlatMapGroupsWithStateStrategy ::
StreamingRelationStrategy ::
Expand Down Expand Up @@ -116,6 +117,16 @@ class IncrementalExecution(
stateInfo = Some(nextStatefulOperationStateInfo),
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))

case j: StreamingSymmetricHashJoinExec =>
j.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs),
stateWatermarkPredicates =
StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
j.left.output, j.right.output, j.leftKeys, j.rightKeys, j.condition,
Some(offsetSeqMetadata.batchWatermarkMs))
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ class StreamExecution(
val sparkSessionToRunBatches = sparkSession.cloneSession()
// Adaptive execution can change num shuffle partitions, disallow
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
sparkSessionToRunBatches.conf.set(SQLConf.CBO_ENABLED.key, "false")
offsetSeqMetadata = OffsetSeqMetadata(
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionToRunBatches.conf)

Expand Down
Loading

0 comments on commit f32a842

Please sign in to comment.