In [6]:
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.udf

# sql.function.udf注册的自定义udf 不能在sql里面用，sql和dataframe注册的udf互不通用

In [13]:
def isAdult(age: Int) = {
    if (age < 18) {
      false
    } 
    else {
      true
    }
}

isAdult: (age: Int)Boolean


In [2]:
val userData = Array(("Leo", 16), ("Marry", 21), 
                     ("Jack", 14), ("Tom", 18))
//创建测试df
val userDF = sc.parallelize(userData).toDF("name", "age")
// 注册一张user表
//userDF.registerTempTable("user")
userDF.createOrReplaceTempView("user")

userData = Array((Leo,16), (Marry,21), (Jack,14), (Tom,18))
userDF = [name: string, age: int]




[name: string, age: int]

# sparksql udf 
老版本就用sqlContext.udf

In [7]:
//注册自定义函数（通过匿名函数）
spark.udf.register("strLen", (str: String) => str.length())
//注册自定义函数（通过实名函数）
spark.udf.register("isAdult", isAdult _)
spark.sql("select *,strLen(name) as name_len,isAdult(age) as isAdult from user").show


+-----+---+--------+-------+
| name|age|name_len|isAdult|
+-----+---+--------+-------+
|  Leo| 16|       3|  false|
|Marry| 21|       5|   true|
| Jack| 14|       4|  false|
|  Tom| 18|       3|   true|
+-----+---+--------+-------+



# spark DF udf

In [38]:
// 这里的udf来自于spark sql.function.udf
//注册自定义函数（通过匿名函数）
val strLen = udf((str: String) => str.length())
//注册自定义函数（通过实名函数）
val udf_isAdult = udf(isAdult _)
//userDF.withColumn("name_len", strLen(col("name").cast("String"))).show()
userDF.withColumn("isAdult", udf_isAdult(col("age").cast("Int"))).show()


+-----+---+-------+
| name|age|isAdult|
+-----+---+-------+
|  Leo| 16|  false|
|Marry| 21|   true|
| Jack| 14|  false|
|  Tom| 18|   true|
+-----+---+-------+



strLen = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))
udf_isAdult = UserDefinedFunction(<function1>,BooleanType,Some(List(IntegerType)))


UserDefinedFunction(<function1>,BooleanType,Some(List(IntegerType)))

In [18]:
userDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [40]:
userDF.select(col("*"), strLen(col("name")) as("name_len"), udf_isAdult(col("age")) as "isAdult").show

+-----+---+--------+-------+
| name|age|name_len|isAdult|
+-----+---+--------+-------+
|  Leo| 16|       3|  false|
|Marry| 21|       5|   true|
| Jack| 14|       4|  false|
|  Tom| 18|       3|   true|
+-----+---+--------+-------+



In [56]:
// 此函数 column后面不能跟（）
userDF.columns

Array(name, age)

In [48]:
var aa=""
for(aa <- userDF.columns){println(aa)}

name
age


aa: String = ""


In [61]:
sql.function.udf
spark.sql("select isAdult(age) from user").show()

+----------------+
|UDF:isAdult(age)|
+----------------+
|           false|
|            true|
|           false|
|            true|
+----------------+



lastException: Throwable = null
