# Introducción
En este Notebook aprenderemos las operaciones básicas más utilizadas en PySpark, incluyendo un ejercicio práctico en el que realizaremos una pequeña ETL para extraer unos datos desde una API y los volcaremos en el catálogo de datos de Spark.

# Importación de módulos requeridos
En primer lugar necesitamos importar las funciones y objetos requeridos para la implementación.

- `SparkSession`: objeto necesario para la interacción con la herramienta de Spark a través de Python.
- `pyspark.sql.functions`: funciones de SQL que ofrece pyspark, necesarias para las transformaciones de los datos en la ETL.
- `fetch_api, save_json`: estas funciones están definidas dentro de nuestra propia librería llamada `blackops`. Contienen el código necesario para extraer y almacenar los datos de la API.
- `date, timedelta`: funciones para crear objetos de tipo fecha y timestamp dentro de Python.
- `random`: módulo utilizado para la generación de datos aleatorios.
- `DeltaTable`: objeto para interaccionar con tablas de tipo Delta. Se trata de un formato ampliamente utilizado en Spark, que ofrece muchas funcionalidades añadidas a nuestro catálogo de datos, como por ejemplo la posibilidad de revertir cambios.

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from blackops.crawlers.wallapop.functions import fetch_api, save_json
from blackops.utils.catalog import get_detailed_tables_info
from datetime import date, timedelta
import random
from delta import DeltaTable

Establecemos una semilla para la generación de números aleatorios. De esta manera, los resultados serán reproducibles

In [2]:
random.seed(45)

# Inicialización de la sesión de Spark

Establecemos ahora la comunicación con el motor de Spark desde Python, a través del objeto `SparkSession` de la librería `pyspark`.

En este caso de prueba no estamos utilizando un clúster, sino que haremos uso de una arquitectura local. El propio Jupyter Notebook ejercerá como Driver, como Master y como Ejecutor de las tareas.

Adicionalmente, estamos instalando dependencias externas como la librería Delta, que incorpora utilidades muy importantes para el manejo de las tablas en nuestro catálogo de datos (histórico de versiones de tablas, omisión de ficheros innecesarios en la lectura, etc.)

In [3]:
spark = (
    SparkSession.Builder()
    .master("local[*]")
    .config(
        map={
            "spark.driver.memory": "8g",
            "spark.jars.packages": "io.delta:delta-spark_2.12:3.2.0",
            "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
            "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
            "spark.databricks.delta.retentionDurationCheck.enabled": "false",
            "spark.sql.catalogImplementation": "hive",
            "spark.sql.repl.eagerEval.enabled": "true",
            "spark.sql.repl.eagerEval.truncate": "100",
        }
    )
    .getOrCreate()
)

24/09/28 12:32:29 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.40 instead (on interface enp3s0)
24/09/28 12:32:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/dadiego/.ivy2/cache
The jars for the packages stored in: /home/dadiego/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d838f30e-1916-4405-9584-1a550198d467;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/dadiego/projects/ESIC/esic-bigdata-iv-blackops/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 108ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-d838f30e-1916-4405-9584-1a550198d467
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/3ms)
24/09/28 12:32:30

Una vez se ha inicializado la sesión, podemos acceder a la web `localhost:4040` para consultar la interfaz de administración que ofrece Spark. Allí, se podrá monitorizar las tareas que se mandan desde el Driver.

**Nota**: Si al inicializar la sesión de Spark obtenemos algún error en el que se nos indica que la variable `JAVA_HOME` no existe, lo más probable es que no tengamos instalado Java en nuestro sistema, y necesitamos instalarlo ya que Spark depende de Java para su funcionamiento. Para ello, en Linux podemos utilizar el gestor de paquetes: `sudo apt update && sudo apt install openjdk-17-jdk -y`.

# Creación de un DataFrame de Spark

En Spark podemos crear directamente un Dataframe a partir de una lista de datos, o bien de un Dataframe de pandas. Para ello se puede utilizar el método `spark.createDataFrame`.
Debemos especificar tanto los datos como el esquema que tiene el Dataframe (sus columnas y sus tipos).

En este caso hacemos uso del paquete `random` para generar datos aleatorios (pero reproducibles, al haber establecido una semilla).

In [4]:
# Podemos especificar el esquema del DataFrame usando una cadena de texto
schema = "id INT, nombre STRING, edad INT, salario FLOAT, es_empleado BOOLEAN, fecha_contratacion DATE, departamento STRING"

# Crear una lista de datos ficticios
nombres = [
    "Juan",
    "María",
    "Pedro",
    "Ana",
    "Luis",
    "Carla",
    "Miguel",
    "Sara",
    "David",
    "Laura",
]
departamentos = ["Ventas", "Marketing", "Finanzas", "IT", "RRHH"]

data = [
    (
        i,  # id
        random.choice(nombres),  # nombre
        random.randint(22, 60),  # edad
        round(random.uniform(20000, 80000), 2),  # salario
        random.choice([True, False]),  # es_empleado
        date(2024, 10, 1)
        - timedelta(days=random.randint(0, 3650)),  # fecha_contratacion
        random.choice(departamentos),  # departamento
    )
    for i in range(1, 31)  # Genera 30 registros aleatorios
]

# Crear el DataFrame usando el esquema en string
df = spark.createDataFrame(data, schema)

# Creamos una vista temporal del DataFrame en el catálogo, para poder hacer consultas en SQL.
df.createOrReplaceTempView("empleados")

# Mostramos el DataFrame resultante por pantalla
display(df)

                                                                                

id,nombre,edad,salario,es_empleado,fecha_contratacion,departamento
1,Luis,48,49281.69,True,2021-05-09,Finanzas
2,Juan,26,49055.36,True,2021-07-27,Ventas
3,Luis,24,73947.68,True,2023-03-28,Finanzas
4,Pedro,35,79554.92,False,2023-11-29,RRHH
5,Miguel,31,62027.07,True,2022-10-27,Finanzas
6,Luis,44,66505.58,False,2023-09-13,IT
7,María,23,65070.76,False,2015-12-18,IT
8,David,56,72059.12,False,2018-02-06,Ventas
9,Sara,45,74705.86,False,2023-01-02,Ventas
10,Ana,53,62691.89,False,2021-06-16,Marketing


# Operaciones de transformación

La sintaxis de Spark es muy similar a la del lenguaje SQL, de hecho, admite la introducción de comandos SQL para realizar las transformaciones de los datos. Vamos a ver algunas de las operaciones más habituales.

### Select

La operación más sencilla consiste en seleccionar simplemente un subconjunto de los datos, sin ninguna otra operación de transformación o filtro añadido. Por ejemplo, seleccionemos únicamente los campos `id` y `nombre`.

In [5]:
df.select("id", "nombre")

id,nombre
1,Luis
2,Juan
3,Luis
4,Pedro
5,Miguel
6,Luis
7,María
8,David
9,Sara
10,Ana


Al igual que en SQL estándar, podemos no solo seleccionar unas columnas sino aplicarles alguna función de transformación dentro del propio comando SELECT, y renombrarlas utilizando un alias.

Las funciones SQL en Spark están contenidas en el módulo `pyspark.sql.functions`, que hemos importado al principio y lo hemos almacenado en un objeto con alias `f` (por sencillez de uso).

Vamos a seleccionar en este caso los mismos campos que en el ejemplo anterior, sin embargo, al campo `nombre` le vamos a aplicar una transformación para visualizar el nombre en mayúsculas, y al resultado lo renombraremos `nombre_en_mayusculas`.

In [6]:
df.select("id", f.upper("nombre").alias("nombre_en_mayusculas"))

id,nombre_en_mayusculas
1,LUIS
2,JUAN
3,LUIS
4,PEDRO
5,MIGUEL
6,LUIS
7,MARÍA
8,DAVID
9,SARA
10,ANA


### WithColumn

Podemos añadir campos nuevos derivados a partir de otros campos utilizando el método `withColumn`. Este comando conservará todas las columnas de la tabla, y añadirá una adicional, con las transformaciones que le indiquemos.

Este método opera fila a fila, es decir, aplicará las transformaciones correspondientes registro a registro.

Por ejemplo, en nuestra tabla disponemos del campo `edad`, pero supongamos que nos interesa, para nuestra analítica, disponer de un campo con el año de nacimiento. En tal caso, podríamos concatenar dos funciones SQL: con la primera, `current_date`, extraemos la fecha actual, y sobre dicha fecha aplicamos la función `year` para extraer el año. Finalmente, a este año actual le restamos la edad que tiene el usuario para así calcular su año de nacimiento. Cada usuario dispondrá así de un año de nacimiento (transformación fila a fila).

In [7]:
df.withColumn("año_nacimiento", f.year(f.current_date()) - f.col("edad"))

id,nombre,edad,salario,es_empleado,fecha_contratacion,departamento,año_nacimiento
1,Luis,48,49281.69,True,2021-05-09,Finanzas,1976
2,Juan,26,49055.36,True,2021-07-27,Ventas,1998
3,Luis,24,73947.68,True,2023-03-28,Finanzas,2000
4,Pedro,35,79554.92,False,2023-11-29,RRHH,1989
5,Miguel,31,62027.07,True,2022-10-27,Finanzas,1993
6,Luis,44,66505.58,False,2023-09-13,IT,1980
7,María,23,65070.76,False,2015-12-18,IT,2001
8,David,56,72059.12,False,2018-02-06,Ventas,1968
9,Sara,45,74705.86,False,2023-01-02,Ventas,1979
10,Ana,53,62691.89,False,2021-06-16,Marketing,1971


### Filter

Podemos filtrar los datos de acuerdo a alguna condición especificada. Esta sentencia se corresponde con el comando `WHERE` en SQL. Por ejemplo, queremos obtener únicamente los datos de los empleados. 

Recordemos que en Python el operador de igualdad es `==`.

Para poder realizar operaciones con columnas, necesitamos especificar que se trata de una columna del DataFrame haciendo uso de la función `col`, puesto que si no lo que estaríamos es comparando un string con un booleano (`"es_empleado" == True`), que será siempre igual a `False`.

In [8]:
df.filter(f.col("es_empleado") == True)

# Si hacemos df.filter("es_empleado" == True) obtendremos un error porque los tipos no son los esperados.

id,nombre,edad,salario,es_empleado,fecha_contratacion,departamento
1,Luis,48,49281.69,True,2021-05-09,Finanzas
2,Juan,26,49055.36,True,2021-07-27,Ventas
3,Luis,24,73947.68,True,2023-03-28,Finanzas
5,Miguel,31,62027.07,True,2022-10-27,Finanzas
11,Miguel,46,70126.54,True,2018-03-14,IT
12,David,27,53608.56,True,2014-11-09,RRHH
17,Ana,53,21659.3,True,2018-01-19,Finanzas
18,Sara,28,30491.18,True,2015-12-13,Ventas
23,Carla,50,73172.89,True,2015-08-21,IT
25,Sara,59,59973.55,True,2018-04-23,Marketing


### Agrupaciones

Utilizando el comando group by, podemos agrupar nuestro dataset según los valores de una o varias columnas y posteriormente realizar una operación de agregación sobre cada conjunto, para así obtener estadísticas descriptivas de nuestros datos.

Por ejemplo, podemos obtener el número de empleados en marketing, con lo cual debemos agrupar por departamento y realizar una operación de agregación de suma. Estas operaciones se denominan "de agregación" o "de reducción" porque actúan sobre un conjunto de filas (todas aquellas que comparten el mismo valor del grupo) y devuelven un único valor

In [9]:
df.groupBy("departamento").agg(f.sum("salario").alias("salario_total"))

departamento,salario_total
Finanzas,524206.53515625
Ventas,421618.7890625
RRHH,292734.91015625
IT,355377.107421875
Marketing,122665.44140625


### Combinaciones
Naturalmente, la riqueza de PySpark es que podemos combinar filtros con agrupaciones, adición de columnas, cambios de tipos, etc para que nuestro dato final quede pulido.

Al contrario que en Pandas, todas las operaciones de transformación en Spark son *lazy*, es decir, no se evalúan hasta que se pide una acción (resultado). Esto permite que el catalizador de Spark optimice toda la cadena de consultas de la manera más apropiada antes de ser ejecutadas.

Veamos un ejemplo de consulta algo más avanzada: supongamos que queremos conocer cuál es el departamento del que más gente se ha ido a partir de 2017 para unos ciertos intervalos de meses: enero a mayo, junio a septiembre y octubre a diciembre. En este caso podemos comenzar aplicando unos filtros para quedarnos únicamente con registros de los que actualmente ya no son empleados y su fecha de contratación es igual o posterior a 2017. Después de aplicar dicho filtro, podemos añadir dos columnas transitorias para extraer el mes de la fecha de contratación y establecer los intervalos pedidos, utilizando la función `when`, que es esquivalente al `CASE` de SQL. Finalmente, agrupamos por estas categorías de mes y agregamos cogiendo la moda (el valor más repetido de un conjunto de datos).

In [10]:
df.filter(
    (f.col("es_empleado") == False) & (f.year("fecha_contratacion") >= 2017)
).withColumn("mes_contratacion", f.month("fecha_contratacion")).withColumn(
    "categoria_mes",
    f.when(f.col("mes_contratacion").between(1, 5), f.lit("enero-mayo"))
    .when(f.col("mes_contratacion").between(6, 9), f.lit("junio-septiembre"))
    .otherwise(f.lit("octubre-diciembre")),
).groupBy(
    "categoria_mes"
).agg(
    f.mode("departamento").alias("departamento_mas_repetido")
)

categoria_mes,departamento_mas_repetido
octubre-diciembre,RRHH
junio-septiembre,Marketing
enero-mayo,Ventas


La consulta equivalente en Spark SQL en este caso sería la siguiente

In [11]:
spark.sql(
    """
    SELECT
        CASE
            WHEN MONTH(fecha_contratacion) BETWEEN 1 AND 5 THEN 'enero-mayo'
            WHEN MONTH(fecha_contratacion) BETWEEN 6 AND 9 THEN 'junio-septiembre'
            ELSE 'octubre-diciembre'
        END AS categoria_mes,
        MODE(departamento) AS departamento_mas_repetido

    FROM empleados
    WHERE
        es_empleado = false AND
        YEAR(fecha_contratacion) >= 2017
    GROUP BY categoria_mes
    """
)

categoria_mes,departamento_mas_repetido
octubre-diciembre,RRHH
junio-septiembre,Marketing
enero-mayo,Ventas


Como se puede comprobar, se obtienen exactamente los mismos resultados

# Caso práctico: Extracción de datos de Wallapop
Vamos a construir un pequeño ejemplo de una ETL (Extraction Transform Load). Extraeremos datos en crudo desde la API REST de Wallapop, los guardamos en una carpeta de almacenamiento, los leemos con spark, realizamos algunas transformaciones y almacenamos la tabla resultante en nuestro catálogo de datos

In [12]:
try:
    json_data = fetch_api(product="portátil")
    save_json(obj=json_data, path="data/wallapop.json", indent=4)
except Exception as e:
    print(f"Warning: No ha sido posible descargar los datos de la API: {e}")

Podemos previsualizar cuál es la estructura de nuestro fichero JSON utilizando el comando externo `cat` de nuestra terminal (válido únicamente en sistemas Unix, con `jq` instalado).

Si no está instalado `jq`, puede instalarse mediante `sudo apt update && sudo apt install jq -y`

In [13]:
%%sh
cat data/wallapop.json | jq -C | head -20

[1;39m{
  [0m[34;1m"data"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"section"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"payload"[0m[1;39m: [0m[1;39m{
        [0m[34;1m"order"[0m[1;39m: [0m[0;32m"most_relevance"[0m[1;39m,
        [0m[34;1m"title"[0m[1;39m: [0m[0;32m"Find what you want"[0m[1;39m,
        [0m[34;1m"items"[0m[1;39m: [0m[1;39m[
          [1;39m{
            [0m[34;1m"id"[0m[1;39m: [0m[0;32m"pj9mq457kk6e"[0m[1;39m,
            [0m[34;1m"user_id"[0m[1;39m: [0m[0;32m"vjrd1q4rx46k"[0m[1;39m,
            [0m[34;1m"title"[0m[1;39m: [0m[0;32m"Soporte para Portatil / Mesa de Mezclas"[0m[1;39m,
            [0m[34;1m"description"[0m[1;39m: [0m[0;32m"Vendo soporte para Portatil pero tambien vale si tienes una mesa de mezclas pequeña de dj"[0m[1;39m,
            [0m[34;1m"category_id"[0m[1;39m: [0m[0;39m24200[0m[1;39m,
            [0m[34;1m"price"[0m[1;39m: [0m[1;39m{
              [0m[34;1m"amount"[0m[1;39m

Una vez determinada la estructura que tiene nuestro fichero JSON de información, notamos que los datos que queremos obtener se encuentran dentro de la ruta `data -> section -> payload -> items`. Dicha ruta se corresponde con un array (lista) de items, que son los productos de Wallapop; cada uno de ellos tiene unos campos, algunos simples como `id`, `user_id`, y otros compuestos como `price -> amount` o `price -> currency`.

En primer lugar, observemos que si leemos el fichero JSON directamente no obtenemos una estructura muy amigable

In [14]:
wallapop = spark.read.json("data/wallapop.json", multiLine=True)
wallapop.show(truncate=50)

+--------------------------------------------------+--------------------------------------------------+
|                                              data|                                              meta|
+--------------------------------------------------+--------------------------------------------------+
|{{{[{{none}, 24200, 1727516670420, Vendo soport...|{eyJhbGciOiJIUzI1NiJ9.eyJwYXJhbXMiOnsic2VhcmNoU...|
+--------------------------------------------------+--------------------------------------------------+



Esto es porque nos ha cogido las dos primeras claves más externas de nuestro fichero JSON, que son los campos `"data"` y `"meta"`.

Observemos qué estructura hemos cargado haciendo un `printSchema` de nuestro DataFrame. De esta manera obtendremos información de los campos y sus tipos

In [15]:
wallapop.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- section: struct (nullable = true)
 |    |    |-- payload: struct (nullable = true)
 |    |    |    |-- items: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- bump: struct (nullable = true)
 |    |    |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |    |-- category_id: long (nullable = true)
 |    |    |    |    |    |-- created_at: long (nullable = true)
 |    |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |    |-- discount: struct (nullable = true)
 |    |    |    |    |    |    |-- percentage: long (nullable = true)
 |    |    |    |    |    |    |-- previous_price: struct (nullable = true)
 |    |    |    |    |    |    |    |-- amount: double (nullable = true)
 |    |    |    |    |    |    |    |-- currency: string (nullable = true)
 |    |    |    |    |    |-- favorited: struct (nullable = true)
 

Ahora, para navegar a través de nuestro fichero JSON, podemos utilizar la sintáxis por puntos; es decir, para obtener el campo deseado `"items"`, que contiene la información de todos los productos, debemos acceder mediante `data.section.payload.items`.

In [16]:
wallapop.select("data.section.payload.items").show(truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                               items|
+----------------------------------------------------------------------------------------------------+
|[{{none}, 24200, 1727516670420, Vendo soporte para Portatil pero tambien vale si tienes una mesa ...|
+----------------------------------------------------------------------------------------------------+



Sin embargo, seguimos sin apreciar una estructura legible. Esto es porque se nos está mostrando un único registro (fila) que contiene toda la información de los productos. Lo que nos interesa es que cada elemento de esta lista se muestre en un registro a parte. Para ello se utiliza la función SQL `explode`, que coge un array de elementos y devuelve un registro por cada uno de esos elementos. Veámoslo

In [17]:
wallapop.select(f.explode("data.section.payload.items")).show(truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                                 col|
+----------------------------------------------------------------------------------------------------+
|{{none}, 24200, 1727516670420, Vendo soporte para Portatil pero tambien vale si tienes una mesa d...|
|{{none}, 24200, 1727514300255, Teclado Bluetooth portátil plegable muy práctico y con muy poco us...|
|{{none}, 24200, 1727518263833, Portatil HP 250 G8\nOrdenador de 15,6” FullHD\nIntel Core I5-1135G...|
|{{none}, 24200, 1727516192754, Se vende todo por 20 euros, son piezas de portátil diferente marcá...|
|{{none}, 24200, 1727444183886, Ventilador portable para portátiles para la refrigeración de estos...|
|{{none}, 24200, 1727519183576, DVD portatil - BELSON - BS-130806\n\nReproductor Dvd Portatil con ...|
|{{none}, 24200, 1727515228880, Puerto de carga o conector del cargador p

Bien, ya hemos avanzado, disponemos ahora de un registro por cada producto de la lista `items`, como queríamos. Sin embargo, se sigue mostrando toda la información en una misma columna. Eso lo solucionamos seleccionando los campos anidados deseados. Por ejemplo, supongamos que queremos coger el `id` del producto, el `user_id` del usuario y la fecha de creación del anuncio `created_at`. Una buena manera de operar sería crear una nueva columna llamada, por ejemplo, `"data"`, que contenga los registros explotados del campo de `"items"`, y luego utilizar este nuevo campo para obtener la info de los otros campos descendientes

In [18]:
wallapop.withColumn("data", f.explode("data.section.payload.items")).select(
    "data.id", "data.user_id", "data.created_at"
)

id,user_id,created_at
pj9mq457kk6e,vjrd1q4rx46k,1727516670420
vjrqve93pwzk,p61o88mkx5j5,1727514300255
p614184klg65,g0j2gq3y7rzy,1727518263833
0j24k83g4ezy,8x6qd1k8kejy,1727516192754
e658282neg6o,ejkx1revk16x,1727444183886
8j34o8411l69,mxzo770ed2j9,1727519183576
qjw4vrpedqzo,g0j21e1xpvzy,1727515228880
wzv4vgnry1zl,g0j21e1xpvzy,1727515227519
xzo2ogrl8w69,g0j21e1xpvzy,1727515226265
e6582qgvyg6o,g0j21e1xpvzy,1727515224639


Fenomenal. Ahora, siguiendo esta misma operación, obtendremos un dataset completo tabular de los campos del JSON más relevantes. Observemos que para todos los campos seleccionados, se hace una conversión de tipos (método `cast`) y se asigna un alias (método `alias`). Esto es para que la tabla resultante sea consistente, y tenga siempre el mismo esquema de salida.

Notemos también que a campos que representan fechas pero se muestran como números enteros (milisegundos desde 1970, esto se conoce como UNIX time), como `created_at` o `modified_at`, les aplicamos una conversión mediante la función `from_unixtime` para representarlos como una fecha legible

In [19]:
wallapop = (
    spark.read.json("data/wallapop.json", multiLine=True, primitivesAsString=True)
    .withColumn("data", f.explode("data.section.payload.items"))
    .select(
        f.col("data.id").cast("string").alias("id"),
        f.col("data.title").cast("string").alias("title"),
        f.col("data.user_id").cast("string").alias("user_id"),
        f.col("data.category_id").cast("int").alias("category_id"),
        f.from_unixtime((f.col("data.created_at") / 1000))
        .cast("timestamp")
        .alias("created_at"),
        f.from_unixtime(f.col("data.modified_at") / 1000)
        .cast("timestamp")
        .alias("modified_at"),
        f.col("data.description").cast("string").alias("description"),
        f.col("data.discount.percentage").cast("int").alias("percentage"),
        f.col("data.discount.previous_price.amount")
        .cast("double")
        .alias("previous_price_amount"),
        f.col("data.discount.previous_price.currency")
        .cast("string")
        .alias("previous_price_currency"),
        f.col("data.favorited.flag").cast("boolean").alias("favorited"),
        f.col("data.is_favoriteable.flag").cast("boolean").alias("is_favoriteable"),
        f.col("data.is_refurbished.flag").cast("boolean").alias("is_refurbished"),
        f.col("data.location.latitude").cast("double").alias("latitude"),
        f.col("data.location.longitude").cast("double").alias("longitude"),
        f.col("data.location.postal_code").cast("string").alias("postal_code"),
        f.col("data.location.city").cast("string").alias("city"),
        f.col("data.location.region").cast("string").alias("region"),
        f.col("data.location.region2").cast("string").alias("region2"),
        f.col("data.location.country_code").cast("string").alias("country_code"),
        f.col("data.price.amount").cast("double").alias("amount"),
        f.col("data.price.currency").cast("string").alias("currency"),
        f.col("data.reserved.flag").alias("reserved"),
        f.col("data.shipping.item_is_shippable")
        .cast("boolean")
        .alias("item_is_shippable"),
        f.col("data.shipping.user_allows_shipping")
        .cast("boolean")
        .alias("user_allows_shipping"),
        f.current_timestamp().alias("__timestamp"),
    )
)
wallapop.printSchema()
display(wallapop)

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- modified_at: timestamp (nullable = true)
 |-- description: string (nullable = true)
 |-- percentage: integer (nullable = true)
 |-- previous_price_amount: double (nullable = true)
 |-- previous_price_currency: string (nullable = true)
 |-- favorited: boolean (nullable = true)
 |-- is_favoriteable: boolean (nullable = true)
 |-- is_refurbished: boolean (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region2: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- reserved: string (nullable = true)
 |-- item_is_shippab

24/09/28 12:32:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


id,title,user_id,category_id,created_at,modified_at,description,percentage,previous_price_amount,previous_price_currency,favorited,is_favoriteable,is_refurbished,latitude,longitude,postal_code,city,region,region2,country_code,amount,currency,reserved,item_is_shippable,user_allows_shipping,__timestamp
pj9mq457kk6e,Soporte para Portatil / Mesa de Mezclas,vjrd1q4rx46k,24200,2024-09-28 11:44:30,2024-09-28 11:44:41,Vendo soporte para Portatil pero tambien vale si tienes una mesa de mezclas pequeña de dj,,,,False,True,False,40.3007871,-3.4378134,28500,Arganda del Rey,Comunidad de Madrid,Madrid,ES,10.0,EUR,False,True,False,2024-09-28 12:32:43.642223
vjrqve93pwzk,Teclado Bluetooth portátil plegable,p61o88mkx5j5,24200,2024-09-28 11:05:00,2024-09-28 11:05:10,Teclado Bluetooth portátil plegable muy práctico y con muy poco uso.,,,,False,True,False,40.388187529781774,-3.713019692257435,28026,Madrid,Comunidad de Madrid,Madrid,ES,9.0,EUR,False,True,True,2024-09-28 12:32:43.642223
p614184klg65,Portatil HP,g0j2gq3y7rzy,24200,2024-09-28 12:11:03,2024-09-28 12:11:14,"Portatil HP 250 G8\nOrdenador de 15,6” FullHD\nIntel Core I5-1135G7\nDisco duro SSD NVME 256 gb\n...",,,,False,True,False,40.48378014095072,-3.86560871014634,28230,Las Rozas de Madrid,Comunidad de Madrid,Madrid,ES,250.0,EUR,False,True,True,2024-09-28 12:32:43.642223
0j24k83g4ezy,Piezas de portátil,8x6qd1k8kejy,24200,2024-09-28 11:36:32,2024-09-28 11:36:48,"Se vende todo por 20 euros, son piezas de portátil diferente marcás",,,,False,True,False,40.4315752,-3.6276283,28037,Madrid,Comunidad de Madrid,Madrid,ES,20.0,EUR,False,True,False,2024-09-28 12:32:43.642223
e658282neg6o,ventilador portátil,ejkx1revk16x,24200,2024-09-27 15:36:23,2024-09-27 15:36:34,Ventilador portable para portátiles para la refrigeración de estos mismos. tiene unos años pero v...,,,,False,True,False,40.23246875179884,-3.979483127852885,28607,El Álamo,Comunidad de Madrid,Madrid,ES,5.0,EUR,False,True,False,2024-09-28 12:32:43.642223
8j34o8411l69,Dvd Portatil - Belson,mxzo770ed2j9,24200,2024-09-28 12:26:23,2024-09-28 12:32:15,DVD portatil - BELSON - BS-130806\n\nReproductor Dvd Portatil con pantalla TFT 7”\n\nFormatos DVD...,,,,False,True,False,40.34486734204959,-3.738026356054821,28916,Leganés,Comunidad de Madrid,Madrid,ES,35.0,EUR,False,True,True,2024-09-28 12:32:43.642223
qjw4vrpedqzo,Recambio Ordenador Portátil,g0j21e1xpvzy,24200,2024-09-28 11:20:28,2024-09-28 11:20:39,Puerto de carga o conector del cargador para ACER SWIFT SF315-41 R69U. Hago envíos y acepto bizum.\n,,,,False,True,False,40.4269821,-3.6963103,28010,Madrid,Comunidad de Madrid,Madrid,ES,4.99,EUR,False,True,True,2024-09-28 12:32:43.642223
wzv4vgnry1zl,Recambio Ordenador Portátil,g0j21e1xpvzy,24200,2024-09-28 11:20:27,2024-09-28 11:20:37,Disipador de cobre para ACER SWIFT SF315-41 R69U. Hago envíos y acepto bizum.\n,,,,False,True,False,40.4269821,-3.6963103,28010,Madrid,Comunidad de Madrid,Madrid,ES,3.99,EUR,False,True,True,2024-09-28 12:32:43.642223
xzo2ogrl8w69,Recambio Ordenador Portátil,g0j21e1xpvzy,24200,2024-09-28 11:20:26,2024-09-28 11:20:36,"Placa con puertos USB 2.0, tarjetas SD y LED's de encendido y carga para ACER SWIFT SF315-41 R69U...",,,,False,True,False,40.4269821,-3.6963103,28010,Madrid,Comunidad de Madrid,Madrid,ES,4.99,EUR,False,True,True,2024-09-28 12:32:43.642223
e6582qgvyg6o,Recambio Ordenador Portátil,g0j21e1xpvzy,24200,2024-09-28 11:20:24,2024-09-28 11:20:34,Set de altavoces para ACER SWIFT SF315-41 R69U. Hago envíos y acepto bizum.\n,,,,False,True,False,40.4269821,-3.6963103,28010,Madrid,Comunidad de Madrid,Madrid,ES,7.99,EUR,False,True,True,2024-09-28 12:32:43.642223


Genial, ya hemos leído y transformado nuestro set de datos inicialmente desestructurado en una tabla bien estructurada, con unos campos y tipos fijados.

Ahora, para completar la ETL, almacenaremos esta tabla en el catálogo de datos de Spark. En este caso, al estar trabajando de manera local, dicho catálogo de datos estará localizado en esta misma ruta (`metastore_db/`, `spark-warehouse/`, `derby.log`), sin embargo, cuando trabajemos en un entorno corporativo, habitualmente el catálogo de datos se alojará en una arquitectura cloud, como AWS, Azure, GCP, Databricks, etc.

Aunque no es necesario, es una buena práctica crear en primera instancia la tabla Delta sobre la que escribiremos nuestro dataset, con un schema concreto, metadatos, etc.

In [20]:
dt = (
    DeltaTable.createIfNotExists(spark)
    .addColumns(wallapop.schema)
    .tableName("wallapop")
    .comment("Tabla de productos de Wallapop")
    .execute()
)

24/09/28 12:32:44 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/09/28 12:32:44 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/09/28 12:32:45 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/09/28 12:32:45 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore dadiego@127.0.1.1
24/09/28 12:32:45 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
24/09/28 12:32:47 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`default`.`wallapop` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
24/09/28 12:32:47 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is s

Con el método `createIfNotExists` estamos indicando que cree únicamente la tabla en el catálogo de datos si esta no existía previamente.

Una vez creada la tabla, insertaremos los datos de nuestro DataFrame mediante una operación `merge`. Para ello, identificamos en primer lugar cuáles son los campos que consituyen una clave primaria en la tabla, es decir, un identificador único de cada registro. En este caso, podríamos utilizar por ejemplo la combinación `"id"`, `"user_id"`; cuando estos campos coincidan entre la tabla fuente y la tabla destino, actualizaremos los registros en el destino, y cuando no coincidan, insertaremos los registros de la fuente en el destino. Esto lo podemos hacer utilizando los métodos del objeto `DeltaTable` dentro de la librería externa `delta-spark` que tenemos instalada en nuestro entorno virtual de Python.

In [21]:
(
    dt.alias("target")
    .merge(
        wallapop.alias("source"),
        "source.id = target.id AND source.user_id = target.user_id",
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

                                                                                

Ya hemos creado la tabla delta y hemos insertado los registros tabulados de la API de Wallapop en la misma. Ahora podemos consultar el catálogo mediante SQL.

Primero, utilizaremos una función auxiliar de nuestro propio paquete de Python creado (`blackops`), llamada `get_detailed_tables_info`, para obtener información detallada de todas las tablas de nuestro catálogo de datos

In [22]:
get_detailed_tables_info(spark)

24/09/28 12:32:51 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


namespace,tableName,Unnamed: 2,Catalog,Comment,Created By,Created Time,Database,InputFormat,Last Access,Location,OutputFormat,Owner,Partition Provider,Provider,Serde Library,Table,Type
,empleados,,,,Spark,Sat Sep 28 12:32:32 CEST 2024,,,UNKNOWN,,,,,,,empleados,VIEW
default,wallapop,,spark_catalog,Tabla de productos de Wallapop,Spark 3.5.3,Sat Sep 28 12:32:47 CEST 2024,default,org.apache.hadoop.mapred.SequenceFileInputFormat,UNKNOWN,file:/home/dadiego/projects/ESIC/esic-bigdata-iv-blackops/notebooks/tema-2-etl/spark-warehouse/wa...,org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,dadiego,Catalog,delta,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,wallapop,MANAGED


Podemos obtener información concreta de nuestra tabla recién creada, `wallapop`

In [23]:
dt.detail()

format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures
delta,7c5731bc-40d6-43a1-be7c-c2f9a2d42497,spark_catalog.default.wallapop,Tabla de productos de Wallapop,file:/home/dadiego/projects/ESIC/esic-bigdata-iv-blackops/notebooks/tema-2-etl/spark-warehouse/wa...,2024-09-28 12:32:46.382,2024-09-28 12:32:51.663,[],[],1,17943,{},1,2,"[appendOnly, invariants]"


También podemos obtener una traza histórica de las veces que esta tabla se ha modificado, lo cual es enormemente útil de cara a disponer de un gobierno del dato escalable y robusto

In [24]:
dt.history()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2024-09-28 12:32:51.663,,,MERGE,"{predicate -> [""((id#643 = id#982) AND (user_id#645 = user_id#984))""], matchedPredicates -> [{""ac...",,,,0.0,Serializable,False,"{numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdd...",,Apache-Spark/3.5.3 Delta-Lake/3.2.0
0,2024-09-28 12:32:46.548,,,CREATE TABLE,"{partitionBy -> [], clusterBy -> [], description -> Tabla de productos de Wallapop, isManaged -> ...",,,,,Serializable,True,{},,Apache-Spark/3.5.3 Delta-Lake/3.2.0


Vemos como en el histórico aparecen las dos operaciones que hemos ejecutado sobre esta tabla Delta: la operación de creación de la tabla, y la operación de merge para insertar los nuevos datos.

Podemos también ejecutar cualquier operación SQL con esta tabla del catálogo. Por ejemplo, veamos una tabla resumen de cuántos productos existen por comunidad y código postal, ordenada de mayor a menor cantidad de productos

In [25]:
spark.sql(
    "select region, postal_code, count(*) as n_products from wallapop group by region, postal_code order by n_products desc limit 15"
)

region,postal_code,n_products
Comunidad de Madrid,28010,5
Comunidad de Madrid,28033,3
Comunidad de Madrid,28100,3
Comunidad de Madrid,28043,2
Comunidad de Madrid,28037,2
Comunidad de Madrid,28801,2
Comunidad de Madrid,28035,2
Comunidad de Madrid,28607,1
Castilla-La Mancha,19002,1
Comunidad de Madrid,28916,1
