Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29450][SS][2.4] Measure the number of output rows for streaming aggregation with append mode #27209

Closed

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch addresses missing metric, the number of output rows for streaming aggregation with append mode. Other modes are correctly measuring it.

Why are the changes needed?

Without the patch, the value for such metric is always 0.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test added. Also manually tested with below query:

query

import spark.implicits._

spark.conf.set("spark.sql.shuffle.partitions", "5")

val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 1000)
  .load()
  .withWatermark("timestamp", "5 seconds")
  .selectExpr("timestamp", "mod(value, 100) as mod", "value")
  .groupBy(window($"timestamp", "10 seconds"), $"mod")
  .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value"))

val query = df
  .writeStream
  .format("memory")
  .option("queryName", "test")
  .outputMode("append")
  .start()

query.awaitTermination()

before the patch

screenshot-before-SPARK-29450

after the patch

screenshot-after-SPARK-29450

…regation with append mode

### What changes were proposed in this pull request?

This patch addresses missing metric, the number of output rows for streaming aggregation with append mode. Other modes are correctly measuring it.

### Why are the changes needed?

Without the patch, the value for such metric is always 0.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Unit test added. Also manually tested with below query:

> query

```
import spark.implicits._

spark.conf.set("spark.sql.shuffle.partitions", "5")

val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 1000)
  .load()
  .withWatermark("timestamp", "5 seconds")
  .selectExpr("timestamp", "mod(value, 100) as mod", "value")
  .groupBy(window($"timestamp", "10 seconds"), $"mod")
  .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value"))

val query = df
  .writeStream
  .format("memory")
  .option("queryName", "test")
  .outputMode("append")
  .start()

query.awaitTermination()
```

> before the patch

![screenshot-before-SPARK-29450](https://user-images.githubusercontent.com/1317309/69023217-58d7bc80-0a01-11ea-8cac-40f1cced6d16.png)

> after the patch

![screenshot-after-SPARK-29450](https://user-images.githubusercontent.com/1317309/69023221-5c6b4380-0a01-11ea-8a66-7bf1b7d09fc7.png)

Closes apache#26104 from HeartSaVioR/SPARK-29450.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@HeartSaVioR
Copy link
Contributor Author

cc. @gatorsmile
also cc. to @dongjoon-hyun who leads the efforts on releasing 2.4.5.

To provide some context, @gatorsmile found that #26104 actually fixed a regression (I wasn't aware) which seems to be broken at Spark 2.3.0, and asked about porting back the patch to 2.4 version line.
(#26104 (comment))

I'm not sure whether @gatorsmile wanted to deal with this in Spark 2.4.5, or it's just to ensure the new bugfix release of Spark 2.4 will include this. @gatorsmile Could you please clarify?

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116749 has finished for PR 27209 at commit 25b2769.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116774 has finished for PR 27209 at commit 25b2769.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

cloud-fan pushed a commit that referenced this pull request Jan 16, 2020
…g aggregation with append mode

### What changes were proposed in this pull request?

This patch addresses missing metric, the number of output rows for streaming aggregation with append mode. Other modes are correctly measuring it.

### Why are the changes needed?

Without the patch, the value for such metric is always 0.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Unit test added. Also manually tested with below query:

> query

```
import spark.implicits._

spark.conf.set("spark.sql.shuffle.partitions", "5")

val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 1000)
  .load()
  .withWatermark("timestamp", "5 seconds")
  .selectExpr("timestamp", "mod(value, 100) as mod", "value")
  .groupBy(window($"timestamp", "10 seconds"), $"mod")
  .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value"))

val query = df
  .writeStream
  .format("memory")
  .option("queryName", "test")
  .outputMode("append")
  .start()

query.awaitTermination()
```

> before the patch

![screenshot-before-SPARK-29450](https://user-images.githubusercontent.com/1317309/69023217-58d7bc80-0a01-11ea-8cac-40f1cced6d16.png)

> after the patch

![screenshot-after-SPARK-29450](https://user-images.githubusercontent.com/1317309/69023221-5c6b4380-0a01-11ea-8a66-7bf1b7d09fc7.png)

Closes #27209 from HeartSaVioR/SPARK-29450-branch-2.4.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to 2.4!

@cloud-fan cloud-fan closed this Jan 16, 2020
@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-29450-branch-2.4 branch January 16, 2020 08:22
@dongjoon-hyun
Copy link
Member

+1, late LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants