![miad4.png](Archivos/miad4.png)

# Extracción, transformción y carga de datos: `pyspark`

<!--# ETL y principios de _Big Data_ ????-->

Extraer, transformar y cargar (ETL por sus siglas en inglés), describe un proceso en tres etapas:

1. obtener datos de una o más fuentes; <br>
2. transformar los datos, sea haciéndoles limpieza, combinándolos o añadiendo registros; <br>
3. grabar los datos, sea cargándolos a su misma fuente, persistiéndolos en archivos locales o alimentando consumidores que dependen de nuestros resultados (aplicaciones _downstream_).

Hasta ahora hemos tratado fuentes de datos almacenadas localmente en archivos de texto o formatos de herramientas estadísticas como Stata. La realidad es que los datos existen en un sinfín de contextos, casi siempre, en formatos altamente especializados que exigen procesos especializados de extracción, como SQL (_Structured Query Language_) para bases de datos relacionales. También nos hemos enfocado solo en consumir los datos sin preocuparnos mucho acerca de cómo podemos hacer nuestros resultados disponibles en ocasiones futuras o para otros usuarios.

Las bases de datos, como las hemos trabajado, existen únicamente en la memoria volátil de nuestro computador y no persisten de una sesión de Python a la siguiente. Podemos hacer persistir los cambios si los grabamos en archivos tipo `.txt` o `.csv`, pero esto no es manejable a escala o, por ejemplo, cuando queremos desagregar los datos y distribuirlos en distintas tablas para que sean compatibles con operaciones del álgebra relacional (en esencia, lo que hacen las funciones `join` o `merge` de `pandas`).

En este tutorial exploraremos el uso de la librería `pyspark` para el manejo de bases de datos relacionales, en el contexto de procesos de ETL para analítica de datos.

## Requisitos

Para desarrollar este tutorial necesitarás:

* Importar y exportar archivos de texto en formato `.txt` o `.csv` por medio de un *file handle*. <br>
* Utilizar operaciones sencillas y vectorizadas en `numpy` y `pandas`. <br>
* Crear, consultar y utilizar métodos para explorar y manipular objetos tipo `DataFrame` en `pandas`. <br>

## Objetivos

Al final de este tutorial podrás:

**1.** Distinguir situaciones en las que resulta más beneficioso utilizar herramientas de ETL como `pyspark`.<br>
**2.** Reconocer estructuras de datos que distribuyen la carga de sus operaciones en procesos paralelos.<br>
**3.** Extraer y transformar tablas de bases de datos relacionales con operaciones de algebra relacional. <br>
**4.** Crear y cargar tablas a bases de datos relacionales.

## 1. Entorno de desarrollo en Apache Spark

La fundación de _software_ Apache es una comunidad dedicada al desarrollo de herramientas _open source_, entre estas, Spark: un motor de procesamiento de datos altamente eficiente. Spark está diseñado para desplegarse en entornos de cómputo distribuido (_cluster_) y paraleliza sus operaciones de manera implícita, lo que lo hace ideal para el procesamiento de volúmenes altos de datos (_Big Data_). Cualquier aplicación puede llegar a imlementar Spark en su flujo de datos por medio de sus APIs (_Application Program Interface_) para distintos lenguajes de programación, como `pyspark` para Python o `SparkR` para R.

Así como Spark es una aplicación externa a Python a la que accedemos por medio de su API, debemos importar la API, configurar la sesión y luego inicializar la aplicación. Nos enfocaremos en el módulo `sql` para procesos de ETL con bases de datos relacionales.

In [1]:
# Importamos el metodo SparkSession para configurar e inicializar una instancia de Spark SQL
from pyspark.sql import SparkSession

# Inicializamos la aplicacion
spark = SparkSession.builder \
      .master("local[*]") \
      .appName("Instancia_Tutorial_PySpark") \
      .getOrCreate()

# Imprimimos el objeto que contiene la sesión
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/02 04:13:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


El método `master` nos permite definir el contexto de paralelización de los procesos. Veamos algunos parámetros que puede recibir:

* `"local[n]"` le indica a la aplicación que debe ejecutarse en el mismo computador, con `n` cantidad de procesos para paralelizar las tareas; <br><br>

* `"Yarn"` (u otro nombre de un administrador de _clusters_), le indican a la aplicación que debe ejecutarse en un _cluster_, el cual tendríamos que configurar.

Al hacer clic en el enlace del _output_ de la celda (Spark UI), podrás acceder a una interfaz web, igual a la de la imágen, con detalles de la aplicación. Esto funciona solo si expones a la red el puerto de la aplicación o si ejecutas el programa en tu propio computador, ya que necesitas acceso al servicio web desplegado por Spark.

![Spark_GUI.png](Archivos/Spark_GUI.png)

## 2. Estructuras de datos en `pyspark`

Para facilitar la ejecución de tareas en paralelo, Spark incluye dos estructuras de datos que permiten distribuir sus operaciones sobre distintos objetos en memoria.

### 2.1 RDDs (_Resilient Distributed Dataset_)

Un RDD es la representación de una colección de objetos que pueden distribuirse en distintos procesos o incluso almacenarse en distintos nodos de nuestro _cluster_. Resultan bastante útiles y eficientes para procesar grandes cantidades de datos, pero resultan poco convenientes si nuestra intención es hacer cambios sobre los datos ya que son inmutables (una vez creados, no pueden editarse).

Esta representación es poco restrictiva en cuanto a qué consideramos un objeto de la colección. Para nuestras intenciones, podríamos pensar en un RDD como un `DataFrame` de `pandas` cuyas filas hemos almacenado en distintas variables. Alternativamente, podríamos particionar sobre las columnas, en cuyo caso tendríamos una colección de objetos tipo `Series` o, si una partición toma más de una columna, una colección de objetos tipo `DataFrame`.

#### Declaración

Podemos declarar un RDD a partir de diferentes estructuras de datos. A continuación, vemos un ejemplo de cómo declarar a partir de una lista de listas:

In [2]:
data_list = [["Programa_1", 2000, 4500, "LAN"],
             ["Programa_2", 3401, 7000, "LAS"],
             ["Programa_3", 50,   7000, "LAS"],
             ["Programa_4", 7850, 3300, "USW"],
             ["Programa_5", 8000, 3505, "LAN"]]

rdd = spark.sparkContext.parallelize(data_list)
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

Nota que al intentar imprimir el RDD, nos encontramos con que la representación en el _output_ de la celda no muestra el contenido. Esto se debe a que las operaciones sobre los RDD son _lazy_ (perezosas) y no se ejecutarán hasta que sea absolutamente necesario. Este esquema de ejecución diferida permite optimizar de manera anticipada las consultas y operaciones para minimizar el tiempo de ejecución y la ocupación de la memoria.

#### Operaciones y consulta

Por ahora, basta con saber que hay acciones y transformaciones. Bajo ninguna circunstancia estas operaciones modificarán el RDD original; en su lugar, Spark crea una copia del RDD con los cambios necesarios una vez se hayan realizado las operaciones cargadas de manera perezosa.

Veámos a continuación un ejemplo de cómo operar sobre los elementos de la colección.

In [3]:
# Declaramos una funcion que tome los elementos con indice 1 y 2 de un iterable y los multiplique.
def mult(x):
    return x[1] * x[2]

# La transformacion map toma cada objeto contenido en el RDD y ejecuta la funcion que recibe por parametro.
nuevo_rdd = rdd.map(mult)
nuevo_rdd

PythonRDD[1] at RDD at PythonRDD.scala:53

La variable `nuevo_rdd` contiene un RDD distinto a `rdd` para el cual aún no se ha efectuado la transformación. Hacemos uso del método `collect` para ejecutar las operaciones pendientes y recolectar los resultados en Python.

In [5]:
rdd.collect()

[['Programa_1', 2000, 4500, 'LAN'],
 ['Programa_2', 3401, 7000, 'LAS'],
 ['Programa_3', 50, 7000, 'LAS'],
 ['Programa_4', 7850, 3300, 'USW'],
 ['Programa_5', 8000, 3505, 'LAN']]

In [6]:
nuevo_rdd.collect()

[9000000, 23807000, 350000, 25905000, 28040000]

Tener que recolectar los objetos de RDD resulta útil en escenarios donde debemos procesar nuestros datos en distintas etapas y no nos interesa el resultado de operaciones intermedias. La ejecución perezosa de las transformaciones nos permite concatenarlas sin ocupar memoria de manera redundante entre operaciones y optimiza de manera automática e inteligente las instrucciones para no repetir tareas redundantes.

### 2.2 Objeto `DataFrame`

Similar a los `DataFrame` de `pandas`, los `DataFrame` en `pyspark` son una colección mutable de datos organizados por columnas. A diferencia de `pandas`, que opera de manera secuencial, la implementación de `pyspark` almacena las filas de manera distribuida y opera sobre ellas de manera paralelizada. 

#### Declaración

Podemos declarar un `DataFrame` a partir de diferentes estructuras de datos. A continuación, vemos un ejemplo de cómo declarar a partir de una lista de listas:

In [7]:
columns = ["Nombre", "Descargas", "Lineas_de_codigo", "Region"]
df = spark.createDataFrame(data=data_list, schema=columns)
df

DataFrame[Nombre: string, Descargas: bigint, Lineas_de_codigo: bigint, Region: string]

El parámetro `schema` (esquema) hace referencia a los campos de una estructura de datos. En el caso de nuestra lista de listas, el esquema es el nombre de las columnas. Si tuvieramos un diccionario de listas, el esquema serían sus llaves.

`pyspark` incluye métodos dedicados a cargar datos desde una gran variedad de formatos. Consideremos, por lo pronto, un format con el que estemos familiarizados.

In [8]:
df_covid_19 = spark.read.option("header","true").csv("Archivos/BID-Cornell.csv")#, index_col = 0)
df_covid_19.printSchema()

root
 |-- id: string (nullable = true)
 |-- medios_noti_redessociales: string (nullable = true)
 |-- medios_noti_chat: string (nullable = true)
 |-- medios_noti_periodicos: string (nullable = true)
 |-- medios_noti_tv: string (nullable = true)
 |-- medios_noti_radio: string (nullable = true)
 |-- medios_covid_redessociales: string (nullable = true)
 |-- medios_covid_chat: string (nullable = true)
 |-- medios_covid_periodicos: string (nullable = true)
 |-- medios_covid_tv: string (nullable = true)
 |-- medios_covid_radio: string (nullable = true)



#### Operaciones y consulta

Tenemos distintas alternativas para consultar el contenido de nuestros `DataFrame`. Podemos utilizar el método `show` para imprimir de manera estilizada la tabla o los métodos `head` y `tail` para retornar las primeras o últimas filas de la tabla.

In [25]:
df.show()

+----------+---------+----------------+------+
|    Nombre|Descargas|Lineas_de_codigo|Region|
+----------+---------+----------------+------+
|Programa_1|     2000|            4500|   LAN|
|Programa_2|     3401|            7000|   LAS|
|Programa_3|       50|            7000|   LAS|
|Programa_4|     7850|            3300|   USW|
|Programa_5|     8000|            3505|   LAN|
+----------+---------+----------------+------+



In [32]:
df.head(3)

[Row(Nombre='Programa_1', Descargas=2000, Lineas_de_codigo=4500, Region='LAN'),
 Row(Nombre='Programa_2', Descargas=3401, Lineas_de_codigo=7000, Region='LAS'),
 Row(Nombre='Programa_3', Descargas=50, Lineas_de_codigo=7000, Region='LAS')]

In [31]:
df.tail(2)

[Row(Nombre='Programa_4', Descargas=7850, Lineas_de_codigo=3300, Region='USW'),
 Row(Nombre='Programa_5', Descargas=8000, Lineas_de_codigo=3505, Region='LAN')]

Cabe notar que por medio de los métodos `head` y `tail`, obtenemos objetos de tipo `Row` que son los que ditribuye Spark para paralelizar las operaciones. El método `limit` nos permite persitir el tipo `DataFrame` como resultado de la consulta.

In [38]:
df.limit(3).show()

+----------+---------+----------------+------+
|    Nombre|Descargas|Lineas_de_codigo|Region|
+----------+---------+----------------+------+
|Programa_1|     2000|            4500|   LAN|
|Programa_2|     3401|            7000|   LAS|
|Programa_3|       50|            7000|   LAS|
+----------+---------+----------------+------+



En ocasiones puede ser deseable convertir entre los `DataFrame` de `pyspark` a su contraparte de `pandas`. Transformar un `DataFrame` de `pyspark` a `pandas` es solo recomendable si se cuenta con suficiente almacenamiento o memoria. Los altos volúmenes de datos que manejamos en `pyspark` con gran eficiencia en memoria y procesamiento pueden no ser compatibles con otras librerías.

In [39]:
df.

TypeError: withColumn() missing 1 required positional argument: 'col'

In [36]:
pandas_df = df.toPandas()
pandas_df

Unnamed: 0,Nombre,Descargas,Lineas_de_codigo,Region
0,Programa_1,2000,4500,LAN
1,Programa_2,3401,7000,LAS
2,Programa_3,50,7000,LAS
3,Programa_4,7850,3300,USW
4,Programa_5,8000,3505,LAN


## Referencias

Referencia 1

## Créditos

**Autores:** Alejandro Mantilla Redondo

**Fecha última actualización:** 10/07/2022