<a href="https://colab.research.google.com/github/raulcastillabravo/pyspark/blob/main/colab/pyspark_renfe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Librerías

Ejecutar esta celda siempre al comienzo. Si da error, reiniciar el entorno de ejecución.

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

# Clone datasets
!apt-get install git
!git clone --depth=1 --filter=blob:none --sparse https://github.com/raulcastillabravo/datasets.git
%cd datasets
!git sparse-checkout set renfe/passengers/ renfe/stations/
%cd ..

import os
import sys
from datetime import datetime
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.82)] [1 InRelease 0 B/129 kB [0m[33m0% [Connecting to archive.ubuntu.com (185.125.190.82)] [Waiting for headers] [C[0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Ign:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:10 https://developer.download.nvidia.com/

# Esquemas

In [2]:
PASSENGERS_SCHEMA =  T.StructType([
    T.StructField('codigo_estacion', T.IntegerType(), True),
    T.StructField('nombre_estacion', T.StringType(), True),
    T.StructField('nucleo_cercanias', T.StringType(), True),
    T.StructField('tramo_horario', T.StringType(), True),
    T.StructField('viajeros_subidos', T.IntegerType(), True),
    T.StructField('viajeros_bajados', T.IntegerType(), True),
])

STATIONS_SCHEMA = T.StructType([
    T.StructField("codigo_estacion", T.IntegerType(), True),
    T.StructField("descripcion", T.StringType(), True),
    T.StructField("latitud", T.DoubleType(), True),
    T.StructField("longitud", T.DoubleType(), True),
    T.StructField("direccion", T.StringType(), True),
    T.StructField("cp", T.StringType(), True),
    T.StructField("poblacion", T.StringType(), True),
    T.StructField("provincia", T.StringType(), True),
    T.StructField("fichas", T.StringType(), True),
    T.StructField("tuneles_lavado", T.StringType(), True),
])

# Lectura

In [3]:
df_passengers = spark.read.csv('/content/datasets/renfe/passengers/', sep=';', header=True, schema=PASSENGERS_SCHEMA)
df_stations = spark.read.csv('/content/datasets/renfe/stations/', sep=';', header=True, schema=STATIONS_SCHEMA)

In [4]:
df_passengers.show(truncate=False)
df_passengers.count()

+---------------+----------------------+----------------+-------------+----------------+----------------+
|codigo_estacion|nombre_estacion       |nucleo_cercanias|tramo_horario|viajeros_subidos|viajeros_bajados|
+---------------+----------------------+----------------+-------------+----------------+----------------+
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |00:00 - 00:30|0               |19              |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |00:30 - 01:00|0               |1               |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |05:00 - 05:30|43              |0               |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |05:30 - 06:00|46              |0               |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |06:00 - 06:30|56              |0               |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |06:30 - 07:00|187             |22              |
|71600          |SANT VICENÇ DE CALDERS|BARCEL

23492

In [5]:
df_stations.show(truncate=False)
df_stations.count()

+---------------+------------------------------+----------+---------+----------------------------------------+-----+---------------------------+---------+--------------------------------------------------------------+--------------+
|codigo_estacion|descripcion                   |latitud   |longitud |direccion                               |cp   |poblacion                  |provincia|fichas                                                        |tuneles_lavado|
+---------------+------------------------------+----------+---------+----------------------------------------+-----+---------------------------+---------+--------------------------------------------------------------+--------------+
|72400          |AEROPORT                      |41.3038736|2.0724734|ZONA AEROPUERTO, S/N                    |8820 |Prat de Llobregat, El      |Barcelona|https://data.renfe.com/files/estaciones/FichaEstacion72400.pdf|NULL          |
|78804          |ARC DE TRIOMF                 |41.3924353|2.1804163

527

# Ejercicio 1

Escribe la tabla **passengers** particionada por la primera hora del tramo horario. Si **tramo_horario** es 09:00 - 09:30, la partición debe ser **tramo_inicio=0900**

**Funciones útiles**

```
F.split(F.col('colname'), ',')[0]  # divide los valores por coma y extrae el primero
F.regexp_replace(F.col('colname'), ',', ';')  # Cambia las comas por puntos y comas
```



In [6]:
df = df_passengers.withColumn('tramo_inicio', F.split(F.col('tramo_horario'), ' - ')[0])\
  .withColumn('tramo_inicio', F.regexp_replace(F.col('tramo_inicio'), ':', ''))

df.show(truncate=False)

+---------------+----------------------+----------------+-------------+----------------+----------------+------------+
|codigo_estacion|nombre_estacion       |nucleo_cercanias|tramo_horario|viajeros_subidos|viajeros_bajados|tramo_inicio|
+---------------+----------------------+----------------+-------------+----------------+----------------+------------+
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |00:00 - 00:30|0               |19              |0000        |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |00:30 - 01:00|0               |1               |0030        |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |05:00 - 05:30|43              |0               |0500        |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |05:30 - 06:00|46              |0               |0530        |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |06:00 - 06:30|56              |0               |0600        |
|71600          |SANT VICENÇ DE CALDERS|BARCELON

In [7]:
df.write.partitionBy('tramo_inicio').parquet('/content/exercise1/')

# Ejercicio 2

Calcula el total de viajeros que han subido y bajado por núcleo de cercanías

**Funciones útiles**


```
F.sum(F.col('colname)).alias('new_colname') # Permite sumar los valores de una columna
```



In [8]:
df_passengers.groupBy('nucleo_cercanias')\
  .agg(
      F.sum(F.col('viajeros_subidos')).alias('total_viajeros_subidos'),
      F.sum(F.col('viajeros_bajados')).alias('total_viajeros_bajados'),
  ).show()

+----------------+----------------------+----------------------+
|nucleo_cercanias|total_viajeros_subidos|total_viajeros_bajados|
+----------------+----------------------+----------------------+
|   SAN SEBASTIÁN|                 17548|                 17548|
|          MADRID|                907489|                907489|
|       BARCELONA|                369477|                369477|
|    CANTABRIA AM|                 10801|                 10801|
|        ASTURIAS|                 17623|                 17623|
|         SEVILLA|                 28424|                 28424|
|          BILBAO|                 33394|                 33394|
|        VALENCIA|                 46873|                 46873|
|     ASTURIAS AM|                  7408|                  7408|
|            NULL|                  NULL|                  NULL|
|          MÁLAGA|                 27404|                 27404|
|   PAIS VASCO AM|                  2906|                  2906|
|           CÁDIZ|       

# Ejercicio 3

Filtra los tramos horarios que tengan como **hora de inicio una hora en punto**. Si el tramo horario es 00:00 - 00:30, sí lo queremos, pero si es 00:30 - 01:00, no.



In [9]:
df = df_passengers\
  .withColumn('tramo_inicio', F.split(F.col('tramo_horario'), ' - ')[0])\
  .withColumn('minutos_tramo_inicio', F.split(F.col('tramo_inicio'), ':')[1])\
  .filter(F.col('minutos_tramo_inicio') == '00')

df.show(truncate=False)

+---------------+----------------------+----------------+-------------+----------------+----------------+------------+--------------------+
|codigo_estacion|nombre_estacion       |nucleo_cercanias|tramo_horario|viajeros_subidos|viajeros_bajados|tramo_inicio|minutos_tramo_inicio|
+---------------+----------------------+----------------+-------------+----------------+----------------+------------+--------------------+
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |00:00 - 00:30|0               |19              |00:00       |00                  |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |05:00 - 05:30|43              |0               |05:00       |00                  |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |06:00 - 06:30|56              |0               |06:00       |00                  |
|71600          |SANT VICENÇ DE CALDERS|BARCELONA       |07:00 - 07:30|45              |50              |07:00       |00                  |
|71600          |SAN

# Ejercicio 4

Calcula cuántos viajeros se suben a los trenes en cada población y ordena el resultado de mayor a menor.

**Pista**: tendrás que cruzar (**join**) la tabla de pasajeros con la de estaciones, agrupar y sumar. Puedes ordenar el resultado final usando **orderBy** de la siguiente forma


```
df.orderBy(F.col('colname'), ascending=False)  # Ordena de forma descendente
```



In [10]:
df = df_passengers.join(df_stations, on = 'codigo_estacion', how='left')

df.show()

+---------------+--------------------+----------------+-------------+----------------+----------------+--------------------+----------+---------+--------------------+-----+------------+---------+--------------------+--------------+
|codigo_estacion|     nombre_estacion|nucleo_cercanias|tramo_horario|viajeros_subidos|viajeros_bajados|         descripcion|   latitud| longitud|           direccion|   cp|   poblacion|provincia|              fichas|tuneles_lavado|
+---------------+--------------------+----------------+-------------+----------------+----------------+--------------------+----------+---------+--------------------+-----+------------+---------+--------------------+--------------+
|          71600|SANT VICENÇ DE CA...|       BARCELONA|00:00 - 00:30|               0|              19|SANT VICENÇ DE CA...|41.1861054|1.5245117|CALLE ESTACIO FER...|43880|Vendrell, El|Tarragona|https://data.renf...|          NULL|
|          71600|SANT VICENÇ DE CA...|       BARCELONA|00:30 - 01:00|   

In [11]:
df.groupBy('poblacion')\
  .agg(F.sum(F.col('viajeros_subidos')).alias('total_viajeros_subidos'))\
  .orderBy(F.col('total_viajeros_subidos'), ascending=False) \
  .show()

+--------------------+----------------------+
|           poblacion|total_viajeros_subidos|
+--------------------+----------------------+
|              Madrid|                602039|
|           Barcelona|                145742|
|              Getafe|                 36170|
|             Leganés|                 28553|
|         Fuenlabrada|                 28045|
|   Alcalá de Henares|                 22980|
|               Parla|                 20748|
|            Móstoles|                 19263|
|            Valencia|                 18309|
|            Alcorcón|                 17615|
|              Bilbao|                 17583|
|                NULL|                 17098|
|   Torrejón de Ardoz|                 15336|
|             Sevilla|                 15262|
|            Sabadell|                 13620|
|             Coslada|                 11765|
|Rozas de Madrid, Las|                 11587|
|              Málaga|                 11549|
|Hospitalet de Llo...|            

# Ejercicio 5 (micro proyecto)

Se ha detectado que en la tabla de estaciones hay fichas que están vacías (**NULL**). Se quiere evaluar el impacto de estas fichas vacías para saber a cuántos pasajeros les está afectando.

Para ello, se pide realizar un proceso automático que genere un informe con las siguientes características:

1. Debe mostrar la información agrupada por núcleo de cercanías.
2. Debe calcular dos KPIs:
  1. El porcentaje de viajeros que han subido a un tren sin ficha.
  2. El número de viajeros que han subido a un tren sin ficha.
3. El resultado debe estar ordenado de mayor a menor porcentaje.
4. El resultado debe escribirse particionado por núcleo de cercanías.
5. Cada vez que se ejecute el proceso, debe crearse una partición nueva con los resultados de la ejecución para poder tener trazabilidad de análisis anteriores.

**Pistas**

```
df.filter(F.col('colname').isNull())  # Filtra los valores NULL
df.filter(F.col('colname').isNotNull())  # Filtra los valores no NULL


# Permite crear una marca de tiempo con el instante de ejecucion
from datetime import datetime
now = datetime.now().strftime("%Y%m%d%H%M%S") # El resultado es un string
```



## Fase 1.

1. Cruzar tabla **df_passengers** con **df_station**.
2. Filtrar los casos donde las fichas son nulas y donde no lo son para crear dos DataFrames llamados **df_null** y **df_not_null**.

In [12]:
df = df_passengers.join(df_stations, on='codigo_estacion', how='left')

df_null = df.filter(F.col('fichas').isNull())
df_not_null = df.filter(F.col('fichas').isNotNull())

df_null.show()

+---------------+---------------+----------------+-------------+----------------+----------------+---------------+---------+---------+--------------------+----+---------+---------+------+--------------+
|codigo_estacion|nombre_estacion|nucleo_cercanias|tramo_horario|viajeros_subidos|viajeros_bajados|    descripcion|  latitud| longitud|           direccion|  cp|poblacion|provincia|fichas|tuneles_lavado|
+---------------+---------------+----------------+-------------+----------------+----------------+---------------+---------+---------+--------------------+----+---------+---------+------+--------------+
|          71801|BARCELONA-SANTS|       BARCELONA|00:00 - 00:30|              39|              10|BARCELONA-SANTS|41.379335|2.1394773|PLAÇA DELS PAISOS...|8014|Barcelona|Barcelona|  NULL|          NULL|
|          71801|BARCELONA-SANTS|       BARCELONA|04:30 - 05:00|               7|               3|BARCELONA-SANTS|41.379335|2.1394773|PLAÇA DELS PAISOS...|8014|Barcelona|Barcelona|  NULL| 

## Fase 2.

Agrupar cada DataFrame por separado para contar el número de viajeros subidos.

In [13]:
df_null = df_null.groupBy('nucleo_cercanias')\
  .agg(F.sum(F.col('viajeros_subidos')).alias('sin_ficha'))\
  .filter(F.col('sin_ficha').isNotNull())

df_not_null = df_not_null.groupBy('nucleo_cercanias')\
  .agg(F.sum(F.col('viajeros_subidos')).alias('con_ficha'))\
  .filter(F.col('con_ficha').isNotNull())

print('df_null.show()')
df_null.show()

print('df_not_null.show()')
df_not_null.show()

df_null.show()
+----------------+---------+
|nucleo_cercanias|sin_ficha|
+----------------+---------+
|   SAN SEBASTIÁN|     3499|
|          MADRID|    41267|
|       BARCELONA|    69929|
|    CANTABRIA AM|     7282|
|        ASTURIAS|     1450|
|         SEVILLA|     5270|
|          BILBAO|     9855|
|        VALENCIA|    20646|
|     ASTURIAS AM|     5964|
|          MÁLAGA|     3140|
|   PAIS VASCO AM|     2738|
|           CÁDIZ|     2690|
|         LEON AM|      635|
|          MURCIA|     4667|
|       MURCIA AM|     1072|
|        ZARAGOZA|      173|
+----------------+---------+

df_not_null.show()
+----------------+---------+
|nucleo_cercanias|con_ficha|
+----------------+---------+
|   SAN SEBASTIÁN|    14049|
|          MADRID|   866222|
|       BARCELONA|   299548|
|    CANTABRIA AM|     3519|
|        ASTURIAS|    16173|
|         SEVILLA|    23154|
|          BILBAO|    23539|
|        VALENCIA|    26227|
|     ASTURIAS AM|     1444|
|          MÁLAGA|    24264|
|   PAIS

## Fase 3

Cruzar los resultados agregados y calcular el porcentaje de viajeros sin ficha

In [14]:
df = df_null.join(df_not_null, on='nucleo_cercanias', how='inner')

df.show()

+----------------+---------+---------+
|nucleo_cercanias|sin_ficha|con_ficha|
+----------------+---------+---------+
|   SAN SEBASTIÁN|     3499|    14049|
|          MADRID|    41267|   866222|
|       BARCELONA|    69929|   299548|
|    CANTABRIA AM|     7282|     3519|
|        ASTURIAS|     1450|    16173|
|         SEVILLA|     5270|    23154|
|          BILBAO|     9855|    23539|
|        VALENCIA|    20646|    26227|
|     ASTURIAS AM|     5964|     1444|
|          MÁLAGA|     3140|    24264|
|   PAIS VASCO AM|     2738|      168|
|           CÁDIZ|     2690|     5066|
|          MURCIA|     4667|     6405|
|        ZARAGOZA|      173|      844|
+----------------+---------+---------+



In [15]:
df = df.withColumn('porc_sin_ficha', F.col('sin_ficha') / (F.col('sin_ficha') + F.col('con_ficha')))\
  .withColumn('porc_sin_ficha', F.round(F.col('porc_sin_ficha'), 2))\
  .orderBy(F.col('porc_sin_ficha'), ascending=False)

df.show()

+----------------+---------+---------+--------------+
|nucleo_cercanias|sin_ficha|con_ficha|porc_sin_ficha|
+----------------+---------+---------+--------------+
|   PAIS VASCO AM|     2738|      168|          0.94|
|     ASTURIAS AM|     5964|     1444|          0.81|
|    CANTABRIA AM|     7282|     3519|          0.67|
|        VALENCIA|    20646|    26227|          0.44|
|          MURCIA|     4667|     6405|          0.42|
|           CÁDIZ|     2690|     5066|          0.35|
|          BILBAO|     9855|    23539|           0.3|
|   SAN SEBASTIÁN|     3499|    14049|           0.2|
|       BARCELONA|    69929|   299548|          0.19|
|         SEVILLA|     5270|    23154|          0.19|
|        ZARAGOZA|      173|      844|          0.17|
|          MÁLAGA|     3140|    24264|          0.11|
|        ASTURIAS|     1450|    16173|          0.08|
|          MADRID|    41267|   866222|          0.05|
+----------------+---------+---------+--------------+



## Fase 4

Escribir el resultado final particionado por fecha de ejecución y núcleo de cercanías

In [16]:
now = datetime.now().strftime("%Y%m%d%H%M%S")
df.write.partitionBy('nucleo_cercanias').parquet(f'/content/report/created_at={now}/')

In [17]:
df = spark.read.load('/content/report/')
df.show()

+---------+---------+--------------+--------------+----------------+
|sin_ficha|con_ficha|porc_sin_ficha|    created_at|nucleo_cercanias|
+---------+---------+--------------+--------------+----------------+
|    69929|   299548|          0.19|20241020082157|       BARCELONA|
|     9855|    23539|           0.3|20241020082157|          BILBAO|
|     4667|     6405|          0.42|20241020082157|          MURCIA|
|     5964|     1444|          0.81|20241020082157|     ASTURIAS AM|
|     2690|     5066|          0.35|20241020082157|           CÁDIZ|
|      173|      844|          0.17|20241020082157|        ZARAGOZA|
|    20646|    26227|          0.44|20241020082157|        VALENCIA|
|     2738|      168|          0.94|20241020082157|   PAIS VASCO AM|
|     7282|     3519|          0.67|20241020082157|    CANTABRIA AM|
|     1450|    16173|          0.08|20241020082157|        ASTURIAS|
|     5270|    23154|          0.19|20241020082157|         SEVILLA|
|     3499|    14049|           0.