Skip to content

[SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys#35047

Closed
sumeetgajjar wants to merge 30 commits intoapache:masterfrom
sumeetgajjar:SPARK-37175-HashRelation-Optimization
Closed

[SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys#35047
sumeetgajjar wants to merge 30 commits intoapache:masterfrom
sumeetgajjar:SPARK-37175-HashRelation-Optimization

Conversation

@sumeetgajjar
Copy link
Contributor

What changes were proposed in this pull request?

This PR aims at improving performance for Hash joins with many duplicate keys.

A HashedRelation uses a map underneath to store rows against a corresponding key. A LongToUnsafeRowMap is used by LongHashedRelation and a BytesToBytesMap is used by UnsafeHashedRelation.
We propose to reorder the underlying map thereby placing all the rows for a given key adjacent in the memory to improve the spatial locality while iterating over them in the stream side of the join.

This is achieved in the following steps:

  • creating another copy of the underlying map
  • for all keys in the existing map
    • get the corresponding rows
    • insert all the rows for the given key at once in the new map
  • use the new map for look-ups

This optimization can be enabled by specifying spark.sql.hashedRelationReorderFactor=<value>.
Once the condition number of rows >= number of unique keys * above value is satisfied for the underlying map, the optimization will kick in.

Why are the changes needed?

There is no order maintained when the rows are added to the underlying map, thus for a given key, the corresponding rows are typically non-adjacent in memory, resulting in a poor spatial locality. Placing the rows for adjacent in memory yields a performance boost thereby reducing execution time.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Modified existing unit tests to run against the suggested improvement.
  • Added a couple of cases to test the scenarios when the improvement throws an exception due to insufficient memory.
  • Added a micro-benchmark that clearly indicates performance improvements when there are duplicate keys.
  • Ran the four example queries mentioned in the JIRA in spark-sql as a final check for performance improvement.

Credits

This work is based on the initial idea proposed by @bersprockets.

Comment on lines +1158 to +1162

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] IIUC, at this point of time we will have both the old map as well as the new compact map, in memory and we would be holding 2X memory (at this time) which we were holding before (as we just had 1 map). This itself can cause us OOM, do we need some heuristic here ? or perhap's delete / free (not sure if at present we have this functionality) enteries once they have been entered in the compactMap ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case an OOM-related exception is thrown while appending to the map, we invoke maybeCompactMap.foreach(_.free()) in the catch clause which releases the memory of the compactMap.

https://github.com/apache/spark/pull/35047/files#diff-127291a0287f790755be5473765ea03eb65f8b58b9ec0760955f124e21e3452fR1171

Copy link

@singhpk234 singhpk234 Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant here is if we know before hand we don't have this much memory available we should try to short circuit this re-write by not attempting to do it ? we have this getTotalMemoryConsumption from earlier map, and we can calculate available memory to us during start, if we know we can't have / support 2X , maybe we will save time avoiding the extra work if the exception happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a good idea, however, TaskMemoryManager does not expose API to fetch available execution memory. There does exist MemoryManager.getExecutionMemoryUsageForTask but unfortunately MemoryManager is not accessible at the place of building the Map.

private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long = synchronized {

Furthermore, re-building the map takes a fraction amount of the time of the entire query execution.
I observed the following times while running a test with this feature:

Stream side 300M rows
Build side 90M rows
Rebuilding the map 4 seconds (diff from the logs)
Total query execution time with optimization enabled 3.9 minutes
Total query execution time with optimization disabled 9.2 minutes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a valid concern that this puts more memory pressure on the driver. Is it possible to improve the relation-building logic and make it co-locate the values of the same key? Then we don't need to rewrite the relation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a valid concern that this puts more memory pressure on the driver. Is it possible to improve the relation-building logic and make it co-locate the values of the same key? Then we don't need to rewrite the relation.

+1 on this. Beside memory concern, I am also not sure of the performance penalty when doing shuffled hash join with large hash table in memory. We need to probe each key in the hash table to rebuild the table, and we kind of waste of the time of 1st build of table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to improve the relation-building logic and make it co-locate the values of the same key?

Since there is no ordering guarantee with Iterator[InternalRow], I don't believe there is a way to co-locate the values of the same key when building the relation, but let me think about it again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also not sure of the performance penalty when doing shuffled hash join with large hash table in memory. We need to probe each key in the hash table to rebuild the table, and we kind of waste of the time of 1st build of table.

I totally agree regarding wasting time on 1st build of the table. However, if we consider the amount of time taken to re-build the table, it is a minuscule amount when compared to the overall execution time of the query.
It can be clearly seen in the above example.
Also, in the above case, I was running the spark application in local mode. If we run the application on Yarn or K8s, the overall execution time will increase due to added network cost and the table rebuild time would further become a smaller fraction of the overall time.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@sumeetgajjar sumeetgajjar force-pushed the SPARK-37175-HashRelation-Optimization branch from a0a6e0e to 772070c Compare December 31, 2021 06:11
@sumeetgajjar
Copy link
Contributor Author

sumeetgajjar commented Jan 5, 2022

Hi @HyukjinKwon @cloud-fan @dongjoon-hyun can you please review this PR?

(Test / javaOptions) += "-ea",
(Test / javaOptions) ++= {
val metaspaceSize = sys.env.get("METASPACE_SIZE").getOrElse("1300m")
val extraTestJavaArgs = Array("-XX:+IgnoreUnrecognizedVMOptions",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the actual change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual change is to expose an ENV variable to set the heap size. If set this heap size will be using as -Xmx while invoking runMain from sbt.
The default 4g headspace was not sufficient when I was running the newly added benchmark for 10 billion rows (driver OOM'd), thus the change.

While making the change I noticed extraTestJavaArgs Array was first converted to a string and immediately at the next line it was again split on spaces and converted to Seq.
Thus I refactored the code a bit.

var maybeCompactMap: Option[LongToUnsafeRowMap] = None
try {
maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
Math.toIntExact(map.numUniqueKeys)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we let this LongToUnsafeRowMap allocate all the pages ahead, so that we can fail earlier if memory is not enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, it already does.

.version("3.3.0")
.doubleConf
.checkValue(_ > 1, "The value must be greater than 1.")
.createOptional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious of what would be a good default value for this? Are we expecting users to tune this config accordingly for each query? If user needs to tune this config for each query, then I feel this feature is less useful, because user can always choose to rewrite query to sort on join keys before join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the SQL benchmarks (not the micro-benchmark) that I ran, 4 is a good default value for this config.
A value of 4 reduces the query processing time to half.

This was not set at query level but at the spark application level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I mean do we expect users to tune the config per each Spark app/query? I feel the improvement also depends on access pattern from probe/stream side. If the probe side has many looks up of keys with a lot of values, then we can see the improvement. But if the probe side does not look up much for these keys, then we probably cannot see the benefit. I kind of feel that this config is not so easy to use in practice.

Copy link
Contributor Author

@sumeetgajjar sumeetgajjar Jan 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect users to tune the config per each Spark app/query?

No, a global default value of 4, should suffice here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a default hash reorder factor of 4, but a separate config for on/off (off would mean the reorder factor gets passed to the various hash relation factories as None, regardless of the setting of spark.sql.hashedRelationReorderFactor).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a default hash reorder factor of 4, but a separate config for on/off (off would mean the reorder factor gets passed to the various hash relation factories as None, regardless of the setting of spark.sql.hashedRelationReorderFactor).

I like the idea, I'll add it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the improvement also depends on access pattern from probe/stream side. If the probe side has many looks up of keys with a lot of values, then we can see the improvement. But if the probe side does not look up much for these keys, then we probably cannot see the benefit. I kind of feel that this config is not so easy to use in practice.

No, a global default value of 4, should suffice here.

@sumeetgajjar - could you help elaborate more why a global default value is sufficient per my question above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delayed response, I was stuck with some work stuff followed by a sick week due to covid.

@sumeetgajjar - could you help elaborate more why a global default value is sufficient per my question above?

@c21 My rationale behind suggesting a global value of 4 was based on the experiment that I ran. I ran a synthetic workload with the HashOptimization enabled no matter the duplication factor of the keys. I gradually iterated over the duplication factor from 1 to 20, I noticed the optimization to be beneficial right after the duplication factor crossed a value of 4. Thus based on the experiment I conducted locally, I suggested a value of 4.

If the probe side has many looks up of keys with a lot of values, then we can see the improvement. But if the probe side does not look up much for these keys, then we probably cannot see the benefit.

I agree, the synthetic workload I was running queried the probe side such that the majority of the keys had multiple values.

Anyways, due to concerns over the added memory pressure introduced by this Optimization and the feedback received on the config being difficult to tweak, I've decided to close the PR. In case I find a better solution, I'll reopen the PR.

new UnsafeHashedRelation(key.size, numFields, compactMap)
} catch {
case e: SparkOutOfMemoryError =>
logWarning("Reordering BytesToBytesMap failed, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashedRelation building can happen either on driver side (for broadcast join), or executor side (for shuffled hash join), so try increasing the driver memory to mitigate it is not accurate suggestion for users. We probably want to suggest users to disable reordering because it causes OOM here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, will modify the warning message.

Comment on lines +1158 to +1162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a valid concern that this puts more memory pressure on the driver. Is it possible to improve the relation-building logic and make it co-locate the values of the same key? Then we don't need to rewrite the relation.

+1 on this. Beside memory concern, I am also not sure of the performance penalty when doing shuffled hash join with large hash table in memory. We need to probe each key in the hash table to rebuild the table, and we kind of waste of the time of 1st build of table.

@cloud-fan
Copy link
Contributor

Since this introduces overhead (rebuild hash relation, more memory), I think we need to carefully make sure the benefit is larger than the overhead. Asking users to tune the config is really not a good way to roll out this optimization.

Some random ideas:

  1. look at the ndv of join keys. If there are very few duplicated keys, don't do this optimization
  2. doing it at the executor side, in a dynamic way. If we detect that a key is consistently looked up and has many values, compact the values of this key and put it to a new fast map. We can even set an upper bound of the number of keys to optimize, to avoid taking too much memory. We don't need to rewrite the entire hash relation.

@sumeetgajjar
Copy link
Contributor Author

sumeetgajjar commented Feb 1, 2022

Apologies for the delayed response, I was stuck with some work stuff followed by a sick week due to covid.

Some random ideas:

Thanks for the suggestions, appreciate it.

Since this introduces overhead (rebuild hash relation, more memory), I think we need to carefully make sure the benefit is larger than the overhead. Asking users to tune the config is really not a good way to roll out this optimization.

Agreed, in that case, I'll close this PR for the time being. In case I find a better solution, I'll reopen the PR.

Thank you @cloud-fan @c21 @bersprockets @singhpk234 for your comments on the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants

Comments