## Lectura y preprocesamiento de los datos con PySpark

In [146]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import dayofmonth, month, col, count
from pyspark.sql.functions import to_timestamp, date_format, when
from pyspark.sql.functions import avg, count
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import FloatType

En primer lugar, se ha considerado el archivo calles-SER-2025.csv, el cual tiene 33731 filas y 12 columnas, cada una de estas filas corresponde a una zona de Servicio de Estacionamiento Regulado determinada, la información sobre éste se puede consultar en [calles_ser](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=4973b0dd4a872510VgnVCM1000000b205a0aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default).

In [147]:
calles_ser = pd.read_csv("gs://tfm-bucket-clq/datos/calles_SER_2025.csv", encoding = 'latin-1', sep = ';')
calles_ser.head(5)

Unnamed: 0,gis_x,gis_y,cod_distrito,distrito,cod_barrio,num_barrio,barrio,calle,numero_finca,color,bateria_linea,numero_plazas
0,439592.91,4473566.23,1,CENTRO,11,1,PALACIO,"AGUAS, CALLE, DE LAS",2,077214010 Verde,Línea,7
1,439569.61,4473598.22,1,CENTRO,11,1,PALACIO,"AGUAS, CALLE, DE LAS",8,077214010 Verde,Línea,4
2,439536.49,4473428.32,1,CENTRO,11,1,PALACIO,"AGUILA, CALLE, DEL",17,077214010 Verde,Línea,4
3,439525.36,4473404.28,1,CENTRO,11,1,PALACIO,"AGUILA, CALLE, DEL",21,077214010 Verde,Línea,4
4,439578.18,4473498.76,1,CENTRO,11,1,PALACIO,"AGUILA, CALLE, DEL",3,077214010 Verde,Línea,1


In [148]:
calles_ser.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 33731 entries, 0 to 33730
Data columns (total 12 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   gis_x          33731 non-null  float64
 1   gis_y          33731 non-null  float64
 2   cod_distrito   33731 non-null  int64  
 3   distrito       33731 non-null  object 
 4   cod_barrio     33731 non-null  int64  
 5   num_barrio     33731 non-null  int64  
 6   barrio         33731 non-null  object 
 7   calle          33731 non-null  object 
 8   numero_finca   33731 non-null  object 
 9   color          33731 non-null  object 
 10  bateria_linea  33731 non-null  object 
 11  numero_plazas  33731 non-null  int64  
dtypes: float64(2), int64(4), object(6)
memory usage: 3.1+ MB


A continuación se definen las variables que encontramos en este dataset según su definición en la documentación asociada, la cual se puede encontrar en el enlace anterior:
* gis_x: Coordenadas X, proyección UTM, de sistema geodésico de referencia ETRS89.
* gis_y: Coordenadas Y, proyección UTM, del sistema geodésico de referencia ETRS89.
* cos_distrito: Código del distrito.
* distrito: Nombre del distrito.
* cod_barrio: Código del barrio, compuesto por el código del distrito al que pertenece y su código de barrio dentro del distrito.
* num_barrio: Código del barrio dentro del distrito.
* barrio: Nombre del barrio.
* calle: Nombre de la calle.
* numero_finca: Número de finca.
* color: Color según su codificación RGB en valores decimales (077214010 Verde; 043000255 Azul; 255000000 Rojo;081209246 Alta Rotación; 255140000 Naranja), siendo el tipo de plaza de estacionamiento: Verde (Uso residencial); Azul (Uso rotacional); Naranja (Ámbito Diferenciado Disuasorio); Alta Rotación (Alta Rotación); Rojo (Ámbito Diferenciado Hospitalario). 
* bateria_linea: tipo de aparcamiento.
* numero_plazas: Cantidad numérica de vehículos que pueden ocupar el espacio destinado al aparcamiento. 

In [149]:
# Se comprueba si hay duplicados en los datos
duplicados = calles_ser[calles_ser.duplicated(keep=False)]
duplicados

Unnamed: 0,gis_x,gis_y,cod_distrito,distrito,cod_barrio,num_barrio,barrio,calle,numero_finca,color,bateria_linea,numero_plazas
31371,444154.45,4475367.15,15,CIUDAD LINEAL,151,1,VENTAS,"SAN MARCELO, CALLE, DE",3,077214010 Verde,Línea,3
31372,444154.45,4475367.15,15,CIUDAD LINEAL,151,1,VENTAS,"SAN MARCELO, CALLE, DE",3,077214010 Verde,Línea,3


In [150]:
# Se eliminan los duplicados
calles_ser.drop_duplicates(inplace=True)

In [151]:
# Se modifican los valores de la variable color 
calles_ser['color'] = calles_ser['color'].replace({
    '077214010 Verde': 'Verde',
    '043000255 Azul': 'Azul',
    '081209246 Alta Rotación': 'Alta Rotación',
    '255140000 Naranja': 'Naranja',
    '255000000 Rojo': 'Rojo'
})

In [152]:
# El ETRS89 es el Sistema de Referencia Terrestre Europeo 1989.
# El código EPSG correspondiente a este para las coordenadas longitud y latitud es el EPSG:4258
from pyproj import Transformer

transformer = Transformer.from_crs("EPSG:25830", "EPSG:4258", always_xy=True)

# Se obtiene la longitud y la latitud para cada registro
calles_ser['longitud'], calles_ser['latitud'] = transformer.transform(calles_ser['gis_x'].values, calles_ser['gis_y'].values)

In [153]:
# Se guardan los datos modificados
calles_ser.to_csv("gs://tfm-bucket-clq/calles_ser_df.csv", index=False)

Se considera ahora el archivo parquimetros.csv, el cual tiene 6125 filas y 14 columnas, cada una de las filas corresponde a un parquímetro determinado de una zona SER de Madrid, la información sobre éste se puede consultar en [parquímetros](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=65d85d6f40b86710VgnVCM2000001f4a900aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default).

In [154]:
parquimetros = pd.read_csv('gs://tfm-bucket-clq/datos/parquimetros.csv', encoding = 'latin-1', sep = ';')

In [155]:
parquimetros.head(5)

Unnamed: 0,gis_x,gis_y,fecha_de_alta,fecha_de_baja,cod_distrito,distrito,cod_barrio,num_barrio,barrio,calle,numero_finca,matricula,longitud,latitud
0,439608.190788,4473550.0,2014-05-28,,1,CENTRO,11,1,PALACIO,"AGUAS, CALLE, DE LAS",1,301110056.0,-3.711774,40.410379
1,439757.570954,4473795.0,2014-05-27,,1,CENTRO,11,1,PALACIO,"ALMENDRO, CALLE, DEL",14,301110042.0,-3.710037,40.412592
2,439724.473069,4474317.0,2014-05-26,,1,CENTRO,11,1,PALACIO,"AMNISTIA, CALLE, DE LA",6,301110014.0,-3.710476,40.4173
3,439923.681109,4474529.0,2014-05-26,,1,CENTRO,11,1,PALACIO,"ANGELES, COSTANILLA, DE LOS",11,301110012.0,-3.708149,40.41922
4,439735.231748,4474490.0,2014-05-26,,1,CENTRO,11,1,PALACIO,"ARRIETA, CALLE, DE",8,301110010.0,-3.710366,40.418855


In [156]:
parquimetros.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6126 entries, 0 to 6125
Data columns (total 14 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   gis_x          6126 non-null   float64
 1   gis_y          6126 non-null   float64
 2   fecha_de_alta  6126 non-null   object 
 3   fecha_de_baja  1547 non-null   object 
 4   cod_distrito   6126 non-null   int64  
 5   distrito       6126 non-null   object 
 6   cod_barrio     6126 non-null   int64  
 7   num_barrio     6126 non-null   int64  
 8   barrio         6126 non-null   object 
 9   calle          6126 non-null   object 
 10  numero_finca   6126 non-null   object 
 11  matricula      6098 non-null   float64
 12  longitud       6126 non-null   float64
 13  latitud        6126 non-null   float64
dtypes: float64(5), int64(3), object(6)
memory usage: 670.2+ KB


Muchas de las variables son similares a las de dataframe calles_ser, se incluye una descripción de las no coincidentes:
* fecha_de_alta: Fecha de alta del parquímetro en formato aaaa-mm-dd.
* fecha_de_baja: Fecha de baja del parquímetro en formato aaaa-mm-dd.
* matricula: Identificador parquímetro.

In [157]:
# Se comprueba la presencia de duplicados
duplicados = parquimetros[parquimetros.duplicated(keep=False)]
duplicados

Unnamed: 0,gis_x,gis_y,fecha_de_alta,fecha_de_baja,cod_distrito,distrito,cod_barrio,num_barrio,barrio,calle,numero_finca,matricula,longitud,latitud
1726,441429.0,4476595.0,2022-12-07,2022-12-08,4,SALAMANCA,51,1,EL VISO,"DOCTOR MARAÑON, PLAZA, DEL",S/N,,-3.690596,40.437939
1727,441429.0,4476595.0,2022-12-07,2022-12-08,4,SALAMANCA,51,1,EL VISO,"DOCTOR MARAÑON, PLAZA, DEL",S/N,,-3.690596,40.437939


In [158]:
# Se eliminan los duplicados
parquimetros.drop_duplicates(inplace = True)

In [159]:
# Se eliminan los parquímetros que están dados de baja consideramos únicamente los que están activos.
parquimetros = parquimetros[parquimetros['fecha_de_baja'].isna()]
parquimetros = parquimetros.drop(columns = ['fecha_de_baja'])

In [160]:
# Se comprueba el número de parquímetros activos tras eliminar los que están dados de baja
parquimetros.shape[0]

4579

In [161]:
# Se modifica el tipo de la columa matricula para que sea tipo object
parquimetros['matricula'] = parquimetros['matricula'].astype('Int64')
parquimetros['matricula'] = parquimetros['matricula'].astype(str)

Se leen los archivos Primertrimestre2025.csv y Segundotrimestre2025.csv que contienen información sobre los tiques registrados en distintos parquímetros desde enero a junio de 2025. Éstos contienen en conjunto 25106958 de filas y 12 columnas. Más información se puede consultar en el enlace [tiques](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=67663c0a55e16710VgnVCM1000001d4a900aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default). Para su lectura y posteriores transformaciones usamos Spark.

In [162]:
# Se inicia la sesión de spark
spark = SparkSession.builder \
    .appName("TiquesParking") \
    .getOrCreate()


tiques_1trimestre = spark.read.csv("gs://tfm-bucket-clq/datos/Primertrimestre2025.csv", header=True, inferSchema=True, sep=";")
tiques_2trimestre = spark.read.csv("gs://tfm-bucket-clq/datos/Segundotrimestre2025.csv", header=True, inferSchema=True, sep=";")

                                                                                

In [163]:
# Se unen los datos de tiques del primer y segundo trimestre
tiques = tiques_1trimestre.union(tiques_2trimestre)

# Se muestra el esquema de los datos y 5 filas
tiques.printSchema()
tiques.show(5)

root
 |-- matricula_parquimetro: string (nullable = true)
 |-- fecha_operacion: timestamp (nullable = true)
 |-- fecha_inicio: timestamp (nullable = true)
 |-- fecha_fin: timestamp (nullable = true)
 |-- cod_distrito: integer (nullable = true)
 |-- distrito: string (nullable = true)
 |-- cod_barrio: integer (nullable = true)
 |-- barrio: string (nullable = true)
 |-- tipo_zona: string (nullable = true)
 |-- distintivo: string (nullable = true)
 |-- minutos_tique: integer (nullable = true)
 |-- importe_tique: string (nullable = true)



[Stage 50:>                                                         (0 + 1) / 1]

+---------------------+-------------------+-------------------+-------------------+------------+----------+----------+----------+---------+----------+-------------+-------------+
|matricula_parquimetro|    fecha_operacion|       fecha_inicio|          fecha_fin|cod_distrito|  distrito|cod_barrio|    barrio|tipo_zona|distintivo|minutos_tique|importe_tique|
+---------------------+-------------------+-------------------+-------------------+------------+----------+----------+----------+---------+----------+-------------+-------------+
|             EASYPARK|2025-01-22 20:20:34|2025-01-22 20:20:00|2025-01-22 20:59:00|           7|  CHAMBERI|         5|RIOS ROSAS|     AZUL|       ECO|           39|         0,15|
|            302130034|2025-01-31 13:50:19|2025-01-31 13:50:19|2025-01-31 15:50:19|           2|ARGANZUELA|         1|  IMPERIAL|    VERDE|         C|          120|         3,70|
|            ELPARKING|2025-01-10 11:44:44|2025-01-10 11:46:27|2025-01-10 11:56:27|           3|    RETIR

                                                                                

Las variables de este dataset son:
* matricula_parquimetro: Identificador del parquímetro.
* fecha_operacion: Fecha y hora de la finalización de la obtención del tique. 
* fecha_inicio: Fecha y hora de comienzo de validez del tique. 
* fecha_fin: Fecha y hora de fin de validez del tique. 
* tipo_zona: Tipo al que pertenece la plaza de aparcamiento para la que tiene validez el tique.
* distintivo: Distintivo ambiental del vehículo.
* minutos_tique: Minutos de duración del periodo de validez. 
* importe_tique: Importe del tique en euros. 




In [164]:
# Se muestra el número de filas total
tiques.count()

                                                                                

25106958

In [165]:
# Se extrae el día y la hora de la fecha de operación
tiques = tiques.withColumn("dia", date_format(col("fecha_operacion"), "dd-MM-yyyy")) \
                 .withColumn("hora", date_format(col("fecha_operacion"), "HH"))

# Se añade el día de la semana en el que se lleva a cabo la operación
tiques = tiques.withColumn("dia_semana", date_format(col("fecha_operacion"), "EEEE"))

# Se codifica el día de la semana
tiques = tiques.withColumn(
    "dia_semana",
    when(col("dia_semana") == "Monday", 1)
    .when(col("dia_semana") == "Tuesday", 2)
    .when(col("dia_semana") == "Wednesday", 3)
    .when(col("dia_semana") == "Thursday", 4)
    .when(col("dia_semana") == "Friday", 5)
    .when(col("dia_semana") == "Saturday", 6)
    .when(col("dia_semana") == "Sunday", 7)
    .otherwise("Desconocido")
    )

# Se añade una variable que indica si se trata de un día entre semana (0) o un día de fin de semana (1)
tiques = tiques.withColumn("fin_de_semana", when(col("dia_semana").isin([6, 7]), 1).otherwise(0))

# Se modifica la variable hora para que en lugar del valor 0 muestre el valor 24
tiques = tiques.withColumn("hora",
    when(col("hora") == 0, 24).otherwise(col("hora"))
)

# Se añade la variable int_tiempo en la cual se agrupan los datos en los grupos mañana, mediodia, tarde o noche en función de la hora
tiques = tiques.withColumn("int_tiempo", when((col("hora") >= 6) & (col("hora") < 12), "Mañana")
                  .when((col("hora") >= 12) & (col("hora") < 18), "Mediodia")
                  .when((col("hora") >= 18) & (col("hora") < 24), "Tarde")
                  .otherwise("Noche"))

In [166]:
# Número de parquímetros distintos de los cuales el dataframe tiene información
tiques.select("matricula_parquimetro").distinct().count()

                                                                                

4559

In [167]:
# Se agrupa la información de tiques por int_tiempo, fin_de_semana y matricula_parquimetro y se calcula la métrica agregada:
# Número total de registros por grupo.
tiques_agrupado_n = tiques.groupBy("matricula_parquimetro", "int_tiempo", "fin_de_semana").agg(count("*").alias("num_registros"))

# Se calcula la media de registros por agrupación
tiques_agrupado = tiques_agrupado_n.groupBy("matricula_parquimetro", "int_tiempo", "fin_de_semana").agg(avg("num_registros").alias("numero_tiques"))

tiques_agrupado.show(5)
tiques_agrupado.printSchema()



+---------------------+----------+-------------+-------------+
|matricula_parquimetro|int_tiempo|fin_de_semana|numero_tiques|
+---------------------+----------+-------------+-------------+
|            703330021|  Mediodia|            0|        781.0|
|            307110054|  Mediodia|            0|       1226.0|
|            703530027|    Mañana|            1|          4.0|
|            704330003|     Tarde|            0|       1150.0|
|            205210618|  Mediodia|            0|       1132.0|
+---------------------+----------+-------------+-------------+
only showing top 5 rows

root
 |-- matricula_parquimetro: string (nullable = true)
 |-- int_tiempo: string (nullable = false)
 |-- fin_de_semana: integer (nullable = false)
 |-- numero_tiques: double (nullable = true)



                                                                                

In [168]:
# Número de filas del conjunto de datos final
tiques_agrupado.count()

                                                                                

32942

In [169]:
# Se transforma el dataframe a pandas
tiques_df = tiques_agrupado.toPandas()

tiques_df.head(5)

                                                                                

Unnamed: 0,matricula_parquimetro,int_tiempo,fin_de_semana,numero_tiques
0,106520482,Tarde,0,132.0
1,215531432,Mañana,0,675.0
2,703430020,Mediodia,0,441.0
3,204530195,Mediodia,0,1751.0
4,307110054,Mediodia,0,1226.0


In [170]:
tiques_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 32942 entries, 0 to 32941
Data columns (total 4 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   matricula_parquimetro  32942 non-null  object 
 1   int_tiempo             32942 non-null  object 
 2   fin_de_semana          32942 non-null  int32  
 3   numero_tiques          32942 non-null  float64
dtypes: float64(1), int32(1), object(2)
memory usage: 900.9+ KB


In [171]:
# Se lleva a cabo un inner join para añadir la información de tiques de cada parquímetro al dataframe de parquímetros que hemos considerado anteriormente, se hace inner join para conservar únicamente matrículas de parquímetros coincidentes
parkings_info = pd.merge(tiques_df, parquimetros, left_on='matricula_parquimetro', right_on='matricula', how='inner')

In [172]:
parkings_info.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 32852 entries, 0 to 32851
Data columns (total 17 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   matricula_parquimetro  32852 non-null  object 
 1   int_tiempo             32852 non-null  object 
 2   fin_de_semana          32852 non-null  int32  
 3   numero_tiques          32852 non-null  float64
 4   gis_x                  32852 non-null  float64
 5   gis_y                  32852 non-null  float64
 6   fecha_de_alta          32852 non-null  object 
 7   cod_distrito           32852 non-null  int64  
 8   distrito               32852 non-null  object 
 9   cod_barrio             32852 non-null  int64  
 10  num_barrio             32852 non-null  int64  
 11  barrio                 32852 non-null  object 
 12  calle                  32852 non-null  object 
 13  numero_finca           32852 non-null  object 
 14  matricula              32852 non-null  object 
 15  lo

In [173]:
# Se seleccionan las columnas con las que vamos a trabajar
parkings_info = parkings_info[['int_tiempo', 'fin_de_semana', 'numero_tiques', 'cod_distrito', 'cod_barrio', 'calle','numero_finca','longitud','latitud','barrio','distrito']]
parkings_info.head()
parkings_info.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 32852 entries, 0 to 32851
Data columns (total 11 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   int_tiempo     32852 non-null  object 
 1   fin_de_semana  32852 non-null  int32  
 2   numero_tiques  32852 non-null  float64
 3   cod_distrito   32852 non-null  int64  
 4   cod_barrio     32852 non-null  int64  
 5   calle          32852 non-null  object 
 6   numero_finca   32852 non-null  object 
 7   longitud       32852 non-null  float64
 8   latitud        32852 non-null  float64
 9   barrio         32852 non-null  object 
 10  distrito       32852 non-null  object 
dtypes: float64(3), int32(1), int64(2), object(5)
memory usage: 2.9+ MB


In [174]:
# Se consideran las combinaciones únicas de número finca, calle, barrio, distrito
calles_ser_noduplicados = calles_ser.drop_duplicates(
    subset=['calle', 'cod_barrio', 'cod_distrito','numero_finca']
)

In [175]:
# Se añade información a los parquímetros acerca del número de plazas, color y tipo de aparcamiento, se hace join inner para coger solo registros coincidentes, hay parquímetros que no tienen calle ser asociada
parkings_final = parkings_info.merge(calles_ser_noduplicados[['cod_barrio','calle','numero_finca','numero_plazas','color','bateria_linea','gis_x','gis_y','cod_distrito']],
                         on = ['cod_barrio','calle','cod_distrito','numero_finca'],
                         how = 'left')
parkings_final.head()
parkings_final.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 32852 entries, 0 to 32851
Data columns (total 16 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   int_tiempo     32852 non-null  object 
 1   fin_de_semana  32852 non-null  int32  
 2   numero_tiques  32852 non-null  float64
 3   cod_distrito   32852 non-null  int64  
 4   cod_barrio     32852 non-null  int64  
 5   calle          32852 non-null  object 
 6   numero_finca   32852 non-null  object 
 7   longitud       32852 non-null  float64
 8   latitud        32852 non-null  float64
 9   barrio         32852 non-null  object 
 10  distrito       32852 non-null  object 
 11  numero_plazas  21085 non-null  float64
 12  color          21085 non-null  object 
 13  bateria_linea  21085 non-null  object 
 14  gis_x          21085 non-null  float64
 15  gis_y          21085 non-null  float64
dtypes: float64(6), int32(1), int64(2), object(7)
memory usage: 4.1+ MB


In [176]:
# Se quitan nulos para los valores que no coinciden entre con número de finca del dataframe de calles_ser, esos datos de parquímetros no se consideran
parkings_final = parkings_final.dropna()

parkings_final.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 21085 entries, 8 to 32845
Data columns (total 16 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   int_tiempo     21085 non-null  object 
 1   fin_de_semana  21085 non-null  int32  
 2   numero_tiques  21085 non-null  float64
 3   cod_distrito   21085 non-null  int64  
 4   cod_barrio     21085 non-null  int64  
 5   calle          21085 non-null  object 
 6   numero_finca   21085 non-null  object 
 7   longitud       21085 non-null  float64
 8   latitud        21085 non-null  float64
 9   barrio         21085 non-null  object 
 10  distrito       21085 non-null  object 
 11  numero_plazas  21085 non-null  float64
 12  color          21085 non-null  object 
 13  bateria_linea  21085 non-null  object 
 14  gis_x          21085 non-null  float64
 15  gis_y          21085 non-null  float64
dtypes: float64(6), int32(1), int64(2), object(7)
memory usage: 2.7+ MB


In [177]:
parkings_final.to_csv("gs://tfm-bucket-clq/parking_info_df.csv", index=False)

Se leen los datos de tráfico histórico medido en distintos puntos de Madrid. Más información acerca de estos datos se puede encontrar en el enlace [trafico](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=33cb30c367e78410VgnVCM1000000b205a0aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default). Se consideran los datos de enero a junio, los correspondientes al primer y segundo trimestre. Estos datos se registran cada 15 minutos. En conjunto forman un dataframe de 77 millones de filas.

In [178]:
trafico_enero = spark.read.csv("gs://tfm-bucket-clq/datos/01-2025.csv", header=True, inferSchema=True, sep=";")
trafico_febrero = spark.read.csv("gs://tfm-bucket-clq/datos/02-2025.csv", header=True, inferSchema=True, sep=";")
trafico_marzo = spark.read.csv("gs://tfm-bucket-clq/datos/03-2025.csv", header=True, inferSchema=True, sep=";")
trafico_abril = spark.read.csv("gs://tfm-bucket-clq/datos/04-2025.csv", header=True, inferSchema=True, sep=";")
trafico_mayo = spark.read.csv("gs://tfm-bucket-clq/datos/05-2025.csv", header=True, inferSchema=True, sep=";")
trafico_junio = spark.read.csv("gs://tfm-bucket-clq/datos/06-2025.csv", header=True, inferSchema=True, sep=";")

                                                                                

In [179]:
# Se unen los datos de tráfico de enero a junio y se muestran las 5 primeras filas y el esquema de los datos
trafico = trafico_enero.union(trafico_febrero).union(trafico_marzo).union(trafico_abril).union(trafico_mayo).union(trafico_junio)
trafico.show(5)
trafico.printSchema()

[Stage 84:>                                                         (0 + 1) / 1]

+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+
|  id|              fecha|tipo_elem|intensidad|ocupacion|carga|vmed|error|periodo_integracion|
+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+
|1002|2025-01-01 00:00:00|      C30|       216|        2|    0|66.0|    N|                  5|
|1002|2025-01-01 00:15:00|      C30|        72|        0|    0|14.0|    N|                  5|
|1002|2025-01-01 00:30:00|      C30|       444|        2|    0|69.0|    N|                  5|
|1002|2025-01-01 00:45:00|      C30|      1260|        6|    0|66.0|    N|                  5|
|1002|2025-01-01 01:00:00|      C30|      2028|        7|    0|66.0|    N|                  5|
+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+
only showing top 5 rows

root
 |-- id: integer (nullable = true)
 |-- fecha: timestamp (nullable = true)
 |-- tipo_elem: string (nullable = true)

                                                                                

Las variables de este conjunto de datos son:
* id:  Identificación del punto de medida.
* fecha:  ha Fecha y hora oficiales de Madrid con formato dd/mm/yyyy hh:mm:ss.
* tipo_elem:  Identificación del punto de medida.
* intensidad:  Número de vehículos en el periodo de 15 minutos, expresada en vehículos/hora. El valor efectivo de vehículos que han circulado en ese periodo se obtiene dividiendo entre cuatro. Un valor negativo implica la ausencia de datos. 
* ocupacion:  Porcentaje de tiempo que está un detector de tráfico ocupado por un vehículo. Por ejemplo, una ocupación del 50% en un periodo de 15 minutos significa que ha habido vehículos situados sobre el detector durante 7 minutos y 30 segundos.  Un valor negativo implica la ausencia de datos. 
* carga:  Parámetro de carga del vial en el periodo de 15 minutos. Representa una estimación del grado de congestión, calculado a partir de un algoritmo que usa como variables la intensidad y ocupación, con ciertos factores de corrección. Establece el grado de uso de la vía en un rango de 0 (vacía) a 100 (colapso). Un valor negativo implica la ausencia de datos. 
* vmed:  Velocidad media de los vehículos en el periodo de 15 minutos (Km./h). Solo para puntos de medida interurbanos M30.  Un valor negativo implica la ausencia de datos. 
* error: Indicación de si ha habido al menos una muestra errónea o sustituida en el periodo de 15 minutos. N: no ha habido errores ni sustituciones E: los parámetros de calidad de alguna de las muestras integradas no son óptimos. S: alguna de las muestras recibidas era totalmente errónea y no se ha integrado. 
* periodo_integracion:  Número de muestras recibidas y consideradas para el periodo de integración.  

In [180]:
trafico.count()

                                                                                

77241443

In [181]:
# Se realizan las mismas transformaciones temporales que se han llevado a cabo anteriormente en el dataframe de tiques
trafico = trafico.withColumn("dia", date_format(col("fecha"), "dd-MM-yyyy")) \
                 .withColumn("hora", date_format(col("fecha"), "HH"))

trafico = trafico.withColumn("dia_semana", date_format(col("fecha"), "EEEE"))

trafico = trafico.withColumn(
    "dia_semana",
    when(col("dia_semana") == "Monday", 1)
    .when(col("dia_semana") == "Tuesday", 2)
    .when(col("dia_semana") == "Wednesday", 3)
    .when(col("dia_semana") == "Thursday", 4)
    .when(col("dia_semana") == "Friday", 5)
    .when(col("dia_semana") == "Saturday", 6)
    .when(col("dia_semana") == "Sunday", 7)
    .otherwise("Desconocido")
    )

trafico = trafico.withColumn("fin_de_semana", when(col("dia_semana").isin([6, 7]), 1).otherwise(0))

trafico = trafico.withColumn("hora",
    when(col("hora") == 0, 24).otherwise(col("hora"))
)

trafico = trafico.withColumn("int_tiempo", when((col("hora") >= 6) & (col("hora") < 12), "Mañana")
                  .when((col("hora") >= 12) & (col("hora") < 18), "Mediodia")
                  .when((col("hora") >= 18) & (col("hora") < 24), "Tarde")
                  .otherwise("Noche"))
trafico.show(5)

[Stage 88:>                                                         (0 + 1) / 1]

+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+----------+----+----------+-------------+----------+
|  id|              fecha|tipo_elem|intensidad|ocupacion|carga|vmed|error|periodo_integracion|       dia|hora|dia_semana|fin_de_semana|int_tiempo|
+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+----------+----+----------+-------------+----------+
|1002|2025-01-01 00:00:00|      C30|       216|        2|    0|66.0|    N|                  5|01-01-2025|  24|         3|            0|     Noche|
|1002|2025-01-01 00:15:00|      C30|        72|        0|    0|14.0|    N|                  5|01-01-2025|  24|         3|            0|     Noche|
|1002|2025-01-01 00:30:00|      C30|       444|        2|    0|69.0|    N|                  5|01-01-2025|  24|         3|            0|     Noche|
|1002|2025-01-01 00:45:00|      C30|      1260|        6|    0|66.0|    N|                  5|01-01-2025|  24|        

                                                                                

In [182]:
# De nuevo, agrupamos para cada id, int_tiempo y fin_de_semana, para obtener las medidas agrupadas:
# Media de la intensidad,
# Media de la ocupación,
# Media de la carga.
trafico_agrupado = trafico.groupBy("int_tiempo","fin_de_semana","id").agg(
    avg("intensidad").alias("media_intensidad"),
    avg("ocupacion").alias("media_ocupacion"),
    avg("carga").alias("media_carga")
)

trafico_agrupado.count()
trafico_agrupado.show(5)



+----------+-------------+----+------------------+------------------+------------------+
|int_tiempo|fin_de_semana|  id|  media_intensidad|   media_ocupacion|       media_carga|
+----------+-------------+----+------------------+------------------+------------------+
|     Tarde|            1|6038| 196.5821972734563| 2.178027265437049|13.943865276663994|
|  Mediodia|            1|6084|134.53445512820514|4.8493589743589745| 16.24599358974359|
|     Tarde|            0|6110|17.696301564722617| 7.816500711237554|  9.69203413940256|
|  Mediodia|            1|6205|184.59214743589743|1.6009615384615385|14.771634615384615|
|     Tarde|            1|6304|232.88782051282053|0.8661858974358975|11.033653846153847|
+----------+-------------+----+------------------+------------------+------------------+
only showing top 5 rows



                                                                                

In [183]:
# Se transforma el dataframe de tráfico a pandas
trafico_df = trafico_agrupado.toPandas()
trafico_df['id'] = trafico_df['id'].astype(str)

                                                                                

In [184]:
trafico_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 38194 entries, 0 to 38193
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   int_tiempo        38194 non-null  object 
 1   fin_de_semana     38194 non-null  int32  
 2   id                38194 non-null  object 
 3   media_intensidad  38194 non-null  float64
 4   media_ocupacion   38194 non-null  float64
 5   media_carga       38194 non-null  float64
dtypes: float64(3), int32(1), object(2)
memory usage: 1.6+ MB


Se lee el archivo pmed_ubicacion_06-2025.csv que contiene la ubicación de los puntos de medición de tráfico. Más información en el enlace [ubicacion_pmed](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=ee941ce6ba6d3410VgnVCM1000000b205a0aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD).

In [185]:
ubicacion_pmed = pd.read_csv("gs://tfm-bucket-clq/datos/pmed_ubicacion_06-2025.csv",  encoding = 'latin-1', sep = ';')
ubicacion_pmed.info()
ubicacion_pmed['id'] = ubicacion_pmed['id'].astype(str)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5001 entries, 0 to 5000
Data columns (total 9 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   tipo_elem  5001 non-null   object 
 1   distrito   4996 non-null   float64
 2   id         5001 non-null   int64  
 3   cod_cent   5001 non-null   object 
 4   nombre     4987 non-null   object 
 5   utm_x      5001 non-null   float64
 6   utm_y      5001 non-null   float64
 7   longitud   5001 non-null   float64
 8   latitud    5001 non-null   float64
dtypes: float64(5), int64(1), object(3)
memory usage: 351.8+ KB


In [186]:
ubicacion_pmed.isnull().sum()


tipo_elem     0
distrito      5
id            0
cod_cent      0
nombre       14
utm_x         0
utm_y         0
longitud      0
latitud       0
dtype: int64

Sus variables son:
* tipo_elem: Tipo de elemento: URB urbano, M30 situado en la M30, OTHER otro tipo.
* distrito: Distrito.
* id: Identificador único numérico del punto de medida.
* cod_cent: Código del punto de medida.
* nombre: Denominación punto de medida. 
* utm_x: Coordenada X en UTM 30N.
* utm_y: Coordenada Y en UTM 30N.
* longitud: Longitud en grados en WGS 84.
* latitud: Latitud en grados en WGS 84.

In [187]:
# Se une a cada registro de tráfico la longitud y latitud correspondiente a la ubicación del punto de medición
trafico_final = pd.merge(trafico_df,
                      ubicacion_pmed[['id','longitud','latitud']],
                      on='id',
                      how='left')
trafico_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 38194 entries, 0 to 38193
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   int_tiempo        38194 non-null  object 
 1   fin_de_semana     38194 non-null  int32  
 2   id                38194 non-null  object 
 3   media_intensidad  38194 non-null  float64
 4   media_ocupacion   38194 non-null  float64
 5   media_carga       38194 non-null  float64
dtypes: float64(3), int32(1), object(2)
memory usage: 1.6+ MB


Se observa que longitud y latitud tienen algunos elementos nulos, debidos a que no coincidirá el identificador del punto de medición con el que se encuentra en los datos de tráfico, se eliminan estos registros para que no causen problemas en el futuro.

In [None]:
trafico_final = trafico_final.dropna()

In [None]:
trafico_final.head()

Unnamed: 0,int_tiempo,fin_de_semana,id,media_intensidad,media_ocupacion,media_carga,longitud,latitud
0,Noche,1,5257,29.321811,0.226337,1.390123,-3.763558,40.363658
1,Mañana,1,5379,88.403194,0.616766,11.396208,-3.598876,40.364426
2,Mañana,1,5481,75.999199,1.507212,4.334135,-3.686867,40.457847
3,Tarde,1,5481,108.14984,2.772436,7.257212,-3.686867,40.457847
4,Mediodia,1,5505,238.377634,4.002431,11.273096,-3.687014,40.455163


In [None]:
trafico_final.to_csv("gs://tfm-bucket-clq/trafico_df.csv", index=False)