Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43995][SPARK-43996][CONNECT] Add support for UDFRegistration to the Connect Scala Client #41953

Closed
wants to merge 9 commits into from

Conversation

vicennial
Copy link
Contributor

@vicennial vicennial commented Jul 12, 2023

What changes were proposed in this pull request?

This PR adds support to register a scala UDF from the scala/jvm client.

The following APIs are implemented in UDFRegistration:

  • def register(name: String, udf: UserDefinedFunction): UserDefinedFunction
  • def register[RT: TypeTag, A1: TypeTag ...](name: String, func: (A1, ...) => RT): UserDefinedFunction for 0 to 22 arguments.

The following API is implemented in functions:

  • def call_udf(udfName: String, cols: Column*): Column

Note: This PR is stacked on #41959.

Why are the changes needed?

To reach parity with classic Spark.

Does this PR introduce any user-facing change?

Yes. spark.udf.register() is added as shown below:

class A(x: Int) { def get = x * 100 }
val myUdf = udf((x: Int) => new A(x).get)
spark.udf.register("dummyUdf", myUdf)
spark.sql("select dummyUdf(id) from range(5)").as[Long].collect()

The output:

Array[Long] = Array(0L, 100L, 200L, 300L, 400L)

How was this patch tested?

New tests in ReplE2ESuite.

@vicennial vicennial marked this pull request as ready for review July 12, 2023 16:39
@vicennial vicennial changed the title [WIP][SPARK-43995][CONNECT] Add support for UDFRegistration for the Connect Scala Client [SPARK-43995][CONNECT] Add support for UDFRegistration for the Connect Scala Client Jul 12, 2023
@vicennial vicennial changed the title [SPARK-43995][CONNECT] Add support for UDFRegistration for the Connect Scala Client [SPARK-43995][CONNECT] Add support for UDFRegistration to the Connect Scala Client Jul 12, 2023
Copy link
Contributor

@zhenlineo zhenlineo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also modify the client's compatibility tests to include the two new classes you added?

@vicennial vicennial marked this pull request as draft July 12, 2023 22:34
@vicennial vicennial marked this pull request as draft July 12, 2023 22:34
@vicennial vicennial changed the title [SPARK-43995][CONNECT] Add support for UDFRegistration to the Connect Scala Client [SPARK-43995][SPARK-43996][CONNECT] Add support for UDFRegistration to the Connect Scala Client Jul 13, 2023
@vicennial vicennial marked this pull request as ready for review July 13, 2023 07:44
@vicennial
Copy link
Contributor Author

cc @hvanhovell @HyukjinKwon

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a cursory look, and seems fine. cc @xinrong-meng too who worked about here.

@HyukjinKwon
Copy link
Member

Merged to master.

asl3 pushed a commit to asl3/spark that referenced this pull request Jul 17, 2023
…o the Connect Scala Client

### What changes were proposed in this pull request?

This PR adds support to register a scala UDF from the scala/jvm client.

The following APIs are implemented in `UDFRegistration`:

- `def register(name: String, udf: UserDefinedFunction): UserDefinedFunction`
- `def register[RT: TypeTag, A1: TypeTag ...](name: String, func: (A1, ...) => RT): UserDefinedFunction` for 0 to 22 arguments.

The following API is implemented in `functions`:

- `def call_udf(udfName: String, cols: Column*): Column`

Note: This PR is stacked on apache#41959.
### Why are the changes needed?

To reach parity with classic Spark.

### Does this PR introduce _any_ user-facing change?

Yes. spark.udf.register() is added as shown below:
```scala
class A(x: Int) { def get = x * 100 }
val myUdf = udf((x: Int) => new A(x).get)
spark.udf.register("dummyUdf", myUdf)
spark.sql("select dummyUdf(id) from range(5)").as[Long].collect()
```
The output:
```scala
Array[Long] = Array(0L, 100L, 200L, 300L, 400L)
````

### How was this patch tested?

New tests in `ReplE2ESuite`.

Closes apache#41953 from vicennial/SPARK-43995.

Authored-by: vicennial <venkata.gudesa@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…o the Connect Scala Client

### What changes were proposed in this pull request?

This PR adds support to register a scala UDF from the scala/jvm client.

The following APIs are implemented in `UDFRegistration`:

- `def register(name: String, udf: UserDefinedFunction): UserDefinedFunction`
- `def register[RT: TypeTag, A1: TypeTag ...](name: String, func: (A1, ...) => RT): UserDefinedFunction` for 0 to 22 arguments.

The following API is implemented in `functions`:

- `def call_udf(udfName: String, cols: Column*): Column`

Note: This PR is stacked on apache#41959.
### Why are the changes needed?

To reach parity with classic Spark.

### Does this PR introduce _any_ user-facing change?

Yes. spark.udf.register() is added as shown below:
```scala
class A(x: Int) { def get = x * 100 }
val myUdf = udf((x: Int) => new A(x).get)
spark.udf.register("dummyUdf", myUdf)
spark.sql("select dummyUdf(id) from range(5)").as[Long].collect()
```
The output:
```scala
Array[Long] = Array(0L, 100L, 200L, 300L, 400L)
````

### How was this patch tested?

New tests in `ReplE2ESuite`.

Closes apache#41953 from vicennial/SPARK-43995.

Authored-by: vicennial <venkata.gudesa@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants