-
Notifications
You must be signed in to change notification settings - Fork 28k
/
functions.scala
283 lines (255 loc) · 10.4 KB
/
functions.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import java.util.Locale
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* The DDL command that creates a function.
* To create a temporary function, the syntax of using this command in SQL is:
* {{{
* CREATE [OR REPLACE] TEMPORARY FUNCTION functionName
* AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
* }}}
*
* To create a permanent function, the syntax in SQL is:
* {{{
* CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] [databaseName.]functionName
* AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
* }}}
*
* @param ignoreIfExists: When true, ignore if the function with the specified name exists
* in the specified database.
* @param replace: When true, alter the function with the specified name
*/
case class CreateFunctionCommand(
databaseName: Option[String],
functionName: String,
className: String,
resources: Seq[FunctionResource],
isTemp: Boolean,
ignoreIfExists: Boolean,
replace: Boolean)
extends RunnableCommand {
if (ignoreIfExists && replace) {
throw new AnalysisException("CREATE FUNCTION with both IF NOT EXISTS and REPLACE" +
" is not allowed.")
}
// Disallow to define a temporary function with `IF NOT EXISTS`
if (ignoreIfExists && isTemp) {
throw new AnalysisException(
"It is not allowed to define a TEMPORARY function with IF NOT EXISTS.")
}
// Temporary function names should not contain database prefix like "database.function"
if (databaseName.isDefined && isTemp) {
throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
s"is not allowed: '${databaseName.get}'")
}
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
if (isTemp) {
// We first load resources and then put the builder in the function registry.
catalog.loadFunctionResources(resources)
catalog.registerFunction(func, overrideIfExists = replace)
} else {
// Handles `CREATE OR REPLACE FUNCTION AS ... USING ...`
if (replace && catalog.functionExists(func.identifier)) {
// alter the function in the metastore
catalog.alterFunction(func)
} else {
// For a permanent, we will store the metadata into underlying external catalog.
// This function will be loaded into the FunctionRegistry when a query uses it.
// We do not load it into FunctionRegistry right now.
catalog.createFunction(func, ignoreIfExists)
}
}
Seq.empty[Row]
}
}
/**
* A command for users to get the usage of a registered function.
* The syntax of using this command in SQL is
* {{{
* DESCRIBE FUNCTION [EXTENDED] upper;
* }}}
*/
case class DescribeFunctionCommand(
functionName: FunctionIdentifier,
isExtended: Boolean) extends RunnableCommand {
override val output: Seq[Attribute] = {
val schema = StructType(StructField("function_desc", StringType, nullable = false) :: Nil)
schema.toAttributes
}
override def run(sparkSession: SparkSession): Seq[Row] = {
// Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions.
functionName.funcName.toLowerCase(Locale.ROOT) match {
case "<>" =>
Row(s"Function: $functionName") ::
Row("Usage: expr1 <> expr2 - " +
"Returns true if `expr1` is not equal to `expr2`.") :: Nil
case "!=" =>
Row(s"Function: $functionName") ::
Row("Usage: expr1 != expr2 - " +
"Returns true if `expr1` is not equal to `expr2`.") :: Nil
case "between" =>
Row("Function: between") ::
Row("Usage: expr1 [NOT] BETWEEN expr2 AND expr3 - " +
"evaluate if `expr1` is [not] in between `expr2` and `expr3`.") :: Nil
case "case" =>
Row("Function: case") ::
Row("Usage: CASE expr1 WHEN expr2 THEN expr3 " +
"[WHEN expr4 THEN expr5]* [ELSE expr6] END - " +
"When `expr1` = `expr2`, returns `expr3`; " +
"when `expr1` = `expr4`, return `expr5`; else return `expr6`.") :: Nil
case _ =>
try {
val info = sparkSession.sessionState.catalog.lookupFunctionInfo(functionName)
val name = if (info.getDb != null) info.getDb + "." + info.getName else info.getName
val result =
Row(s"Function: $name") ::
Row(s"Class: ${info.getClassName}") ::
Row(s"Usage: ${info.getUsage}") :: Nil
if (isExtended) {
result :+
Row(s"Extended Usage:${info.getExtended}")
} else {
result
}
} catch {
case _: NoSuchFunctionException => Seq(Row(s"Function: $functionName not found."))
}
}
}
}
/**
* The DDL command that drops a function.
* ifExists: returns an error if the function doesn't exist, unless this is true.
* isTemp: indicates if it is a temporary function.
*/
case class DropFunctionCommand(
databaseName: Option[String],
functionName: String,
ifExists: Boolean,
isTemp: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (isTemp) {
if (databaseName.isDefined) {
throw new AnalysisException(s"Specifying a database in DROP TEMPORARY FUNCTION " +
s"is not allowed: '${databaseName.get}'")
}
if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) {
throw new AnalysisException(s"Cannot drop native function '$functionName'")
}
catalog.dropTempFunction(functionName, ifExists)
} else {
// We are dropping a permanent function.
catalog.dropFunction(
FunctionIdentifier(functionName, databaseName),
ignoreIfNotExists = ifExists)
}
Seq.empty[Row]
}
}
/**
* A command for users to list all of the registered functions.
* The syntax of using this command in SQL is:
* {{{
* SHOW FUNCTIONS [LIKE pattern]
* }}}
* For the pattern, '*' matches any sequence of characters (including no characters) and
* '|' is for alternation.
* For example, "show functions like 'yea*|windo*'" will return "window" and "year".
*/
case class ShowFunctionsCommand(
db: Option[String],
pattern: Option[String],
showUserFunctions: Boolean,
showSystemFunctions: Boolean) extends RunnableCommand {
override val output: Seq[Attribute] = {
val schema = StructType(StructField("function", StringType, nullable = false) :: Nil)
schema.toAttributes
}
override def run(sparkSession: SparkSession): Seq[Row] = {
val dbName = db.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase)
// If pattern is not specified, we use '*', which is used to
// match any sequence of characters (including no characters).
val functionNames =
sparkSession.sessionState.catalog
.listFunctions(dbName, pattern.getOrElse("*"))
.collect {
case (f, "USER") if showUserFunctions => f.unquotedString
case (f, "SYSTEM") if showSystemFunctions => f.unquotedString
}
// Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions.
// "<>", "!=", "between", and "case" is SystemFunctions, only show when showSystemFunctions=true
if (showSystemFunctions) {
(functionNames ++
StringUtils.filterPattern(FunctionsCommand.virtualOperators, pattern.getOrElse("*")))
.sorted.map(Row(_))
} else {
functionNames.sorted.map(Row(_))
}
}
}
/**
* A command for users to refresh the persistent function.
* The syntax of using this command in SQL is:
* {{{
* REFRESH FUNCTION functionName
* }}}
*/
case class RefreshFunctionCommand(
databaseName: Option[String],
functionName: String)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) {
throw new AnalysisException(s"Cannot refresh builtin function $functionName")
}
if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) {
throw new AnalysisException(s"Cannot refresh temporary function $functionName")
}
val identifier = FunctionIdentifier(
functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase)))
// we only refresh the permanent function.
if (catalog.isPersistentFunction(identifier)) {
// register overwrite function.
val func = catalog.getFunctionMetadata(identifier)
catalog.registerFunction(func, true)
} else if (catalog.isRegisteredFunction(identifier)) {
// clear cached function.
catalog.unregisterFunction(identifier)
} else {
throw new NoSuchFunctionException(identifier.database.get, functionName)
}
Seq.empty[Row]
}
}
object FunctionsCommand {
// operators that do not have corresponding functions.
// They should be handled `DescribeFunctionCommand`, `ShowFunctionsCommand`
val virtualOperators = Seq("!=", "<>", "between", "case")
}