Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Support loading function as FunctionCatalog in SparkSessionCatalog #7153

Merged

Conversation

bowenliang123
Copy link
Contributor

@bowenliang123 bowenliang123 commented Mar 21, 2023

  • implement loadFunction in SparkSessionCatalog to support loading function from session catalog in Spark
  • loading function in the following order:
    1. load Iceberg built-in function in BaseCatalog (as it does now)
    2. then try to load function from session catalog (e.g. registered permanent UDF in Hive MetaStore), if the session catalog is an instance of FunctionCatalg

@pan3793
Copy link
Member

pan3793 commented Mar 21, 2023

This is a kind of bug/regression, it blocks users who use SparkSessionCatalog to use Hive UDF.

The fix LGTM, cc @RussellSpitzer @aokolnychyi

try {
return super.loadFunction(ident);
} catch (NoSuchFunctionException e) {
if (getSessionCatalog() instanceof FunctionCatalog) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is it possible for this not to be a FunctionCatalog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a guard condition before trying to cast the getSessionCatalog into FunctionCatalog. And loadFunction is method of Spark's FunctionCatalog.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SPARK-37731(apache/spark#35004, fixed in 3.3.0), V2SessionCatalog extends FunctionCatalog

class V2SessionCatalog(catalog: SessionCatalog)
-  extends TableCatalog with SupportsNamespaces with SQLConfHelper {
+  extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper {

https://github.com/apache/spark/pull/35004/files#diff-2d6f351fff8241ff1187b98a62e6c57ef3b55349658a9eb98056a14c51a9dc7cL40-R43

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah my main thought here is can this guard ever be false, I don't think it can but I don't have an issue with keeping the check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep the cheek for Spark 3.2, but skipped in Spark 3.3 as V2SessionCatalog extends FunctionCatalog .
Thanks for the hints from @pan3793 .

String catalogHmsUriKey = "spark.sql.catalog.spark_catalog.uri";
String hmsUri = hiveConf.get(METASTOREURIS.varname);

spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this is in two places maybe it makes sense to put this in a "beforeAll" method. Also when we do that we do that and extract this from the above test as well, we can do

spark.conf().set("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName)

If that fits on one line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I have tried to place it in one line, but spotless is forcing them into separate lines.

Common codes between testLoadFunction and testValidateHmsUri have been extracted to @BeforeClass, as you suggested.

spark.sql(createFuncSql);
Row[] rows = (Row[]) spark.sql("SELECT upper('xyz')").collect();
Assert.assertEquals("XYZ", rows[0].get(0));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there is value in this, but maybe we should also add a test that uses the Iceberg function with priority over a Spark function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking twice, an existed problem occurs, as Iceberg's SparkSessionCatalog is not able to load Iceberg's built-in functions correctly. Even without (or with) this PR, SELECT system.years(date('1970-01-01')) fails.

Undefined function: 'years'. This function is neither a registered temporary function nor a permanent function registered in the database 'system'.; line 1 pos 7
org.apache.spark.sql.AnalysisException: Undefined function: 'years'. This function is neither a registered temporary function nor a permanent function registered in the database 'system'.; line 1 pos 7
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.failFunctionLookup(SessionCatalog.scala:1561)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunctionInternal(SessionCatalog.scala:1704)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunction(SessionCatalog.scala:1673)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveFunctions$$resolveV1Function(Analyzer.scala:2168)

An it's not covered or fixed by this PR, since this PR is implementing methods of Spark's FunctionCatalog which correctly loads Iceberg's built-in functions (confirmed in debugging). But the problem comes with Spark's org.apache.spark.sql.catalyst.catalog.SessionCatalog#resolvePersistentFunction.

So I have to add a todo here in unit tests. And I will report an issue for this later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, good thing we checked.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks good to me, I just think we should clean up that test class now that there is a lot of common code between "validateHmsUri" and "testLoadFunction" and group their setup code together in a before method. Once that's cleaned up I think we are good to go.

Optionally add on another test for Iceberg functions being used instead of Session Functions in case of overlap.

String functionClass = "org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper";

// load permanent UDF in Hive via FunctionCatalog
spark.sql(String.format("CREATE FUNCTION upper AS '%s'", functionClass));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark has a build-in function also named upper. I think creating a existing function will not invoke SparkSessionCatalog#loadFunction, can you confirm that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the function name used here to perm_upper. And it's confirmed that SparkSessionCatalog#loadFunction is called.

@bowenliang123 bowenliang123 changed the title Spark: Support loading function from session catalog in SparkSessionCatalog Spark: Support loading function via FunctionCatalog in SparkSessionCatalog Mar 22, 2023
public void testLoadFunction() {
spark.sessionState().catalogManager().reset();
spark.conf().set(envHmsUriKey, hmsUri);
spark.conf().set(catalogHmsUriKey, hmsUri);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything above here can also go in the "before" method and if you make it a "beforeEach" you won't have to do a reset in this method either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, extracted to org.junit.Before annotated method, which will be run before each test. Btw, org.junit.jupiter.api.BeforeEach doesn't help .

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few more lines of code can be refactored into the "beforeEach" test method.

@bowenliang123
Copy link
Contributor Author

Just a few more lines of code can be refactored into the "beforeEach" test method.

Thx. Addressed.

Copy link
Contributor

@zhongyujiang zhongyujiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@bowenliang123 bowenliang123 changed the title Spark: Support loading function via FunctionCatalog in SparkSessionCatalog Spark: Support loading function as FunctionCatalog in SparkSessionCatalog Mar 24, 2023
@@ -50,8 +53,8 @@
*
* @param <T> CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces.
*/
public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces> extends BaseCatalog
implements CatalogExtension {
public class SparkSessionCatalog<T extends TableCatalog & FunctionCatalog & SupportsNamespaces>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guard is required in setDelegateCatalog

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the doc should also be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, both suggestions are addressed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed? BaseCatalog Supports FunctionCatalog already

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here makes the class T extends FunctionCatalog, and it is for the delegated session catalog (set in Spark's CatalogManager#loadV2SessionCatalog via CatalogPlugin#setDelegateCatalog), but not for the Iceberg's SparkSessionCatalog itself. This PR is mainly purposed for loading functions from the delegated session catalog, therefore, forcing to check it to be an instance of (or as) FunctionCatalog is necessary.

BaseSessionStateBuilder.scala (https://github.com/apache/spark/blob/v3.3.2/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala#L168) use the v2SessionCatalog as delegated catalog for CatalogManager. And as discussed above the V2SessionCatalog always extends FunctionCatalog since Spark 3.3.0.

  protected lazy val catalogManager = new CatalogManager(v2SessionCatalog, catalog)

@RussellSpitzer RussellSpitzer merged commit 07b0a15 into apache:master Mar 26, 2023
@RussellSpitzer
Copy link
Member

Merged, Thanks @bowenliang123 for your PR and thanks @pan3793 and @zhongyujiang for reviews!

@bowenliang123 bowenliang123 deleted the session-catalog-function branch March 26, 2023 02:29
@bowenliang123
Copy link
Contributor Author

bowenliang123 commented Mar 26, 2023

Thanks to @RussellSpitzer , and reviews from @pan3793 and @zhongyujiang .

And here is some more clarification to the load Iceberg built-in function in BaseCatalog feature listed in the description.

Facts

  1. it is not able to use Iceberg's built-in function in Spark session catalog (like SELECT * FROM system.years(SELECT system.years(date('1970-01-01')))), BEFORE or AFTER this PR
  2. this PR does load functions from Iceberg's SparkFunctions via BaseCatalog as Function Catalog (confirmed in debugging), but it doesn't reolve the problem in 1.
  3. the problem comes with Spark's Analyzer, which forces to do resolveV1Function with v1SessionCatalog instance (but NOT the v2 session catalog !) when using the spark_catalog session catalog. More importantly, the v1SessionCatalog used is initialized in Spark's CatalogManager inside (as private HiveSessionCatalog), and it seems not able to inject or register Iceberg's functions in Iceberg's SparkSessionCatalog CatalogPlugin.

https://github.com/apache/spark/blob/v3.3.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#LL2114-L2123

case u @ UnresolvedFunction(nameParts, arguments, _, _, _) => withPosition(u) {
	resolveBuiltinOrTempFunction(nameParts, arguments, Some(u)).getOrElse {
	  val CatalogAndIdentifier(catalog, ident) = expandIdentifier(nameParts)
	  if (CatalogV2Util.isSessionCatalog(catalog)) {
	    resolveV1Function(ident.asFunctionIdentifier, arguments, u)
	  } else {
	    resolveV2Function(catalog.asFunctionCatalog, ident, arguments, u)
	  }
	}
}

@jiamin13579
Copy link
Contributor

Merged, Thanks @bowenliang123 for your PR and thanks @pan3793 and @zhongyujiang for reviews!

hello,How long will the release be?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants