Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9189][CORE] Takes locality and the sum of partition length into account when partition is instance of HadoopPartition in operator coalesce #7536

Closed
wants to merge 2 commits into from

Conversation

watermen
Copy link
Contributor

Before:
Takes locality and the number of partitions into account in operator coalesce.

After:
Takes locality and the sum of partition length(part1.len + part2.len + ... + partN.len) into account when partition is instance of HadoopPartition in operator coalesce.

To make the data size of partition more balanced.

/cc @liancheng @scwf

@scwf
Copy link
Contributor

scwf commented Jul 20, 2015

@watermen i think this should be [SPARK-9189][CORE]

@watermen watermen changed the title [SPARK-9189][SQL] Takes locality and the sum of partition length into account when partition is instance of HadoopPartition in operator coalesce [SPARK-9189][CORE] Takes locality and the sum of partition length into account when partition is instance of HadoopPartition in operator coalesce Jul 21, 2015
@andrewor14
Copy link
Contributor

add to whitelist

@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37909 has finished for PR 7536 at commit cb72d0f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@watermen
Copy link
Contributor Author

@andrewor14 retest it?
[error] SERVER ERROR: Service Temporarily Unavailable url=http://maven.twttr.com/org/apache/hadoop/hadoop-yarn-server/2.2.0/hadoop-yarn-server-2.2.0.jar

@liancheng
Copy link
Contributor

@watermen You can retest it yourself now as Andrew has put you into the whitelist.

@liancheng
Copy link
Contributor

Jenkins is being shut down for maintenance purposes. You may retest this PR after it comes back.

@watermen
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #42 has finished for PR 7536 at commit cb72d0f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37951 has finished for PR 7536 at commit cb72d0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@watermen
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38004 has finished for PR 7536 at commit cb72d0f.

  • This patch fails some tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #47 has finished for PR 7536 at commit cb72d0f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@scwf
Copy link
Contributor

scwf commented Jul 22, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #57 has finished for PR 7536 at commit cb72d0f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38066 has finished for PR 7536 at commit cb72d0f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@watermen
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #38141 has finished for PR 7536 at commit cb72d0f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #67 has finished for PR 7536 at commit cb72d0f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@watermen
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 26, 2015

Test build #108 has finished for PR 7536 at commit cb72d0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 26, 2015

Test build #38473 has finished for PR 7536 at commit cb72d0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val minPowerOfTwo = if (p.isInstanceOf[HadoopPartition]) {
val groupLen1 = groupArr(r1).arr.map(part =>
part.asInstanceOf[HadoopPartition].inputSplit.value.getLength).sum
val groupLen2 = groupArr(r1).arr.map(part =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be val groupLen2 = groupArr(r2)? Otherwise groupLen1 will aways equal groupLen2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hellertime Yes, I'll fix this bug, thanks.

@watermen
Copy link
Contributor Author

watermen commented Aug 6, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #244 has finished for PR 7536 at commit 5623370.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #39997 has finished for PR 7536 at commit 5623370.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@watermen
Copy link
Contributor Author

@andrewor14 @srowen Any more comment on this?

@srowen
Copy link
Member

srowen commented Aug 11, 2015

That seems reasonable to me, although I am not sure if inputLength is necessarily set in all cases? this is just because I don't know the code. Also are there other places that need a similar treatment?

@andrewor14
Copy link
Contributor

@watermen can you add a unit test for this? The high level motivation sounds reasonable to me, but like @srowen I'm not familiar enough with the Hadoop code to merge this. Perhaps @tgravescs would have a better idea?

@tgravescs
Copy link
Contributor

On the hadoop size getLength() is part of the required interface so the function would be there. With all the inputs formats I've seen this is always set to something reasonable but anyone can write a custom input format. I could see cases where someone has an input format that they don't know the size or its more expensive to compute the size then just to fetch it. You could simply add a check for 0 and fall back to the # of partitions.

@watermen have you run this on real cluster with skewed data to see if it makes a difference? what input formats have you used?

if there are thousands (or tens of thousands) of partitions and you are coalescing into small # of buckets we are now potentially calculating the length in every group over and over again. did you test to see how long that takes vs just checking the size of the array? I'm guessing that isn't to bad but it doesn't hurt to verify.

@srowen
Copy link
Member

srowen commented Sep 3, 2015

(PS Yeah I meant that I wasn't sure whether getLength always returned a meaningful value, as I sort of distantly remember a problem with this, but, it might be imagining it.)

@watermen watermen closed this Oct 14, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants