Skip to content

Conversation

@cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Dec 11, 2020

What changes were proposed in this pull request?

Modify cache(hadoopJobMetadata) softValues to weakValues.

Why are the changes needed?

Reduce driver memory pressure, gc time and frequency, job execution time.

HadoopRDD uses soft-reference map to cache jobconf (rdd_id -> jobconf)
When the number of hive partitions read by the driver is large, HadoopRDD.getPartitions will create many jobconfs and add them to the cache.
The executor will also create a jobconf, add it to the cache, and share it among exeuctors.

The number of jobconfs in the driver cache increases the memory pressure. When the driver memory configuration is not high, full gc becoming very frequent, and these jobconfs are hardly reused.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Exist UT
Manual test

@github-actions github-actions bot added the CORE label Dec 11, 2020
@AmplabJenkins
Copy link

Can one of the admins verify this patch?


protected val jobConfCacheKey: String = "rdd_%d_job_conf".format(id)

protected val inputFormatCacheKey: String = "rdd_%d_input_format".format(id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-9585 Removed the inputformat cache.

@xkrogen
Copy link
Contributor

xkrogen commented Dec 11, 2020

Do we really need a separate copy of the JobConf cached for each partition ID? Is there any opportunity for us to reduce the number of JobConfs to begin with? It seems like all of the partitions should be able to safely share the same conf object...?

Regardless, weak references seem more appropriate here than soft.

@cxzl25
Copy link
Contributor Author

cxzl25 commented Dec 13, 2020

Do we really need a separate copy of the JobConf cached for each partition ID?

Needs. HadoopRDD#getJobConf has such a comment.

protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (shouldCloneJobConf) {
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
// one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
// somewhat rarely because most jobs treat the configuration as though it's immutable. One
// solution, implemented here, is to clone the Configuration object. Unfortunately, this
// clone can be very expensive. To avoid unexpected performance regressions for workloads and
// Hadoop versions that do not suffer from these thread-safety issues, this cloning is
// disabled by default.

@cxzl25
Copy link
Contributor Author

cxzl25 commented Dec 14, 2020

It seems that the github test is ok, and there is a performance improvement in the production environment test.
Can you review this pr if you have time ? @cloud-fan

@xkrogen
Copy link
Contributor

xkrogen commented Dec 14, 2020

I see, thanks for the reference. So IIUC this patch is primarily targeting the spark.hadoop.cloneConf = true use case?

@cxzl25
Copy link
Contributor Author

cxzl25 commented Dec 15, 2020

I see, thanks for the reference. So IIUC this patch is primarily targeting the spark.hadoop.cloneConf = true use case?

No.
When spark.hadoop.cloneConf=false, HadoopRDD#getPartitions will create a jobconf and add it to hadoopJobMetadata cache.
When the number of partitions of the queried hive table is large, many jobconf objects will be created and added to the cache.
When the drvier memory configuration is small, the driver will use all the memory, and then full gc.

If your hadoop client version is above 2.7, or use the patch of HADOOP-11209, you can enable spark.hadoop.cloneConf=true, at this time the driver will not have too many jobconf objects.

@xkrogen
Copy link
Contributor

xkrogen commented Dec 15, 2020

Thanks for the further explanation, that is very helpful. Seems like potentially the comment in HadoopRDD#getJobConf
should be updated since the concurrency bugs have been fixed in Hadoop since 2.7.0, a pretty old version.

There is still one point I don't understand. It seems that the key for the JobConf in the cache is based on the ID of the RDD, not a per-partition key:

protected val jobConfCacheKey: String = "rdd_%d_job_conf".format(id)

So I would expect there to be one cached entry in hadoopJobMetadata per RDD. How do we end up with one JobConf per partition? Is it because the check if jobConf in cache -> if not put into cache steps are not synchronized, and many threads simultaneously decide that the conf isn't present and then put many copies of the conf into the cache? Or have I missed something?

Thanks for bearing with me as I try to understand this issue!

@cxzl25
Copy link
Contributor Author

cxzl25 commented Dec 16, 2020

To clarify, the partition here refers to the partition of the hive table, not the rdd partition.
For example, using spark sql to read a hive table, the hive table has 10,000 partitions.
HadoopTableReader#makeRDDForPartitionedTable will create 10,000 Rdd, which means there are 10,000 jobconfs.

// (e.g., HadoopRDD uses this to cache JobConfs).
private[spark] val hadoopJobMetadata =
CacheBuilder.newBuilder().softValues().build[String, AnyRef]().asMap()
CacheBuilder.newBuilder().weakValues().build[String, AnyRef]().asMap()
Copy link
Contributor

@cloud-fan cloud-fan Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to put a size limitation for this cache? then soft reference should also be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use limited size, soft-reference cache can reduce the number of YGC than weak-reference.
But what size should it be limited to?
In fact, the driver rarely has the opportunity to reuse the jobconf of the cache, and it makes sense to share the jobconf in the executor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether or not a limit is in place (which could also be a good back-stop to prevent a huge cache), this could be fine - I think the only risk is that weak references are quite readily reclaimed, so this risks losing most of the caching.

@xkrogen
Copy link
Contributor

xkrogen commented Dec 16, 2020

To clarify, the partition here refers to the partition of the hive table, not the rdd partition.

Now it all makes sense. Thanks for the clarification. Seems I needed to read your original message more carefully.

@github-actions
Copy link

github-actions bot commented Apr 1, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Apr 1, 2021
@github-actions github-actions bot closed this Apr 2, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants