Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13146][SQL] Management API for continuous queries #11030

Closed
wants to merge 24 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Feb 2, 2016

Management API for Continuous Queries

API for getting status of each query

  • Whether active or not
  • Unique name of each query
  • Status of the sources and sinks
  • Exceptions

API for managing each query

  • Immediately stop an active query
  • Waiting for a query to be terminated, correctly or with error

API for managing multiple queries

  • Listing all active queries
  • Getting an active query by name
  • Waiting for any one of the active queries to be terminated

API for listening to query life cycle events

  • ContinuousQueryListener API for query start, progress and termination events.

"""
""".stripMargin

def assert(condition: => Boolean, message: String): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are a bunch of changes to make sure that any kind of assert and eventually that is needed within testStream is wrapped in a try catch so that we can catch them and enrich the message with the testState.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this method, but I wouldn't shadow an existing method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to verify.

@SparkQA
Copy link

SparkQA commented Feb 2, 2016

Test build #50589 has finished for PR 11030 at commit 40c6444.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ContinuousQueryManager

}
}

test("awaitAnyTermination") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are likely to be flaky because of the timing related issues. Other then making the timings more coarse, i am not sure how else to test awaitTerminations's behavior.

@tdas
Copy link
Contributor Author

tdas commented Feb 2, 2016

@marmbrus @zsxwing Please review!

@SparkQA
Copy link

SparkQA commented Feb 2, 2016

Test build #50597 has finished for PR 11030 at commit b6c2517.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -84,6 +84,17 @@ final class DataStreamWriter private[sql](df: DataFrame) {
}

/**
* Specifies a name to the [[ContinuousQuery]] to be started. This name must be unique among
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Specifies a name for this [[ContinuousQuery]]. The name must be unique for any active query, and will be auto assigned if unspecified."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataStreamWriter is not a ContinuousQuery. I think looking at the scala doc page of DataStreamWriter, the phrase "this ContinuousQuery" is confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have to be exactly that text, but Specifies a name to the ContinuousQuery to be started doesn't sound right. In other parts of the documentation we say "for the underlying datastream"

How about just "Specifies a name for this query."... and then stuff about auto assigning an uniqueness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still confusing. this query ... this is a not a query. That's why it make more sense if name is associated directly with the start() method.

val query = ds.streamTo.path("some-path").format("text").start("query-name")

OR

val query = ds.streamTo("path").format("text").start("query-name")

Then its easy to write the docs of def start(name: String) "@param name Name of the query".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then its easy to write the docs of def start(name: String) "@param name Name of the query".

There is already a start(path: String)

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50954 has finished for PR 11030 at commit 144adbb.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50955 has finished for PR 11030 at commit 5c3c690.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* If the query has terminated with an exception, then the exception will be thrown.
*
* If the query has terminated, then all subsequent calls to this method will either return
* `true` immediately (if the query was terminated by `stop()`), or throw the exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this method doesn't return a Boolean.

@zsxwing
Copy link
Member

zsxwing commented Feb 9, 2016

Just some nits. Otherwise LGTM

@tdas
Copy link
Contributor Author

tdas commented Feb 10, 2016

I addressed the multifailure case and added unit test for it.

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #51020 has finished for PR 11030 at commit d0003cf.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #51021 has finished for PR 11030 at commit b0d5533.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #51022 has finished for PR 11030 at commit d7b1d97.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Feb 10, 2016

test this again.

@tdas
Copy link
Contributor Author

tdas commented Feb 10, 2016

test this please.

@tdas
Copy link
Contributor Author

tdas commented Feb 10, 2016

retest this

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #2529 has finished for PR 11030 at commit d7b1d97.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #51030 has finished for PR 11030 at commit d7b1d97.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Feb 10, 2016

Found the failure cause:

Hadoop FileSystem API will wrap InterruptedException as IOException...

java.io.IOException: java.lang.InterruptedException
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:508)
    at org.apache.hadoop.util.Shell.run(Shell.java:418)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:739)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:722)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:633)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:467)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
    at org.apache.spark.sql.execution.streaming.FileStreamSource.writeBatch(FileStreamSource.scala:186)
    at org.apache.spark.sql.execution.streaming.FileStreamSource.fetchMaxOffset(FileStreamSource.scala:115)
    at org.apache.spark.sql.execution.streaming.FileStreamSource.getNextBatch(FileStreamSource.scala:139)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$3.applyOrElse(StreamExecution.scala:182)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$3.applyOrElse(StreamExecution.scala:179)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:261)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:304)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$class.foreach(Iterator.scala:742)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
    at scala.collection.AbstractIterator.to(Iterator.scala:1194)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:353)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:251)
    at org.apache.spark.sql.execution.streaming.StreamExecution.attemptBatch(StreamExecution.scala:179)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:123)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:74)

So if we call stop()(interrupt) while a batch is writing some file, we will get IOException...

One workaround is adding addCheck() above this line: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala#L339
Then we can make sure no batch is writing any file when we call stop.

Another one is we should allow exception happens in this test since we randomly start and stop the stream multiple times.

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #51055 has finished for PR 11030 at commit 9caec83.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2016

Test build #51062 has finished for PR 11030 at commit 458199b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Feb 11, 2016

LGTM

@asfgit asfgit closed this in 0902e20 Feb 11, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants