Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column #16739

Closed
wants to merge 6 commits into from

Conversation

felixcheung
Copy link
Member

What changes were proposed in this pull request?

Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column

How was this patch tested?

manual, unit tests

@SparkQA
Copy link

SparkQA commented Jan 30, 2017

Test build #72147 has started for PR 16739 at commit 50ab563.

@felixcheung
Copy link
Member Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jan 30, 2017

Test build #72149 has finished for PR 16739 at commit 50ab563.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

#' df <- read.json(path)
#' newDF <- coalesce(df, 1L)
#'}
#' @note coalesce(SparkDataFrame) since 2.1.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.2.0? Or this will be ported back to 2.1.1 too.

setMethod("coalesce",
signature(x = "SparkDataFrame"),
function(x, numPartitions) {
stopifnot(is.numeric(numPartitions))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we enforce the input param as Integer?

Copy link
Member Author

@felixcheung felixcheung Jan 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's being coerced into integer - the reason we don't want this to be integer is to allow calls like

coalesce(df, 3)

in which 3 is a numeric by default. (vs 3L is integer) IMO, forcing the user to call with 3L is a bit too much

@@ -406,6 +406,13 @@ setGeneric("attach")
#' @export
setGeneric("cache", function(x) { standardGeneric("cache") })

#' @rdname coalesce
#' @param x a Column or a SparkDataFrame.
#' @param ... additional argument(s). If \code{x} is a Column, addition Columns can be optionally
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addition Columns -> additional Columns?

@SparkQA
Copy link

SparkQA commented Jan 30, 2017

Test build #72166 has finished for PR 16739 at commit 938c2ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@shivaram shivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment on the doc - otherwise looking good. And I can see that the RDD stuff is getting annoying - Will respond on that JIRA

#'
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are more partitions then there will be a shuffle right ? Might be useful to add that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no, coalesce is set to min(prev partitions, numPartitions) according to CoalescedRDD here so it will be unchanged then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh well I guess thats worth mentioning then ?

@SparkQA
Copy link

SparkQA commented Feb 1, 2017

Test build #72232 has finished for PR 16739 at commit 1bd7163.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2017

Test build #72240 has finished for PR 16739 at commit 3ed835a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

shivaram commented Feb 1, 2017

Thanks @felixcheung - I think these changes look good.

cc @gatorsmile / @holdenk for doc changes in SQL, Python

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the Python Doc String update looks reasonable, but maybe while we are updating the coalesce docstrings accross the three languages we should consider if we want to include the warning from RDD's coalesce?

@@ -2432,7 +2432,8 @@ class Dataset[T] private[sql](
* Returns a new Dataset that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
* the 100 new partitions will claim 10 of the current partitions. If a larger number of
* partitions is requested, it will stay at the current number of partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we seem to have left out the warning from RDD about darastic coaleces in the Dataset coalesce. Since we are updating the docstrings now anyways would it maybe make sense to include that warning here as well? (Looking at the implementation of CoalesceExec it seems like it would still apply unless I'm missing something).

@felixcheung
Copy link
Member Author

surely, i think you mean https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L428
we will need to update this to say use repartition() if you want shuffling though, since the shuffle option is only on RDD.

@felixcheung
Copy link
Member Author

and actually I find the current behavior a bit hard to explain, could someone perhaps enlighten me if this is intentional and how best, if we are to, document this behavior?

 df <- as.DataFrame(cars, numPartitions = 5) <-- this set numSlices on RDD to 5
 +  expect_equal(getNumPartitions(df), 5)
 +  expect_equal(getNumPartitions(coalesce(df, 3)), 3)
 +  expect_equal(getNumPartitions(coalesce(df, 6)), 5)
 +
 +  df1 <- coalesce(df, 3)
 +  expect_equal(getNumPartitions(df1), 3)
 +  expect_equal(getNumPartitions(coalesce(df1, 6)), 5)  <---- even after a coalesce it can't go beyond 5 
 +  expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
 +  expect_equal(getNumPartitions(coalesce(df1, 2)), 2)
 +
 +  df2 <- repartition(df1, 10)
 +  expect_equal(getNumPartitions(df2), 10) <-- right after repartition the number of partition is greater than the original numSlices
 +  expect_equal(getNumPartitions(coalesce(df2, 13)), 5) <-- but coalesce after repartition it can't go beyond 5
 +  expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
 +  expect_equal(getNumPartitions(coalesce(df2, 3)), 3)

@holdenk
Copy link
Contributor

holdenk commented Feb 1, 2017

@felixcheung I was refering to the ` * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,

  • this may result in your computation taking place on fewer nodes than
  • you like (e.g. one node in the case of numPartitions = 1). To avoid this,
  • you can pass shuffle = true. This will add a shuffle step, but means the
  • current upstream partitions will be executed in parallel (per whatever
  • the current partitioning is).
    ` warning

but documentating the coalesce capping out based on numSlices also sounds important to document (and potentially confusing).

@felixcheung
Copy link
Member Author

yap, #16739 (comment) - only RDD has coalesce(.. shuffle), in Dataset, it's coalesce and repartition

@gatorsmile
Copy link
Member

coalesce is used to decrease the number of partitions in the RDD, but when you are setting it to a number that is larger than the number of the current RDD partitions, the result is not predicable. It depends on your RDD physical distribution.

Thus, I am wondering whether we should allow users to set it to a larger number? Or some advanced users are using it?

@felixcheung
Copy link
Member Author

@gatorsmile thanks for commenting. coalesce currently accept a number even if it is larger than the current number of partitions - I guess we didn't want to throw exeception in that case?

but, since you are here, do you know why we see this behavior

df2 <- repartition(df1, 10)
expect_equal(getNumPartitions(df2), 10) <-- right after repartition the number of partition is greater than the original numSlices
expect_equal(getNumPartitions(coalesce(df2, 13)), 5) <-- but coalesce after repartition it can't go beyond 5

Shouldn't I allow to set partition to 5 < n < 10, since I just repartition(10)?

@gatorsmile
Copy link
Member

: ) This might be caused by the optimizer rule CollapseRepartition. Can you output the plan by explain(true)?

@felixcheung
Copy link
Member Author

felixcheung commented Feb 5, 2017

hmm, not as far as I can see:

> df2 <- repartition(df1, 10)
>   getNumPartitions(df2) # right after repartition the number of partition is greater than the original numSlices
[1] 10
> foo <-  coalesce(df2, 13)
> explain(foo, extended = T)
== Parsed Logical Plan ==
Repartition 13, false
+- Repartition 10, true
   +- Repartition 3, false
      +- LogicalRDD [speed#2, dist#3]

== Analyzed Logical Plan ==
speed: double, dist: double
Repartition 13, false
+- Repartition 10, true
   +- Repartition 3, false
      +- LogicalRDD [speed#2, dist#3]

== Optimized Logical Plan ==
Repartition 13, false
+- LogicalRDD [speed#2, dist#3]

== Physical Plan ==
Coalesce 13
+- Scan ExistingRDD[speed#2,dist#3]

Perhaps during optimization the Repartition is optimized out? As you can see there's only Coalesce in the Physical Plan.

@SparkQA
Copy link

SparkQA commented Feb 12, 2017

Test build #72791 has finished for PR 16739 at commit a0fe134.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2017

Test build #72790 has finished for PR 16739 at commit 55b99df.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

gatorsmile commented Feb 14, 2017

Let me rewrite the test cases in Scala.

    val df = spark.range(0, 10000, 1, 5)
    assert(df.rdd.getNumPartitions == 5)
    assert(df.coalesce(3).rdd.getNumPartitions == 3)
    assert(df.coalesce(6).rdd.getNumPartitions == 5)

    val df1 = df.coalesce(3)
    assert(df1.rdd.getNumPartitions == 3)
    assert(df1.coalesce(6).rdd.getNumPartitions == 5)
    assert(df1.coalesce(4).rdd.getNumPartitions == 4)
    assert(df1.coalesce(2).rdd.getNumPartitions == 2)

    val df2 = df.repartition(10)
    assert(df2.rdd.getNumPartitions == 10)
    assert(df2.coalesce(13).rdd.getNumPartitions == 5)
    assert(df2.coalesce(7).rdd.getNumPartitions == 5)
    assert(df2.coalesce(3).rdd.getNumPartitions == 3)

The question is why the second one is 5 instead of 10. If we do the explain, we got the following plan

df2.coalesce(13).explain(true)

== Parsed Logical Plan ==
Repartition 13, false
+- Repartition 10, true
   +- Range (0, 10000, step=1, splits=Some(5))

== Analyzed Logical Plan ==
id: bigint
Repartition 13, false
+- Repartition 10, true
   +- Range (0, 10000, step=1, splits=Some(5))

== Optimized Logical Plan ==
Repartition 13, false
+- Range (0, 10000, step=1, splits=Some(5))

== Physical Plan ==
Coalesce 13
+- *Range (0, 10000, step=1, splits=Some(5))

Ok... Repartition 10, true is removed by our Optimizer rule CollapseRepartition. It is a bug, I think. Your question is valid. Let me fix it.

@felixcheung
Copy link
Member Author

great, looking forward to that.
I'm going to merge this unless anyone has a concern?

@gatorsmile
Copy link
Member

The issue is fixed in #16933. If this is merged at first, I will fix the test case in this PR Thanks! : )

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72925 has started for PR 16739 at commit bf2373f.

@felixcheung
Copy link
Member Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72929 has finished for PR 16739 at commit bf2373f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 671bc08 Feb 15, 2017
asfgit pushed a commit that referenced this pull request Feb 15, 2017
Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column

manual, unit tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #16739 from felixcheung/rcoalesce.

(cherry picked from commit 671bc08)
Signed-off-by: Felix Cheung <felixcheung@apache.org>
@felixcheung
Copy link
Member Author

merged to master and branch-2.1
@gatorsmile thanks - please feel free to update or remove unneeded test cases.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Feb 16, 2017

Hi, @felixcheung .
While backporting, line 1232 6c35399#diff-3d2a6b9d2b7d84ae179d7ea0f9eca696R1232 seems to break the test build of branch-2.1.
The PR about to_timestamp is not backported to branch-2.1 yet.
Could you backport that issue, too?

Failed -------------------------------------------------------------------------
1. Error: column functions (@test_sparkSQL.R#1232) -----------------------------
could not find function "to_timestamp"
1: .handleSimpleError(function (e) 
   {
       e$call <- sys.calls()[(frame + 11):(sys.nframe() - 2)]
       register_expectation(e, frame + 11, sys.nframe() - 2)
       signalCondition(e)
   }, "could not find function \"to_timestamp\"", quote(eval(expr, envir, enclos))) at /home/jenkins/workspace/spark-branch-2.1-test-sbt-hadoop-2.6/R/lib/SparkR/tests/testthat/test_sparkSQL.R:1232
2: eval(expr, envir, enclos)

@felixcheung
Copy link
Member Author

felixcheung commented Feb 16, 2017

@dongjoon-hyun my apologies, thanks for bringing this to my attention. I had to hand merge and didn't realize the mismatch. Opened a new PR to fix that.

@dongjoon-hyun
Copy link
Member

Thank YOU, always! :)

asfgit pushed a commit that referenced this pull request Feb 16, 2017
## What changes were proposed in this pull request?

fix test broken by git merge for #16739

## How was this patch tested?

manual

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #16950 from felixcheung/fixrtest.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 16, 2017
## What changes were proposed in this pull request?

Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes apache#16739 from felixcheung/rcoalesce.
@jkbradley
Copy link
Member

I've commented elsewhere, but wanted to here just to make more people aware: Let's refrain from backporting new APIs into patch versions unless they are really critical. We do not do this elsewhere in Spark, and we should not in SparkR. New APIs and API changes should only happen in minor versions (and ideally changes will only happen in major ones). It's been discussed elsewhere that SparkR is more experimental than other parts of Spark, but the sooner we start treating it like a stable library, the sooner it will be a stable library. For most people, there isn't a huge difference between getting a new API in a patch version (every 1-2 months) vs. getting it in a minor version (every 4 months). Thanks all!

asfgit pushed a commit that referenced this pull request Mar 8, 2017
…nabled Repartition

### What changes were proposed in this pull request?

Observed by felixcheung  in #16739, when users use the shuffle-enabled `repartition` API, they expect the partition they got should be the exact number they provided, even if they call shuffle-disabled `coalesce` later.

Currently, `CollapseRepartition` rule does not consider whether shuffle is enabled or not. Thus, we got the following unexpected result.

```Scala
    val df = spark.range(0, 10000, 1, 5)
    val df2 = df.repartition(10)
    assert(df2.coalesce(13).rdd.getNumPartitions == 5)
    assert(df2.coalesce(7).rdd.getNumPartitions == 5)
    assert(df2.coalesce(3).rdd.getNumPartitions == 3)
```

This PR is to fix the issue. We preserve shuffle-enabled Repartition.

### How was this patch tested?
Added a test case

Author: Xiao Li <gatorsmile@gmail.com>

Closes #16933 from gatorsmile/CollapseRepartition.
@shivaram
Copy link
Contributor

Agree with @jkbradley on this one. We should avoid adding functions that are completely new in a patch release given that the timing between minor versions and patch releases aren't that high. As we discussed in the other thread, lets start tagging JIRAs with backport and also add a line in the JIRA saying why its safe/required for backport.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants