In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType

spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
custom_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("fecha", StringType(), True),
    StructField("tipo_elem", StringType(), True),
    StructField("intensidad", IntegerType(), True),
    StructField("ocupacion", IntegerType(), True),
    StructField("carga", IntegerType(), True),
    StructField("vmed", IntegerType(), True),
    StructField("error", StringType(), True),
    StructField("periodo_integracion", IntegerType(), True)
])

In [3]:
def transformar(ruta):
    #Leemos el fichero con el schemo creado anteriormente
    df_dia = spark.read.csv(ruta, schema=custom_schema, header=True, sep=';')
    #Eliminamos las columnas id y error
    df_dia = df_dia.drop('id').drop('error')
    #Eliminamos los datos vacíos
    df_dia = df_dia.na.drop()
    #Con una expresión regular cogemos el día dentro de la columna fecha, la guardamos en la columna día y borramos la de fecha 
    df_dia = df_dia.select(regexp_extract('fecha', r'^[0-9]{4}-[0-9]{2}-[0-9]{2}', 0).alias('dia'),'*').drop('fecha')
    #En el mes de enero de 2018, los valores de la columna tipo_elem están en un formato diferente al resto, en esta fila unificamos formatos
    df_dia = df_dia.withColumn("tipo_elem", when(df_dia.tipo_elem == "PUNTOS MEDIDA M-30","M30")
                               .when(df_dia.tipo_elem == "PUNTOS MEDIDA URBANOS","URB")
                               .when(df_dia.tipo_elem == "C30","M30")
                               .otherwise(df_dia.tipo_elem))
    #Unimos todos los datos por el día y por el tipo de carretera
    df_dia = df_dia.groupBy('dia','tipo_elem').agg(
        round(avg("intensidad"),2).alias("intensidad")
        ,round(avg("ocupacion"),2).alias("ocupacion")
        ,round(avg("carga"),2).alias("carga")
        ,round(avg("vmed"),2).alias("vmed")
        ,round(avg("periodo_integracion"),2).alias("periodo_integracion"))
    
    return df_dia

In [4]:
df_2018 = transformar('/data/trafico/DataTrafico/2018/01-2018.csv')
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/02-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/03-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/04-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/05-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/06-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/07-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/08-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/09-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/10-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/11-2018.csv'))
df_2018 = df_2018.union(transformar('/data/trafico/DataTrafico/2018/12-2018.csv'))
df_2018.sort(df_2018.dia.asc()).show()
df_2018.toPandas().to_csv('/home/22058951Mario/2018.csv', index=False)

                                                                                

+----------+---------+----------+---------+-----+-----+-------------------+
|       dia|tipo_elem|intensidad|ocupacion|carga| vmed|periodo_integracion|
+----------+---------+----------+---------+-----+-----+-------------------+
|2018-01-01|      M30|    924.09|     3.26|17.07| 67.9|              11.81|
|2018-01-01|      URB|    219.72|     3.87|12.59|  0.0|              13.79|
|2018-01-02|      URB|    315.65|     6.04|18.46|  0.0|              14.27|
|2018-01-02|      M30|   1274.94|     4.72|23.66| 64.6|              11.86|
|2018-01-03|      M30|   1336.29|     5.14|24.78|63.97|              11.83|
|2018-01-03|      URB|    334.75|     6.58|19.64|  0.0|              14.35|
|2018-01-04|      M30|   1370.41|     5.31|25.37|63.99|              11.87|
|2018-01-04|      URB|    346.72|     6.88|20.35|  0.0|              14.38|
|2018-01-05|      M30|   1305.48|     5.12|24.23|64.55|              11.83|
|2018-01-05|      URB|     338.9|     6.77|20.02|  0.0|              14.37|
|2018-01-06|

                                                                                

In [5]:
df_2019 = transformar('/data/trafico/DataTrafico/2019/01-2019.csv')
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/02-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/03-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/04-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/05-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/06-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/07-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/08-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/09-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/10-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/11-2019.csv'))
df_2019 = df_2019.union(transformar('/data/trafico/DataTrafico/2019/12-2019.csv'))
df_2019.sort(df_2019.dia.asc()).show()
df_2019.toPandas().to_csv('/home/22058951Mario/2019.csv', index=False)

                                                                                

+----------+---------+----------+---------+-----+-----+-------------------+
|       dia|tipo_elem|intensidad|ocupacion|carga| vmed|periodo_integracion|
+----------+---------+----------+---------+-----+-----+-------------------+
|2019-01-01|      M30|     880.8|     3.65|13.34|61.33|              10.26|
|2019-01-01|      URB|     228.4|     4.09|12.66|  0.0|              14.01|
|2019-01-02|      M30|   1189.77|     5.28|18.64|57.36|              10.11|
|2019-01-02|      URB|    332.38|     6.22|18.58|  0.0|              14.21|
|2019-01-03|      M30|   1236.98|     5.85|19.46|57.69|              10.19|
|2019-01-03|      URB|    352.67|     6.71|19.77|  0.0|              14.33|
|2019-01-04|      URB|    365.96|     6.95|20.52|  0.0|              14.29|
|2019-01-04|      M30|   1227.78|      6.0|19.99|57.71|              10.28|
|2019-01-05|      M30|     913.3|     3.91|15.06| 60.6|              10.36|
|2019-01-05|      URB|    281.06|     5.23|15.99|  0.0|              14.27|
|2019-01-06|

                                                                                

In [6]:
df_2020 = transformar('/data/trafico/DataTrafico/2020/01-2020.csv')
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/02-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/03-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/04-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/05-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/06-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/07-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/08-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/09-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/10-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/11-2020.csv'))
df_2020 = df_2020.union(transformar('/data/trafico/DataTrafico/2020/12-2020.csv'))
df_2020.sort(df_2020.dia.asc()).show()
df_2020.toPandas().to_csv('/home/22058951Mario/2020.csv', index=False)

                                                                                

+----------+---------+----------+---------+-----+-----+-------------------+
|       dia|tipo_elem|intensidad|ocupacion|carga| vmed|periodo_integracion|
+----------+---------+----------+---------+-----+-----+-------------------+
|2020-01-01|      URB|    223.56|     4.29|12.98|  0.0|              13.96|
|2020-01-01|      M30|    876.96|     3.49|13.76|64.18|              10.22|
|2020-01-02|      M30|   1199.85|     4.98|18.95|60.85|              10.02|
|2020-01-02|      URB|    322.54|     6.58|18.87|  0.0|               14.1|
|2020-01-03|      M30|   1228.77|     5.11|19.35|61.64|              10.22|
|2020-01-03|      URB|    343.11|     6.94|20.02|  0.0|              14.27|
|2020-01-04|      M30|    975.28|     3.89|15.71|63.54|              10.28|
|2020-01-04|      URB|    280.74|     5.54| 16.5|  0.0|              14.17|
|2020-01-05|      M30|    876.07|     3.49|14.03|64.33|              10.26|
|2020-01-05|      URB|     249.6|     4.97|14.84|  0.0|              14.12|
|2020-01-06|

                                                                                

In [7]:
df_2021 = transformar('/data/trafico/DataTrafico/2021/01-2021.csv')
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/02-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/03-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/04-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/05-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/06-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/07-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/08-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/09-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/10-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/11-2021.csv'))
df_2021 = df_2021.union(transformar('/data/trafico/DataTrafico/2021/12-2021.csv'))
df_2021.sort(df_2021.dia.asc()).show()
df_2021.toPandas().to_csv('/home/22058951Mario/2021.csv', index=False)

                                                                                

+----------+---------+----------+---------+-----+-----+-------------------+
|       dia|tipo_elem|intensidad|ocupacion|carga| vmed|periodo_integracion|
+----------+---------+----------+---------+-----+-----+-------------------+
|2021-01-01|      URB|    161.58|     2.76| 8.91|  0.0|               13.2|
|2021-01-01|      M30|    589.01|     2.13| 9.17| 58.3|               10.4|
|2021-01-02|      URB|    226.06|     3.93|12.51|  0.0|              13.62|
|2021-01-02|      M30|    765.65|     2.93|12.27|57.99|              10.39|
|2021-01-03|      URB|    205.03|     3.47|11.29|  0.0|              13.56|
|2021-01-03|      M30|    717.26|     2.72|11.33|58.16|              10.38|
|2021-01-04|      URB|     332.9|     6.05|18.37|  0.0|              14.34|
|2021-01-04|      M30|   1110.13|     4.63|17.46|58.16|              10.33|
|2021-01-05|      M30|   1093.66|     4.62|17.21|57.91|              10.29|
|2021-01-05|      URB|    320.13|      5.9|17.82|  0.0|              13.99|
|2021-01-06|

                                                                                

In [8]:
df_2022 = transformar('/data/trafico/DataTrafico/2022/01-2022.csv')
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/02-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/03-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/04-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/05-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/06-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/07-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/08-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/09-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/10-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/11-2022.csv'))
df_2022 = df_2022.union(transformar('/data/trafico/DataTrafico/2022/12-2022.csv'))
df_2022.sort(df_2022.dia.asc()).show()
df_2022.toPandas().to_csv('/home/22058951Mario/2022.csv', index=False)

                                                                                

+----------+---------+----------+---------+-----+-----+-------------------+
|       dia|tipo_elem|intensidad|ocupacion|carga| vmed|periodo_integracion|
+----------+---------+----------+---------+-----+-----+-------------------+
|2022-01-01|      URB|    180.68|     3.09|10.16|  0.0|              12.85|
|2022-01-01|      M30|     668.5|     2.52|10.48|63.29|              10.23|
|2022-01-02|      M30|    770.24|     2.84|12.12|61.49|              10.25|
|2022-01-02|      URB|    207.83|     3.62|11.67|  0.0|              13.63|
|2022-01-03|      URB|     297.2|     5.44|16.77|  0.0|              13.45|
|2022-01-03|      M30|   1112.08|     4.31| 17.4|61.08|              10.25|
|2022-01-04|      M30|   1134.03|     4.46| 17.7|60.02|              10.26|
|2022-01-04|      URB|    302.99|      5.8|17.29|  0.0|              13.92|
|2022-01-05|      URB|    298.57|     5.79|17.21|  0.0|              14.04|
|2022-01-05|      M30|   1104.56|     4.49|17.24|59.62|              10.22|
|2022-01-06|

                                                                                