-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23288][SS] Fix output metrics with parquet sink #20745
Conversation
Test build #87992 has finished for PR 20745 at commit
|
ping @koeninger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The streaming changes look good to me.
cc @cloud-fan to review others.
failAfter(streamingTimeout) { | ||
query.processAllAvailable() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's better to add the below statement here to avoid flakiness.
spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
query.processAllAvailable() | ||
} | ||
|
||
assert(numTasks === 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just check numTasks > 0
since it depends on the configurations and the number of CPU codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Test build #88323 has finished for PR 20745 at commit
|
val inputData = MemoryStream[String] | ||
val df = inputData.toDF() | ||
|
||
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use withTempDir
to clean up the temp directory at the end
LGTM, can you also attach a web UI SQL tab screenshot? thanks! |
hmm, the screenshot you attached is not parquet, is it? |
I've started history server then executed |
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { | ||
val outputMetrics = taskEnd.taskMetrics.outputMetrics | ||
recordsWritten += outputMetrics.recordsWritten | ||
bytesWritten += outputMetrics.bytesWritten |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without registering statsTrackers output metrics are not filled and assert(recordsWritten === 5)
and assert(bytesWritten > 0)
blows up.
Test build #88453 has finished for PR 20745 at commit
|
retest this please |
Test build #88457 has finished for PR 20745 at commit
|
thanks, merging to master/2.3! |
## What changes were proposed in this pull request? Output metrics were not filled when parquet sink used. This PR fixes this problem by passing a `BasicWriteJobStatsTracker` in `FileStreamSink`. ## How was this patch tested? Additional unit test added. Author: Gabor Somogyi <gabor.g.somogyi@gmail.com> Closes #20745 from gaborgsomogyi/SPARK-23288. (cherry picked from commit 918c7e9) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? Output metrics were not filled when parquet sink used. This PR fixes this problem by passing a `BasicWriteJobStatsTracker` in `FileStreamSink`. ## How was this patch tested? Additional unit test added. Author: Gabor Somogyi <gabor.g.somogyi@gmail.com> Closes apache#20745 from gaborgsomogyi/SPARK-23288. (cherry picked from commit 918c7e9) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Output metrics were not filled when parquet sink used.
This PR fixes this problem by passing a
BasicWriteJobStatsTracker
inFileStreamSink
.How was this patch tested?
Additional unit test added.