# Fundamentos de Apache Spark: Funciones avanzadas

En este notebook aprenderemos algunas funciones avanzadas para optimizar el rendimiento de Spark, para imputar valores faltantes o a crear funciones definidas por el usuario (UDF).

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

### Crea la sesión de SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

### Crear el DataFrame

In [4]:
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"]) 

# Create Temp Tables
df.createOrReplaceTempView("empdf")
deptdf.createOrReplaceTempView("deptdf")

# Save as HIVE tables.
df.write.saveAsTable("hive_empdf", mode = "overwrite")
deptdf.write.saveAsTable("hive_deptdf", mode = "overwrite")

### BroadCast Join

El tamaño de la tabla de difusión es de 10 MB. Sin embargo, podemos cambiar el umbral hasta 8GB según la documentación oficial de Spark 2.3.

* Podemos verificar el tamaño de la tabla de transmisión de la siguiente manera:

In [172]:
size = int(spark.conf.get("spark.sql.autoBroadcastJoinThreshold")) / (1024 * 1024)
print("Default size of broadcast table is {0} MB.".format(size))

Default size of broadcast table is 50.0 MB.


* Podemos establecer el tamaño de la tabla de transmisión para que diga 50 MB de la siguiente manera:

In [173]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)

In [174]:
# Considere que necesitamos unir 2 Dataframes.
# small_df: DataFrame pequeño que puede caber en la memoria y es más pequeño que el umbral especificado.
# big_df: DataFrame grande que debe unirse con DataFrame pequeño.

join_df = big_df.join(broadcast(small_df), big_df["id"] == small_df["id"])

### Almacenamiento en caché
Podemos usar la función de caché / persistencia para mantener el marco de datos en la memoria. Puede mejorar significativamente el rendimiento de su aplicación Spark si almacenamos en caché los datos que necesitamos usar con mucha frecuencia en nuestra aplicación.

In [175]:
df.cache()
df.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


Cuando usamos la función de caché, usará el nivel de almacenamiento como Memory_Only hasta Spark 2.0.2. Desde Spark 2.1.x es Memory_and_DISK.

Sin embargo, si necesitamos especificar los distintos niveles de almacenamiento disponibles, podemos usar el método persist( ). Por ejemplo, si necesitamos mantener los datos solo en la memoria, podemos usar el siguiente fragmento.

In [176]:
from pyspark.storagelevel import StorageLevel

In [177]:
deptdf.persist(StorageLevel.MEMORY_ONLY)
deptdf.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


### No persistir
También es importante eliminar la memoria caché de los datos cuando ya no sean necesarios.

In [178]:
df.unpersist()

DataFrame[id: bigint, name: string, dept: string, salary: bigint]

In [None]:
sqlContext.clearCache()

#  Expresiones SQL

También podemos usar la expresión SQL para la manipulación de datos. Tenemos la función **expr** y también una variante de un método de selección como **selectExpr** para la evaluación de expresiones SQL.

In [179]:
from pyspark.sql.functions import expr

# Intentemos categorizar el salario en Bajo, Medio y Alto según la categorización a continuación.

# 0-2000: salario_bajo
# 2001 - 5000: mid_salary
#> 5001: high_salary

cond = """case when salary > 5000 then 'high_salary'
               else case when salary > 2000 then 'mid_salary'
                    else case when salary > 0 then 'low_salary'
                         else 'invalid_salary'
                              end
                         end
                end as salary_level"""

newdf = df.withColumn("salary_level", expr(cond))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Usando la función selectExpr

In [180]:
newdf = df.selectExpr("*", cond)
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Funciones definidas por el usuario (UDF)
A menudo necesitamos escribir la función en función de nuestro requisito muy específico. Aquí podemos aprovechar las udfs. Podemos escribir nuestras propias funciones en un lenguaje como python y registrar la función como udf, luego podemos usar la función para operaciones de DataFrame.

* Función de Python para encontrar el nivel_salario para un salario dado.

In [181]:
def detSalary_Level(sal):
    level = None

    if(sal > 5000):
        level = 'high_salary'
    elif(sal > 2000):
        level = 'mid_salary'
    elif(sal > 0):
        level = 'low_salary'
    else:
        level = 'invalid_salary'
    return level

* Luego registre la función "detSalary_Level" como UDF.

In [182]:
sal_level = udf(detSalary_Level, StringType())

* Aplicar función para determinar el salario_level para un salario dado.

In [183]:
newdf = df.withColumn("salary_level", sal_level("salary"))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Trabajando con valores NULL

Los valores NULL siempre son difíciles de manejar independientemente del Framework o lenguaje que usemos. Aquí en Spark tenemos pocas funciones específicas para lidiar con valores NULL.

- **es nulo()**

Esta función nos ayudará a encontrar los valores nulos para cualquier columna dada. Por ejemplo si necesitamos encontrar las columnas donde las columnas id contienen los valores nulos.

In [184]:
newdf = df.filter(df["dept"].isNull())
newdf.show()

+----+----+----+------+
|  id|name|dept|salary|
+----+----+----+------+
|null|null|null|  7500|
|   9| III|null|  4500|
+----+----+----+------+



* **No es nulo()**

Esta función funciona de manera opuesta a la función isNull () y devolverá todos los valores no nulos para una función en particular.

In [185]:
newdf = df.filter(df["dept"].isNotNull())
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



* **fillna ()**

Esta función nos ayudará a reemplazar los valores nulos.

In [186]:
# Replace -1 where the salary is null.
newdf = df.fillna("INVALID", ["dept"])
newdf.show()

+----+----+-------+------+
|  id|name|   dept|salary|
+----+----+-------+------+
|   1| AAA|  dept1|  1000|
|   2| BBB|  dept1|  1100|
|   3| CCC|  dept1|  3000|
|   4| DDD|  dept1|  1500|
|   5| EEE|  dept2|  8000|
|   6| FFF|  dept2|  7200|
|   7| GGG|  dept3|  7100|
|null|null|INVALID|  7500|
|   9| III|INVALID|  4500|
|  10|null|  dept5|  2500|
+----+----+-------+------+



* **dropna ()**

Esta función nos ayudará a eliminar las filas con valores nulos.

In [187]:
# Remove all rows which contains any null values.
newdf = df.dropna()
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
+---+----+-----+------+



In [None]:
# Elimina todas las filas que contienen todos los valores nulos.
newdf = df.dropna(how = "all")
newdf.show()

# Nota: valor predeterminado de "cómo" param es "any".

In [189]:
# Remove all rows where columns : dept is null.
newdf = df.dropna(subset = "dept")
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



## Partitioning


El particionamiento es un aspecto muy importante para controlar el paralelismo de la aplicación Spark.

* Consultar número de particiones.

In [190]:
df.rdd.getNumPartitions()

4

* Incrementar el número de particiones. Por ejemplo Aumentar las particiones a 6

In [191]:
newdf = df.repartition(6)
newdf.rdd.getNumPartitions()

6

**Nota: se trata de operaciones costosas, ya que requiere la mezcla de datos entre los trabajadores.**

* Disminuir el número de particiones. Por ejemplo disminuir las particiones a 2.

In [192]:
newdf = df.coalesce(2)
newdf.rdd.getNumPartitions()

2

* De forma predeterminada, el número de particiones para Spark SQL es 200.
* Pero también podemos establecer el número de particiones en el nivel de aplicación Spark. Por ejemplo establecido en 500

In [193]:
# Set number of partitions as Spark Application.
spark.conf.set("spark.sql.shuffle.partitions", "500")

# Check the number of patitions.
num_part = spark.conf.get("spark.sql.shuffle.partitions")
print("No of Partitions : {0}".format(num_part))

No of Partitions : 500


# Catálogo de APIs

Spark Catalog es una API orientada al usuario, a la que puede acceder mediante SparkSession.catalog.

* **listDatabases ()**

Devolverá todas las bases de datos junto con su ubicación en el sistema de archivos.

In [194]:
spark.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/home/jovyan/work/spark-warehouse')]

* **listTables ()**

Devolverá todas las tablas para una base de datos determinada junto con información como el tipo de tabla (externa / administrada) y si una tabla en particular es temporal o permanente.
Esto incluye todas las vistas temporales.

In [195]:
spark.catalog.listTables("default")

[Table(name='hive_deptdf', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='hive_empdf', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='deptdf', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='empdf', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

* **listColumns ()**

Devolverá todas las columnas de una tabla en particular en DataBase. Además, devolverá el tipo de datos, si la columna se usa en particiones o agrupaciones.

In [196]:
spark.catalog.listColumns("hive_empdf", "default")

[Column(name='id', description=None, dataType='bigint', nullable=True, isPartition=False, isBucket=False),
 Column(name='name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='dept', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='salary', description=None, dataType='bigint', nullable=True, isPartition=False, isBucket=False)]

* **listFunctions()**

Devolverá todas las funciones disponibles en Spark Session junto con la información si es temporal o no.

In [197]:
spark.catalog.listFunctions()

[Function(name='!', description=None, className='org.apache.spark.sql.catalyst.expressions.Not', isTemporary=True),
 Function(name='%', description=None, className='org.apache.spark.sql.catalyst.expressions.Remainder', isTemporary=True),
 Function(name='&', description=None, className='org.apache.spark.sql.catalyst.expressions.BitwiseAnd', isTemporary=True),
 Function(name='*', description=None, className='org.apache.spark.sql.catalyst.expressions.Multiply', isTemporary=True),
 Function(name='+', description=None, className='org.apache.spark.sql.catalyst.expressions.Add', isTemporary=True),
 Function(name='-', description=None, className='org.apache.spark.sql.catalyst.expressions.Subtract', isTemporary=True),
 Function(name='/', description=None, className='org.apache.spark.sql.catalyst.expressions.Divide', isTemporary=True),
 Function(name='<', description=None, className='org.apache.spark.sql.catalyst.expressions.LessThan', isTemporary=True),
 Function(name='<=', description=None, cl

* **currentDatabase ()**

Obtenga la base de datos actual.

In [198]:
spark.catalog.currentDatabase()

'default'

* **setCurrentDatabase ()**

Establecer la base de datos actual

In [199]:

spark.catalog.setCurrentDatabase(<DB_Name>)

* **cacheTable ()**

almacenar en caché una tabla en particular.


In [200]:
spark.catalog.cacheTable("default.hive_empdf")

* **isCached()**

Compruebe si la tabla está almacenada en caché o no.

In [201]:
spark.catalog.isCached("default.hive_empdf")

True

* **uncacheTable()**

Des-cachear de una tabla en particular.

In [202]:
spark.catalog.uncacheTable("default.hive_empdf")

In [203]:
# Verify uncached table. Now you will see that it will return "False" which means table is not cached.
spark.catalog.isCached("default.hive_empdf")

False

* **clearCache()**

Des-cachear toda la tabla en la sesión de Spark.

In [204]:
spark.catalog.clearCache()