## **Practicando PySpark con datos del subte de BA**

Este notebook está destinado a procesar datos vinculados a los subtes de la Ciudad de Buenos Aires, específicamente sobre la cantidad de pasajeros por molinete de todas las estaciones de la red de Subte.
Los datasets se encuentran disponible [aquí](https://data.buenosaires.gob.ar/dataset/subte-viajes-molinetes)

En el sitio [Buenos Aires Data]() se encuentra una gran variedad de datos abiertos.

### **Entorno de trabajo**
Se trabaja de manera local con una imagen de docker que incluye todo lo necesario para trabajar con PySpark

Dicha docker image está disponible [aquí](https://hub.docker.com/r/jupyter/pyspark-notebook/)

Un vistazo rápido a los datos

In [1]:
!head -2 molinetes-201*.csv

==> molinetes-2014.csv <==
﻿FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;PAX_PAGO;PAX_PASES_PAGOS;PAX_FRANQ;PAX_TOTAL
2014-04-09;09:15;09:29;B;LINEA_B_FLORIDA_E_TURN02;Florida;7;NA;NA;7

==> molinetes-2015.csv <==
periodo,fecha,desde,hasta,linea,molinete,estacion,pax_pagos,pax_pases_pagos,pax_franq,total
201501,2015-01-01,05:00:00,05:15:00,LINEA_H,LINEA_H_CASEROS_NORTE_TURN01,CASEROS,0.0,0.0,0.0,0.0

==> molinetes-2016.csv <==
﻿PERIODO;FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;PAX_PAGOS;PAX_PASES_PAGOS;PAX_FRANQ;TOTAL
201601;02/01/2016;05:00:00;05:15:00;LINEA_A;LINEA_A_CARABOBO_E_TURN03;CARABOBO;1;0;0;1

==> molinetes-2017.csv <==
﻿PERIODO;FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;PAX_PAGOS;PAX_PASES_PAGOS;PAX_FRANQ;TOTAL;ID
201701;01/01/2017;08:00:00;08:15:00;LINEA_H;LINEA_H_CASEROS_SUR_TURN02;CASEROS;1;0;0;1;1

==> molinetes-2018.csv <==
fecha,desde,hasta,linea,molinete,estacion,pax_pagos,pax_pases_pagos,pax_franq,total,periodo
2018-01-01,08:00:00,08:15:00,LineaA,LineaA_CBarros_S_Turn01

No todos los archivos mantienen la misma estructura. Un grupo tiene el caracter ',' como delimitador, mientras que para el otro es el caracter ';'.

El orden de las columnas no es el mismo para todos los archivos. Los valores para el campo *LINEA* no mantiene el mismo formato para todos los arhivos

El formato de fecha también difiere entre los archivos

### **Hands on**

In [1]:
# Para interactuar con una base de datos desde PySpark es necesario disponer del conector JDBC para dicha base de datos
PG_JDBC_PATH = "/home/jovyan/work/postgresql-42.2.12.jar"

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

import pandas as pd

import json
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = f"--driver-class-path {PG_JDBC_PATH} --jars {PG_JDBC_PATH} pyspark-shell"

In [3]:
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("molinetes") \
                    .getOrCreate()

Primero veamos si hay missing/null values

In [4]:
cols_to_select=["Fecha", "Desde", "Hasta", "Linea", "Estacion"]

In [6]:
fnames = [f"molinetes-201{n}.csv" for n in [4,6,7]]
for fname in fnames:
    print(fname)
    molinetes = spark.read.csv(fname, sep=";", header=True, encoding="UTF-8")
    molinetes = molinetes.select(*cols_to_select)
    for column in molinetes.columns:
        nulls_count = molinetes.filter(molinetes[column].isNull()).count()
        print(f"Column: {column}\t Null values: {nulls_count}")
    
    molinetes = None

molinetes-2014.csv
Column: Fecha	 Null values: 0
Column: Desde	 Null values: 0
Column: Hasta	 Null values: 0
Column: Linea	 Null values: 0
Column: Estacion	 Null values: 0
molinetes-2016.csv
Column: Fecha	 Null values: 0
Column: Desde	 Null values: 0
Column: Hasta	 Null values: 0
Column: Linea	 Null values: 0
Column: Estacion	 Null values: 0
molinetes-2017.csv
Column: Fecha	 Null values: 0
Column: Desde	 Null values: 0
Column: Hasta	 Null values: 0
Column: Linea	 Null values: 0
Column: Estacion	 Null values: 0


In [7]:
fnames = [f"molinetes-201{n}.csv" for n in [5,8,9]]
for fname in fnames:
    print(fname)
    molinetes = spark.read.csv(fname, sep=",", header=True, encoding="UTF-8")
    molinetes = molinetes.select(*cols_to_select)
    for column in molinetes.columns:
        nulls_count = molinetes.filter(molinetes[column].isNull()).count()
        print(f"Column: {column}\t Null values: {nulls_count}")
    

    molinetes = None

molinetes-2015.csv
Column: Fecha	 Null values: 0
Column: Desde	 Null values: 0
Column: Hasta	 Null values: 0
Column: Linea	 Null values: 0
Column: Estacion	 Null values: 0
molinetes-2018.csv
Column: Fecha	 Null values: 78207
Column: Desde	 Null values: 78207
Column: Hasta	 Null values: 78207
Column: Linea	 Null values: 78263
Column: Estacion	 Null values: 78207
molinetes-2019.csv
Column: Fecha	 Null values: 0
Column: Desde	 Null values: 0
Column: Hasta	 Null values: 0
Column: Linea	 Null values: 0
Column: Estacion	 Null values: 0


El archivo referente al año 2018 requiere un tratamiento diferente por presentar valores nulos.

Primero, trabajaremos con los otros datasets.

In [40]:
def csv_to_df(filename, separator):
    cols_to_select = ["fecha", "desde", "linea", "estacion"]
    # Omitimos la columna Hasta dado se sabe que los registros se efectuaron cada quince minutos    
    
    molinetes = spark.read.csv(filename, encoding="UTF-8", header=True, sep=separator)
    
    # La columna referente al total de pasajeros por molinete tiene nombres diferentes entre los archivos.
    # De la siguiente forma logramos obtener el nombre de la columna que hace referencia a dichos totales
    total_column = next(filter(lambda s: 'total' in s.lower(), molinetes.columns))
    cols_to_select.append(total_column)
    molinetes = molinetes.select(*cols_to_select)
    molinetes = molinetes.withColumnRenamed(total_column, "total")
    return molinetes

def modify_columns(df):
    last_char = F.udf(lambda s: s[-1])
    df = df.withColumn("linea", last_char("linea"))
    df = df.withColumn("estacion", F.upper(F.col("estacion")))
    
    df = df.withColumnRenamed("desde", "hora")
    df = df.withColumn("hora", \
                      F.hour(df["hora"]))

    df = df.withColumn("total", F.col("total").cast("int"))
    
    # Transformar el formato del campo "Fecha" a dd/MM/yyyy en caso de ser necesario
    df = df.withColumn("fecha", \
                       F.when(F.to_date("fecha").isNotNull(), \
                              F.to_date("fecha")) \
                       .otherwise(F.to_date("fecha", "dd/MM/yyyy")))

    return df

def sort_and_group(df):
    cols = ["fecha", "hora", "linea", "estacion"]
    df = df.orderBy(cols) \
            .groupBy(cols).agg(F.sum("total").alias("total"))
    return df

In [None]:
fnames = (f"molinetes-201{n}.csv" for n in [4,6,7])
for fname in fnames:
    print(fname, end="\t")
    molinetes = csv_to_df(fname, ';')
    molinetes = modify_columns(molinetes)
    molinetes = sort_and_group(molinetes)
    molinetes.write.saveAsTable("molinetes", mode="append")
    print("saved")
    molinetes = None

In [None]:
fnames = (f"molinetes-201{n}.csv" for n in [5, 9])
for fname in fnames:
    print(fname, end="\t")
    molinetes = csv_to_df(fname, ',')
    molinetes = modify_columns(molinetes)
    molinetes = sort_and_group(molinetes)
    molinetes.write.saveAsTable("molinetes", mode="append")
    print("saved")
    molinetes = None

Ahora vamos a tratar los valores nulos del archivo *molinetes-18.csv*.

Cargamos el archivo en un dataframe, contamos los valores nulos y luego realizamos las transformaciones correspondientes.

In [17]:
molinetes18 = csv_to_df("molinetes-2018.csv", ',')
molinetes18.show(3)

+----------+--------+------+-------------+-----+
|     fecha|   desde| linea|     estacion|total|
+----------+--------+------+-------------+-----+
|2018-01-01|08:00:00|LineaA|Castro Barros|  1.0|
|2018-01-01|08:00:00|LineaA|         Lima|  4.0|
|2018-01-01|08:00:00|LineaA|        Pasco|  1.0|
+----------+--------+------+-------------+-----+
only showing top 3 rows



In [18]:
molinetes18 = molinetes18.dropna(subset=["Linea"])

In [19]:
for column in molinetes18.columns:
    nulls_count = molinetes18.filter(molinetes18[column].isNull()).count()
    print(f"Column: {column}\t Nulls values: {nulls_count}")

Column: fecha	 Nulls values: 0
Column: desde	 Nulls values: 0
Column: linea	 Nulls values: 0
Column: estacion	 Nulls values: 0
Column: total	 Nulls values: 0


Los valores nulos fueron eliminados y ya podemos proceder a cargar este dataframe en una tabla.

In [20]:
molinetes18 = modify_columns(molinetes18)
molinetes18 = sort_and_group(molinetes18)

In [21]:
molinetes18.show(5)

+----------+----+-----+-------------+-----+
|     fecha|hora|linea|     estacion|total|
+----------+----+-----+-------------+-----+
|2018-01-01|   8|    A|       ACOYTE|   27|
|2018-01-01|   8|    A|      ALBERTI|    9|
|2018-01-01|   8|    A|     CARABOBO|   38|
|2018-01-01|   8|    A|CASTRO BARROS|   33|
|2018-01-01|   8|    A|     CONGRESO|   36|
+----------+----+-----+-------------+-----+
only showing top 5 rows



In [22]:
molinetes18.write.saveAsTable("molinetes", mode="append")

### **Mas modificiones**

Haremos unas modificaciones sobre los valores del campo 'estacion'.

Contamos con un dataset de todas las estaciones de subte de CABA.

In [6]:
stations_data = pd.read_csv("estaciones-de-subte.csv")
stations_data

Unnamed: 0,long,lat,id,estacion,linea
0,-58.398928,-34.635750,1.0,CASEROS,H
1,-58.400970,-34.629376,2.0,INCLAN - MEZQUITA AL AHMAD,H
2,-58.402323,-34.623092,3.0,HUMBERTO 1°,H
3,-58.404732,-34.615242,4.0,VENEZUELA,H
4,-58.406036,-34.608935,5.0,ONCE - 30 DE DICIEMBRE,H
...,...,...,...,...,...
85,-58.402376,-34.594525,86.0,SANTA FE - CARLOS JAUREGUI,H
86,-58.391019,-34.583036,87.0,FACULTAD DE DERECHO - JULIETA LANTERI,H
87,-58.375850,-34.592114,90.0,RETIRO,E
88,-58.371700,-34.596597,89.0,CATALINAS,E


In [25]:
molinetes = spark.table("molinetes")
stations_name = molinetes.select("estacion")
stations_name = stations_name.distinct().toPandas()
stations_name

Unnamed: 0,estacion
0,CORRIENTES
1,AGÃ¼ERO
2,RIO DE JANEIRO
3,LORIA
4,JOSE HERNANDEZ
...,...
95,VENEZUELA
96,CASTRO BARROS
97,AGÃÂ¼ERO
98,PERU


Nótese que el dataset de estaciones contiene 90 filas, es decir existen 90 estaciones diferentes. Mientras que al consultar las estaciones diferentes del dataset de molinetes, obtenemos 100 filas.

Habrá que hacer modificacione sobre los nombres de las estaciones

Primero, veamos las diferencias

In [8]:
list(filter(lambda x: x not in list(stations_data.estacion), \
           list(stations_name.estacion)))

['AGÃ¼ERO',
 'TRONADOR',
 'FLORES',
 'INCLAN',
 'PLAZA MISERERE',
 'ONCE',
 'HUMBERTO I',
 'CALLAO.B',
 'MALABIA',
 'GENERAL SAN MARTIN',
 'INDEPENDENCIA.H',
 'PUEYRREDON.D',
 'SAENZ PEÃ±A ',
 'ECHEVERRIA',
 'PATRICIOS',
 'FACULTAD DE DERECHO',
 'PUEYRREDON.',
 'GENERAL BELGRANO',
 'RETIRO E',
 'SAENZ PEÃ\x83Â±A ',
 'CALLAO.',
 'TALLER BONIFACIO',
 'PASTEUR',
 'ENTRE RIOS',
 'MEDRANO',
 'TRIBUNALES',
 'AGUERO',
 'ROSAS',
 'SANTA FE',
 'MINISTRO CARRANZA',
 'SAENZ PEÑA ',
 'LOS INCAS',
 'MARIANO MORENO',
 'CARLOS PELLEGRINI',
 'AVENIDA DE MAYO',
 'SCALABRINI ORTIZ',
 'CONGRESO',
 'PZA. DE LOS VIRREYES',
 'AVENIDA LA PLATA',
 'INDEPENDENCIA.',
 'CORDOBA',
 'AGÃ\x83Â¼ERO']

Para ello, preparé un archivo json, *replacements.json*, donde cada par *(clave, valor)* representa *(valor_actual, valor_nuevo)*

In [9]:
with open("replacements.json", "r") as json_file:
    replacements = json.load(json_file)

molinetes = molinetes.replace(replacements, subset=["estacion"])

Luego hay unos inconvenientes con las estaciones *Agüero* y *Saenz peña* con respecto a los caracteres ü y ñ.

Se puede solucionar de la siguiente forma

In [10]:
names_list = list(filter(lambda x: x.startswith("SAENZ"), \
            list(stations_name.estacion)))
print(names_list)
molinetes = molinetes.replace(names_list, value="SAENZ PEÑA", subset="estacion")

['SAENZ PEÑA', 'SAENZ PEÃ±A ', 'SAENZ PEÃ\x83Â±A ', 'SAENZ PEÑA ']


In [11]:
names_list = list(filter(lambda x: x.endswith("ERO"), \
                        list(stations_name.estacion)))
print(names_list)
molinetes = molinetes.replace(names_list, value="AGÜERO", subset="estacion")

['AGÃ¼ERO', 'AGÜERO', 'AGUERO', 'AGÃ\x83Â¼ERO']


Así verificamos si las modificaciones tuvieron los resultados esperados

In [12]:
stations_name = molinetes.select("estacion").distinct().toPandas()
list(filter(lambda x: x not in list(stations_data.estacion), \
           list(stations_name.estacion)))

['TALLER BONIFACIO']

Esta estación, *Taller Bonifacio*, no existe realmente, al consultar vemos que está asociada con la línea de subte *S*, que tampoco existe. Además no hay registros de pasajeros para esta estación así que luego será descartada

In [50]:
_ = molinetes.filter('estacion = "TALLER BONIFACIO"')
_.show()
_.select('linea').distinct().show()
_.select(F.sum("total")).show()

+----------+----+-----+----------------+-----+
|     fecha|hora|linea|        estacion|total|
+----------+----+-----+----------------+-----+
|2016-04-20|  14|    S|TALLER BONIFACIO|    0|
|2016-04-19|  13|    S|TALLER BONIFACIO|    0|
|2016-04-19|  14|    S|TALLER BONIFACIO|    0|
|2016-04-19|  15|    S|TALLER BONIFACIO|    0|
|2016-04-19|  16|    S|TALLER BONIFACIO|    0|
|2016-04-07|  19|    S|TALLER BONIFACIO|    0|
|2016-04-11|  12|    S|TALLER BONIFACIO|    0|
|2016-04-11|  13|    S|TALLER BONIFACIO|    0|
|2016-04-11|  14|    S|TALLER BONIFACIO|    0|
+----------+----+-----+----------------+-----+

+-----+
|linea|
+-----+
|    S|
+-----+

+----------+
|sum(total)|
+----------+
|         0|
+----------+



In [13]:
molinetes.write.saveAsTable("pases", mode="overwrite")

### **Carga a una base de datos**

Ya finalizado el procesamiento y con los datos cargados en una tabla provisoria, procedemos a cargar toda la data en un base de datos en postgresql.

In [14]:
molinetes = spark.table("pases")
molinetes.show(10)

+----------+----+-----+--------------------+-----+
|     fecha|hora|linea|            estacion|total|
+----------+----+-----+--------------------+-----+
|2019-10-14|  21|    A|      RIO DE JANEIRO|  157|
|2019-10-14|  21|    A|          SAENZ PEÑA|   87|
|2019-10-14|  21|    A|         SAN PEDRITO|  189|
|2019-10-14|  21|    B|      ANGEL GALLARDO|  204|
|2019-10-14|  21|    B|CALLAO - MAESTRO ...|  281|
|2019-10-14|  21|    B|       CARLOS GARDEL|  838|
|2019-10-14|  21|    B|       C. PELLEGRINI|  704|
|2019-10-14|  21|    B|             DORREGO|  136|
|2019-10-14|  21|    B|          ECHEVERRÍA|   51|
|2019-10-14|  21|    B|    FEDERICO LACROZE|  338|
+----------+----+-----+--------------------+-----+
only showing top 10 rows



In [5]:
molinetes = molinetes.orderBy('fecha', 'hora')
molinetes.show(5)

+----------+----+-----+----------------+-----+
|     fecha|hora|linea|        estacion|total|
+----------+----+-----+----------------+-----+
|2014-01-02|   0|    C|    CONSTITUCION|    4|
|2014-01-02|   4|    B|FEDERICO LACROZE|    5|
|2014-01-02|   4|    A|            PUAN|    1|
|2014-01-02|   4|    A|         ALBERTI|    1|
|2014-01-02|   4|    A|  RIO DE JANEIRO|    1|
+----------+----+-----+----------------+-----+
only showing top 5 rows



In [10]:
connection_string = "jdbc:postgresql://127.0.0.1/subte"
tablename = "public.molinetes"
connection_details = {
    "user": "<user>",
    "password": "<pswd>",
}

In [11]:
molinetes.write \
.option("driver", "org.postgresql.Driver") \
.option("batchsize", 100000) \
.jdbc(connection_string, tablename, mode="append", properties=connection_details)

Finalmente, los datos han sido cargados a una db. Asi resultan mas accesibles para crear algún dashboard de visualización y realizar un análisis mas en profundidad