## Introducción a Pyspark (I): Sintáxis básica


El siguiente notebook contiene un pequeño tutorial para explicar los conceptos más básicos de sintaxis de Pyspark que nos permitan luego ver un ejemplo de construcción de modelo y que el alumno pueda profundizar por su cuenta.

Como es un notebook preparado para ser ejecutado en "Databricks", contiene código que es específico para esta plataforma. 

### Inciso DataBrick (I): Crear un cluster y asociar el notebook a dicho clúster

Antes de poder ejecutar cualquier código en Databricks es necesario tener un clúster al que poder asociar dicho código. Si ejectuas la celda que viene a continuación y no has asociado este notebook a un cluster te pedirá hacerlo. En clase vamos a ver cómo crear el cluster por "fuera" y luego asociarlo al notebook

In [None]:
import pyspark

Importado pyspark, lo primero como ya comentamos en las sesiones téoricas es crear ese "contexto" o "cursor" con el que poder interactuar con nuestro clúster:

In [None]:
from pyspark.sql import SparkSession

### Inciso DataBricks (II): Lectura de tablas y ficheros, empezando por Pandas

Vamos a leer ahora el primer fichero de datos que nos servirá como guía durante estas sesiones introductorias, pero para ello necesitamos saber dónde está (en la sesión anterior lo creamos y vimos el "path", pero seguramente no te acuerdas). Para saber en Databricks dónde están las tablas, ejecutamos la siguiente celda:

In [None]:
display(dbutils.fs.ls("dbfs:/FileStore/tables/"))

Las tablas están en el sistema de ficheros propio de DataBricks que no es accesible directamente con comandos python (si con pyspark). Como en este tutorial vamos a ir comparando pandas con pyspark, necesitamos mover esas tablas a un directorio "local" al nodo driver del cluster y ahí lo leeremos con pandas y luego ya lo volcaremos en un dataframe de spark para ir haciendo la comparativa.

NOTA: Al ser copias en el sistema de ficheros local de un nodo del clúster, cuando este se desconecte y termine, se perderán y tendrás que volver a copiarlas del dbfs

Las celdas siguiente no la vas a necesitar si programas en pyspark, ojo, es sólo para este tutorial pero ahí te queda por si quieres 

In [None]:
%fs cp dbfs:/FileStore/tables/test1.csv file:/tmp/test1.csv



In [None]:
%fs cp dbfs:/FileStore/tables/test2.csv file:/tmp/test2.csv


In [None]:
%fs cp dbfs:/FileStore/tables/test3.csv file:/tmp/test3.csv

Y ahora ya sí, podemos leer nuestro fichero en pandas

In [None]:
import pandas as pd
df = pd.read_csv("/tmp/test1.csv")

In [None]:
df

### Comenzando con Spark: Lectura de datos y echar un vistazo

Como ya se comentó en las sesiones teóricas, lo PRIMERO siempre es abrir un contexto o una sesión con Spark. Hoy en día siempre una sesión porque eso nos evita tener que abrir un contexto para cada módulo de Spark diferente.


La sesión adenmás nos da acceso a los métodos y funciones de DataFrame y DataSet, que son los elementos sobre los que trabajaremos de una forma "parecida" a como hemos trabajado los Dataframes de Pandas

In [None]:
spark = SparkSession.builder.appName("Practise").getOrCreate() # Fíjate en la sintaxis, no muy intuitiva. Se le da un nombre.

In [None]:
spark

Ahora vamos a leer el fichero a través de la interfaz de sparkm

In [None]:
df_pyspark = spark.read.csv("/FileStore/tables/test1.csv")

Recuerda que la lectura anterior es una transformación, no se ha ejecutado. 

Por eso ahora lo vamos a forar con el método show() (Que es como el head de pandas)

In [None]:
df_pyspark.show(2)

No lee los nombres de las columnas directamente como hace un read_csv porque no le hemos indicado nada al respecto. Ha considerado todas las filas como filas de datos y por eso la primera fila contine el nombre de las columnas. Veamos como forzar la lectura de la primera fila como nombre de columnas y otra forma de hacer la lectura del fichero.

In [None]:
df_pyspark = spark.read.csv("/FileStore/tables/test1.csv", header= True)
df_pyspark_2 = spark.read.option("header","true").csv("/FileStore/tables/test1.csv")

In [None]:
df_pyspark

In [None]:
display(df_pyspark.show())

In [None]:
df_pyspark_2.show()

In [None]:
type(df_pyspark_2)

In [None]:
El comando análogo de info en pandas es printSchema. Ambos nos muestran las columnas, si pueden contener nulos y el tipo de cada columna

In [None]:
df.info()

In [None]:
df_pyspark.printSchema()

El atributo dtypes de un DataFrame de Pandas hace algo parecido también:


In [None]:
display(df.dtypes) # display hace que se vea un poco más estético pero no es necesario

Pero fijate en la diferencia... En spark todas las columnas son tipo string. A diferencia de pandas, spark si no se lo dices no hace inferencia de tipos. Tenemos que forzarlo como con las cabeceras

In [None]:
df_pyspark_3 = spark.read.csv("/FileStore/tables/test1.csv", header = True, inferSchema= True) # Si no le dices nada a Databricks lee directamente de dbfs con los read de spark

In [None]:
df_pyspark_3.show()

In [None]:
df_pyspark_3.printSchema()

Como puedes ver ahora sí ha cogido los tipos de cada columna. Veamos que también se puede aplicar de la forma alternativa (y ya de paso que podemos leer también del sistema de ficheros locales del nodo driver)

In [None]:
df_pyspark_4 = spark.read.option("header","true").option("inferSchema","true").csv("file:/tmp/test1.csv") # Para acceder en Databricks al sistema de ficheros local de un nodo hay que poner file: delante del path

In [None]:
df_pyspark_4.printSchema()

Los dataframes de Spark también tienen el atributo dtypes

In [None]:
df_pyspark_3.dtypes

Para terminar la sesión, completemos el vistazo con el método describe (que existe tanto en Pandas con en Pyspark)

In [None]:
df_pyspark_4.describe()

Hmmm, no ha hecho nada, describe es una transformación, porque genera otro DataFrame a partir del primero (en este caso "df_pyspark_4"), en este caso además se habrá quedado perdida en el limbo porque no hay accion posterior posible que llame al DataFrame resultante (no lo hemos asignado a ninguna variable). En ese sentido es bastante eficiente.

In [None]:
df_pyspark_4.describe().show() # Show fuerza a mostrar por pantalla y por tanto a "obtener" el dataframe, show es una acción (action)

Aunque, se suele usar show para enseñar una muestra de datos de un dataframe, estos también tienen el método head (aunque su valor por defefcto es 1)

In [None]:
df_pyspark_3.head()

In [None]:
df_pyspark_3.head(4)

Y vamos bien servidos por ahora. En la siguiente sección/sesión avanzaremos en la selección, indexación y manipulación de columnas de un dataframe Spark. Recuerda que si dejas pasar más de una hora, el cluster se desconectará y para la siguiente sesión tendrás que crear otro nuevo y volver a ejectuar el código hasta aquí. 

***

### Intro Spark (II): Trabajando con columnas

#### Selección de Columnas

Empezamos viendo como se selecciona una columna de un dataframe. Primero con pandas y luego con spark

In [None]:
display(df["Name"])

In [None]:
df_pyspark_3.select("Name")

Vuelve a ocurrirnos lo que nos pasaba con el display, select es una transformacion (fíjate además que en Pandas se origina una Serie y en spark se origina un DataFrame) y si no va seguida de un acción relacionada no se ejecuta.

En realidad todavía no ha hecho la operación, está esperando a que "sea necesario"

In [None]:
df_pyspark_3.select("Name").show()

Para seleccionar varias

In [None]:
df[["Name","Salary"]]

In [None]:
df_pyspark_3.select(["Name","Salary"]).show()

#### Indexado por posición de columna y fila

Por indice de columna

In [None]:
df.columns[:2]

In [None]:
df_pyspark_3.columns[:2]

In [None]:
df.iloc[:,:2]

In [None]:
df_pyspark_3.select(df_pyspark_3.columns[:2]).show()

La cosa se "complica" cuando queremos trabajar por ejemplo con índices posicionales (porque Pandas tiene siempre un índice posicional implicito), en el caso de Spark si lo necesitamos tenemos que crearlo.

In [None]:
df.iloc[2:4,:2]

In [None]:
df_pyspark_3.select(df_pyspark_3.columns[:2]).show(3)

Como ves no es lo mismo, para poder hacer algo similar tenemos que crear una columna que contenga un índice hecho "a mano". Para ello veamos primero como añadir una columna a un dataframe Spark.

#### Creando una nueva columna

En Pandas:

In [None]:
df["new_column"] = df["Experience"] + 2
df

En spark hay que usar el metodo withColumn

In [None]:
df_pyspark_4 = df_pyspark_4.withColumn("new_column", df_pyspark_4["Experience"] + 2)

Recuerda que todavía no la ha creado

In [None]:
df_pyspark_4.printSchema()

In [None]:
df_pyspark_4.show()

El show tarda un poco más de lo "esperable", precisamente porque es cuando está creando el nuevo dataframe con la columna añadida

Cremos ahora un índice y simulemos un iloc (usando filter, un método que veremos un poco más en la siguiente sección/sesión): 

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
df_pyspark_4 = df_pyspark_4.withColumn("index", monotonically_increasing_id() )

In [None]:
df_pyspark_4.show()

In [None]:
df_pyspark_4.select(["Name","age","index"]).filter(df_pyspark_4["index"].between(2,3)).show()

Pero no teníamos que usar "select", ahí pone df_pyspark_4["index"]... Sí pero eso es porque es un objeto columna que no tiene métodos para visualizarse (solo los dataframe lo tienen), pero que sí puede usarse para generar condiciones como veremos cuando tratemos el metodo filter y las formas de seleccionar columnas en un dataframe de spark.

In [None]:
type(df_pyspark_4["index"])

Volveremos sobre ello, ahora veamos como eliminar y renombrar columnas:

### Eliminando columnas

In [None]:
df.drop("new_column", axis = 1)

In [None]:
df_pyspark_4.drop("Experience")

En ambos casos no es una operación que "mute" el dataframe original (en Pandas es necesario el argumento inplace y en Spark no es posible hacerlo por filosofía de programación interna)

In [None]:
df_pyspark_4.show()

Con el renombrado ocurre igual, es necesario o usar el argumento inplace(Pandas) o asignarle el resultado de la operación a una nueva variable

In [None]:
df.rename(columns={"Name": "Nombre"}, inplace = True)

In [None]:
df

In [None]:
df_pyspark_4.withColumnRenamed("Name","Nombre").show()

In [None]:
df_pyspark_4.printSchema()

In [None]:
df_renombrado = df_pyspark_4.withColumnRenamed("Name","Nombre")

In [None]:
df_renombrado_2 = df_renombrado.withColumnRenamed("Salary","Nomina")

In [None]:
df_renombrado_2.show()

Aquí terminamos la sección/sesión. En la siguiente veremos el tratamiento de valores faltantes y el filtrado y selección por valores. 

***

### Intro Spark (III): Missing Values y Filtrado por valores

#### Missing Values

Leemos primero una tabla con valores faltantes

In [None]:
df_pyspark_5 = spark.read.csv("/FileStore/tables/test2.csv", header = True, inferSchema= True)

In [None]:
df_pyspark_5.show()

Y aquí las opciones son las de siempre:
- Eliminar las filas con valores nulos
- Eliminar las columnas con valores nulos
- Imputar valores "representativos" (moda en categóricas, media en numéricas tipicamente)

1. Eliminando las filas con valores faltantes:

In [None]:
df_sin_missing = df_pyspark_5.na.drop()

In [None]:
df_sin_missing.show()

2. Quedándonos con las columnas que no tienen missings o faltantes (imagina que "age" y "salary" no tienen valores faltantes)

In [None]:
df_sin_col_missing = df_pyspark_5.select(["age","Salary"])

In [None]:
df_sin_col_missing.show()

Y antes de pasar a la imputación, veamos algunos de los parámetros de la función drop (parecidos a Pandas):


* how = "any" elimina la fila si uno de los valores es nulo (comportamiento por defecto)
* how = "all" elimina la fila si todos los valores son nulos  
* Otro parámetro es "tresh". Nos dice el número minimo de valores no nulos que teine que tener una fila antes de eliminarla. Por ejemplo tresh = 2, mantiene las columnas con al menos 2 valores NO NULOS

In [None]:
df_pyspark_5.na.drop(how = "any").show()

In [None]:
df_pyspark_5.na.drop(how = "all").show()

In [None]:
df_pyspark_5.na.drop(how = "any", thresh= 2).show()

Veamos how en pandas a modo de recordatorio:

In [None]:
df_2 = pd.read_csv("/tmp/test2.csv")

In [None]:
df_2

In [None]:
df_2.dropna(how = "any")

In [None]:
df_2.dropna(how = "all")

Eliminando valores por columna, cuando sólo queremos eliminar aquellas filas en las que determinadas columnas tienen nulos

In [None]:
df_pyspark_5.na.drop(subset=("Experience")).show()

In [None]:
df_2.dropna(subset= "Experience")

Y para terminar el tratamiento de nulos, veamos la imputación de missing values:

In [None]:
df_pyspark_5.na.fill(20).show()

In [None]:
df_pyspark_5.na.fill("Pepe").show()

Fíjate que sólo imputa en aquellas columnas cuyo tipo es compatible con el valor de relleno. En cambio en Pandas se fuerza la imputación:

In [None]:
df_2.fillna(20)

In [None]:
df_2.Experience.fillna(20)

Si lo que queremos es imputar valores según la columna:

In [None]:
df_pyspark_5.na.fill({"Name": "Ana", "Experience": 3}).show()

Esta bien, esta bien, hemos imputado valores "constantes" o "fijos", pero si queremos hacer la imputación con medias o modas:

In [None]:
from pyspark.ml.feature import Imputer # Necesitamos tirar de la parte de Machine Learning (el MLLib de PySpark)

In [None]:
imputer_mean = Imputer(inputCols = ["age","Experience","Salary"], outputCols= ["imputed_age", "imputed_Experience","imputed_Salary"]).setStrategy("mean")

In [None]:
imputer_mean.fit(df_pyspark_5).transform(df_pyspark_5).show()

Pasemos ahora a filtrar (algo que vimos en la sesión anterior muy de pasada) y a aplicar condiciones (muy parecido a como las aplicamos con Pandas)

#### Filtrado y condiciones

Como vimos cuando intentamos hacer un iloc con dataframes de spark, el método para filtrar es filter y hay que pasarle una condición. De nuevo, filter es una transformación (su resultado es otro dataframe a partir del original) por lo que necesitamos aplicar una acción para forzar la ejecución. 

Veamos varias formas de proporcionar las condiciones:

* Como un string usando el nombre de las columnas

In [None]:
df_pyspark_5.filter("Salary <= 20000").show()

* Creando una mascara asignada a una variable, análogamente a como hemos hecho muchas veces con Pandas

In [None]:
condicion = df_2["Experience"] < 3
condicion_spark = df_pyspark_5["Experience"] < 3

In [None]:
df_2[condicion]

In [None]:
df_pyspark_5.filter(condicion_spark).show()

De igual forma podemos combinar máscaras entre sí con los operadores & (and o y-lógico) y | (or-lógico)

In [None]:
condicion_spark_2 = (condicion_spark) & (df_pyspark_5["Salary"] > 10000)

In [None]:
df_pyspark_5.filter(condicion_spark_2).show()

In [None]:
condicion_2 = (condicion) & (df_2["Salary"] > 10000)
df_2[condicion_2]

Y también tenemos el operador negacion  ~, igual que en pandas

In [None]:
no_Harsha = ~(df_pyspark_5["Name"] == "Harsha")

In [None]:
df_pyspark_5.filter(no_Harsha).show()

Con esto damos por finalizada la sección. En la siguiente trataremos las agregaciones o agrupaciones (los group by) y la forma de operar sobre ellas

***

### Intro Spark (IV): Agrupaciones y agregaciones (groupby)

Para jugar con las agrupaciones en los dataframes de spark cargamos el tercer juego de datos

In [None]:
df_pyspark_6 = spark.read.csv("/FileStore/tables/test3.csv", header = True, inferSchema= True)

In [None]:
df_pyspark_6.show()

In [None]:
df_pyspark_6.groupby("Name").show()

Al igual que con las agrupaciones en Pandas, es necesario aplicar una función de agregación para poder generar un dataframe final

In [None]:
df_grouped = df_pyspark_6.groupBy("Name")

In [None]:
df_grouped.sum().show()

Y solo arrastra las columnas sobre las que tiene sentido la función de agregación

In [None]:
df_grouped.count().show()

Veamos la equivalencia en Pandas:

In [None]:
df_3 = pd.read_csv("/tmp/test3.csv")

In [None]:
df_3

In [None]:
pd_grouped = df_3.groupby("Name")

In [None]:
pd_grouped.sum(numeric_only= True)

Agrupando ahora por departamentos:

In [None]:
df_department = df_pyspark_6.groupBy("Departments")

In [None]:
df_department.count().show()

In [None]:
df_department.sum().show()

In [None]:
df_department.mean().show()

Y en pandas:

In [None]:
pd_departments = df_3.groupby("Departments")

In [None]:
pd_departments.count()

In [None]:
pd_departments.sum()

In [None]:
pd_departments.mean(numeric_only= True)

#### AGGREGATE

Al igual que en Pandas, se pueden aplicar diferentes funciones de agregación a una agrupación en función de la columna objetivo. Aunque primero veamos que ocurre cuando lo hacemos sin agrupar (que se aplica a todas las filas)

In [None]:
df_pyspark_6.agg({"Salary": "sum"}).show()

Ahora ya sobre la agrupación

In [None]:
df_pyspark_6.groupBy("Departments").agg({"Salary": "sum"}).show()

Recordando un poco de pandas:

In [None]:
pd_departments.agg(["sum","mean","count"])

Un equivalente (mejorado):

In [None]:
df_pyspark_6.groupBy("Departments").agg({"Salary": "sum", "Salary": "mean", "Name": "count"}).show()

Y ya casi estamos preparados para trabajar sobre nuestros dataframes como si fueran pandas, pero para terminar la introducción veremos en la siguiente sección como aplicar funciones a las columnas (y en particular las udf, user defined functions). Pero ya en la siguiente sección/sesión.

***

### Intro Spark (V): Funciones

Para empezar nos importamos unas cuantas funciones predefinidas que se pueden aplicar directamente a las columnas:

In [None]:
from pyspark.sql.functions import lower,col,ascii,mean

In [None]:
df_pyspark_6.show()

Fijate en la sintaxis, que no es precisamente muy directa:

In [None]:
df_pyspark_6.select(lower(col("Name"))).show()

In [None]:
df_pyspark_6.select(lower(df_pyspark_6["Name"])).show()

Es necesario aplicar la función col primero (en el contexto de un método de un dataframe) y luego la función que queramos

Construimos ahora un dataframe con el código ascii del primer caracter del nombre (sí, no tiene ninguna utilidad aparente pero es que así practicamos)

In [None]:
df_test = df_pyspark_6.withColumn("New_Column", ascii(col("Name")))

In [None]:
df_test.show()

Y ahora algo con más sentido obtener el salario medio:

In [None]:
valores = df_pyspark_6.select(mean(col("Salary")))

In [None]:
type(valores)

In [None]:
valores.printSchema()

In [None]:
valores.show()

#### User-Defined Function (udf): Funciones definidas por el usuario

Al igual que usando el método apply en pandas podíamos aplicar una función definida por nosotros mismos a una o varias columnas, una **UDF (User-Defined Function)** en PySpark es una función definida por el usuario que se puede aplicar a columnas de un DataFrame de Spark. Las UDFs permiten realizar operaciones personalizadas en los datos que no están disponibles de forma nativa en las funciones de PySpark.

Tienen alguna particularidad como es el registro de la función, por eso vamos a ver paso a paso como crear y usar una udf:
1. **Definir la Función en Python**: Escribes una función en Python que realice la operación deseada.
2. **Registrar la Función como UDF**: Usas `pyspark.sql.functions.udf` para convertir la función de Python en una UDF.
3. **Aplicar la UDF**: Aplicas la UDF a las columnas de un DataFrame de Spark.

Definimos una función que va a añadir un prefijo "bootcamp" a una columna string y a convertirla en 0 si es un número

In [None]:
def to_test(name):
    if type(name) == str:
        return f"bootcamp_{name}"
    else:
        return "0"

Ahora hay que registrarla

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Registrar la función como UDF
to_test_udf = udf(to_test, StringType())

Fijate que le decimos el tipo que retorna la función (Spark está basado en Scala y este lenguaje es mucho más riguroso con los tipos que python y exige este tipo de prevención respecto al tipo de salida, además de que casi todo es inmutable, con cierta flexibilidad)

Apliquemos la udf:

In [None]:
# Aplicar la UDF
df_with_upper = df_pyspark_6.withColumn("name_upper", to_test_udf(df_pyspark_6["Name"])) 

# Mostrar los resultados
df_with_upper.show()

In [None]:
df_with_upper = df_pyspark_6.withColumn("salary_upper", to_test_udf(df_pyspark_6["Salary"]))
df_with_upper.show() 

In [None]:
df_with_upper.printSchema()

Y con esto terminamos la sesión introductoria a la sintaxis y al manejo más básico de dataframes Spark. En las siguientes sesiones veremos como construir un modelo supersencillo.

In [None]:
spark.stop() # Por ser un poquito ordenado, como nos ocurria con las bases de datos, una vez hemos terminado el trabajo es muy conveniente cerrar las sesiones