# Fundamentos de Apache Spark: Funciones avanzadas

En este notebook aprenderemos algunas funciones avanzadas para optimizar el rendimiento de Spark o a crear funciones definidas por el usuario (UDF).

In [None]:
!pip install findspark
!pip install pyspark



In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

### Crea la sesión de SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

### Crear el DataFrame

In [None]:
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"])

# Create Temp Tables
df.createOrReplaceTempView("empdf")
deptdf.createOrReplaceTempView("deptdf")


### BroadCast Join

El tamaño de la tabla de difusión es de 10 MB. Sin embargo, podemos cambiar el umbral hasta 8GB según la documentación oficial de Spark 2.3.

* Podemos verificar el tamaño de la tabla de transmisión de la siguiente manera:

In [None]:
threshold = spark.conf.get("spark.sql.autoBroadcastJoinThreshold").rstrip("b")
size = int(threshold) / (1024 * 1024)
print("Default size of broadcast table is {0} MB.".format(size))

Default size of broadcast table is 10.0 MB.


* Podemos establecer el tamaño de la tabla de transmisión para que diga 50 MB de la siguiente manera:

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)

In [None]:
# Considere que necesitamos unir 2 Dataframes.
# small_df: DataFrame pequeño que puede caber en la memoria y es más pequeño que el umbral especificado.
# big_df: DataFrame grande que debe unirse con DataFrame pequeño.

join_df = big_df.join(broadcast(small_df), big_df["id"] == small_df["id"])

### Almacenamiento en caché
Podemos usar la función de caché / persistencia para mantener el marco de datos en la memoria. Puede mejorar significativamente el rendimiento de su aplicación Spark si almacenamos en caché los datos que necesitamos usar con mucha frecuencia en nuestra aplicación.

In [None]:
df.cache()
df.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


Cuando usamos la función de caché, usará el nivel de almacenamiento como Memory_Only hasta Spark 2.0.2. Desde Spark 2.1.x es Memory_and_DISK.

Sin embargo, si necesitamos especificar los distintos niveles de almacenamiento disponibles, podemos usar el método persist( ). Por ejemplo, si necesitamos mantener los datos solo en la memoria, podemos usar el siguiente fragmento.

In [None]:
from pyspark.storagelevel import StorageLevel

In [None]:
deptdf.persist(StorageLevel.MEMORY_ONLY)
deptdf.count()
print("Memory Used : {0}".format(deptdf.storageLevel.useMemory))
print("Disk Used : {0}".format(deptdf.storageLevel.useDisk))

Memory Used : True
Disk Used : False


### No persistir
También es importante eliminar la memoria caché de los datos cuando ya no sean necesarios.

In [None]:
df.unpersist()

DataFrame[id: bigint, name: string, dept: string, salary: bigint]

In [None]:
sqlContext.clearCache()

#  Expresiones SQL

También podemos usar la expresión SQL para la manipulación de datos. Tenemos la función **expr** y también una variante de un método de selección como **selectExpr** para la e**valuación de expresiones SQL**.

# **expr**

In [None]:
from pyspark.sql.functions import expr

# Intentemos categorizar el salario en Bajo, Medio y Alto según la categorización a continuación.

# 0-2000: salario_bajo
# 2001 - 5000: mid_salary
#> 5001: high_salary

cond = """case when salary > 5000 then 'high_salary'
               else case when salary > 2000 then 'mid_salary'
                    else case when salary > 0 then 'low_salary'
                         else 'invalid_salary'
                              end
                         end
                end as salary_level"""

newdf = df.withColumn("salary_level", expr(cond))
newdf.show()



#repasar

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Usando la variante función selectExpr

In [None]:
newdf = df.selectExpr("*", cond)
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Funciones definidas por el usuario (UDF)
A menudo necesitamos escribir la función en función de nuestro requisito muy específico. Aquí podemos aprovechar las udfs. Podemos escribir nuestras propias funciones en un lenguaje como python y registrar la función como udf, luego podemos usar la función para operaciones de DataFrame.

* Función de Python para encontrar el nivel_salario para un salario dado.

In [None]:
def detSalary_Level(sal):
    level = None

    if(sal > 5000):
        level = 'high_salary'
    elif(sal > 2000):
        level = 'mid_salary'
    elif(sal > 0):
        level = 'low_salary'
    else:
        level = 'invalid_salary'
    return level

* Luego registre la función "detSalary_Level" como UDF.

In [None]:
sal_level = udf(detSalary_Level, StringType())

* Aplicar función para determinar el salario_level para un salario dado.

In [None]:
newdf = df.withColumn("salary_level", sal_level("salary"))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+

