## Lectura y preprocesamiento de los datos con PySpark

In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import dayofmonth, month, col, count
from pyspark.sql.functions import to_timestamp, date_format, when
from pyspark.sql.functions import avg, count
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import FloatType

En primer lugar, se ha considerado el archivo calles-SER-2025.csv, el cual tiene 33731 filas y 12 columnas, cada una de estas filas corresponde a una zona SER determinada, la información sobre éste se puede consultar en [calles_ser](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=4973b0dd4a872510VgnVCM1000000b205a0aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default).

In [2]:
calles_ser = pd.read_csv("gs://uned-bucket-clq/datos/calles_SER_2025.csv", encoding = 'latin-1', sep = ';')
calles_ser.head(5)

Unnamed: 0,gis_x,gis_y,cod_distrito,distrito,cod_barrio,num_barrio,barrio,calle,numero_finca,color,bateria_linea,numero_plazas
0,439592.91,4473566.23,1,CENTRO,11,1,PALACIO,"AGUAS, CALLE, DE LAS",2,077214010 Verde,Línea,7
1,439569.61,4473598.22,1,CENTRO,11,1,PALACIO,"AGUAS, CALLE, DE LAS",8,077214010 Verde,Línea,4
2,439536.49,4473428.32,1,CENTRO,11,1,PALACIO,"AGUILA, CALLE, DEL",17,077214010 Verde,Línea,4
3,439525.36,4473404.28,1,CENTRO,11,1,PALACIO,"AGUILA, CALLE, DEL",21,077214010 Verde,Línea,4
4,439578.18,4473498.76,1,CENTRO,11,1,PALACIO,"AGUILA, CALLE, DEL",3,077214010 Verde,Línea,1


In [3]:
calles_ser.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 33731 entries, 0 to 33730
Data columns (total 12 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   gis_x          33731 non-null  float64
 1   gis_y          33731 non-null  float64
 2   cod_distrito   33731 non-null  int64  
 3   distrito       33731 non-null  object 
 4   cod_barrio     33731 non-null  int64  
 5   num_barrio     33731 non-null  int64  
 6   barrio         33731 non-null  object 
 7   calle          33731 non-null  object 
 8   numero_finca   33731 non-null  object 
 9   color          33731 non-null  object 
 10  bateria_linea  33731 non-null  object 
 11  numero_plazas  33731 non-null  int64  
dtypes: float64(2), int64(4), object(6)
memory usage: 3.1+ MB


A continuación se definen las variables que encontramos en este dataset según su definición en la documentación asociada, la cual se puede encontrar en el enlace anterior:
* gis_x: Coordenadas X, proyección UTM, de sistema geodésico de referencia ETRS89.
* gis_y: Coordenadas Y, proyección UTM, del sistema geodésico de referencia ETRS89.
* cos_distrito: Código del distrito.
* distrito: Nombre del distrito.
* cod_barrio: Código del barrio, compuesto por el código del distrito al que pertenece y su código de barrio dentro del distrito.
* num_barrio: Código del barrio dentro del distrito.
* barrio: Nombre del barrio.
* calle: Nombre de la calle.
* numero_finca: Número de finca.
* color: Color según su codificación RGB en valores decimales (077214010 Verde; 043000255 Azul; 255000000 Rojo;081209246 Alta Rotación; 255140000 Naranja), siendo el tipo de plaza de estacionamiento: Verde (Uso residencial); Azul (Uso rotacional); Naranja (Ámbito Diferenciado Disuasorio); Alta Rotación (Alta Rotación); Rojo (Ámbito Diferenciado Hospitalario). 
* bateria_linea: tipo de aparcamiento.
* numero_plazas: Cantidad numérica de vehículos que pueden ocupar el espacio destinado al aparcamiento. 

In [4]:
# Para cada zona SER añadimos un id único compuesto por el código del barrio, del distrito y el número de finca
calles_ser['id'] = (calles_ser['cod_barrio'].astype(str) + "_" +
                    calles_ser['cod_distrito'].astype(str) + "_" +
                    calles_ser['numero_finca'].astype(str))

In [5]:
# Comprobamos si hay duplicados en los datos
duplicados = calles_ser[calles_ser.duplicated(keep=False)]
duplicados

Unnamed: 0,gis_x,gis_y,cod_distrito,distrito,cod_barrio,num_barrio,barrio,calle,numero_finca,color,bateria_linea,numero_plazas,id
31371,444154.45,4475367.15,15,CIUDAD LINEAL,151,1,VENTAS,"SAN MARCELO, CALLE, DE",3,077214010 Verde,Línea,3,151_15_3
31372,444154.45,4475367.15,15,CIUDAD LINEAL,151,1,VENTAS,"SAN MARCELO, CALLE, DE",3,077214010 Verde,Línea,3,151_15_3


In [6]:
# Eliminamos los duplicados en el caso de que los hubiese
calles_ser.drop_duplicates(inplace=True)

In [7]:
# Modificamos los valores de la variable color 
calles_ser['color'] = calles_ser['color'].replace({
    '077214010 Verde': 'Verde',
    '043000255 Azul': 'Azul',
    '081209246 Alta Rotación': 'Alta Rotación',
    '255140000 Naranja': 'Naranja',
    '255000000 Rojo': 'Rojo'
})

In [8]:
# El ETRS89 es el Sistema de Referencia Terrestre Europeo 1989.
# El código EPSG correspondiente a este datum para las cartografías no proyectadas es el EPSG:4258
from pyproj import Transformer

transformer = Transformer.from_crs("EPSG:25830", "EPSG:4258", always_xy=True)

# Obtenemos la longitud y la latitud para cada registro
calles_ser['longitud'], calles_ser['latitud'] = transformer.transform(calles_ser['gis_x'].values, calles_ser['gis_y'].values)

In [9]:
calles_ser.to_csv("gs://uned-bucket-clq/calles_ser_df.csv", index=False)

Consideramos ahora el archivo parquimetros.csv, el cual tiene 6125 filas y 14 columnas, cada una de las filas corresponde a un parquímetro determinado del centro de madrid, la información sobre éste se puede consultar en [parquímetros](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=65d85d6f40b86710VgnVCM2000001f4a900aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default).

In [10]:
parquimetros = pd.read_csv('gs://uned-bucket-clq/datos/parquimetros.csv', encoding = 'latin-1', sep = ';')

In [11]:
parquimetros.head(5)

Unnamed: 0,gis_x,gis_y,fecha_de_alta,fecha_de_baja,cod_distrito,distrito,cod_barrio,num_barrio,barrio,calle,numero_finca,matricula,longitud,latitud
0,439608.190788,4473550.0,2014-05-28,,1,CENTRO,11,1,PALACIO,"AGUAS, CALLE, DE LAS",1,301110056.0,-3.711774,40.410379
1,439757.570954,4473795.0,2014-05-27,,1,CENTRO,11,1,PALACIO,"ALMENDRO, CALLE, DEL",14,301110042.0,-3.710037,40.412592
2,439724.473069,4474317.0,2014-05-26,,1,CENTRO,11,1,PALACIO,"AMNISTIA, CALLE, DE LA",6,301110014.0,-3.710476,40.4173
3,439923.681109,4474529.0,2014-05-26,,1,CENTRO,11,1,PALACIO,"ANGELES, COSTANILLA, DE LOS",11,301110012.0,-3.708149,40.41922
4,439735.231748,4474490.0,2014-05-26,,1,CENTRO,11,1,PALACIO,"ARRIETA, CALLE, DE",8,301110010.0,-3.710366,40.418855


In [12]:
parquimetros.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6126 entries, 0 to 6125
Data columns (total 14 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   gis_x          6126 non-null   float64
 1   gis_y          6126 non-null   float64
 2   fecha_de_alta  6126 non-null   object 
 3   fecha_de_baja  1547 non-null   object 
 4   cod_distrito   6126 non-null   int64  
 5   distrito       6126 non-null   object 
 6   cod_barrio     6126 non-null   int64  
 7   num_barrio     6126 non-null   int64  
 8   barrio         6126 non-null   object 
 9   calle          6126 non-null   object 
 10  numero_finca   6126 non-null   object 
 11  matricula      6098 non-null   float64
 12  longitud       6126 non-null   float64
 13  latitud        6126 non-null   float64
dtypes: float64(5), int64(3), object(6)
memory usage: 670.2+ KB


Muchas de las variables son similares a las de dataframe calles_ser, incluimos una descripción de las no coincidentes:
* fecha_de_alta: Fecha de alta del parquímetro en formato aaaa-mm-dd.
* fecha_de_baja: Fecha de baja del parquímetro en formato aaaa-mm-dd.
* matricula: Identificador parquímetro.

In [13]:
# Comprobamos la presencia de duplicados
duplicados = parquimetros[parquimetros.duplicated(keep=False)]
duplicados

Unnamed: 0,gis_x,gis_y,fecha_de_alta,fecha_de_baja,cod_distrito,distrito,cod_barrio,num_barrio,barrio,calle,numero_finca,matricula,longitud,latitud
1726,441429.0,4476595.0,2022-12-07,2022-12-08,4,SALAMANCA,51,1,EL VISO,"DOCTOR MARAÑON, PLAZA, DEL",S/N,,-3.690596,40.437939
1727,441429.0,4476595.0,2022-12-07,2022-12-08,4,SALAMANCA,51,1,EL VISO,"DOCTOR MARAÑON, PLAZA, DEL",S/N,,-3.690596,40.437939


In [14]:
# Eliminamos los duplicados
parquimetros.drop_duplicates(inplace = True)

In [15]:
# Eliminamos los parquímetros que están dados de baja consideramos únicamente los que están activos.
parquimetros = parquimetros[parquimetros['fecha_de_baja'].isna()]
parquimetros = parquimetros.drop(columns = ['fecha_de_baja'])

In [16]:
# Comprobamos el número de parquímetros restante tras eliminar los que están dados de baja
parquimetros.shape[0]

4579

In [17]:
# Se modifica el tipo de la columa matricula para que sea tipo object
parquimetros['matricula'] = parquimetros['matricula'].astype('Int64')
parquimetros['matricula'] = parquimetros['matricula'].astype(str)

Leemos el archivo Primertrimestre2025.csv que contiene información sobre los tiquets registrados en distintos parquímetros en el primer trimestre de 2025. Éste contiene 12745646 de filas y 12 columnas. Más información se puede consultar en el enlace [tiquets](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=67663c0a55e16710VgnVCM1000001d4a900aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default). Para su lectura y posteriores transformaciones usamos Spark.

In [18]:
# Iniciamos la sesión de spark
spark = SparkSession.builder \
    .appName("TiquetsParking") \
    .getOrCreate()


tiquets_1trimestre = spark.read.csv("gs://uned-bucket-clq/datos/Primertrimestre2025.csv", header=True, inferSchema=True, sep=";")

25/08/08 01:48:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [19]:
tiquets = tiquets_1trimestre

# Mostramos el esquema de los datos y 5 filas
tiquets.printSchema()
tiquets.show(5)

root
 |-- matricula_parquimetro: string (nullable = true)
 |-- fecha_operacion: timestamp (nullable = true)
 |-- fecha_inicio: timestamp (nullable = true)
 |-- fecha_fin: timestamp (nullable = true)
 |-- cod_distrito: integer (nullable = true)
 |-- distrito: string (nullable = true)
 |-- cod_barrio: integer (nullable = true)
 |-- barrio: string (nullable = true)
 |-- tipo_zona: string (nullable = true)
 |-- distintivo: string (nullable = true)
 |-- minutos_tique: integer (nullable = true)
 |-- importe_tique: string (nullable = true)



[Stage 2:>                                                          (0 + 1) / 1]

+---------------------+-------------------+-------------------+-------------------+------------+----------+----------+----------+---------+----------+-------------+-------------+
|matricula_parquimetro|    fecha_operacion|       fecha_inicio|          fecha_fin|cod_distrito|  distrito|cod_barrio|    barrio|tipo_zona|distintivo|minutos_tique|importe_tique|
+---------------------+-------------------+-------------------+-------------------+------------+----------+----------+----------+---------+----------+-------------+-------------+
|             EASYPARK|2025-01-22 20:20:34|2025-01-22 20:20:00|2025-01-22 20:59:00|           7|  CHAMBERI|         5|RIOS ROSAS|     AZUL|       ECO|           39|         0,15|
|            302130034|2025-01-31 13:50:19|2025-01-31 13:50:19|2025-01-31 15:50:19|           2|ARGANZUELA|         1|  IMPERIAL|    VERDE|         C|          120|         3,70|
|            ELPARKING|2025-01-10 11:44:44|2025-01-10 11:46:27|2025-01-10 11:56:27|           3|    RETIR

                                                                                

Las nuevas variables de este dataset son:
* matricula_parquimetro: Identificador del parquímetro.
* fecha_operacion: Fecha y hora de la finalización de la obtención del tique. 
* fecha_inicio: Fecha y hora de comienzo de validez del tique. 
* fecha_fin: Fecha y hora de fin de validez del tique. 
* tipo_zona: Tipo al que pertenece la plaza de aparcamiento para la que tiene validez el tique.
* distintivo: Distintivo ambiental del vehículo.
* minutos_tique: Minutos de duración del periodo de validez. 
* importe_tique: Importe del tique en euros. 




In [20]:
# Mostramos el número de filas
tiquets.count()

                                                                                

12745646

In [21]:
tiquets = tiquets.withColumn(
    "importe_tique",
    regexp_replace("importe_tique", ",", ".").cast(FloatType())
)

# Se extrae el día y la hora de la fecha de operación
tiquets = tiquets.withColumn("dia", date_format(col("fecha_operacion"), "dd-MM-yyyy")) \
                 .withColumn("hora", date_format(col("fecha_operacion"), "HH"))

# Se añade el día de la semana en el que se lleva a cabo la operación
tiquets = tiquets.withColumn("dia_semana", date_format(col("fecha_operacion"), "EEEE"))

# Se codifica el día de la semana
tiquets = tiquets.withColumn(
    "dia_semana",
    when(col("dia_semana") == "Monday", 1)
    .when(col("dia_semana") == "Tuesday", 2)
    .when(col("dia_semana") == "Wednesday", 3)
    .when(col("dia_semana") == "Thursday", 4)
    .when(col("dia_semana") == "Friday", 5)
    .when(col("dia_semana") == "Saturday", 6)
    .when(col("dia_semana") == "Sunday", 7)
    .otherwise("Desconocido")
    )

# Se añade una variable que indica si se trata de un día entre semana (0) o un día de fin de semana (1)
tiquets = tiquets.withColumn("fin_de_semana", when(col("dia_semana").isin([6, 7]), 1).otherwise(0))

# Se modifica la variable hora para que en lugar del valor 0 muestre el valor 24
tiquets = tiquets.withColumn("hora",
    when(col("hora") == 0, 24).otherwise(col("hora"))
)

# Se añade la variable int_tiempo en la cual se agrupan los datos en los grupos mañana, mediodia, tarde o noche en función de la hora
tiquets = tiquets.withColumn("int_tiempo", when((col("hora") >= 6) & (col("hora") < 12), "Mañana")
                  .when((col("hora") >= 12) & (col("hora") < 19), "Mediodia")
                  .when((col("hora") >= 19) & (col("hora") < 24), "Tarde")
                  .otherwise("Noche"))

In [22]:
# Número de parquímetros distintos de los cuales el dataframe tiene información
tiquets.select("matricula_parquimetro").distinct().count()

                                                                                

4553

In [23]:
# Se agrupan la información de tiquets por int_tiempo, fin_de_semana y matricula_parquimetro ycalculan las métricas agregadas:
# Media de minutos por grupo,
# Media del importe del grupo,
# Número total de registros por grupo.
tiquets_agrupado = tiquets.groupBy("matricula_parquimetro", "int_tiempo", "fin_de_semana").agg(
    avg("minutos_tique").alias("media_minutos"),
    avg("importe_tique").alias("media_importe"),
    count("*").alias("num_registros")
)

tiquets_agrupado.show(5)
tiquets_agrupado.printSchema()



+---------------------+----------+-------------+------------------+------------------+-------------+
|matricula_parquimetro|int_tiempo|fin_de_semana|     media_minutos|     media_importe|num_registros|
+---------------------+----------+-------------+------------------+------------------+-------------+
|            703230021|     Tarde|            0| 59.12219451371571|1.0420199512617843|          401|
|            106230172|    Mañana|            0| 131.3402889245586| 2.272231148355558|          623|
|            205611031|  Mediodia|            0| 86.78609062170706|1.2667544775056576|          949|
|            204530195|  Mediodia|            0| 79.14587737843551| 1.303752647252088|          946|
|            703630015|    Mañana|            0|123.07479224376732|1.3451523601059439|          361|
+---------------------+----------+-------------+------------------+------------------+-------------+
only showing top 5 rows

root
 |-- matricula_parquimetro: string (nullable = true)
 |-- int

                                                                                

In [24]:
# Número de filas del conjunto de datos final
tiquets_agrupado.count()

                                                                                

30116

In [25]:
# Transformamos el dataframe a pandas
tiquets_df = tiquets_agrupado.toPandas()

tiquets_df.head(5)

                                                                                

Unnamed: 0,matricula_parquimetro,int_tiempo,fin_de_semana,media_minutos,media_importe,num_registros
0,106520482,Tarde,0,54.75,1.31875,32
1,215531432,Mañana,0,118.545946,1.068378,370
2,703430020,Mediodia,0,97.053004,1.368551,283
3,204530195,Mediodia,0,79.145877,1.303753,946
4,307110054,Mediodia,0,90.290323,0.99639,651


In [26]:
tiquets_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 30116 entries, 0 to 30115
Data columns (total 6 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   matricula_parquimetro  30116 non-null  object 
 1   int_tiempo             30116 non-null  object 
 2   fin_de_semana          30116 non-null  int32  
 3   media_minutos          30116 non-null  float64
 4   media_importe          30116 non-null  float64
 5   num_registros          30116 non-null  int64  
dtypes: float64(2), int32(1), int64(1), object(2)
memory usage: 1.3+ MB


In [27]:
# Se lleva a cabo un inner join para añadir la información de tiquets de cada parquímetro al dataframe de parquímetros que hemos considerado anteriormente
parkings_info = pd.merge(tiquets_df, parquimetros, left_on='matricula_parquimetro', right_on='matricula', how='inner')

In [28]:
# Seleccionamos las columnas con las que vamos a trabajar
parkings_info = parkings_info[['int_tiempo', 'fin_de_semana', 'media_minutos', 'num_registros', 'cod_distrito', 'cod_barrio', 'calle','numero_finca','longitud','latitud']]
parkings_info.head()

Unnamed: 0,int_tiempo,fin_de_semana,media_minutos,num_registros,cod_distrito,cod_barrio,calle,numero_finca,longitud,latitud
0,Tarde,0,54.75,32,6,65,"MIOSOTIS, CALLE, DE LA",19 C,-3.700857,40.464906
1,Mediodia,0,78.466667,300,6,65,"MIOSOTIS, CALLE, DE LA",19 C,-3.700857,40.464906
2,Mañana,1,62.04,25,6,65,"MIOSOTIS, CALLE, DE LA",19 C,-3.700857,40.464906
3,Noche,1,39.0,2,6,65,"MIOSOTIS, CALLE, DE LA",19 C,-3.700857,40.464906
4,Noche,0,89.5,2,6,65,"MIOSOTIS, CALLE, DE LA",19 C,-3.700857,40.464906


In [29]:
# Añadimos información a los parquímetros acerca del número de plazas, color y tipo de aparcamiento
parkings_info = pd.merge(parkings_info,
                         calles_ser[['cod_barrio','calle','numero_finca','numero_plazas','color','bateria_linea','gis_x','gis_y']],
                         on = ['cod_barrio','calle','numero_finca'],
                         how = 'inner')
parkings_info.head()
parkings_info.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 47214 entries, 0 to 47213
Data columns (total 15 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   int_tiempo     47214 non-null  object 
 1   fin_de_semana  47214 non-null  int32  
 2   media_minutos  47214 non-null  float64
 3   num_registros  47214 non-null  int64  
 4   cod_distrito   47214 non-null  int64  
 5   cod_barrio     47214 non-null  int64  
 6   calle          47214 non-null  object 
 7   numero_finca   47214 non-null  object 
 8   longitud       47214 non-null  float64
 9   latitud        47214 non-null  float64
 10  numero_plazas  47214 non-null  int64  
 11  color          47214 non-null  object 
 12  bateria_linea  47214 non-null  object 
 13  gis_x          47214 non-null  float64
 14  gis_y          47214 non-null  float64
dtypes: float64(5), int32(1), int64(4), object(5)
memory usage: 5.6+ MB


In [35]:
parkings_info.to_csv("gs://uned-bucket-clq/parking_info_df.csv", index=False)

Leemos los datos de tráfico histórico medido en distintos puntos de Madrid. Más información acerca de estos datos se puede encontrar en el enlace [trafico](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=33cb30c367e78410VgnVCM1000000b205a0aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default). Se consideran los datos de enero a marzo, los correspondientes al primer trimestre. Estos datos se registran cada 15 minutos. En conjunto forman un dataframe de más de 30 millones de filas.

In [36]:
datos_enero = spark.read.csv("gs://uned-bucket-clq/datos/01-2025.csv", header=True, inferSchema=True, sep=";")
datos_febrero = spark.read.csv("gs://uned-bucket-clq/datos/02-2025.csv", header=True, inferSchema=True, sep=";")
datos_marzo = spark.read.csv("gs://uned-bucket-clq/datos/03-2025.csv", header=True, inferSchema=True, sep=";")

                                                                                

In [37]:
trafico = datos_enero.unionByName(datos_febrero).unionByName(datos_marzo)
trafico.show(5)
trafico.printSchema()

[Stage 37:>                                                         (0 + 1) / 1]

+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+
|  id|              fecha|tipo_elem|intensidad|ocupacion|carga|vmed|error|periodo_integracion|
+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+
|1002|2025-01-01 00:00:00|      C30|       216|        2|    0|66.0|    N|                  5|
|1002|2025-01-01 00:15:00|      C30|        72|        0|    0|14.0|    N|                  5|
|1002|2025-01-01 00:30:00|      C30|       444|        2|    0|69.0|    N|                  5|
|1002|2025-01-01 00:45:00|      C30|      1260|        6|    0|66.0|    N|                  5|
|1002|2025-01-01 01:00:00|      C30|      2028|        7|    0|66.0|    N|                  5|
+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+
only showing top 5 rows

root
 |-- id: integer (nullable = true)
 |-- fecha: timestamp (nullable = true)
 |-- tipo_elem: string (nullable = true)

                                                                                

Las variables de este conjunto de datos son:
* id:  Identificación del punto de medida.
* fecha:  ha Fecha y hora oficiales de Madrid con formato dd/mm/yyyy hh:mm:ss.
* tipo_elem:  Identificación del punto de medida.
* intensidad:  Número de vehículos en el periodo de 15 minutos, expresada en vehículos/hora. El valor efectivo de vehículos que han circulado en ese periodo se obtiene dividiendo entre cuatro. Un valor negativo implica la ausencia de datos. 
* ocupacion:  Porcentaje de tiempo que está un detector de tráfico ocupado por un vehículo. Por ejemplo, una ocupación del 50% en un periodo de 15 minutos significa que ha habido vehículos situados sobre el detector durante 7 minutos y 30 segundos.  Un valor negativo implica la ausencia de datos. 
* carga:  Parámetro de carga del vial en el periodo de 15 minutos. Representa una estimación del grado de congestión, calculado a partir de un algoritmo que usa como variables la intensidad y ocupación, con ciertos factores de corrección. Establece el grado de uso de la vía en un rango de 0 (vacía) a 100 (colapso). Un valor negativo implica la ausencia de datos. 
* vmed:  Velocidad media de los vehículos en el periodo de 15 minutos (Km./h). Solo para puntos de medida interurbanos M30.  Un valor negativo implica la ausencia de datos. 
* error: Indicación de si ha habido al menos una muestra errónea o sustituida en el periodo de 15 minutos. N: no ha habido errores ni sustituciones E: los parámetros de calidad de alguna de las muestras integradas no son óptimos. S: alguna de las muestras recibidas era totalmente errónea y no se ha integrado. 
* periodo_integracion:  Número de muestras recibidas y consideradas para el periodo de integración.  

In [38]:
trafico.count()

                                                                                

38607459

In [39]:
# Se realizan las mismas transformaciones que se han llevado a cabo anteriormente en el dataframe de tiquets
trafico = trafico.withColumn("dia", date_format(col("fecha"), "dd-MM-yyyy")) \
                 .withColumn("hora", date_format(col("fecha"), "HH"))

trafico = trafico.withColumn("dia_semana", date_format(col("fecha"), "EEEE"))

trafico = trafico.withColumn(
    "dia_semana",
    when(col("dia_semana") == "Monday", 1)
    .when(col("dia_semana") == "Tuesday", 2)
    .when(col("dia_semana") == "Wednesday", 3)
    .when(col("dia_semana") == "Thursday", 4)
    .when(col("dia_semana") == "Friday", 5)
    .when(col("dia_semana") == "Saturday", 6)
    .when(col("dia_semana") == "Sunday", 7)
    .otherwise("Desconocido")
    )

trafico = trafico.withColumn("fin_de_semana", when(col("dia_semana").isin([6, 7]), 1).otherwise(0))

trafico = trafico.withColumn("hora",
    when(col("hora") == 0, 24).otherwise(col("hora"))
)

trafico = trafico.withColumn("int_tiempo", when((col("hora") >= 6) & (col("hora") < 12), "Mañana")
                  .when((col("hora") >= 12) & (col("hora") < 19), "Mediodia")
                  .when((col("hora") >= 19) & (col("hora") < 24), "Tarde")
                  .otherwise("Noche"))
trafico.show(5)

+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+----------+----+----------+-------------+----------+
|  id|              fecha|tipo_elem|intensidad|ocupacion|carga|vmed|error|periodo_integracion|       dia|hora|dia_semana|fin_de_semana|int_tiempo|
+----+-------------------+---------+----------+---------+-----+----+-----+-------------------+----------+----+----------+-------------+----------+
|1002|2025-01-01 00:00:00|      C30|       216|        2|    0|66.0|    N|                  5|01-01-2025|  24|         3|            0|     Noche|
|1002|2025-01-01 00:15:00|      C30|        72|        0|    0|14.0|    N|                  5|01-01-2025|  24|         3|            0|     Noche|
|1002|2025-01-01 00:30:00|      C30|       444|        2|    0|69.0|    N|                  5|01-01-2025|  24|         3|            0|     Noche|
|1002|2025-01-01 00:45:00|      C30|      1260|        6|    0|66.0|    N|                  5|01-01-2025|  24|        

                                                                                

In [40]:
# De nuevo, agrupamos para cada id, int_tiempo y fin_de_semana, para obtener las medidas agrupadas:
# Media de la intensidad,
# Media de la ocupación,
# Media de la carga.
trafico_agrupado = trafico.groupBy("int_tiempo","fin_de_semana","id").agg(
    avg("intensidad").alias("media_intensidad"),
    avg("ocupacion").alias("media_ocupacion"),
    avg("carga").alias("media_carga")
)

trafico_agrupado.count()
trafico_agrupado.show(5)



+----------+-------------+----+------------------+-------------------+------------------+
|int_tiempo|fin_de_semana|  id|  media_intensidad|    media_ocupacion|       media_carga|
+----------+-------------+----+------------------+-------------------+------------------+
|     Noche|            1|5257| 30.45771144278607|0.29187396351575456|1.4179104477611941|
|    Mañana|            1|5379| 89.49395161290323| 0.6935483870967742|11.618951612903226|
|    Mañana|            1|5481| 76.14583333333333| 1.6458333333333333| 4.405448717948718|
|     Tarde|            1|5481|107.78461538461538| 2.9461538461538463| 7.373076923076923|
|  Mediodia|            1|5505|260.02653631284915|   4.35754189944134|12.268156424581006|
+----------+-------------+----+------------------+-------------------+------------------+
only showing top 5 rows



                                                                                

In [None]:
# Se transforma el dataframe de tráfico a pandas
trafico_df = trafico_agrupado.toPandas()
trafico_df['id'] = trafico_df['id'].astype(str)

[Stage 51:>                                                        (0 + 4) / 18]

Leemos el archivo pmed_ubicacion_06-2025.csv que contiene la ubicación de los puntos de medición de tráfico. Más información en el enlace [ubicacion_pmed](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=ee941ce6ba6d3410VgnVCM1000000b205a0aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD).

In [None]:
ubicacion_pmed = pd.read_csv("gs://uned-bucket-clq/datos/pmed_ubicacion_06-2025.csv",  encoding = 'latin-1', sep = ';')
ubicacion_pmed.info()
ubicacion_pmed['id'] = ubicacion_pmed['id'].astype(str)

Sus variables son:
* tipo_elem: Tipo de elemento: URB urbano, M30 situado en la M30, OTHER otro tipo.
* distrito: Distrito.
* id: Identificador único numérico del punto de medida.
* cod_cent: Código del punto de medida.
* nombre: Denominación punto de medida. 
* utm_x: Coordenada X en UTM 30N.
* utm_y: Coordenada Y en UTM 30N.
* longitud: Longitud en grados en WGS 84.
* latitud: Latitud en grados en WGS 84.

In [None]:
# Se une a cada registro de tráfico la longitud y latitud correspondiente a la ubicación del punto de medición
trafico_df = pd.merge(trafico_df,
                      ubicacion_pmed[['id','longitud','latitud']],
                      on='id',
                      how='left')
trafico_df.head()

In [None]:
trafico_df.to_csv("gs://uned-bucket-clq/trafico_df.csv", index=False)