Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38349][SS] No need to filter events when sessionwindow gapDuration greater than 0 #35680

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -57,7 +57,7 @@ import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType.DAY
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -4061,10 +4061,20 @@ object SessionWindowing extends Rule[LogicalPlan] {
case s: SessionWindow => sessionAttr
}

val filterByTimeRange = session.gapDuration match {
case Literal(interval: CalendarInterval, CalendarIntervalType) =>
interval == null || interval.months + interval.days + interval.microseconds <= 0
case _ => true
}

// As same as tumbling window, we add a filter to filter out nulls.
// And we also filter out events with negative or zero or invalid gap duration.
val filterExpr = IsNotNull(session.timeColumn) &&
(sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))
val filterExpr = if (filterByTimeRange) {
IsNotNull(session.timeColumn) &&
(sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))
} else {
IsNotNull(session.timeColumn)
}

replacedPlan.withNewChildren(
Filter(filterExpr,
Expand Down
Expand Up @@ -22,8 +22,8 @@ import java.time.LocalDateTime
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, Filter, LogicalPlan, Project}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -495,4 +495,56 @@ class DataFrameSessionWindowingSuite extends QueryTest with SharedSparkSession
validateWindowColumnInSchema(schema2, "session")
}
}

test("SPARK-38349: No need to filter events when gapDuration greater than 0") {
// negative gap duration
check("-5 seconds", true, "Need to filter events when gap duration less than 0")

// positive gap duration
check("5 seconds", false, "No need to filter events when gap duration greater than 0")

// invalid gap duration
check("x seconds", true, "Need to filter events when gap duration invalid")

// dynamic gap duration
check(when(col("time").equalTo("1"), "5 seconds")
.when(col("time").equalTo("2"), "10 seconds")
.otherwise("10 seconds"), true, "Need to filter events when gap duration dynamically")

def check(
gapDuration: Any,
nyingping marked this conversation as resolved.
Show resolved Hide resolved
expectTimeRange: Boolean,
assertHintMsg: String): Unit = {
val data = Seq(
("2016-03-27 19:39:30", 1, "a")).toDF("time", "value", "id")
val df = if (gapDuration.isInstanceOf[String]) {
nyingping marked this conversation as resolved.
Show resolved Hide resolved
data.groupBy(session_window($"time", gapDuration.asInstanceOf[String]))
} else {
data.groupBy(session_window($"time", gapDuration.asInstanceOf[Column]))
}
val aggregate = df.agg(count("*").as("counts"))
.select($"session_window.start".cast("string"), $"session_window.end".cast("string"),
$"counts")

checkFilterCondition(aggregate.queryExecution.logical, expectTimeRange, assertHintMsg)
}

def checkFilterCondition(
logicalPlan: LogicalPlan,
nyingping marked this conversation as resolved.
Show resolved Hide resolved
expectTimeRange: Boolean,
assertHintMsg: String): Unit = {
val filter = logicalPlan.find { plan =>
plan.isInstanceOf[Filter] && plan.children.head.isInstanceOf[Project]
}
assert(filter.isDefined)
val exist = filter.get.expressions.flatMap { expr =>
expr.collect { case gt: GreaterThan => gt }
}
if (expectTimeRange) {
assert(exist.nonEmpty, assertHintMsg)
} else {
assert(exist.isEmpty, assertHintMsg)
}
}
}
}