# Column Functions Vs User-defined functions - UDFs en Apache Spark

Uno de los componentes que más me sorprendió en Apache Spark es que permitiera extender el vocabulario de SQL fuera de los límites de DSL con la ayuda de Column Functions o User-defined functions - UDFs, incluso incrustando funciones de negocio escritas en diferente lenguaje <a href="https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html">Scala</a>, <a href="https://blog.cloudera.com/working-with-udfs-in-apache-spark/">Java</a>, o <a href="https://docs.databricks.com/spark/latest/spark-sql/udf-python.html">Python</a>.

## Contenido:
* [Prerequisitos](#head1)
* [Column Functions en Apache Spark](#head2)
* [User-defined functions - UDF en Apache Spark](#head3)
* [Consideraciones de rendimiento y orden de evaluación UDFs](#head4)
* [Tratamiento Nulls Column Functions y UDFs](#head5)
* [Column Functions Vs UDFs](#head6)
* [Column Functions for all!](#head7)
* [Conclusiones](#head8)

## Prerequisitos<a class="anchor" id="head1"></a>

Únicamente necesitaremos las siguientes importaciones en nuestro notebook:

In [1]:
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.types.{StructField, StringType, IntegerType, StructType}

Intitializing Scala interpreter ...

Spark Web UI available at http://DESKTOP-4MHGUNH:4040
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1587599503768)
SparkSession available as 'spark'


import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.types.{StructField, StringType, IntegerType, StructType}


## Column Functions en Apache Spark<a class="anchor" id="head2"></a>

Los Column Functions son funciones que reciben como parámetro una(s) columna(s) o ninguna, y son capaces de retornar una(s) columna(s), se encuentran en el namespace <i style="color:blue;">org.apache.spark.sql.functions</i> (<a href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html">Java</a> <a href="https://spark.apache.org/docs/2.4.5/api/scala/index.html#org.apache.spark.sql.functions">Scala</a>). Al ser funciones nativas, pasan por el optimizador de consultas Catalyst, pero si necesitamos efectuar test a nuestra función demanda un poco de esfuerzo sin la ayuda de librerías como <a href="https://github.com/MrPowers/spark-fast-tests">spark-fast-tests</a> o <a href="https://github.com/mockito/mockito-scala">mockito-scala</a>. Escribamos la función <i style="color:blue;">square</i> una Column Functions que calcula el cuadrado de una columna:

In [2]:
def squareFunction(col:Column) = col * col
spark.range(10).select(col("id"), squareFunction(col("id"))).show

+---+---------+
| id|(id * id)|
+---+---------+
|  0|        0|
|  1|        1|
|  2|        4|
|  3|        9|
|  4|       16|
|  5|       25|
|  6|       36|
|  7|       49|
|  8|       64|
|  9|       81|
+---+---------+



squareFunction: (col: org.apache.spark.sql.Column)org.apache.spark.sql.Column


Podemos construir Column Functions sin parámetros de entrada <i style="color:blue;">createNewColum</i> y con condicionales, enteros y literales <i style="color:blue;">comparativeWithValue</i>

In [3]:
//Without input params
def createNewColum():Column = lit("new column!") 

// With integer and literal columns
def comparativeWithValue(col:Column, value:Int):Column = {
    when(col.leq(lit(value)), lit(s"less or equal to ${value}"))
        .otherwise(lit(s"greater than ${value}")).as("comparative")
}

spark.range(10).select(col("id"),createNewColum(), comparativeWithValue(col("id"),5)).show

+---+-----------+------------------+
| id|new column!|       comparative|
+---+-----------+------------------+
|  0|new column!|less or equal to 5|
|  1|new column!|less or equal to 5|
|  2|new column!|less or equal to 5|
|  3|new column!|less or equal to 5|
|  4|new column!|less or equal to 5|
|  5|new column!|less or equal to 5|
|  6|new column!|    greater than 5|
|  7|new column!|    greater than 5|
|  8|new column!|    greater than 5|
|  9|new column!|    greater than 5|
+---+-----------+------------------+



createNewColum: ()org.apache.spark.sql.Column
comparativeWithValue: (col: org.apache.spark.sql.Column, value: Int)org.apache.spark.sql.Column


## User-defined functions - UDF en Apache Spark<a class="anchor" id="head3"></a>

User-defined functions o UDF es otra forma de crear funciones que extienden la funcionalidad de SQL, permitiendo construir complejas lógicas de negocio y utilizarlas como si fueran funciones nativas de SQL y no relacionadas a tipos de datos asociados a Datasets. Los UDFs requieren ser registradas en Spark y estarán listas para su uso como funciones nativas de SQL. Spark serializará las funciones y las enviará a todos los procesos ejecutores en los worker para su ejecución. Reescribamos nuestra función <i style="color:blue;">square</i> como UDF:

In [4]:
val square = (s: Long) => s * s
val squareUDF = udf(square(_:Long):Long)
spark.range(10).select(col("id"), squareUDF(col("id"))).show

+---+-------+
| id|UDF(id)|
+---+-------+
|  0|      0|
|  1|      1|
|  2|      4|
|  3|      9|
|  4|     16|
|  5|     25|
|  6|     36|
|  7|     49|
|  8|     64|
|  9|     81|
+---+-------+



square: Long => Long = <function1>
squareUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(LongType)))


Su versión Inline más compacta utilizada con Spark SQL

In [5]:
spark.udf.register("squareUDFInline", (s: Long) => s * s)
spark.range(10).createTempView("square")
spark.sql("SELECT id, squareUDFInline(id) from square").show

+---+-----------------------+
| id|UDF:squareUDFInline(id)|
+---+-----------------------+
|  0|                      0|
|  1|                      1|
|  2|                      4|
|  3|                      9|
|  4|                     16|
|  5|                     25|
|  6|                     36|
|  7|                     49|
|  8|                     64|
|  9|                     81|
+---+-----------------------+



Realizar test a una función UDF es bastante sencillo, por ejemplo, un test a nuestra función <i style="color:blue;">square</i>:

In [6]:
List(1, 2, 3).map(x => assert(square(x)== x*x))

res4: List[Unit] = List((), (), ())


## Consideraciones de rendimiento y orden de evaluación UDFs<a class="anchor" id="head4"></a>

Existe una diferencia clave asociada con el lenguaje con el cual se escribió la UDF: si es Java o Scala,  correrán sobre las JVM en las maquinas esclavas, sin embargo si la función fue escrita en <a href="https://medium.com/analytics-vidhya/pyspark-udf-deep-dive-8ae984bfac00">Python</a>, Spark iniciara el proceso de Python en el worker, serializara la data a un formato aceptado por Python, ejecutará la función registro a registro en el proceso de Python y finalmente retornará los resultados a la JVM en la maquina Worker para continuar su procesamiento.

Estas diferencias en la forma de ejecución traen implicaciones a nivel de rendimiento(<a href="https://medium.com/@QuantumBlack/spark-udf-deep-insights-in-performance-f0a95a4d8c62">Spark UDF — Deep insights in performance</a>) ofreciendo evidencia de mejores resultados las UDFs escritas nativamente en Scala.

<img class="nh sg s t u he ai hn" srcset="https://miro.medium.com/max/552/1*FFi8Yk6mwSc6AvI-avWcYw.png 276w, https://miro.medium.com/max/1104/1*FFi8Yk6mwSc6AvI-avWcYw.png 552w, https://miro.medium.com/max/1280/1*FFi8Yk6mwSc6AvI-avWcYw.png 640w, https://miro.medium.com/max/1400/1*FFi8Yk6mwSc6AvI-avWcYw.png 700w" sizes="700px" role="presentation" src="https://miro.medium.com/max/1800/1*FFi8Yk6mwSc6AvI-avWcYw.png" width="1200" height="250">

¿Por qué debería evitar esta maravillosa funcionalidad aun siendo escrita en Scala?  Los UDFs no son optimizados por el optimizador de consultas Catalys (<a href="https://blog.cloudera.com/working-with-udfs-in-apache-spark/">Working with UDFs in Apache Spark</a>) y las funciones nativas de SQL a menudo tendrán un mejor rendimiento y deberían ser el primer enfoque considerando siempre que se pueda evitar la introducción de un UDF. Una de las desventajas de UDF es que su invocación cuando se hace a través de spark.sql no puede ser revisada en tiempo de compilación, si la UDF no existe o no se inscribió lanzará una excepción de tipo <i style="color:red;">org.apache.spark.sql.AnalysisException</i>:

In [7]:
try{
    spark.sql("SELECT squareNotExists(id) from square").show
} catch{
    case x:org.apache.spark.sql.AnalysisException => println("org.apache.spark.sql.AnalysisException has been launched!!!")
}

org.apache.spark.sql.AnalysisException has been launched!!!


Si utilizáramos programación defensiva implica que deberíamos consultar al catálogo de Spark en busca de la existencia de nuestra función UDF y desarrollar alternativas para el manejo de esta situación:

In [8]:
spark.catalog.listFunctions.filter('name like "%square%").show(false)

+---------------+--------+-----------+---------+-----------+
|name           |database|description|className|isTemporary|
+---------------+--------+-----------+---------+-----------+
|squareUDFInline|null    |null       |null     |true       |
+---------------+--------+-----------+---------+-----------+



Otro punto clave a tener en cuenta respecto a las UDFs, cuando son usadas en operaciones de filtro a nivel fila o grupo(WHERE o HAVING) es que no tienen garantía de ejecución tales como las operaciones de corto circuito, como lo menciona <a href="https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html">Databricks</a> en <i>Evaluation order and null checking</i>.
<img src="src/EvaluationOrderUDF.png">

## Tratamiento Nulls Column Functions y UDFs<a class="anchor" id="head5"></a>

El tratamiento de los valores <i style="color:green;">null</i> es diferente en Column Functions y UDFs, miremos con un ejemplo las diferencias en comportamiento:

##### Nulls con Column Functions sobre columnas StringType y el Physical Plan

In [9]:
def upperFunction(col:Column):Column = when(col.isNull, lit("ERROR")).otherwise(upper(col)).as("textWithNulls")
val df = sc.parallelize(Array("aaa",null,"ccc")).toDF("id").select(upperFunction(col("id")))
df.show
df.explain

+-------------+
|textWithNulls|
+-------------+
|          AAA|
|        ERROR|
|          CCC|
+-------------+

== Physical Plan ==
*(1) Project [CASE WHEN isnull(value#82) THEN ERROR ELSE upper(value#82) END AS textWithNulls#86]
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#82]
   +- Scan[obj#81]


upperFunction: (col: org.apache.spark.sql.Column)org.apache.spark.sql.Column
df: org.apache.spark.sql.DataFrame = [textWithNulls: string]


##### Nulls con UDFs sobre columnas StringType y el Physical Plan

In [10]:
def upper(s: String): String = Option(s).getOrElse("error").toUpperCase
val upperUDF = udf(upper(_:String):String)
val df = sc.parallelize(Array("aaa",null,"ccc")).toDF("id").select(col("id"), upperUDF(col("id")).as("textWithNulls"))
df.show
df.explain

+----+-------------+
|  id|textWithNulls|
+----+-------------+
| aaa|          AAA|
|null|        ERROR|
| ccc|          CCC|
+----+-------------+

== Physical Plan ==
*(1) Project [value#95 AS id#97, UDF(value#95) AS textWithNulls#99]
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#95]
   +- Scan[obj#94]


upper: (s: String)String
upperUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
df: org.apache.spark.sql.DataFrame = [id: string, textWithNulls: string]


A pesar de que los resultados son los mismos, se evidencia en el <i style="color:gray;">Physical Plan</i> como la Column Functions fue comprendida <i style="color:darkblue;">CASE WHEN isnull(value#82) THEN err... </i>, mientras que el UDF solo se muestra como una black box: <i style="color:darkblue;">UDF(value#482) AS textWithNulls#486</i>, ahora revisemos el comportamiento con un tipo de columna diferente a StringType.

##### Preparemos la data

In [11]:
val dataRow = Seq(Row(1), Row(null), Row(3))
val dataStruct = List(StructField("id", IntegerType, nullable = true))
val dfdataNull = spark.createDataFrame(spark.sparkContext.parallelize(dataRow), StructType(dataStruct))
dfdataNull.printSchema

root
 |-- id: integer (nullable = true)



dataRow: Seq[org.apache.spark.sql.Row] = List([1], [null], [3])
dataStruct: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true))
dfdataNull: org.apache.spark.sql.DataFrame = [id: int]


##### Nulls con Column Functions sobre columnas IntegerType y el Physical Plan

In [12]:
def squareFunction(col:Column):Column = when(col.isNull, lit("-1")).otherwise(col * col).as("square") 
val df = dfdataNull.select(col("id"), squareFunction(col("id")))
df.show
df.explain

+----+------+
|  id|square|
+----+------+
|   1|     1|
|null|    -1|
|   3|     9|
+----+------+

== Physical Plan ==
*(1) Project [id#111, CASE WHEN isnull(id#111) THEN -1 ELSE cast((id#111 * id#111) as string) END AS square#113]
+- Scan ExistingRDD[id#111]


squareFunction: (col: org.apache.spark.sql.Column)org.apache.spark.sql.Column
df: org.apache.spark.sql.DataFrame = [id: int, square: string]


##### Nulls con UDFs sobre columnas IntegerType y el Physical Plan

In [13]:
def square(s: Long) = if(s.eq(null)) -1 else s * s
val squareUDF = udf(square(_:Long):Long)
val df = dfdataNull.select(col("id"), squareUDF(col("id")).as("square"))
df.show
df.explain

+----+------+
|  id|square|
+----+------+
|   1|     1|
|null|  null|
|   3|     9|
+----+------+

== Physical Plan ==
*(1) Project [id#111, if (isnull(cast(id#111 as bigint))) null else UDF(cast(id#111 as bigint)) AS square#124L]
+- Scan ExistingRDD[id#111]


square: (s: Long)Long
squareUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(LongType)))
df: org.apache.spark.sql.DataFrame = [id: int, square: bigint]


Esta vez los resultados son diferentes, y  eso se debe a que Spark cuando invoca una UDF asume una programación defensiva con el tratamiento de los nulos, retornando inmediatamente el valor , sin permitir el manejo de estos valores por parte de las UDFs, a diferencia del comportamiento asumido si corresponde a  la invocación de un Column Functions, si aún esa dudando de esta afirmación, revisemos el <i style="color:gray;">Physical Plan</i> cuando invoca una UDF, pero esta vez en la función <i style="color:blue;">square</i> no escribiremos el manejo de los valores nulos.

In [14]:
def square(s: Long) = s * s
val squareUDF = udf(square(_:Long):Long)
dfdataNull.select(squareUDF(col("id")).as("square")).explain

== Physical Plan ==
*(1) Project [if (isnull(cast(id#111 as bigint))) null else UDF(cast(id#111 as bigint)) AS square#135L]
+- Scan ExistingRDD[id#111]


square: (s: Long)Long
squareUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(LongType)))


Aquí es claro que esta línea <i style="color:darkblue;">if (isnull(cast(id#95 as bigint))) null else UDF(cast...</i> no fue adicionada por nuestro código y se debe a un tratamiento interno de Spark para las columnas IntegerType en este caso. Si quisiéramos manejar los valores <i style="color:green;">null</i> con nuestro código y los tipo de columna son numéricos, debemos utilizar Column Functions para este tratamiento y evitar que Spark asuma cierta posición como la vista anteriormente.

## Column Functions Vs UDFs<a class="anchor" id="head6"></a>

Esta tabla comparativa nos resumira los conceptos vistos:

<style type="text/css">
table, th, td {
  border: 1px solid black;
  border-collapse: collapse;
}
th, td {
  padding: 5px;
  text-align: center;
}
th {
  text-align: center;
}
</style>
<table>
<thead><tr><th></th><th>Column Functions</th><th>UDFs</th></tr></thead><tbody>
 <tr><td>Sql Functions Native</td><td>Yes</td><td>No</td></tr>
 <tr><td>Required Spark register</td><td>No</td><td>Yes</td></tr>
 <tr><td>Compiler check</td><td>Yes</td><td>No(SQL)</td></tr>
 <tr><td>Return can be different to Colum</td><td>No</td><td>Yes</td></tr>
 <tr><td>Performance in general</td><td>Fast</td><td>Slow</td></tr>
 <tr><td>Test</td><td>Medium</td><td>Easy</td></tr>
</tbody></table>

Las UDFs en Apache Spark no deberian ser nuestra primera elección, bienvenidas Column Functions!

## Column Functions for all!<a class="anchor" id="head7"></a>

En su mayoría de veces las Column Functions permitarán reescribir el código de una UDF existente a una versión nativa, los siguientes ejemplos nos mostrarán su potencial:

##### Accediendo a data externa a Column Functions

In [15]:
val fruits = List("apple","blueberry","watermelon")

val isFruit = (col:Column) => when(col.isin(fruits:_*), lit("yes")).otherwise(lit("no")).as("isFruit")

sc.parallelize(
    Array("apple","car", "plane", "watermelon")
).toDF("fruit").select(col("fruit"), isFruit(col("fruit"))).show

+----------+-------+
|     fruit|isFruit|
+----------+-------+
|     apple|    yes|
|       car|     no|
|     plane|     no|
|watermelon|    yes|
+----------+-------+



fruits: List[String] = List(apple, blueberry, watermelon)
isFruit: org.apache.spark.sql.Column => org.apache.spark.sql.Column = <function1>


##### Utilizando Pattern matching

In [16]:
def sum(colA:Column, colB:Column) = colA + colB 
val diff = (colA:Column, colB:Column) => colA.minus(colB)

def matchOperation(operationType:String):(Column, Column)=>Column = operationType match {
  case "+" => sum
  case "-" => diff
  case _ => (colA:Column, colB:Column) => colA * colB 
}

val genericFunction = matchOperation("other")
spark.range(10).toDF("id").select(col("id"), genericFunction(col("id"), col("id"))).show

+---+---------+
| id|(id * id)|
+---+---------+
|  0|        0|
|  1|        1|
|  2|        4|
|  3|        9|
|  4|       16|
|  5|       25|
|  6|       36|
|  7|       49|
|  8|       64|
|  9|       81|
+---+---------+



sum: (colA: org.apache.spark.sql.Column, colB: org.apache.spark.sql.Column)org.apache.spark.sql.Column
diff: (org.apache.spark.sql.Column, org.apache.spark.sql.Column) => org.apache.spark.sql.Column = <function2>
matchOperation: (operationType: String)(org.apache.spark.sql.Column, org.apache.spark.sql.Column) => org.apache.spark.sql.Column
genericFunction: (org.apache.spark.sql.Column, org.apache.spark.sql.Column) => org.apache.spark.sql.Column = <function2>


##### Utilizando funciones parcialmente aplicadas

In [17]:
def operate[A](operation:(A, A)=>A, a:A, b:A):A = operation(a,b)
val deferredOperation = operate(_:(Column, Column)=>Column, col("id"),col("id"))
//Many lines after
spark.range(10).toDF("id").select(col("id"), deferredOperation(matchOperation("+"))).show

+---+---------+
| id|(id + id)|
+---+---------+
|  0|        0|
|  1|        2|
|  2|        4|
|  3|        6|
|  4|        8|
|  5|       10|
|  6|       12|
|  7|       14|
|  8|       16|
|  9|       18|
+---+---------+



operate: [A](operation: (A, A) => A, a: A, b: A)A
deferredOperation: ((org.apache.spark.sql.Column, org.apache.spark.sql.Column) => org.apache.spark.sql.Column) => org.apache.spark.sql.Column = <function1>


## Conclusiones<a class="anchor" id="head8"></a>

Las UDFs al igual que las Column Functions pueden extender el léxico de SQL, pero siempre las Column Functions deben ser la primera opción para resolver el problema por sus ventajas a nivel de rendimiento y optimizaciones internas.