Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10868] monotonicallyIncreasingId() supports offset for indexing #14568

Closed
wants to merge 30 commits into from
Closed

Conversation

tedyu
Copy link
Contributor

@tedyu tedyu commented Aug 9, 2016

What changes were proposed in this pull request?

This PR adds offset to monotonicallyIncreasingId()

How was this patch tested?

Existing tests.

@tedyu tedyu changed the title SPARK-10868 monotonicallyIncreasingId() supports offset for indexing [SPARK-10868] monotonicallyIncreasingId() supports offset for indexing Aug 9, 2016
@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63458 has finished for PR 14568 at commit 4a4e247.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class MonotonicallyIncreasingID(offset: Long = 0) extends LeafExpression with Nondeterministic

@hvanhovell
Copy link
Contributor

Add a test?

@@ -40,13 +40,14 @@ import org.apache.spark.sql.types.{DataType, LongType}
represent the record number within each partition. The assumption is that the data frame has
less than 1 billion partitions, and each partition has less than 8 billion records.""",
extended = "> SELECT _FUNC_();\n 0")
case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {
case class MonotonicallyIncreasingID(offset: Long = 0) extends LeafExpression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if this still works in SQL. We might have to change offset into a literal expression. See HyperLogLogPlusPlus or NTile for examples of this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case class HyperLogLogPlusPlus(
    child: Expression,
    relativeSD: Double = 0.05,
    mutableAggBufferOffset: Int = 0,
    inputAggBufferOffset: Int = 0)

The change seems to be in line with HyperLogLogPlusPlus ctor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63462 has finished for PR 14568 at commit ea77d78.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @group normal_funcs
* @since 1.6.0
* @since 2.0.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets make this 2.1.0. Could you also add a little bit of documentation on the offset? One line suffices.

@hvanhovell
Copy link
Contributor

A high-level question: is it a problem when the offset is larger than 1 << 33? I can't really think of one.

@rxin
Copy link
Contributor

rxin commented Aug 9, 2016

So I'm still confused what offset does, even after reading the code. Can you please write a more clear documentation? An example would help.

Also let's update the Python API as well, and add an integration test for SQL.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63459 has finished for PR 14568 at commit 46ea8a4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class MonotonicallyIncreasingID(offset: Long = 0) extends LeafExpression

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63472 has finished for PR 14568 at commit 8198d9c.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63466 has finished for PR 14568 at commit 81e342d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63469 has finished for PR 14568 at commit f78d6aa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63470 has finished for PR 14568 at commit 1107182.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63473 has finished for PR 14568 at commit b713f23.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63475 has finished for PR 14568 at commit 4d0cd3a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tedyu
Copy link
Contributor Author

tedyu commented Aug 10, 2016

[info] - monotonically_increasing_id_with_offset *** FAILED *** (14 milliseconds)
[info]   org.apache.spark.sql.AnalysisException: Invalid number of arguments for function monotonically_increasing_id; line 1 pos 0
[info]   at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$5.apply(FunctionRegistry.scala:457)
[info]   at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$5.apply(FunctionRegistry.scala:443)

I wonder why 'monotonically_increasing_id(offset: Long): Column' wasn't considered as a match.

@hvanhovell
Copy link
Contributor

monotonically_increasing_id(5) get parsed into the following expression UnresolvedFunction("monotonically_increasing_id", Seq(Literal(5)). Note that the offset is turned into an Expression as well.

The FunctionRegistry can only resolve Expression based constructors. Which is not provided by MonotonicallyIncreasingID. Solution: add an Expression based constructor.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63537 has finished for PR 14568 at commit 58e044f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2016

Test build #63766 has finished for PR 14568 at commit 57ab6fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tedyu
Copy link
Contributor Author

tedyu commented Aug 15, 2016

@rxin
Can you take a look at the python API one more time ?

object MonotonicallyIncreasingID {
private def parseExpression(expr: Expression): Long = expr match {
case IntegerLiteral(i) => i.toLong
case NonNullLiteral(l: Long, LongType) => l.toString.toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return l?

@hvanhovell
Copy link
Contributor

@tedyu the scala code is shaping up nicely.

I do have a question regarding usage. How will this be used? The thing is that the monotonically_increasing_id returns an id based on the number of rows in a partition and the partition id. If I have read the JIRA correctly, someone wants to use this for is id generation in multiple batches. How would a user be able to provide a sensible offset? Could you create an example?

@SparkQA
Copy link

SparkQA commented Aug 15, 2016

Test build #63791 has finished for PR 14568 at commit 5bdb3ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tedyu
Copy link
Contributor Author

tedyu commented Aug 16, 2016

@hvanhovell
As Martin said in JIRA:

  • Add the index column to A' - this time starting at 200, as there are already entries with id's from 0 to 199 (here, monotonicallyInreasingID( 200 ) is required.)
  • union A and A'

Is the above sample good by you ?

@tedyu
Copy link
Contributor Author

tedyu commented Aug 16, 2016

@hvanhovell
What do you think of the above reply ?

@hvanhovell
Copy link
Contributor

Not super. I will try to explain why.

monotonically_increasing_id constructs an id based on an increasing offset (the lower 33 bits) and the partition id (the upper 31 bits). The example given is not super realistic because it only seems to contain the one partition (id=0). In this case maximum id of the previous run would be usable as the seed.

If you have more than one partition an issue emerges, for instance range(0, 9, 1, 3).select(monotonically_increasing_id()).show yields the following ids:

+-----------------------------+
|monotonically_increasing_id()|
+-----------------------------+
|                            0|
|                            1|
|                            2|
|                            3|
|                            4|
|                   8589934592|
|                   8589934593|
|                   8589934594|
|                   8589934595|
|                   8589934596|
+-----------------------------+

Which offset for the next run would you use in this case? Lets explore the the options we have:

  • Taking the maximum offset. This will probably lead to collisions between ids.
  • Calculating the maximum of the lower 33 bits. This relies heavily on the exact implementation of monotonically_increasing_id.

What would you recommend an end user to do?

@tedyu
Copy link
Contributor Author

tedyu commented Aug 16, 2016

With:
spark.range(0, 9, 1, 3).select(monotonically_increasing_id()).show

I got:

+-----------------------------+
|monotonically_increasing_id()|
+-----------------------------+
|                            0|
|                            1|
|                            2|
|                   8589934592|
|                   8589934593|
|                   8589934594|
|                  17179869184|
|                  17179869185|
|                  17179869186|
+-----------------------------+

The next offset could be 3.

@tedyu
Copy link
Contributor Author

tedyu commented Aug 17, 2016

The addition of offset support allows users to concatenate rows from different datasets.

@@ -426,6 +426,29 @@ def monotonically_increasing_id():
return Column(sc._jvm.functions.monotonically_increasing_id())


@since(2.1)
def monotonically_increasing_id_w_offset(offset):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do this by adding a default parameter to the monotonically_increasing_id method and remove this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to do that.
But the @SInCE() annotation becomes confusing.

@hvanhovell
Copy link
Contributor

Ok, so the maximum of the lower 33 bits would be the starting offset for the next batch. That is not super easy for an end user to do. Lets say a user inserts data from table a into table b, using this would require code like this:

insert into b
select *,
       monotonically_increasing_id((select max(id & 8589934591) from b)) as id
from   a

Other than that, the change looks pretty good. I left one last comment regarding the python code.

@SparkQA
Copy link

SparkQA commented Aug 17, 2016

Test build #63929 has finished for PR 14568 at commit 9b34358.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 17, 2016

Test build #63928 has finished for PR 14568 at commit 0167b02.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tedyu
Copy link
Contributor Author

tedyu commented Aug 17, 2016

@hvanhovell
Let me know if there is more I should do for this enhancement.
Thanks

@rxin
Copy link
Contributor

rxin commented Aug 17, 2016

Is this actually going to solve the use case asked in SPARK-10868?

The problem with monotonicallyIncreasingId is that it is actually not consecutive -- e.g. even if you have 200 records in this batch of update, the max id won't be 200. It will be some very large number (since the upper few bits are determined by the partition id).

@tedyu
Copy link
Contributor Author

tedyu commented Aug 17, 2016

As Herman commented above, obtaining lower 33 bits of the id column would allow Ids generated from two (or more) executions to form contiguous range.

@rxin
Copy link
Contributor

rxin commented Aug 17, 2016

Yea but you can't do that more than once.

@tedyu
Copy link
Contributor Author

tedyu commented Aug 17, 2016

Can you elaborate ?

1st run: Id's 1 to 99 are generated.

2nd run: poll Id column and obtain 99. Specify 100 as offset for monotonically_increasing_id(). Id's 100 to 199 are generated.

3rd run: poll Id column and obtain 199. Specify 200 as offset for monotonically_increasing_id(). Id's 200 to 299 are generated.

@rxin
Copy link
Contributor

rxin commented Aug 17, 2016

That won't work when there are more than one partitions.

@tedyu
Copy link
Contributor Author

tedyu commented Aug 17, 2016

I don't think so.
Using (id & 8589934591) would obtain the numbers 99 and 199 in my example.

@rxin
Copy link
Contributor

rxin commented Aug 17, 2016

scala> spark.range(10).selectExpr("monotonically_increasing_id() & 8589934591L").show()
+--------------------------------------------+
|(monotonically_increasing_id() & 8589934591)|
+--------------------------------------------+
|                                           0|
|                                           0|
|                                           0|
|                                           0|
|                                           1|
|                                           0|
|                                           0|
|                                           0|
|                                           0|
|                                           1|
+--------------------------------------------+

@hvanhovell
Copy link
Contributor

The thing is that this (id & 8589934591L) is difficult/strange for an end user to work with; they should not really have to think about such a detail.

@rxin
Copy link
Contributor

rxin commented Aug 17, 2016

@tedyu I closed the original JIRA. Can you close the pull request?

Let me know if it is unclear why this doesn't really solve the problem.

@hvanhovell
Copy link
Contributor

@tedyu could you close this one?

@tedyu tedyu closed this Aug 31, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants