Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18516][SQL] Split state and progress in streaming #15954

Closed
wants to merge 24 commits into from

Conversation

marmbrus
Copy link
Contributor

@marmbrus marmbrus commented Nov 21, 2016

This PR separates the status of a StreamingQuery into two separate APIs:

  • status - describes the status of a StreamingQuery at this moment, including what phase of processing is currently happening and if data is available.
  • recentProgress - an array of statistics about the most recent microbatches that have executed.

A recent progress contains the following information:

{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
    "triggerExecution" : 276,
    "queryPlanning" : 3,
    "getBatch" : 5,
    "getOffset" : 3,
    "addBatch" : 234,
    "walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-14]]",
    "startOffset" : {
      "topic-14" : {
        "2" : 0,
        "4" : 1,
        "1" : 0,
        "3" : 0,
        "0" : 0
      }
    },
    "endOffset" : {
      "topic-14" : {
        "2" : 1,
        "4" : 2,
        "1" : 0,
        "3" : 0,
        "0" : 1
      }
    },
    "numRecords" : 3,
    "inputRowsPerSecond" : 230.76923076923077,
    "processedRowsPerSecond" : 10.869565217391303
  } ]
}

Additionally, in order to make it possible to correlate progress updates across restarts, we change the id field from an integer that is unique with in the JVM to a UUID that is globally unique.

@marmbrus
Copy link
Contributor Author

/cc @tdas

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68916 has finished for PR 15954 at commit 213081a.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68923 has finished for PR 15954 at commit f4357d1.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68944 has finished for PR 15954 at commit 6eb1396.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68945 has finished for PR 15954 at commit f1bd871.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round 1 of comments. I am still looking.

*/
@deprecated("use status.sourceStatuses", "2.0.2")
def sourceStatuses: Array[SourceStatus]
def recentProgress: Array[StreamingQueryProgress]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt this be recentProgresses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, yeah maybe. Its not clear to me that progress is inherently singular and progresses is kind of a mouthful. It is maybe nice for Arrays to always be plural though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well. in the name StreamingQueryProgress we have effectively defined that "progress" means data from one trigger (i.e. singular). So I think its better to be progresses.


/**
* :: Experimental ::
* Reports metrics on data being read from a given streaming source.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should say that this information related a trigger where progress was made in processing data from sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can copy the docs from the main class: Each event relates to processing done for a single trigger of the streaming query.

* @param description Description of the source.
* @param startOffset The starting offset for data being read.
* @param endOffset The ending offset for data being read.
* @param numRecords The number of records read from this source.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this is the numrecords read from this source since the beginning or in the last trigger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we update the docs as you suggest above this will be clear.

* Holds statistics about state that is being stored for a given streaming query.
*/
@Experimental
class StateOperator private[sql](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StateOperator -> StateOperatorProgress

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

*/
@Experimental
class StateOperator private[sql](
val numEntries: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numEntries -> numTotal would make it more consistent with numUpdated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, needs docs to make it clear that numUpdated is with reference to the last progress

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to docs. I think numTotal is less clear. Total of what? It is a count of the number of entries that the state store is holding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question then applies to numUpdated as well.
How about numRowsTotal, numRowsUpdated?
Then in future we could add sizeBytesTotal, sizeBytesUpdated, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those sound good.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68972 has finished for PR 15954 at commit 59a9139.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69276 has finished for PR 15954 at commit 247ada6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AdvanceManualClock(timeToAdd: Long) extends StreamAction

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69275 has finished for PR 15954 at commit c64632c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69278 has finished for PR 15954 at commit 8ac76c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69295 has started for PR 15954 at commit b41a662.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69293 has finished for PR 15954 at commit d6200d1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69288 has finished for PR 15954 at commit f6d60df.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69290 has finished for PR 15954 at commit 32ff04e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69308 has started for PR 15954 at commit d92f4bf.

Copy link
Contributor Author

@marmbrus marmbrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good! Thanks for your help getting the tests back in shape. I think the only important question to answer before we can merge is the contents of QueryTerminatedEvent.

Returns the most recent :class:`StreamingQueryProgress` update of this streaming query.
:return: a map
"""
return json.loads(self._jsq.lastProgress().toString())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use json as above instead of relying on the fact that the toString is json.

* all queries that have been started in the current process.
* @since 2.0.0
* Returns the unique id of this query. An id is tied to the checkpoint location and will
* be the same across restarts of a given streaming query.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fix the TODO earlier, or remove this promise for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

@@ -51,7 +53,7 @@ trait StreamingQuery {
def sparkSession: SparkSession

/**
* Whether the query is currently active or not
* Returns `true` if this query is actively running.
* @since 2.0.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: other places have a blank line before @since

*/
@Experimental
class QueryTerminatedEvent private[sql](
val queryStatus: StreamingQueryStatus,
val exception: Option[String]) extends Event
val lastProgress: StreamingQueryProgress,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this if no progress is ever made? null? I would consider leaving this just the id, because otherwise if the query dies before progress is made, now you can't get the id at all.


object StreamingQueryManager {
private val _nextId = new AtomicLong(0)
def nextId: Long = _nextId.getAndIncrement()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

@@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
}
}
}

object StreamingQueryManager {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell me why was this moved from the StreamExecution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it private, made I feel this could have stayed in object StreamExecution


/**
* :: Experimental ::
* Statistics about updates made to a stateful operators in a [[StreamingQuery]] in a trigger.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: during a trigger?

* Statistics about updates made to a stateful operators in a [[StreamingQuery]] in a trigger.
*/
@Experimental
class StateOperatorProgress private[sql](
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move SourceProgress here or put StateOperatorProgress in its own file. We might also consider putting them all in org.apache.spark.sql.streaming.progress, but there might not be time.

val id: UUID,
val name: String,
val timestamp: Long,
val batchId: Long, // TODO: epoch?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably will not do this TODO.


class QueryStatusCollector extends StreamingQueryListener {
/** Collects events from the StreamingQueryListener for testing */
class EventCollector extends StreamingQueryListener {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be an inner class of StreamTest. This file is pretty long/complicated as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, it was being used in multiple files at some point so I put it here. but not any more. I will put it only in StreamingQueryListener.

@marmbrus marmbrus changed the title [WIP][SPARK-18516][SQL] Split state and progress in streaming [SPARK-18516][SQL] Split state and progress in streaming Nov 29, 2016
@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69352 has finished for PR 15954 at commit aa8af9c.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69353 has finished for PR 15954 at commit c11d2e5.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
@deprecated("use status.sourceStatuses", "2.0.2")
def sourceStatuses: Array[SourceStatus]
def recentProgresses: Array[StreamingQueryProgress]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these for the last n triggers? Or is it last n instantaneous progress updates, e.g. finished reading from a source etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last n triggers.

*
* @since 2.1.0
*/
def get(id: String): StreamingQuery = get(UUID.fromString(id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this I guess we can't provide API's for get(name: String)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think thats okay. A globally unique ID is a better identifier.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69346 has finished for PR 15954 at commit d9d8f82.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -33,25 +35,27 @@ trait StreamingQuery {
* Returns the name of the query. This name is unique across all active queries. This can be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this doc still true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. it is.

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69355 has finished for PR 15954 at commit 69d9b4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor Author

LGTM, merging to master and 2.1

@asfgit asfgit closed this in c3d08e2 Nov 30, 2016
asfgit pushed a commit that referenced this pull request Nov 30, 2016
This PR separates the status of a `StreamingQuery` into two separate APIs:
 - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
 - `recentProgress` - an array of statistics about the most recent microbatches that have executed.

A recent progress contains the following information:
```
{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
    "triggerExecution" : 276,
    "queryPlanning" : 3,
    "getBatch" : 5,
    "getOffset" : 3,
    "addBatch" : 234,
    "walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-14]]",
    "startOffset" : {
      "topic-14" : {
        "2" : 0,
        "4" : 1,
        "1" : 0,
        "3" : 0,
        "0" : 0
      }
    },
    "endOffset" : {
      "topic-14" : {
        "2" : 1,
        "4" : 2,
        "1" : 0,
        "3" : 0,
        "0" : 1
      }
    },
    "numRecords" : 3,
    "inputRowsPerSecond" : 230.76923076923077,
    "processedRowsPerSecond" : 10.869565217391303
  } ]
}
```

Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #15954 from marmbrus/queryProgress.

(cherry picked from commit c3d08e2)
Signed-off-by: Michael Armbrust <michael@databricks.com>
ghost pushed a commit to dbtsai/spark that referenced this pull request Dec 1, 2016
…erver

## What changes were proposed in this pull request?

As `queryStatus` in StreamingQueryListener events was removed in apache#15954, parsing 2.0.2 structured streaming logs will throw the following errror:

```
[info]   com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "queryStatus" (class org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent), not marked as ignorable (2 known properties: "id", "exception"])
[info]  at [Source: {"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}; line: 1, column: 521] (through reference chain: org.apache.spark.sql.streaming.QueryTerminatedEvent["queryStatus"])
[info]   at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51)
[info]   at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839)
[info]   at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1306)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:453)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099)
...
```

This PR just ignores such errors and adds a test to make sure we can read 2.0.2 logs.

## How was this patch tested?

`query-event-logs-version-2.0.2.txt` has all types of events generated by Structured Streaming in Spark 2.0.2. `testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2")` verified we can load them without any error.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#16085 from zsxwing/SPARK-18655.
asfgit pushed a commit that referenced this pull request Dec 1, 2016
…erver

## What changes were proposed in this pull request?

As `queryStatus` in StreamingQueryListener events was removed in #15954, parsing 2.0.2 structured streaming logs will throw the following errror:

```
[info]   com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "queryStatus" (class org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent), not marked as ignorable (2 known properties: "id", "exception"])
[info]  at [Source: {"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}; line: 1, column: 521] (through reference chain: org.apache.spark.sql.streaming.QueryTerminatedEvent["queryStatus"])
[info]   at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51)
[info]   at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839)
[info]   at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1306)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:453)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099)
...
```

This PR just ignores such errors and adds a test to make sure we can read 2.0.2 logs.

## How was this patch tested?

`query-event-logs-version-2.0.2.txt` has all types of events generated by Structured Streaming in Spark 2.0.2. `testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2")` verified we can load them without any error.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16085 from zsxwing/SPARK-18655.

(cherry picked from commit c4979f6)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
This PR separates the status of a `StreamingQuery` into two separate APIs:
 - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
 - `recentProgress` - an array of statistics about the most recent microbatches that have executed.

A recent progress contains the following information:
```
{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
    "triggerExecution" : 276,
    "queryPlanning" : 3,
    "getBatch" : 5,
    "getOffset" : 3,
    "addBatch" : 234,
    "walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-14]]",
    "startOffset" : {
      "topic-14" : {
        "2" : 0,
        "4" : 1,
        "1" : 0,
        "3" : 0,
        "0" : 0
      }
    },
    "endOffset" : {
      "topic-14" : {
        "2" : 1,
        "4" : 2,
        "1" : 0,
        "3" : 0,
        "0" : 1
      }
    },
    "numRecords" : 3,
    "inputRowsPerSecond" : 230.76923076923077,
    "processedRowsPerSecond" : 10.869565217391303
  } ]
}
```

Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Michael Armbrust <michael@databricks.com>

Closes apache#15954 from marmbrus/queryProgress.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…erver

## What changes were proposed in this pull request?

As `queryStatus` in StreamingQueryListener events was removed in apache#15954, parsing 2.0.2 structured streaming logs will throw the following errror:

```
[info]   com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "queryStatus" (class org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent), not marked as ignorable (2 known properties: "id", "exception"])
[info]  at [Source: {"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}; line: 1, column: 521] (through reference chain: org.apache.spark.sql.streaming.QueryTerminatedEvent["queryStatus"])
[info]   at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51)
[info]   at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839)
[info]   at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1306)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:453)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099)
...
```

This PR just ignores such errors and adds a test to make sure we can read 2.0.2 logs.

## How was this patch tested?

`query-event-logs-version-2.0.2.txt` has all types of events generated by Structured Streaming in Spark 2.0.2. `testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2")` verified we can load them without any error.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#16085 from zsxwing/SPARK-18655.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
This PR separates the status of a `StreamingQuery` into two separate APIs:
 - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
 - `recentProgress` - an array of statistics about the most recent microbatches that have executed.

A recent progress contains the following information:
```
{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
    "triggerExecution" : 276,
    "queryPlanning" : 3,
    "getBatch" : 5,
    "getOffset" : 3,
    "addBatch" : 234,
    "walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-14]]",
    "startOffset" : {
      "topic-14" : {
        "2" : 0,
        "4" : 1,
        "1" : 0,
        "3" : 0,
        "0" : 0
      }
    },
    "endOffset" : {
      "topic-14" : {
        "2" : 1,
        "4" : 2,
        "1" : 0,
        "3" : 0,
        "0" : 1
      }
    },
    "numRecords" : 3,
    "inputRowsPerSecond" : 230.76923076923077,
    "processedRowsPerSecond" : 10.869565217391303
  } ]
}
```

Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Michael Armbrust <michael@databricks.com>

Closes apache#15954 from marmbrus/queryProgress.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
This PR separates the status of a `StreamingQuery` into two separate APIs:
 - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
 - `recentProgress` - an array of statistics about the most recent microbatches that have executed.

A recent progress contains the following information:
```
{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
    "triggerExecution" : 276,
    "queryPlanning" : 3,
    "getBatch" : 5,
    "getOffset" : 3,
    "addBatch" : 234,
    "walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-14]]",
    "startOffset" : {
      "topic-14" : {
        "2" : 0,
        "4" : 1,
        "1" : 0,
        "3" : 0,
        "0" : 0
      }
    },
    "endOffset" : {
      "topic-14" : {
        "2" : 1,
        "4" : 2,
        "1" : 0,
        "3" : 0,
        "0" : 1
      }
    },
    "numRecords" : 3,
    "inputRowsPerSecond" : 230.76923076923077,
    "processedRowsPerSecond" : 10.869565217391303
  } ]
}
```

Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Michael Armbrust <michael@databricks.com>

Closes apache#15954 from marmbrus/queryProgress.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…erver

## What changes were proposed in this pull request?

As `queryStatus` in StreamingQueryListener events was removed in apache#15954, parsing 2.0.2 structured streaming logs will throw the following errror:

```
[info]   com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "queryStatus" (class org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent), not marked as ignorable (2 known properties: "id", "exception"])
[info]  at [Source: {"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}; line: 1, column: 521] (through reference chain: org.apache.spark.sql.streaming.QueryTerminatedEvent["queryStatus"])
[info]   at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51)
[info]   at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839)
[info]   at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1306)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:453)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099)
...
```

This PR just ignores such errors and adds a test to make sure we can read 2.0.2 logs.

## How was this patch tested?

`query-event-logs-version-2.0.2.txt` has all types of events generated by Structured Streaming in Spark 2.0.2. `testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2")` verified we can load them without any error.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#16085 from zsxwing/SPARK-18655.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants