Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31999][SQL] Add REFRESH FUNCTION command #28840

Closed
wants to merge 46 commits into from

Conversation

ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Jun 16, 2020

What changes were proposed in this pull request?

In Hive mode, permanent functions are shared with Hive metastore so that functions may be modified by other Hive client. With in long-lived spark scene, it's hard to update the change of function.

Here are 2 reasons:

  • Spark cache the function in memory using FunctionRegistry.
  • User may not know the location or classname of udf when using replace function.

Note that we use v2 command code path to add new command.

Why are the changes needed?

Give a easy way to make spark function registry sync with Hive metastore.
Then we can call

refresh function functionName

Does this PR introduce any user-facing change?

Yes, new command.

How was this patch tested?

New UT.

@SparkQA
Copy link

SparkQA commented Jun 16, 2020

Test build #124115 has finished for PR 28840 at commit 69a47a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class RefreshFunctionStatement(
  • case class RefreshFunctionCommand(

@ulysses-you
Copy link
Contributor Author

@cloud-fan @maropu @HyukjinKwon thanks for review

@maropu
Copy link
Member

maropu commented Jun 17, 2020

Hive supports this feature? Anyway, please update the SQL doc, too?

@ulysses-you
Copy link
Contributor Author

Hive support reload functions that reload all function.

refresh function just like refresh table, invalid cache for one function.

### Description

`REFRESH FUNCTION` statement invalidates the cached entries, which include class name
and resource location of the given function. The invalidated cache is populated right now.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little difference with refresh table, it's light to populate function cache right now.

limitations under the License.
---

### Description
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124145 has finished for PR 28840 at commit 3fc807e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

### Description

`REFRESH FUNCTION` statement invalidates the cached entries, which include class name
and resource location of the given function. The invalidated cache is populated right now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is populated right now -> is populated right away?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to add a little more detail, something like refresh function only works for permanent function, refresh native function or temporary function will cause Exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, update this later.


### Description

`REFRESH FUNCTION` statement invalidates the cached entries, which include class name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cached entries -> the cached function entry

/**
* REFRESH FUNCTION statement, as parsed from SQL
*/
case class RefreshFunctionStatement(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a new command, can we follow CommentOnTable and use the new command framework?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will move it later.

/**
* The logical plan of the REFRESH FUNCTION command that works for v2 catalogs.
*/
case class RefreshFunction(func: Seq[String]) extends Command
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a UnresolvedFunc, similar to UnresolvedTable?

The key point is to do the resolution in the analyzer, not at runtime in RefreshFunctionCommand.run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it.

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124164 has finished for PR 28840 at commit de54470.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class RefreshFunction(func: LogicalPlan) extends Command

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124161 has finished for PR 28840 at commit f677a4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

// TODO: move function related v2 statements to the new framework.
private def parseSessionCatalogFunctionIdentifier(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this method to LookupCatalog.CatalogAndFunctionIdentifier and drop the sql param.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR needs the change?

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124165 has finished for PR 28840 at commit 9e09875.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class RefreshFunction(func: Seq[String]) extends Command

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124168 has finished for PR 28840 at commit 9e9d5ce.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


`REFRESH FUNCTION` statement invalidates the cached function entry, which include class name
and resource location of the given function. The invalidated cache is populated right away.
Note that, refresh function only works for permanent function. Refresh native function or temporary function will cause exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the suggestion I gave you yesterday has a few grammar mistakes.

which include class name -> which includes the class name

Note that, refresh function only works for permanent function. -> Note that REFRESH FUNCTION only works for permanent functions.

Refresh native function or temporary function will cause exception. ->
Refreshing native functions or temporary functions will cause an exception.


* **function_identifier**

Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, use the current database.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the current database -> uses the current database

catalog.registerFunction(func, true)
} else if (catalog.isRegisteredFunction(identifier)) {
// clear cached function.
catalog.unregisterFunction(identifier, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does unregisterFunction need to take a boolean parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

@SparkQA
Copy link

SparkQA commented Jul 14, 2020

Test build #125839 has finished for PR 28840 at commit 711656d.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 14, 2020

Test build #125834 has finished for PR 28840 at commit a956144.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125915 has finished for PR 28840 at commit 711656d.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// register overwrite function.
val func = catalog.getFunctionMetadata(identifier)
catalog.registerFunction(func, true)
} else if (catalog.isRegisteredFunction(identifier)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can simplify it

... else {
  catalog.unregisterFunction(identifier)
}

unregisterFunction will fail if function is not registered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125969 has finished for PR 28840 at commit 5d4c152.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #126007 has finished for PR 28840 at commit 94fa132.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #126026 has finished for PR 28840 at commit fc4789f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126224 has finished for PR 28840 at commit fc4789f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126241 has finished for PR 28840 at commit e83194f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} else {
// clear cached function, if not exists throw exception
if (!catalog.unregisterFunction(identifier)) {
throw new NoSuchFunctionException(identifier.database.get, identifier.funcName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I may not make myself clear.

I mean to go back to your original proposal, which always throw an exception if the function doesn't exist in the metastore. That said, we should do

catalog.unregisterFunction(identifier)
throw new NoSuchFunctionException(identifier.database.get, identifier.funcName)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, get it!

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126249 has finished for PR 28840 at commit b18437c.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

All github actions pass, merging to master, thanks!

@cloud-fan cloud-fan closed this in 184074d Jul 22, 2020
@ulysses-you
Copy link
Contributor Author

@cloud-fan Thanks for merge ! Thanks all !


override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still can create persistent function with the same name as the built-in function. For example,

CREATE FUNCTION rand AS 'org.apache.spark.sql.catalyst.expressions.Abs'
DESC function default.rand

I think we should still allow this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems no meaning to refresh a persistent function whose name is same as a built-in function.

Yes, we can create a persistent function with the same name as the built-in function, but just create in metastore. The actual function we used is the built-in function. The reason is built-in functions are pre-cached in registry and we lookup cached function first.

e.g., CREATE FUNCTION rand AS 'xxx', DESC FUNCTION rand will always return Class: org.apache.spark.sql.catalyst.expressions.Rand.

BTW, maybe it's the reason why we create function and load it lazy that just be a Hive client, otherwise we can't create such function like rand,md5 in metastore. @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

CREATE FUNCTION rand AS 'xxx';
DESC FUNCTION default.rand;

I think this is similar to table and temp views. Spark will try to look up temp view first, so if the name conflicts, temp view is preferred. But users can still use a qualified table name to read the table explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

Missed qualified name case, I will fix this in followup.

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) {
throw new AnalysisException(s"Cannot refresh builtin function $functionName")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: built-in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

val func = FunctionIdentifier("func1", Some("default"))
sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
assert(!spark.sessionState.catalog.isRegisteredFunction(func))
sql("REFRESH FUNCTION func1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only positive test case. Could you think more and try to cover more cases?

cloud-fan pushed a commit that referenced this pull request Aug 20, 2020
### What changes were proposed in this pull request?

Address the [#comment](#28840 (comment)).

### Why are the changes needed?

Make code robust.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

ut.

Closes #29453 from ulysses-you/SPARK-31999-FOLLOWUP.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@ulysses-you ulysses-you deleted the SPARK-31999 branch March 11, 2021 10:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants