Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30185][SQL] Implement Dataset.tail API #26809

Closed
wants to merge 2 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Dec 9, 2019

What changes were proposed in this pull request?

This PR proposes a tail API.

Namely, as below:

scala> spark.range(10).head(5)
res1: Array[Long] = Array(0, 1, 2, 3, 4)
scala> spark.range(10).tail(5)
res2: Array[Long] = Array(5, 6, 7, 8, 9)

Implementation details will be similar with head but it will be reversed:

  1. Run the job against the last partition and collect rows. If this is enough, return as is.
  2. If this is not enough, calculate the number of partitions to select more based upon
    spark.sql.limit.scaleUpFactor
  3. Run more jobs against more partitions (in a reversed order compared to head) as many as the number calculated from 2.
  4. Go to 2.

Note that, we don't guarantee the natural order in DataFrame in general - there are cases when it's deterministic and when it's not. We probably should write down this as a caveat separately.

Why are the changes needed?

Many other systems support the way to take data from the end, for instance, pandas[1] and
Python[2][3]. Scala collections APIs also have head and tail

On the other hand, in Spark, we only provide a way to take data from the start
(e.g., DataFrame.head).

This has been requested multiple times here and there in Spark user mailing list[4], StackOverFlow[5][6], JIRA[7] and other third party projects such as
Koalas[8]. In addition, this missing API seems explicitly mentioned in comparison to another system[9] time to time.

It seems we're missing non-trivial use case in Spark and this motivated me to propose this API.

[1] https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.tail.html?highlight=tail#pandas.DataFrame.tail
[2] https://stackoverflow.com/questions/10532473/head-and-tail-in-one-line
[3] https://stackoverflow.com/questions/646644/how-to-get-last-items-of-a-list-in-python
[4] http://apache-spark-user-list.1001560.n3.nabble.com/RDD-tail-td4217.html
[5] https://stackoverflow.com/questions/39544796/how-to-select-last-row-and-also-how-to-access-pyspark-dataframe-by-index
[6] https://stackoverflow.com/questions/45406762/how-to-get-the-last-row-from-dataframe
[7] https://issues.apache.org/jira/browse/SPARK-26433
[8] databricks/koalas#343
[9] https://medium.com/@chris_bour/6-differences-between-pandas-and-spark-dataframes-1380cec394d2

Does this PR introduce any user-facing change?

No, (new API)

How was this patch tested?

Unit tests were added and manually tested.

override lazy val metrics = readMetrics ++ writeMetrics
protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited = child.execute().mapPartitionsInternal { iter =>
val slidingIter = iter.sliding(limit)
Copy link
Member Author

@HyukjinKwon HyukjinKwon Dec 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sliding Scala API - I manually tested after writing some logics by myself manually (e.g., having a finite queue and loop once via while). There wasn't notable performance diff so I just decided to use sliding as it does what I want.

@HyukjinKwon HyukjinKwon requested review from viirya, cloud-fan and gengliangwang and removed request for viirya and cloud-fan December 9, 2019 10:38
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Dec 9, 2019

Test build #115032 has finished for PR 26809 at commit 9436dfb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Tail(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode
  • case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec

@srowen
Copy link
Member

srowen commented Dec 9, 2019

I can see the use case for skipping a header, but, this doesn't help if you still want an RDD/DataFrame with the result, because you collect an Array. It also only really works if there is an ordering defined.

How much is this different from sorting in reverse and head()? in comparison this looks like it has to traverse the whole data set?

@HeartSaVioR
Copy link
Contributor

I felt the same with @srowen; once the shuffle is involved, without ordering there should be no outstanding difference with head() as we don't guarantee ordering anyway, and with ordering the semantic would be same as sort with reverse order + head().

It would be great if we can clarify the benefits compared to the same semantic, otherwise it might be just going to be a syntax sugar, though I'd be even OK for it given there're so many requests in the description of PR.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Dec 10, 2019

How much is this different from sorting in reverse and head()? in comparison this looks like it has to traverse the whole data set?

At least it can drop the records at executor sides and it won't require a sort. so, we can do this almost at map-only op.

once the shuffle is involved, without ordering there should be no outstanding difference with head() as we don't guarantee ordering anyway, and with ordering the semantic would be same as sort with reverse order + head().

Yes, I think this is a good point. It can be just a different way for the same thing with ordering. Without ordering, it's designed to follow its natural order, which is not guaranteed in many cases in Spark.

One clear use case might be when it reads from external datasource. If I am not wrong, when we use Hadoop RDD (which most of external datasources use), it respects its natural order. So, spark.read.format("xml").load().tail(5) case will work.
Another case is local collection. If I am not wrong, the natural order is preserved.
I am sure there are such more cases which I should identify.

FWIW, Spark used to (unofficially) respect its natural order but it's broken after we started to consolidate small partitions into a big partition IIRC. This can be configured to keep its natural order for our file based sources too, if I am not wrong, of course, I don't think it's official support though.

@HyukjinKwon
Copy link
Member Author

Oh, BTW, we also have last and first in functions that I think are friends with head and tail.

@cloud-fan
Copy link
Contributor

I think it makes sense to support top-level tail which doesn't need shuffle, like limit (see CollectLimitExec.executeCollect). But I agree with @HeartSaVioR that once this is in the middle and you need a shuffle, then tail and limit has no difference.

@SparkQA
Copy link

SparkQA commented Dec 12, 2019

Test build #115230 has finished for PR 26809 at commit 9b58d4c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Just for clarification, take(..) code path isn't affected at all.

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115676 has finished for PR 26809 at commit 6d40611.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115678 has finished for PR 26809 at commit 8ad431d.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115696 has finished for PR 26809 at commit 8ad431d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115712 has finished for PR 26809 at commit 8ad431d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 26, 2019

Test build #115783 has finished for PR 26809 at commit 40d0740.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 27, 2019

Test build #115868 has finished for PR 26809 at commit 4999808.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Tail(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode
  • case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115930 has finished for PR 26809 at commit 4999808.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Tail(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode
  • case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115932 has finished for PR 26809 at commit 738d3e1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115943 has finished for PR 26809 at commit 738d3e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to master.

dongjoon-hyun pushed a commit that referenced this pull request Jan 18, 2020
### What changes were proposed in this pull request?

#26809 added `Dataset.tail` API. It should be good to have it in PySpark API as well.

### Why are the changes needed?

To support consistent APIs.

### Does this PR introduce any user-facing change?

No. It adds a new API.

### How was this patch tested?

Manually tested and doctest was added.

Closes #27251 from HyukjinKwon/SPARK-30539.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@HyukjinKwon HyukjinKwon deleted the wip-tail branch March 3, 2020 01:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
7 participants