Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18124] Observed delay based Event Time Watermarks #15702

Closed
wants to merge 12 commits into from
Expand Up @@ -252,6 +252,10 @@ public static long parseSecondNano(String secondNano) throws IllegalArgumentExce
public final int months;
public final long microseconds;

public final long milliseconds() {
return this.microseconds / MICROS_PER_MILLI;
}

public CalendarInterval(int months, long microseconds) {
this.months = months;
this.microseconds = microseconds;
Expand Down
Expand Up @@ -31,7 +31,8 @@ class AnalysisException protected[sql] (
val message: String,
val line: Option[Int] = None,
val startPosition: Option[Int] = None,
val plan: Option[LogicalPlan] = None,
// Some plans fail to serialize due to bugs in scala collections.
@transient val plan: Option[LogicalPlan] = None,
val cause: Option[Throwable] = None)
extends Exception(message, cause.orNull) with Serializable {

Expand Down
Expand Up @@ -2179,7 +2179,13 @@ object TimeWindowing extends Rule[LogicalPlan] {
windowExpressions.head.timeColumn.resolved &&
windowExpressions.head.checkInputDataTypes().isSuccess) {
val window = windowExpressions.head
val windowAttr = AttributeReference("window", window.dataType)()

val metadata = window.timeColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}
val windowAttr =
AttributeReference("window", window.dataType, metadata = metadata)()

val maxNumOverlapping = math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
val windows = Seq.tabulate(maxNumOverlapping + 1) { i =>
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.OutputMode
Expand Down Expand Up @@ -55,9 +56,20 @@ object UnsupportedOperationChecker {
// Disallow some output mode
outputMode match {
case InternalOutputModes.Append if aggregates.nonEmpty =>
throwError(
s"$outputMode output mode not supported when there are streaming aggregations on " +
s"streaming DataFrames/DataSets")(plan)
val aggregate = aggregates.head

// Find any attributes that are associated with an eventTime watermark.
val watermarkAttributes = aggregate.groupingExpressions.collect {
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
}

// We can append rows to the sink once the group is under the watermark. Without this
// watermark a group is never "finished" so we would never output anything.
if (watermarkAttributes.isEmpty) {
throwError(
s"$outputMode output mode not supported when there are streaming aggregations on " +
s"streaming DataFrames/DataSets")(plan)
}

case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
throwError(
Expand Down
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, Codege
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}

/**
* Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully
Expand Down Expand Up @@ -98,6 +98,7 @@ case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Un
override def withNullability(newNullability: Boolean): UnresolvedAttribute = this
override def withQualifier(newQualifier: Option[String]): UnresolvedAttribute = this
override def withName(newName: String): UnresolvedAttribute = UnresolvedAttribute.quoted(newName)
override def withMetadata(newMetadata: Metadata): Attribute = this

override def toString: String = s"'$name"

Expand Down
Expand Up @@ -22,6 +22,7 @@ import java.util.{Objects, UUID}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -104,6 +105,7 @@ abstract class Attribute extends LeafExpression with NamedExpression with NullIn
def withNullability(newNullability: Boolean): Attribute
def withQualifier(newQualifier: Option[String]): Attribute
def withName(newName: String): Attribute
def withMetadata(newMetadata: Metadata): Attribute

override def toAttribute: Attribute = this
def newInstance(): Attribute
Expand Down Expand Up @@ -292,11 +294,22 @@ case class AttributeReference(
}
}

override def withMetadata(newMetadata: Metadata): Attribute = {
AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated)
}

override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifier :: isGenerated :: Nil
}

override def toString: String = s"$name#${exprId.id}$typeSuffix"
/** Used to signal the column used to calculate an eventTime watermark (e.g. a#1-T{delayMs}) */
private def delaySuffix = if (metadata.contains(EventTimeWatermark.delayKey)) {
s"-T${metadata.getLong(EventTimeWatermark.delayKey)}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this in milliseconds or microseconds like timestamp type?

} else {
""
}

override def toString: String = s"$name#${exprId.id}$typeSuffix$delaySuffix"

// Since the expression id is not in the first constructor it is missing from the default
// tree string.
Expand Down Expand Up @@ -332,6 +345,8 @@ case class PrettyAttribute(
override def withQualifier(newQualifier: Option[String]): Attribute =
throw new UnsupportedOperationException
override def withName(newName: String): Attribute = throw new UnsupportedOperationException
override def withMetadata(newMetadata: Metadata): Attribute =
throw new UnsupportedOperationException
override def qualifier: Option[String] = throw new UnsupportedOperationException
override def exprId: ExprId = throw new UnsupportedOperationException
override def nullable: Boolean = true
Expand Down
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.unsafe.types.CalendarInterval

object EventTimeWatermark {
/** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */
val delayKey = "spark.watermarkDelay"
}

/**
* Used to mark a user specified column as holding the event time for a row.
*/
case class EventTimeWatermark(
eventTime: Attribute,
delay: CalendarInterval,
child: LogicalPlan) extends LogicalPlan {

// Update the metadata on the eventTime column to include the desired delay.
override val output: Seq[Attribute] = child.output.map { a =>
if (a semanticEquals eventTime) {
val updatedMetadata = new MetadataBuilder()
.withMetadata(a.metadata)
.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Normally Spark SQL uses microsecond precision for TimestampType. When it converts it to LongType, it uses second precision. Here we're using milliseconds. Wouldn't that be super confusing to reason about?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched it to using CalendarInterval to make it clearer what units were being used where. I chose milliseconds because it seemed like the right granularity. Microseconds are too short for the global coordination required and seconds lack granularity. It should be easy to change, and I'm open to that if there's consensus this is too confusing though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the key to include Ms

.build()
a.withMetadata(updatedMetadata)
} else {
a
}
}

override val children: Seq[LogicalPlan] = child :: Nil
}
40 changes: 37 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -50,6 +50,7 @@ import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils

private[sql] object Dataset {
Expand Down Expand Up @@ -476,7 +477,7 @@ class Dataset[T] private[sql](
* `collect()`, will throw an [[AnalysisException]] when there is a streaming
* source present.
*
* @group basic
* @group streaming
* @since 2.0.0
*/
@Experimental
Expand All @@ -496,8 +497,6 @@ class Dataset[T] private[sql](
/**
* Returns a checkpointed version of this Dataset.
*
* @param eager When true, materializes the underlying checkpointed RDD eagerly.
*
* @group basic
* @since 2.1.0
*/
Expand Down Expand Up @@ -535,6 +534,41 @@ class Dataset[T] private[sql](
)(sparkSession)).as[T]
}

/**
* :: Experimental ::
* Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time
* before which we assume no more late data is going to arrive.
*
* Spark will use this watermark for several purposes:
* - To know when a given time window aggregation can be finalized and thus can be emitted when
* using output modes that do not allow updates.
* - To minimize the amount of state that we need to keep for on-going aggregations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For append, this sounds like the intention is emit only once watermark has passed, and drop state.
But for other output modes, it's not clear from reading this what the effect of the watermark on emission and dropping state is.

*
* The current event time is computed by looking at the `MAX(eventTime)` seen in an epoch across
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Should this be "The current watermark is computed..." ?
  • what is an epoch, it isn't mentioned in the docs or elsewhere in the PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to watermark. For epoch, I really just mean "during some period of time where we decide too coordinate across the partitions". This happens at batch boundaries now, but that is not part of the contract we are promising. I just removed that word to avoid confusion.

* all of the partitions in the query minus a user specified `delayThreshold`. Due to the cost
* of coordinating this value across partitions, the actual watermark used is only guaranteed
* to be at least `delayThreshold` behind the actual event time. In some cases we may still
* process records that arrive more than `delayThreshold` late.
*
* @param eventTime the name of the column that contains the event time of the row.
* @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
* record that has been processed in the form of an interval
* (e.g. "1 minute" or "5 hours").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this make it clear what the minimum useful granularity is (ms)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like more of an implementation detail, rather than a contract of the API. The real contract is stated above as the actual watermark used is only guaranteed to be at least 'delayThreshold' behind the actual event time. There aren't really any bounds we can promise without knowing more about the query (even ms).

*
* @group streaming
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
// We only accept an existing column name, not a derived column here as a watermark that is
// defined on a derived column cannot referenced elsewhere in the plan.
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
val parsedDelay =
Option(CalendarInterval.fromString("interval " + delayThreshold))
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
}

/**
* Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated,
* and all cells will be aligned right. For example:
Expand Down
Expand Up @@ -18,20 +18,23 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, SaveMode, Strategy}
import org.apache.spark.sql.{SaveMode, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming.{MemoryPlan, StreamingExecutionRelation, StreamingRelation, StreamingRelationExec}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery

/**
* Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
Expand Down Expand Up @@ -224,6 +227,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
object StatefulAggregationStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case EventTimeWatermark(columnName, delay, child) =>
EventTimeWatermarkExec(columnName, delay, planLater(child)) :: Nil

case PhysicalAggregation(
namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) =>

Expand Down
Expand Up @@ -313,8 +313,13 @@ object AggUtils {
}
// Note: stateId and returnAllStates are filled in later with preparation rules
// in IncrementalExecution.
val saved = StateStoreSaveExec(
groupingAttributes, stateId = None, returnAllStates = None, partialMerged2)
val saved =
StateStoreSaveExec(
groupingAttributes,
stateId = None,
outputMode = None,
eventTimeWatermark = None,
partialMerged2)

val finalAndCompleteAggregate: SparkPlan = {
val finalAggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Final))
Expand Down
Expand Up @@ -104,7 +104,7 @@ case class ExplainCommand(
if (logicalPlan.isStreaming) {
// This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
// output mode does not matter since there is no `Sink`.
new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0)
new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0, 0)
} else {
sparkSession.sessionState.executePlan(logicalPlan)
}
Expand Down