Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27297] [SQL] Add higher order functions to scala API #24232

Closed
wants to merge 42 commits into from
Closed

[SPARK-27297] [SQL] Add higher order functions to scala API #24232

wants to merge 42 commits into from

Conversation

nvander1
Copy link
Contributor

@nvander1 nvander1 commented Mar 28, 2019

What changes were proposed in this pull request?

There is currently no existing Scala API equivalent for the higher order functions introduced in Spark 2.4.0.

  • transform
  • aggregate
  • filter
  • exists
  • forall
  • zip_with
  • map_zip_with
  • map_filter
  • transform_values
  • transform_keys

Equivalent column based functions should be added to the Scala API for org.apache.spark.sql.functions with the following signatures:

 

def transform(column: Column, f: Column => Column): Column = ???

def transform(column: Column, f: (Column, Column) => Column): Column = ???

def exists(column: Column, f: Column => Column): Column = ???

def filter(column: Column, f: Column => Column): Column = ???

def aggregate(
expr: Column,
zero: Column,
merge: (Column, Column) => Column,
finish: Column => Column): Column = ???

def aggregate(
expr: Column,
zero: Column,
merge: (Column, Column) => Column): Column = ???

def zip_with(
left: Column,
right: Column,
f: (Column, Column) => Column): Column = ???

def transform_keys(expr: Column, f: (Column, Column) => Column): Column = ???

def transform_values(expr: Column, f: (Column, Column) => Column): Column = ???

def map_filter(expr: Column, f: (Column, Column) => Column): Column = ???

def map_zip_with(left: Column, right: Column, f: (Column, Column, Column) => Column): Column = ???

How was this patch tested?

I've mimicked the existing tests for the higher order functions in org.apache.spark.sql.DataFrameFunctionsSuite that use expr to test the higher order functions.

As an example of an existing test:

  test("map_zip_with function - map of primitive types") {
    val df = Seq(
      (Map(8 -> 6L, 3 -> 5L, 6 -> 2L), Map[Integer, Integer]((6, 4), (8, 2), (3, 2))),
      (Map(10 -> 6L, 8 -> 3L), Map[Integer, Integer]((8, 4), (4, null))),
      (Map.empty[Int, Long], Map[Integer, Integer]((5, 1))),
      (Map(5 -> 1L), null)
    ).toDF("m1", "m2")

    checkAnswer(df.selectExpr("map_zip_with(m1, m2, (k, v1, v2) -> k == v1 + v2)"),
      Seq(
        Row(Map(8 -> true, 3 -> false, 6 -> true)),
        Row(Map(10 -> null, 8 -> false, 4 -> null)),
        Row(Map(5 -> null)),
        Row(null)))
}

I've added this test that performs the same logic, but with the new column based API I've added.

    checkAnswer(df.select(map_zip_with(df("m1"), df("m2"), (k, v1, v2) => k === v1 + v2)),
      Seq(
        Row(Map(8 -> true, 3 -> false, 6 -> true)),
        Row(Map(10 -> null, 8 -> false, 4 -> null)),
        Row(Map(5 -> null)),
        Row(null)))

*
* @group collection_funcs
*/
def exists(column: Column, f: Column => Column): Column = withExpr {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how do we support this in Java?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change the signatures to accept scala.runtime.AbstractFunctions instead to avoid using the Function traits?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add (Scala-specific) at least for each doc. BTW, please take a look for style guide at https://github.com/databricks/scala-style-guide

Copy link
Contributor Author

@nvander1 nvander1 Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually a better idea would probably be to use java functional interfaces.

@FuncitonalInterface
interface Function3[T1, T2, T3, R] {
  R apply(T1 t1, T2 t2, T3 t3);
}

Column map_zip_with(Column left, Column right, Function3[Column, Column, Column, Column] f) = ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And of course we would use the existing functional interfaces first from java.util.function, but I don't think there are any that accept three parameters likes some of the functions here require.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears these interfaces already exist in the source tree: https://github.com/apache/spark/blob/v2.4.0/core/src/main/java/org/apache/spark/api/java/function/Function3.java

I'll come back later to add java-specific apis that utilizes these.

@srowen
Copy link
Member

srowen commented Mar 28, 2019

I think this is a whole lot to add and support in the APIs, and will be Scala-specific, in an API that is more meant to follow SQL operations than Scala.

@nvander1
Copy link
Contributor Author

@srowen Can't we make the same argument against any of the scala functions in org.apache.spark.sql.functions ?

Also, I can provide equivalent methods in java that will accept lambda expressions via a functional interface.

@nvander1
Copy link
Contributor Author

I've added Java-specific versions of the api as well to show how java interop can be handled. I still need to add the corresponding tests.

@HyukjinKwon
Copy link
Member

Can you hold it for a while before we go further? Growing APIs in functions.scala is a concern. cc @rxin, @gatorsmile, and @ueshin

@nvander1
Copy link
Contributor Author

Sure, I'd love to get some more feedback on this!

@nvander1
Copy link
Contributor Author

nvander1 commented Apr 2, 2019

Any more thoughts on this? @HyukjinKwon @rxin @gatorsmile @ueshin @srowen

@gatorsmile
Copy link
Member

Also cc @hvanhovell

@nvander1
Copy link
Contributor Author

Would it be more appropriate for me to close the issue and make a third party library for these if growing the API is a concern? @HyukjinKwon @rxin @gatorsmile @ueshin @srowen @hvanhovell

@HyukjinKwon
Copy link
Member

I have no strong opinion on here. I would leave it to @rxin, @ueshin, @hvanhovell

@ssimeonov
Copy link
Contributor

It's extremely weird and inconsistent for there to be SparkSQL functions with no DSL equivalent. It forces companies such as Swoop to create our own (e.g., https://gist.github.com/ssimeonov/8d902d0dfda934a79c3a46ec7dc0523d) yet bear the uncertainty as to what OSS Spark does, which is not a great outcome for the ecosystem. It would have been much better if SparkSQL and DSL support had been launched jointly.

The growing size of functions feels like a red herring, but, if it is a concern, what prevents us from putting various categories of functions in other "namespaces", the way Dataset does with statistics & missing value functionality?

Either way, delaying a decision on this functionality, by which I don't mean this specific PR, does not help.

/cc @rxin

@nvander1
Copy link
Contributor Author

I noticed that the implementation I initially submitted only worked for bound column references, so I've fixed that with this most recent commit. Referencing columns via col("x") will work now instead of just dataframe("x")

@rxin
Copy link
Contributor

rxin commented Jun 15, 2019

I feel it's ok to have these functions. Fills a gap.

@nvander1
Copy link
Contributor Author

@rxin @HyukjinKwon What's the next steps here? Can we get a jenkins build kicked off?

@HyukjinKwon
Copy link
Member

ok to test

@HyukjinKwon
Copy link
Member

@ssimeonov,

It's extremely weird and inconsistent for there to be SparkSQL functions with no DSL equivalent.

See https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L42-L56

@SparkQA
Copy link

SparkQA commented Jul 2, 2019

Test build #107117 has finished for PR 24232 at commit 6bf07d8.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nvander1
Copy link
Contributor Author

nvander1 commented Jul 11, 2019

The build is failing but not at changes I have made:

error] /home/jenkins/workspace/SparkPullRequestBuilder/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala:184: type mismatch;
[error]  found   : Function0[Int] (in scala) 
[error]  required: Function0[?]   (in org.apache.spark.api.java.function) 
[error]     val sampleUDF = udf(sample)

@HyukjinKwon How should we proceed to isolate the failure from my changes?

@srowen
Copy link
Member

srowen commented Jul 11, 2019

That doesn't seem to be failing in master. I suspect it is somehow related to this change though it's hard to see how here. Does it compile locally?

@HyukjinKwon
Copy link
Member

ping @nvander1 are you able to compile locally?

@SparkQA
Copy link

SparkQA commented Sep 19, 2019

Test build #110961 has finished for PR 24232 at commit e43033b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Sep 19, 2019

FYI: We might want to include the method for #25666.

@nvander1
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111034 has finished for PR 24232 at commit 722f0e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nvander1
Copy link
Contributor Author

@ueshin Do you want to wait for #25666 to get sorted before moving forward with this? Or get this moving, then add the binding for the new filter overload in #25666?

@ueshin
Copy link
Member

ueshin commented Sep 25, 2019

@nvander1 We don't need to wait for #25666. It won't affect the current behavior but just add a new usage. We can do it in the separate PR.

@nvander1
Copy link
Contributor Author

@ueshin I think it's ready to go now then, pending maintainer review :)

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some nits. Otherwise LGTM.
Btw, shall we add forall in the description?
Thanks!

* @since 3.0.0
*/
def aggregate(expr: Column, zero: Column, merge: (Column, Column) => Column,
finish: Column => Column): Column = withExpr {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style

  def aggregate(
      expr: Column,
      zero: Column,
      merge: (Column, Column) => Column,
      finish: Column => Column): Column = withExpr {
    ...
  }

* @since 3.0.0
*/
def map_zip_with(left: Column, right: Column,
f: (Column, Column, Column) => Column): Column = withExpr {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

put(2, 1);
put(4, 2);
}}),
null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style. one more indent?

put(1, 2);
put(2, 4);
}}),
null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    @Test
    public void testTransformValues() {
        checkAnswer(
            mapDf.select(transform_values(col("x"), (k, v) -> k.plus(v))),
            toRows(
                mapAsScalaMap(new HashMap<Integer, Integer>() {{
                    put(1, 2);
                    put(2, 4);
                }}),
                null
            )
        );
    }

Does this work as well? I've moved the new HashMap up a line. @ueshin

Also, what is the general preference in the codebase, each paren and brace on a new line?

Or the more "lispy" style of every close on the same line:

    @Test
    public void testTransformValues() {
        checkAnswer(
            mapDf.select(transform_values(col("x"), (k, v) -> k.plus(v))),
            toRows(
                mapAsScalaMap(new HashMap<Integer, Integer>() {{
                    put(1, 2);
                    put(2, 4);}}),
                null));
    }

I've seen a mixture of the two to various degrees in the code, I edited this file to at least be consistent with itself (the exception here being the mapAsScalaMap / hashmap since it really is its own entity just being converted to a scala equivalent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the first one is more preferred.
The second one needs a line break at the end of HashMap since it's a block:

                mapAsScalaMap(new HashMap<Integer, Integer>() {{
                    put(1, 2);
                    put(2, 4);
                }}),
                null));

I'm not quite sure about the parentheses after null. Maybe we need a line break as well.

As for my comment, sorry, maybe my pointer was wrong.
I meant new HashMap ... should be on one more indent.

                mapAsScalaMap(
                    new HashMap<Integer, Integer>() {{	
                        put(1, 2);
                        put(2, 4);
                    }}
                ),
                null

@SparkQA
Copy link

SparkQA commented Oct 2, 2019

Test build #111666 has finished for PR 24232 at commit 1bf2654.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 2, 2019

Test build #111669 has finished for PR 24232 at commit 64c0f87.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Oct 2, 2019

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Oct 2, 2019

Test build #111680 has finished for PR 24232 at commit 64c0f87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Oct 2, 2019

Thanks! merging to master.

@HyukjinKwon
Copy link
Member

+1 !

nvander1 added a commit to MrPowers/spark-daria that referenced this pull request Oct 3, 2019
This reverts commit 1e78335.

This was merged upstream in spark: apache/spark#24232
ueshin pushed a commit that referenced this pull request Oct 3, 2019
… functions object

### What changes were proposed in this pull request?
Add an overload for the higher order function `filter` that takes array index as its second argument to `org.apache.spark.sql.functions`.

### Why are the changes needed?
See: SPARK-28962 and SPARK-27297. Specifically ueshin pointing out the discrepency here: #24232 (comment)

### Does this PR introduce any user-facing change?

### How was this patch tested?
Updated the these test suites:

`test.org.apache.spark.sql.JavaHigherOrderFunctionsSuite`
and
`org.apache.spark.sql.DataFrameFunctionsSuite`

Closes #26007 from nvander1/add_index_overload_for_filter.

Authored-by: Nik Vanderhoof <nikolasrvanderhoof@gmail.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
10 participants