From af1dba7ca501fd9372b158793119163e3fcd1f24 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Wed, 26 May 2021 04:40:53 +0000 Subject: [PATCH] [SPARK-35440][SQL] Add function type to `ExpressionInfo` for UDF ### What changes were proposed in this pull request? Add the function type, such as "scala_udf", "python_udf", "java_udf", "hive", "built-in" to the `ExpressionInfo` for UDF. ### Why are the changes needed? Make the `ExpressionInfo` of UDF more meaningful ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing and newly added UT Closes #32587 from linhongliu-db/udf-language. Authored-by: Linhong Liu Signed-off-by: Wenchen Fan --- .../expressions/ExpressionDescription.java | 11 +- .../catalyst/expressions/ExpressionInfo.java | 24 +++- .../catalyst/analysis/FunctionRegistry.scala | 26 ++++- .../sql/catalyst/catalog/SessionCatalog.scala | 23 +++- .../expressions/complexTypeCreator.scala | 3 +- .../apache/spark/sql/UDFRegistration.scala | 104 +++++++++--------- .../org/apache/spark/sql/JavaUDFSuite.java | 11 ++ .../spark/sql/GeneratorFunctionSuite.scala | 2 +- .../sql/SparkSessionExtensionSuite.scala | 9 +- .../sql/expressions/ExpressionInfoSuite.scala | 45 +++++++- .../org/apache/spark/sql/hive/UDFSuite.scala | 10 ++ 11 files changed, 191 insertions(+), 77 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionDescription.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionDescription.java index 579f4b387b3f9..e9ccf2b56f82e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionDescription.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionDescription.java @@ -31,7 +31,7 @@ * `usage()` will be used for the function usage in brief way. * * These below are concatenated and used for the function usage in verbose way, suppose arguments, - * examples, note, group, since and deprecated will be provided. + * examples, note, group, source, since and deprecated will be provided. * * `arguments()` describes arguments for the expression. * @@ -42,14 +42,17 @@ * `group()` describes the category that the expression belongs to. The valid value is * "agg_funcs", "array_funcs", "datetime_funcs", "json_funcs", "map_funcs" and "window_funcs". * + * `source()` describe the source of the function. The valid value is "built-in", "hive", + * "python_udf", "scala_udf", "java_udf". + * * `since()` contains version information for the expression. Version is specified by, * for example, "2.2.0". * * `deprecated()` contains deprecation information for the expression optionally, for example, * "Deprecated since 2.2.0. Use something else instead". * - * The format, in particular for `arguments()`, `examples()`,`note()`, `group()`, `since()` and - * `deprecated()`, should strictly be as follows. + * The format, in particular for `arguments()`, `examples()`,`note()`, `group()`, `source()`, + * `since()` and `deprecated()`, should strictly be as follows. * *
  * @ExpressionDescription(
@@ -72,6 +75,7 @@
  *     ...
  *   """,
  *   group = "agg_funcs",
+ *   source = "built-in",
  *   since = "3.0.0",
  *   deprecated = """
  *     ...
@@ -112,4 +116,5 @@
     String group() default "";
     String since() default "";
     String deprecated() default "";
+    String source() default "built-in";
 }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
index 70064b6ee9585..9ed764a348503 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
@@ -38,6 +38,7 @@ public class ExpressionInfo {
     private String group;
     private String since;
     private String deprecated;
+    private String source;
 
     private static final Set validGroups =
         new HashSet<>(Arrays.asList("agg_funcs", "array_funcs", "binary_funcs", "bitwise_funcs",
@@ -46,6 +47,9 @@ public class ExpressionInfo {
             "lambda_funcs", "map_funcs", "math_funcs", "misc_funcs", "string_funcs", "struct_funcs",
             "window_funcs", "xml_funcs", "table_funcs"));
 
+    private static final Set validSources =
+            new HashSet<>(Arrays.asList("built-in", "hive", "python_udf", "scala_udf", "java_udf"));
+
     public String getClassName() {
         return className;
     }
@@ -95,6 +99,10 @@ public String getDb() {
         return db;
     }
 
+    public String getSource() {
+        return source;
+    }
+
     public ExpressionInfo(
             String className,
             String db,
@@ -105,7 +113,8 @@ public ExpressionInfo(
             String note,
             String group,
             String since,
-            String deprecated) {
+            String deprecated,
+            String source) {
         assert name != null;
         assert arguments != null;
         assert examples != null;
@@ -114,6 +123,7 @@ public ExpressionInfo(
         assert group != null;
         assert since != null;
         assert deprecated != null;
+        assert source != null;
 
         this.className = className;
         this.db = db;
@@ -125,6 +135,7 @@ public ExpressionInfo(
         this.group = group;
         this.since = since;
         this.deprecated = deprecated;
+        this.source = source;
 
         // Make the extended description.
         this.extended = arguments + examples;
@@ -144,6 +155,11 @@ public ExpressionInfo(
                 this.name + "]. It should be a value in " + validGroups + "; however, " +
                 "got [" + group + "].");
         }
+        if (!source.isEmpty() && !validSources.contains(source)) {
+            throw new IllegalArgumentException("'source' is malformed in the expression [" +
+                    this.name + "]. It should be a value in " + validSources + "; however, " +
+                    "got [" + source + "].");
+        }
         if (!since.isEmpty()) {
             if (Integer.parseInt(since.split("\\.")[0]) < 0) {
                 throw new IllegalArgumentException("'since' is malformed in the expression [" +
@@ -164,11 +180,11 @@ public ExpressionInfo(
     }
 
     public ExpressionInfo(String className, String name) {
-        this(className, null, name, null, "", "", "", "", "", "");
+        this(className, null, name, null, "", "", "", "", "", "", "");
     }
 
     public ExpressionInfo(String className, String db, String name) {
-        this(className, db, name, null, "", "", "", "", "", "");
+        this(className, db, name, null, "", "", "", "", "", "", "");
     }
 
     /**
@@ -179,7 +195,7 @@ public ExpressionInfo(String className, String db, String name) {
     public ExpressionInfo(String className, String db, String name, String usage, String extended) {
         // `arguments` and `examples` are concatenated for the extended description. So, here
         // simply pass the `extended` as `arguments` and an empty string for `examples`.
-        this(className, db, name, usage, extended, "", "", "", "", "");
+        this(className, db, name, usage, extended, "", "", "", "", "", "");
     }
 
     private String replaceFunctionName(String usage) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 4cc87493b5a25..ff5ede86da8ad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -47,9 +47,20 @@ trait FunctionRegistryBase[T] {
 
   type FunctionBuilder = Seq[Expression] => T
 
-  final def registerFunction(name: FunctionIdentifier, builder: FunctionBuilder): Unit = {
+  final def registerFunction(
+      name: FunctionIdentifier, builder: FunctionBuilder, source: String): Unit = {
     val info = new ExpressionInfo(
-      builder.getClass.getCanonicalName, name.database.orNull, name.funcName)
+      builder.getClass.getCanonicalName,
+      name.database.orNull,
+      name.funcName,
+      null,
+      "",
+      "",
+      "",
+      "",
+      "",
+      "",
+      source)
     registerFunction(name, info, builder)
   }
 
@@ -59,10 +70,12 @@ trait FunctionRegistryBase[T] {
     builder: FunctionBuilder): Unit
 
   /* Create or replace a temporary function. */
-  final def createOrReplaceTempFunction(name: String, builder: FunctionBuilder): Unit = {
+  final def createOrReplaceTempFunction(
+      name: String, builder: FunctionBuilder, source: String): Unit = {
     registerFunction(
       FunctionIdentifier(name),
-      builder)
+      builder,
+      source)
   }
 
   @throws[AnalysisException]("If function does not exist")
@@ -157,7 +170,8 @@ object FunctionRegistryBase {
           df.note(),
           df.group(),
           df.since(),
-          df.deprecated())
+          df.deprecated(),
+          df.source())
       } else {
         // This exists for the backward compatibility with old `ExpressionDescription`s defining
         // the extended description in `extended()`.
@@ -721,7 +735,7 @@ object FunctionRegistry {
     val usage = "_FUNC_(expr) - Casts the value `expr` to the target data type `_FUNC_`."
     val expressionInfo =
       new ExpressionInfo(clazz.getCanonicalName, null, name, usage, "", "", "",
-        "conversion_funcs", "2.0.1", "")
+        "conversion_funcs", "2.0.1", "", "built-in")
     (name, (expressionInfo, builder))
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0813d41af1617..ca11891b0af3b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1461,7 +1461,18 @@ class SessionCatalog(
     if (functionRegistry.functionExists(func) && !overrideIfExists) {
       throw QueryCompilationErrors.functionAlreadyExistsError(func)
     }
-    val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
+    val info = new ExpressionInfo(
+      funcDefinition.className,
+      func.database.orNull,
+      func.funcName,
+      null,
+      "",
+      "",
+      "",
+      "",
+      "",
+      "",
+      "hive")
     val builder =
       functionBuilder.getOrElse {
         val className = funcDefinition.className
@@ -1552,7 +1563,15 @@ class SessionCatalog(
           new ExpressionInfo(
             metadata.className,
             qualifiedName.database.orNull,
-            qualifiedName.identifier)
+            qualifiedName.identifier,
+            null,
+            "",
+            "",
+            "",
+            "",
+            "",
+            "",
+            "hive")
         } else {
           failFunctionLookup(name)
         }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index 76974c8229b2e..1c8a41e9c7be6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -394,7 +394,8 @@ object CreateStruct {
       "",
       "struct_funcs",
       "1.4.0",
-      "")
+      "",
+      "built-in")
     ("struct", (info, this.create))
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 4c165680d428b..0a31433416290 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -65,7 +65,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
         | udfDeterministic: ${udf.udfDeterministic}
       """.stripMargin)
 
-    functionRegistry.createOrReplaceTempFunction(name, udf.builder)
+    functionRegistry.createOrReplaceTempFunction(name, udf.builder, "python_udf")
   }
 
   /**
@@ -83,7 +83,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
     " via the functions.udaf(agg) method.", "3.0.0")
   def register(name: String, udaf: UserDefinedAggregateFunction): UserDefinedAggregateFunction = {
     def builder(children: Seq[Expression]) = ScalaUDAF(children, udaf, udafName = Some(name))
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     udaf
   }
 
@@ -112,11 +112,11 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
     udf.withName(name) match {
       case udaf: UserDefinedAggregator[_, _, _] =>
         def builder(children: Seq[Expression]) = udaf.scalaAggregator(children)
-        functionRegistry.createOrReplaceTempFunction(name, builder)
+        functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
         udaf
       case other =>
         def builder(children: Seq[Expression]) = other.apply(children.map(Column.apply) : _*).expr
-        functionRegistry.createOrReplaceTempFunction(name, builder)
+        functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
         other
     }
   }
@@ -147,7 +147,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
         |    throw new AnalysisException("Invalid number of arguments for function " + name +
         |      ". Expected: $x; Found: " + e.length)
         |  }
-        |  functionRegistry.createOrReplaceTempFunction(name, builder)
+        |  functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
         |  finalUdf
         |}""".stripMargin)
     }
@@ -173,7 +173,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
         |    throw new AnalysisException("Invalid number of arguments for function " + name +
         |      ". Expected: $i; Found: " + e.length)
         |  }
-        |  functionRegistry.createOrReplaceTempFunction(name, builder)
+        |  functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
         |}""".stripMargin)
     }
     */
@@ -195,7 +195,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 0; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -216,7 +216,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 1; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -237,7 +237,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 2; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -258,7 +258,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 3; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -279,7 +279,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 4; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -300,7 +300,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 5; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -321,7 +321,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 6; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -342,7 +342,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 7; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -363,7 +363,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 8; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -384,7 +384,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 9; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -405,7 +405,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 10; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -426,7 +426,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 11; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -447,7 +447,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 12; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -468,7 +468,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 13; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -489,7 +489,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 14; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -510,7 +510,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 15; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -531,7 +531,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 16; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -552,7 +552,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 17; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -573,7 +573,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 18; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -594,7 +594,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 19; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -615,7 +615,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 20; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -636,7 +636,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 21; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -657,7 +657,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 22; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "scala_udf")
     finalUdf
   }
 
@@ -765,7 +765,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 0; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -781,7 +781,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 1; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -797,7 +797,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 2; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -813,7 +813,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 3; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -829,7 +829,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 4; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -845,7 +845,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 5; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -861,7 +861,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 6; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -877,7 +877,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 7; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -893,7 +893,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 8; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -909,7 +909,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 9; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -925,7 +925,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 10; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -941,7 +941,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 11; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -957,7 +957,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 12; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -973,7 +973,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 13; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -989,7 +989,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 14; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -1005,7 +1005,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 15; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -1021,7 +1021,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 16; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -1037,7 +1037,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 17; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -1053,7 +1053,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 18; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -1069,7 +1069,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 19; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -1085,7 +1085,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 20; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -1101,7 +1101,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 21; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   /**
@@ -1117,7 +1117,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 22; Found: " + e.length)
     }
-    functionRegistry.createOrReplaceTempFunction(name, builder)
+    functionRegistry.createOrReplaceTempFunction(name, builder, "java_udf")
   }
 
   // scalastyle:on line.size.limit
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
index 9af5023acf391..7e938ca88d8b9 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
@@ -21,6 +21,8 @@
 import java.time.LocalDate;
 import java.util.List;
 
+import org.apache.spark.sql.catalyst.FunctionIdentifier;
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo;
 import org.apache.spark.sql.internal.SQLConf;
 import org.junit.After;
 import org.junit.Assert;
@@ -139,4 +141,13 @@ public void udf7Test() {
       spark.conf().set(SQLConf.DATETIME_JAVA8API_ENABLED().key(), originConf);
     }
   }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void sourceTest() {
+    spark.udf().register("stringLengthTest", (String str) -> str.length(), DataTypes.IntegerType);
+    ExpressionInfo info = spark.sessionState().catalog().lookupFunctionInfo(
+            FunctionIdentifier.apply("stringLengthTest"));
+    Assert.assertEquals("java_udf", info.getSource());
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 311bc52515827..d5c2d93055ba1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -305,7 +305,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
 
   test("outer generator()") {
     spark.sessionState.functionRegistry
-      .createOrReplaceTempFunction("empty_gen", _ => EmptyGenerator())
+      .createOrReplaceTempFunction("empty_gen", _ => EmptyGenerator(), "scala_udf")
     checkAnswer(
       sql("select * from values 1, 2 lateral view outer empty_gen() a as b"),
       Row(1, null) :: Row(2, null) :: Nil)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index c8768ec2c5af1..ff0855310004a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -424,7 +424,8 @@ object MyExtensions {
       "3.0.0",
       """
        deprecated
-      """),
+      """,
+      ""),
     (_: Seq[Expression]) => Literal(5, IntegerType))
 }
 
@@ -932,7 +933,8 @@ object MyExtensions2 {
       "3.0.0",
       """
        deprecated
-      """),
+      """,
+      ""),
     (_: Seq[Expression]) => Literal(5, IntegerType))
 }
 
@@ -965,7 +967,8 @@ object MyExtensions2Duplicate {
       "3.0.0",
       """
        deprecated
-      """),
+      """,
+      ""),
     (_: Seq[Expression]) => Literal(5, IntegerType))
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index 438fd2351ab9f..7e3c091e99703 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -51,33 +51,52 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
     Seq("agg_funcs", "array_funcs", "datetime_funcs", "json_funcs", "map_funcs", "window_funcs")
         .foreach { groupName =>
       val info = new ExpressionInfo(
-        "testClass", null, "testName", null, "", "", "", groupName, "", "")
+        "testClass", null, "testName", null, "", "", "", groupName, "", "", "")
       assert(info.getGroup === groupName)
     }
 
     val errMsg = intercept[IllegalArgumentException] {
       val invalidGroupName = "invalid_group_funcs"
-      new ExpressionInfo("testClass", null, "testName", null, "", "", "", invalidGroupName, "", "")
+      new ExpressionInfo(
+        "testClass", null, "testName", null, "", "", "", invalidGroupName, "", "", "")
     }.getMessage
     assert(errMsg.contains("'group' is malformed in the expression [testName]."))
   }
 
+  test("source in ExpressionInfo") {
+    val info = spark.sessionState.catalog.lookupFunctionInfo(FunctionIdentifier("sum"))
+    assert(info.getSource === "built-in")
+
+    Seq("python_udf", "java_udf", "scala_udf", "built-in", "hive").foreach { source =>
+      val info = new ExpressionInfo(
+        "testClass", null, "testName", null, "", "", "", "", "", "", source)
+      assert(info.getSource === source)
+    }
+    val errMsg = intercept[IllegalArgumentException] {
+      val invalidSource = "invalid_source"
+      new ExpressionInfo(
+        "testClass", null, "testName", null, "", "", "", "", "", "", invalidSource)
+    }.getMessage
+    assert(errMsg.contains("'source' is malformed in the expression [testName]."))
+  }
+
   test("error handling in ExpressionInfo") {
     val errMsg1 = intercept[IllegalArgumentException] {
       val invalidNote = "  invalid note"
-      new ExpressionInfo("testClass", null, "testName", null, "", "", invalidNote, "", "", "")
+      new ExpressionInfo("testClass", null, "testName", null, "", "", invalidNote, "", "", "", "")
     }.getMessage
     assert(errMsg1.contains("'note' is malformed in the expression [testName]."))
 
     val errMsg2 = intercept[IllegalArgumentException] {
       val invalidSince = "-3.0.0"
-      new ExpressionInfo("testClass", null, "testName", null, "", "", "", "", invalidSince, "")
+      new ExpressionInfo("testClass", null, "testName", null, "", "", "", "", invalidSince, "", "")
     }.getMessage
     assert(errMsg2.contains("'since' is malformed in the expression [testName]."))
 
     val errMsg3 = intercept[IllegalArgumentException] {
       val invalidDeprecated = "  invalid deprecated"
-      new ExpressionInfo("testClass", null, "testName", null, "", "", "", "", "", invalidDeprecated)
+      new ExpressionInfo(
+        "testClass", null, "testName", null, "", "", "", "", "", invalidDeprecated, "")
     }.getMessage
     assert(errMsg3.contains("'deprecated' is malformed in the expression [testName]."))
   }
@@ -239,4 +258,20 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
       }
     }
   }
+
+  test("Check source for different kind of UDFs") {
+    import org.apache.spark.sql.IntegratedUDFTestUtils
+    val catalog = spark.sessionState.catalog
+    assert(catalog.lookupFunctionInfo(FunctionIdentifier("sum")).getSource === "built-in")
+
+    val scalaUDF = IntegratedUDFTestUtils.TestScalaUDF("scalaUDF")
+    IntegratedUDFTestUtils.registerTestUDF(scalaUDF, spark)
+    val scalaInfo = catalog.lookupFunctionInfo(FunctionIdentifier(scalaUDF.name))
+    assert(scalaInfo.getSource === "scala_udf")
+
+    val pythonUDF = IntegratedUDFTestUtils.TestPythonUDF("pythonUDF")
+    IntegratedUDFTestUtils.registerTestUDF(pythonUDF, spark)
+    val pythonInfo = catalog.lookupFunctionInfo(FunctionIdentifier(pythonUDF.name))
+    assert(pythonInfo.getSource === "python_udf")
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
index d567128e1a322..e79e2d396b66f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
 
@@ -209,4 +210,13 @@ class UDFSuite
       assert(e.getMessage.contains("Can not get an evaluator of the empty UDAF"))
     }
   }
+
+  test("check source for hive UDF") {
+    withUserDefinedFunction(functionName -> false) {
+      sql(s"CREATE FUNCTION $functionName AS '$functionClass'")
+      val info = spark.sessionState.catalog.lookupFunctionInfo(
+        FunctionIdentifier(functionName, Some("default")))
+      assert(info.getSource == "hive")
+    }
+  }
 }