Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26164][SQL] Allow concurrent writers for writing dynamic partitions and bucket table #32198

Closed
wants to merge 10 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Apr 16, 2021

What changes were proposed in this pull request?

This is a re-proposal of #23163. Currently spark always requires a local sort before writing to output table with dynamic partition/bucket columns. The sort can be unnecessary if cardinality of partition/bucket values is small, and can be avoided by keeping multiple output writers concurrently.

This PR introduces a config spark.sql.maxConcurrentOutputFileWriters (which disables this feature by default), where user can tune the maximal number of concurrent writers. The config is needed here as we cannot keep arbitrary number of writers in task memory which can cause OOM (especially for Parquet/ORC vectorization writer).

The feature is to first use concurrent writers to write rows. If the number of writers exceeds the above config specified limit. Sort rest of rows and write rows one by one (See DynamicPartitionDataConcurrentWriter.writeWithIterator()).

In addition, interface WriteTaskStatsTracker and its implementation BasicWriteTaskStatsTracker are also changed because previously they are relying on the assumption that only one writer is active for writing dynamic partitions and bucketed table.

Why are the changes needed?

Avoid the sort before writing output for dynamic partitioned query and bucketed table.
Help improve CPU and IO performance for these queries.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit test in DataFrameReaderWriterSuite.scala.

@github-actions github-actions bot added the SQL label Apr 16, 2021
@c21
Copy link
Contributor Author

c21 commented Apr 16, 2021

cc @cloud-fan and @maropu could you help take a look when you have time? Thanks.

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42026/

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42026/

@HyukjinKwon
Copy link
Member

@c21 would you mind rebasing w/ the latest master branch? Seems like your branch is based on the old master branch.

@github-actions
Copy link

Test build #754194290 for PR 32198 at commit 4b2801b.

@c21
Copy link
Contributor Author

c21 commented Apr 16, 2021

@HyukjinKwon - thanks for the heads up, updated.

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42033/

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42033/

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Test build #137451 has finished for PR 32198 at commit 18f2851.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ConcurrentOutputWriterSpec(

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Test build #137459 has finished for PR 32198 at commit 4b2801b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ConcurrentOutputWriterSpec(

}
}

sealed abstract class WriterMode
Copy link
Contributor

@cloud-fan cloud-fan Apr 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This abstraction is a bit confusing. single writer or concurrent writers are like a mode that is decided statically. before-sort and after-sort are more like runtime states instead of mode.

I'd expect different FileFormatDataWriter implementations for single and concurrent writers, and the concurrent writers implementation has a boolean state to indicate before and after sort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sounds good I agree with it. Will re-structure the code.

Btw what do you think of change in WriteTaskStatsTracker and BasicWriteTaskStatsTracker? Do you have any concern with those interface change?

* Keep all writers open and write rows one by one.
* - Step 2: If number of concurrent writers exceeds limit, sort rest of rows. Write rows
* one by one, and eagerly close the writer when finishing each partition and/or
* bucket.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean we can have limit + 1 writers at most?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

var outputWriter: OutputWriter,
var recordsInFile: Long,
var fileCounter: Int,
var filePath: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean the latest file path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because we may create a new file if exceeding limit of number of records.

@c21
Copy link
Contributor Author

c21 commented Apr 21, 2021

@cloud-fan - updated the PR to keep single and concurrent writers implementation separately. The PR is ready for review again, thanks.

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42242/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42242/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42244/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42244/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Test build #137714 has finished for PR 32198 at commit 0442f05.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BaseDynamicPartitionDataWriter(
  • class DynamicPartitionDataSingleWriter(
  • class DynamicPartitionDataConcurrentWriter(

@@ -3150,6 +3150,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val MAX_CONCURRENT_OUTPUT_WRITERS = buildConf("spark.sql.maxConcurrentOutputWriters")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxConcurrentOutputFileWriters? To indicate it's for file source only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.

numFiles += 1
}
curFile = None
private def getFileStats(filePath: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it's not getFileStats, but updateFileStats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.

@@ -47,6 +48,7 @@ abstract class FileFormatDataWriter(
protected val MAX_FILE_COUNTER: Int = 1000 * 1000
protected val updatedPartitions: mutable.Set[String] = mutable.Set[String]()
protected var currentWriter: OutputWriter = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems all OutputWriter implementations have a path string. Shall we simply add a def path: String in OutputWriter? Then we don't need the currentPath

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - makes sense. Af first place I was hesitating to make broader change of interface OutputWriter. But updated now.

var bucketId: Option[Int])

/** Wrapper class for status of a unique concurrent output writer. */
private case class WriterStatus(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its fields are all var, we can make it a class instead of case class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.

dataWriter.write(iterator.next())
dataWriter match {
case w: DynamicPartitionDataConcurrentWriter =>
w.writeWithIterator(iterator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make it an API in the base class, which by default just do

while (iterator.hasNext) {
  write(iterator.next())
}

Copy link
Contributor Author

@c21 c21 Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - wondering what's the benefit of doing it? Updated anyway now. After a second thought I think I get it. You want to avoid the pattern matching here, updated.

test("SPARK-26164: Allow concurrent writers for multiple partitions and buckets") {
withTable("t1", "t2") {
val df = spark.range(200).map(_ => {
val n = scala.util.Random.nextInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a fixed seed in the test? Otherwise there is a small possibility that the distinct values are less than 3 and the fallback test doesn't trigger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - good call. Updated.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except some minor comments

Copy link
Contributor Author

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed all comments besides the ones I replied back with questions. cc @cloud-fan thanks.

statsTrackers.foreach(_.newPartition(currentWriterId.partitionValues.get))
}
}
retrieveWriterInMap()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.

s" which is beyond max value ${concurrentOutputWriterSpec.maxWriters + 1}")
}
concurrentWriters.put(
WriterIndex(currentWriterId.partitionValues, currentWriterId.bucketId),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.

test("SPARK-26164: Allow concurrent writers for multiple partitions and buckets") {
withTable("t1", "t2") {
val df = spark.range(200).map(_ => {
val n = scala.util.Random.nextInt
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - good call. Updated.

@SparkQA
Copy link

SparkQA commented Apr 26, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42457/

@SparkQA
Copy link

SparkQA commented Apr 26, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42457/

*/
private def clearCurrentWriterStatus(): Unit = {
if (currentWriterId.partitionValues.isDefined || currentWriterId.bucketId.isDefined) {
updateCurrentWriterStatusInMap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we call it right after when sorted becomes true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish I could do it to tight the logic more closely, but unfortunately no. We need to write a record (writeRecord) between (1).set the sorted to true (setupCurrentWriterUsingMap) and (2).clean up current writer status (clearCurrentWriterStatus).

writeRecord will change the status of recordsInFile to be increased by 1.

@SparkQA
Copy link

SparkQA commented Apr 26, 2021

Test build #137935 has finished for PR 32198 at commit 2895837.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 26, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42490/

@SparkQA
Copy link

SparkQA commented Apr 26, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42490/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Test build #137970 has finished for PR 32198 at commit efe026c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 7f51106 Apr 27, 2021
@c21
Copy link
Contributor Author

c21 commented Apr 27, 2021

Thank you @cloud-fan for all the dedicated help and careful review!
Thank you @imback82 and @ulysses-you for commenting and review too!

@c21 c21 deleted the writer branch April 27, 2021 06:39
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late +1, thanks @c21!

@hvanhovell
Copy link
Contributor

@c21 this doesn't do any sort of memory tracking right? How do you avoid OOMs?

@hvanhovell
Copy link
Contributor

One more thing, how much does this improve the write? Local sorts before the write are typically not too bad if you look at the cycles spend during the write. A much bigger target here would be to properly interleave I/O and CPU operations. You sort of achieve that by having multiple writers, but it IMO feels like quite a big hammer.

@c21
Copy link
Contributor Author

c21 commented Apr 28, 2021

this doesn't do any sort of memory tracking right?

Yes. It seems to me there's no way to track the memory usage accurately because writer is using on-heap memory. And we need memory usage information available to retrieve from each individual writer implementation (Parquet, ORC, Aveo, etc), which is not the case right now.

One immature idea though is to look at executor JVM heap memory usage (which I think should already be captured).

@c21
Copy link
Contributor Author

c21 commented Apr 28, 2021

How do you avoid OOMs?

Note the feature is designed to be disabled by default, and to be enabled case by case now. The fallback logic here is intended to avoid OOM when opening too many writers.

@c21
Copy link
Contributor Author

c21 commented Apr 28, 2021

One more thing, how much does this improve the write? Local sorts before the write are typically not too bad if you look at the cycles spend during the write. A much bigger target here would be to properly interleave I/O and CPU operations. You sort of achieve that by having multiple writers, but it IMO feels like quite a big hammer.

I will add a benchmark for this as a followup.

IMHO how much this can improve thing is really depending on query shape (cardinality of dynamic partitions and buckets). In one environment, if most queries having low number of partitions and users set buckets relatively small, this feature can help more. If in another environment, query tends to write a lot of partitions and users set buckets quite large, this feature helps less. We do see benefit for improving query internally and people raised the request in spark dev as well.

cloud-fan added a commit that referenced this pull request May 7, 2021
…file the row is written to

### What changes were proposed in this pull request?

This is a follow-up of #32198

Before #32198, in `WriteTaskStatsTracker.newRow`, we know that the row is written to the current file. After #32198 , we no longer know this connection.

This PR adds the file path parameter in `WriteTaskStatsTracker.newRow` to bring back the connection.

### Why are the changes needed?

To not break some custom `WriteTaskStatsTracker` implementations.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #32459 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@SparksFyz
Copy link

SparksFyz commented Mar 9, 2022

One more thing, how much does this improve the write? Local sorts before the write are typically not too bad if you look at the cycles spend during the write. A much bigger target here would be to properly interleave I/O and CPU operations. You sort of achieve that by having multiple writers, but it IMO feels like quite a big hammer.

I will add a benchmark for this as a followup.

IMHO how much this can improve thing is really depending on query shape (cardinality of dynamic partitions and buckets). In one environment, if most queries having low number of partitions and users set buckets relatively small, this feature can help more. If in another environment, query tends to write a lot of partitions and users set buckets quite large, this feature helps less. We do see benefit for improving query internally and people raised the request in spark dev as well.

@c21 Hi~ Any link for benchmark? Thanks~
I am interested in how much does this improve for tuning the number to 2 for static partition write(single partition)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants