Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25885][Core][Minor] HighlyCompressedMapStatus deserialization/construction optimization #22894

Closed
wants to merge 1 commit into from

Conversation

Koraseg
Copy link

@Koraseg Koraseg commented Oct 30, 2018

What changes were proposed in this pull request?

Removal of intermediate structures in HighlyCompressedMapStatus will speed up its creation and deserialization time.

https://issues.apache.org/jira/browse/SPARK-25885

How was this patch tested?

Additional tests are not necessary for the patch.

@Koraseg Koraseg changed the title [SPARK-25885] HighlyCompressedMapStatus deserialization/construction optimization [SPARK-25885][Core][Minor] HighlyCompressedMapStatus deserialization/construction optimization Oct 30, 2018
@mgaido91
Copy link
Contributor

@Koraseg please check the contribution guide and update this PR accordingly:

  • first, please fill the PR description properly;
  • second, provide a demonstration of the improved performance with a benchmark;
  • third, IMO this is actually worsening performances, since you are using an immutable data structure, so the map is copied every time is modified....

@Koraseg
Copy link
Author

Koraseg commented Oct 30, 2018

Regarding the third one, the proposed implementation is, conceptually, the same as current one. Under the hood, hugeBlockSizesArray.toMap just updates an internal immutable map reference by tuples from the array and eventually returns it back.

@mgaido91
Copy link
Contributor

Practically, though, it generates a whole copy of the map at every update, so for 10 items, the implementation in the PR generates 9 copies of 1, 2, 3, ... elements, while the current one generates only 1 copy, at the end of size 10. So the proposed change is worse than the current solution. If you create a benchmark, you can see this.

@Koraseg
Copy link
Author

Koraseg commented Oct 31, 2018

Practically, though, it generates a whole copy of the map at every update, so for 10 items, the implementation in the PR generates 9 copies of 1, 2, 3, ... elements, while the current one generates only 1 copy, at the end of size 10. So the proposed change is worse than the current solution. If you create a benchmark, you can see this.

That is not a way how immutable persistent data structures handle updates and the scala map in particular. Moreover, as I mentioned above, it is exactly the same logic, which lies under the hood of ArrayBuffer -> Map conversion in the current implementation. I have only removed an intermediate layer.

I created a benchmark with cut versions of HighlyCompressedStatus (with empty blocks bitmap and huge blocks map only) and measured deserialization performance. The proposed version has shown about 10% performance boost for different blocks configurations.

You can check out the results on the repo below and repeat the test.
https://github.com/Koraseg/mapstatus-benchmark

@mgaido91
Copy link
Contributor

ehat if you use a mutable map instead of an immutable one? which is the perf comparison?

@Koraseg
Copy link
Author

Koraseg commented Oct 31, 2018

Thanks for the remark above. I have checked scala.mutable.Map performance, it is essentially better. For some cases, speed up is up to 2 times I will update the benchmark and the PR soon.

@mgaido91
Copy link
Contributor

that sounds more reasonable and a better implementation, thanks.

@srowen
Copy link
Member

srowen commented Oct 31, 2018

I also would not expect updating an immutable data structure to be faster. Building a map once from tuples at the end seems better than rebuilding a map each time. Under the hood the immutable map is going to be a HashTrieMap (a map of smaller optimized immutable maps) and its updated0 method does some clever stuff to avoid recreating the whole map.

But, yeah, why immutable here to begin with? it ought to be better still to update a mutable Map. And then I am still not sure why it would be faster to keep the map invariants over this loop rather than build the map with its size known ahead of time at the end.

Benchmarks are good evidence but we just need to make sure that the difference is material as used in Spark. It may well be.

@Koraseg
Copy link
Author

Koraseg commented Oct 31, 2018

I have also updated benchmarks here: https://github.com/Koraseg/mapstatus-benchmark.

By the way, the best performance has shown gnu.trove.map.hash.TIntByteHashMap. But the boost in comparison with java.util.HashMap seems not too big for a new library dependency.

@@ -149,7 +150,7 @@ private[spark] class HighlyCompressedMapStatus private (
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long,
private var hugeBlockSizes: Map[Int, Byte])
private[this] var hugeBlockSizes: mutable.Map[Int, Byte])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't be changed, we should still have an immutable map here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about to use more generic scala.collection.Map[Int, Byte] type here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mutated though, and needs to be mutable. If it were exposed outside the class, or there was significant danger of accidentally mutating it elsewhere, I think it might be necessary to wrap the result in an immutable wrapper, but here this seems OK to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should change it. Despite we are using a mutable map in order to build it, the result should be an immutable map, as it enforces correctness, avoiding potential bad updates. So I don't think this should be changed. You can just call toMap on the mutable one, but in this case, I think the performance would become again like the original one, as there is no override of toMap in scala. A better option would then probablt be using a immutable.Map.Builder. Could you check it please?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that possibility. If the cost of wrapping/building an immutable map isn't higher, it's better.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, it means that we use immutable Map instead of mutable one, with worse performance characteristics, since MapBuilder just updates inner reference to immutable Map after each insert. To enforce correctness, what about to use Scala.collection.Map[Int, Byte]? It doesn't allow dangerous mutations operations and mutable.Map is its subtype.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're right. I just checked the implementation of the MapBuilder... Seems like there is no efficient way to build an immutable map... I am fine with the approach you're proposing using scala.collection.Map, but still seems a bit hacky to me.

@@ -189,13 +190,12 @@ private[spark] class HighlyCompressedMapStatus private (
emptyBlocks.readExternal(in)
avgSize = in.readLong()
val count = in.readInt()
val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]()
hugeBlockSizes = new util.HashMap[Int, Byte](count).asScala
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use a mutable.Map instead of the java implementation wrapped by scala, this is not a clean solution IMHO

@@ -189,13 +190,12 @@ private[spark] class HighlyCompressedMapStatus private (
emptyBlocks.readExternal(in)
avgSize = in.readLong()
val count = in.readInt()
val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]()
hugeBlockSizes = new util.HashMap[Int, Byte](count).asScala
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just scala's mutable Map? I'd expect it's no slower than Java's, given it might specialize for primitives (not sure about this) and sometimes has smarter implementations internally.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala.mutable.HashMap implementation does not have a way to set initial capacity out of the box. The performance gets worse, probably because of resizing hash table.

scala.mutable.OpenHashMap implementation does, but it is still slower than java.util.HashMap

However, if a kind of tradeoff between code cleanness and performance os needed, I would use one of the variants above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does calling HashMap.sizeHint(...) here actually help?
I'd stick to the scala collection for now

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, its default implementation in Builder trait is empty, and it is not overridden for the mutable map.

(0 until count).foreach { _ =>
val block = in.readInt()
val size = in.readByte()
hugeBlockSizesArray += Tuple2(block, size)
hugeBlockSizes.asInstanceOf[mutable.Map[Int, Byte]].update(block, size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cast it? it is used as a mutable map and its type is a mutable map, so the type on line 151 is wrong. Also, just hugeBlockSizes(block) = size, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I used more generic map type in HighlyCompressedMapStatus, so cast was to make update operations on it possible. However, it does not seem necessary because HighlyCompressedMapStatus fields are not exposed outside. So we can simply change type of the map right there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is a mutable map and used as a mutable map. Its type must reflect that.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'm good with this approach, after the quick discussion. Thanks!

@mgaido91
Copy link
Contributor

mgaido91 commented Nov 6, 2018

shall we trigger the tests for this @srowen ?

@SparkQA
Copy link

SparkQA commented Nov 6, 2018

Test build #4414 has finished for PR 22894 at commit 57bdd75.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Koraseg
Copy link
Author

Koraseg commented Nov 6, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Nov 7, 2018

Test build #4416 has finished for PR 22894 at commit 57bdd75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Nov 7, 2018

Merged to master

@asfgit asfgit closed this in 0a32238 Nov 7, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…construction optimization

## What changes were proposed in this pull request?

Removal of intermediate structures in HighlyCompressedMapStatus will speed up its creation and deserialization time.

https://issues.apache.org/jira/browse/SPARK-25885

## How was this patch tested?

Additional tests are not necessary for the patch.

Closes apache#22894 from Koraseg/mapStatusesOptimization.

Authored-by: koraseg <artem.kupchinsky@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants