Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14176][SQL]Add DataFrameWriter.trigger to set the stream batch period #11976

Closed
wants to merge 8 commits into from

Conversation

Projects
None yet
6 participants
@zsxwing
Copy link
Member

commented Mar 26, 2016

What changes were proposed in this pull request?

Add a processing time trigger to control the batch processing speed

How was this patch tested?

Unit tests

@SparkQA

This comment has been minimized.

Copy link

commented Mar 26, 2016

Test build #54253 has finished for PR 11976 at commit bf5d675.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@zsxwing

This comment has been minimized.

Copy link
Member Author

commented Mar 28, 2016

retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Mar 28, 2016

Test build #54329 has finished for PR 11976 at commit bf5d675.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
* @since 2.0.0
*/
def trigger(period: Duration): DataFrameWriter = {
this.extraOptions += ("period" -> period.toMillis.toString)

This comment has been minimized.

Copy link
@tdas

tdas Mar 28, 2016

Contributor

Since this name is exposed for the user to accidentally modify (through option() ) we probably should make this more specific. Maybe "triggerInterval"

Also, i think "interval" is better than "period"

@SparkQA

This comment has been minimized.

Copy link

commented Mar 29, 2016

Test build #54376 has finished for PR 11976 at commit 6f5c6ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@@ -276,7 +276,7 @@ trait StreamTest extends QueryTest with Timeouts {
currentStream =
sqlContext
.streams
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink)
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink, 10L)

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 29, 2016

Contributor

Why not 0?

* @since 2.0.0
*/
def trigger(interval: Duration): DataFrameWriter = {
this.extraOptions += ("triggerInterval" -> interval.toMillis.toString)

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 29, 2016

Contributor

I think we probably should create another var, rather than converting this back/forth from a string. Also, how would we extend this to support other types of triggers? The design doc suggests using using classes I think.

This comment has been minimized.

Copy link
@zsxwing

zsxwing Mar 29, 2016

Author Member

how would we extend this to support other types of triggers? The design doc suggests using using classes I think.

We can add a new method triggerMode to support other types of triggers like how we supports different SaveModes. This would be better than adding new classes.

*
* @since 2.0.0
*/
def trigger(interval: Duration): DataFrameWriter = {

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 29, 2016

Contributor

We probably want a version that takes an interval string (see #12008). We should also do python in this PR or a follow up.

This comment has been minimized.

Copy link
@zsxwing

zsxwing Mar 29, 2016

Author Member

There is no Python startStream. I think we don't need to add a Python API that won't be used now.

This comment has been minimized.

Copy link
@davies

davies Mar 30, 2016

Contributor

Should we also accept a startTime together with duration (same to Window), see https://issues.apache.org/jira/browse/SPARK-14230

This comment has been minimized.

Copy link
@zsxwing

zsxwing Mar 30, 2016

Author Member

Should we also accept a startTime together with duration (same to Window), see https://issues.apache.org/jira/browse/SPARK-14230

Sounds a good idea

* Set the trigger interval for the stream query.
*
* @since 2.0.0
*/

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 29, 2016

Contributor

@experimental

This comment has been minimized.

Copy link
@zsxwing

zsxwing Mar 29, 2016

Author Member

The whole class is experimental already.

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 30, 2016

Contributor

We should remove @experimental for all the dataframe stuff and mark just the streaming methods.

/cc @rxin

This comment has been minimized.

Copy link
@rxin

rxin Mar 30, 2016

Contributor

+1

we don't need to do that in this pr though

@@ -212,9 +210,18 @@ class StreamExecution(
populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
while (isActive) {
val batchStartTimeMs = System.currentTimeMillis()

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 29, 2016

Contributor

Maybe we can pull this logic out into its own class so that we can override time and unit test it properly? Some questions:

  • I think this might be implementing the "alternative" and not the proposal from the design doc. If I understand correctly the behavior should be " If the cluster is overloaded, we will skip some firings and wait until the next multiple of period.". Where as I think this is executing ASAP when overloaded.
  • What about failures? What if its an hour trigger and the cluster fails and comes back up after waiting 10 minutes? (we might defer this case)

/cc @rxin

This comment has been minimized.

Copy link
@zsxwing

zsxwing Mar 29, 2016

Author Member

My original design is as follows:

trait Trigger {

  def triggerExecution(): TriggerExecution
}

trait TriggerExecution {

  // batchRunner will run a batch and run if we should terminate the execution
  def execute(batchRunner: () => Boolean): Unit
}

private[sql] class ProcessingTime(intervalMs: Long) extends Trigger {
  override def triggerExecution(): TriggerExecution = new ProcessingTriggerExecution(intervalMs)
}

private[sql] class ProcessingTriggerExecution(intervalMs: Long) extends TriggerExecution {
  override def execute(batchRunner: () => Boolean): Unit = {
    while (true) {
      // Codes that waits for next multiple of intervalMs.
      if (batchRunner()) {
        return
      }
    }
  }
}

However, it requires exposing Trigger and TriggerExecution to the user and seems complicated.

This comment has been minimized.

Copy link
@zsxwing

zsxwing Mar 29, 2016

Author Member

However, it requires exposing Trigger and TriggerExecution to the user and seems complicated.

Never mind, we can hide them.

logWarning("Current batch is falling behind. The trigger interval is " +
s"${triggerIntervalMs} milliseconds, but spent ${batchElapsedTime} milliseconds")
} else {
Thread.sleep(triggerIntervalMs - batchElapsedTime)

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 29, 2016

Contributor

Does this handle spurious wake ups?

@zsxwing

This comment has been minimized.

Copy link
Member Author

commented Mar 30, 2016

I updated the PR to add Trigger and ProcessingTime and it supports to add other triggers in future.

@SparkQA

This comment has been minimized.

Copy link

commented Mar 30, 2016

Test build #54498 has finished for PR 11976 at commit 92d204c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait Trigger
    • case class ProcessingTime(intervalMs: Long) extends Trigger with Logging
*
* @since 2.0.0
*/
def trigger(interval: Long, unit: TimeUnit): DataFrameWriter = {

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 30, 2016

Contributor

Not all trigger modes are going to be time based though. In the doc we also propose data sized based triggers.

This comment has been minimized.

Copy link
@zsxwing

zsxwing Mar 30, 2016

Author Member

Not all trigger modes are going to be time based though. In the doc we also propose data sized based triggers.

How about def trigger(trigger: Trigger) and expose Trigger and all its subclasses?

@SparkQA

This comment has been minimized.

Copy link

commented Mar 31, 2016

Test build #54568 has finished for PR 11976 at commit f3526d0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Mar 31, 2016

Test build #54566 has finished for PR 11976 at commit a7355ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
metadataRoot,
stream,
sink,
ProcessingTime(0L))

This comment has been minimized.

Copy link
@marmbrus

marmbrus Apr 1, 2016

Contributor

Minor: maybe just make this the default arg since this function is internal.

throw new IllegalArgumentException(
"interval cannot be null or blank.")
}
val cal = if (interval.startsWith("interval")) {

This comment has been minimized.

Copy link
@marmbrus

marmbrus Apr 1, 2016

Contributor

Is this logic duplicated elsewhere? Should CalendarInterval.fromString just do this internally?

This comment has been minimized.

Copy link
@zsxwing

zsxwing Apr 1, 2016

Author Member

Is this logic duplicated elsewhere? Should CalendarInterval.fromString just do this internally?

SQL also uses CalendarInterval.fromString to parse the interval and the syntax requires INTERVAL value unit. Hence, I cannot move the logic into CalendarInterval.fromString

This comment has been minimized.

Copy link
@marmbrus

marmbrus Apr 4, 2016

Contributor

The presence of INTERVAL isn't also enforced by the grammar?

import org.apache.spark.unsafe.types.CalendarInterval

/**
* A interface that indicates how to run a batch.

This comment has been minimized.

Copy link
@marmbrus

marmbrus Apr 1, 2016

Contributor

Used to indicate how often results should be produced by a [[ContinuousQuery]]. or something like that. I don't know if we want to say batch in the public API.

@tdas

This comment has been minimized.

Copy link
Contributor

commented Apr 1, 2016

There does not seem to be any end-to-end test that makes sure that trigger is working, and keep the right timing. Also, things like what is the behavior if the previous batch takes longer? None of that is tested.

* {{{
* def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* def.writer.trigger(ProcessingTime.create("10 seconds"))
* }}}

This comment has been minimized.

Copy link
@marmbrus

marmbrus Apr 1, 2016

Contributor

Nice documentation! Maybe put the typesafe one second and include the imports that are required.

require(intervalMs >= 0, "the interval of trigger should not be negative")
}

object ProcessingTime {

This comment has been minimized.

Copy link
@marmbrus

marmbrus Apr 1, 2016

Contributor

Used to create [[ProcessingTime]] triggers for [[ContinuousQueries]]. Or something.

/**
* A interface that indicates how to run a batch.
*/
sealed trait Trigger {}

This comment has been minimized.

Copy link
@marmbrus

marmbrus Apr 1, 2016

Contributor

everything here should be @experimental

* }}}
*
* Java Example:
*

This comment has been minimized.

Copy link
@tdas

tdas Apr 1, 2016

Contributor

nit: extra line

@@ -78,6 +76,11 @@ class StreamExecution(
/** A list of unique sources in the query plan. */
private val uniqueSources = sources.distinct

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t)
case t => throw new IllegalArgumentException(s"${t.getClass} is not supported")

This comment has been minimized.

Copy link
@marmbrus

marmbrus Apr 1, 2016

Contributor

The trait is sealed. Do we need this?

*
* Scala Example:
* {{{
* def.writer.trigger(ProcessingTime(10.seconds))

This comment has been minimized.

Copy link
@tdas

tdas Apr 1, 2016

Contributor

i find this very verbose. @marmbrus what do you think of having a shortcut trigger("10 seconds")

This comment has been minimized.

Copy link
@marmbrus

marmbrus Apr 1, 2016

Contributor

I think thats confusing as soon as you support event time triggers.

@marmbrus

This comment has been minimized.

Copy link
Contributor

commented Apr 1, 2016

This looks great! Minor comments only.

@@ -78,6 +78,17 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}

/**
* Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
* the query as fast as possible.
*

This comment has been minimized.

Copy link
@tdas

tdas Apr 1, 2016

Contributor

This scala doc should have an example right here.

write.trigger(ProcessingTime("10 seconds"))
write.trigger("10 seconds")     // less verbose
new ProcessingTime(cal.microseconds / 1000)
}

def apply(interval: Duration): ProcessingTime = {

This comment has been minimized.

Copy link
@tdas

tdas Apr 1, 2016

Contributor

Needs docs with examples

new ProcessingTime(interval.toMillis)
}

def create(interval: String): ProcessingTime = {

This comment has been minimized.

Copy link
@tdas

tdas Apr 1, 2016

Contributor

Needs scala docs with examples

apply(interval)
}

def create(interval: Long, unit: TimeUnit): ProcessingTime = {

This comment has been minimized.

Copy link
@tdas

tdas Apr 1, 2016

Contributor

Needs scala docs with examples

}

private def waitUntil(time: Long): Unit = {
var now = System.currentTimeMillis()

This comment has been minimized.

Copy link
@tdas

tdas Apr 1, 2016

Contributor

This behavior needs to be tested and best to use clocks (system clock and manual clock to this out).

This comment has been minimized.

Copy link
@zsxwing

zsxwing Apr 1, 2016

Author Member

I added Clock to ProcessingTimeExecutor and added more tests.

@zsxwing

This comment has been minimized.

Copy link
Member Author

commented Apr 1, 2016

Addressed all comments

@SparkQA

This comment has been minimized.

Copy link

commented Apr 1, 2016

Test build #54722 has finished for PR 11976 at commit 7c4bc42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
@SparkQA

This comment has been minimized.

Copy link

commented Apr 2, 2016

Test build #54733 has finished for PR 11976 at commit 6c1b382.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@marmbrus

This comment has been minimized.

Copy link
Contributor

commented Apr 4, 2016

LGTM, merging to master.

@asfgit asfgit closed this in 855ed44 Apr 4, 2016

@zsxwing zsxwing deleted the zsxwing:trigger branch Apr 4, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.