Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7385][Core] Add RDD.foreachPartitionWithIndex #5927

Closed
wants to merge 6 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented May 5, 2015

Spark Streaming apps often update external stores transactionally, which requires it to have an id that uniquely identifies the partition of data to be inserted. This can be the (batch time, partition index).
Current work around is to use mapPartitionsWithIndex().count() which is quite hacky. This PR is to add foreachPartitionWithIndex().

@rxin
Copy link
Contributor

rxin commented May 5, 2015

Why not just use TaskContext?

@SparkQA
Copy link

SparkQA commented May 5, 2015

Test build #31920 has finished for PR 5927 at commit 8520748.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented May 5, 2015

Easier for normal users. The alternative is.

rdd.foreachPartition { iter =>  
    val partitionId = TaskContext.get.partitionId
    .... 
}

This is ugly and non-intuitive.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #31921 has finished for PR 5927 at commit b98a91f.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas tdas changed the title [SPARK-7385][Core] Added RDD.foreachPartitionWithIndex [SPARK-7385][Core] Add RDD.foreachPartitionWithIndex May 6, 2015
@tdas
Copy link
Contributor Author

tdas commented May 6, 2015

@koeninger Isn't this going to make it easier to do transactional output operations?

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #31923 has finished for PR 5927 at commit 69bcc61.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

@tdas yeah, Kafka transactional output was why I originally wanted to add
it.

Although that usage of taskcontext shown above is better than my
alternative of mapPartitionsWithIndex plus an empty for each. If I had
thought of task context first I probably wouldn't have bothered.
On May 5, 2015 7:13 PM, "Tathagata Das" notifications@github.com wrote:

@koeninger https://github.com/koeninger Isn't this going to make it
easier to do transactional output operations?


Reply to this email directly or view it on GitHub
#5927 (comment).

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #31927 has finished for PR 5927 at commit 37f1c37.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #31964 has finished for PR 5927 at commit 33fecb2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented May 7, 2015

@rxin Any objections?
@JoshRosen We discussed about this offline. Please take a look.

@JoshRosen
Copy link
Contributor

This seems fine to me. If we were to design this all over again, I'd consider just dropping the withIndex methods in favor of either a withContext method or just requiring users to use TaskContext.get. Since we already have mapPartitionsWithIndex, though, I suppose it's fine to add this for completeness. Note that I wouldn't recommend that we add *WithIndex variants for most methods, but I think that mapPartitions and foreachPartition are somewhat low-level special cases compared to most RDD operations.

@SparkQA
Copy link

SparkQA commented May 7, 2015

Test build #32065 has finished for PR 5927 at commit 83d0e00.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented May 7, 2015

@JoshRosen I agree with that we should not use this as a precedence for
adding XYZWithIndex. However this is also true that TaskContext.get is not
very easy to find out about. Maybe we should cover that in the Spark
website documentation (may be it already is, and I am just not aware).

If there are not other objections, I will merge it.

On Wed, May 6, 2015 at 8:55 PM, UCB AMPLab notifications@github.com wrote:

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32065/
Test PASSed.


Reply to this email directly or view it on GitHub
#5927 (comment).

@pwendell
Copy link
Contributor

pwendell commented May 7, 2015

Hm - I think it might be better to document TaskContext and ask users to use that - this is why we exposed that. From what I can tell, but @rxin and @JoshRosen also prefer that approach.

@tdas
Copy link
Contributor Author

tdas commented May 8, 2015

But as @koeninger pointed out that it was not obvious to find out the TaskContext.get.partitionId(). And may be for consistency with mapPartitionsWithIndex, its okay to have with RDD.foreachPartitionWithIndex() which is strictly easier to find than TaskContext.get.partitionId(). So I think this is a nice-to-have feature.

@rxin
Copy link
Contributor

rxin commented May 8, 2015

Why not just add to the javadoc of mapPartitions to suggest how to get the partition id and task context.

@tdas
Copy link
Contributor Author

tdas commented May 8, 2015

Still ugly IMO.

On Fri, May 8, 2015 at 12:44 AM, Reynold Xin notifications@github.com
wrote:

Why not just add to the javadoc of mapPartitions to suggest how to get the
partition id and task context.


Reply to this email directly or view it on GitHub
#5927 (comment).

@koeninger
Copy link
Contributor

I think if you're going to decide you really don't like withContext/withIndex etc they should be marked as deprecated, in addition to having a scaladoc reference to TaskContext.get

Either that or foreachPartitionWithIndex seems ok to me.

@pwendell
Copy link
Contributor

pwendell commented May 8, 2015

Marking them deprecated sounds like a good idea. The static getter method was specifically designed to replace them.

@tdas
Copy link
Contributor Author

tdas commented May 11, 2015

All right. Since adding ForeachPartitionWithIndex is not the best idea, I will close this PR. Additionally I will document it in the programming guide to use TaskContext.getPartitionId()

@tdas tdas closed this May 11, 2015
@deusaquilus
Copy link

What do you do if you are in local mode and TaskContext .getPartitionId always returns zero?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants