In [0]:
#Paquetes
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from functools import reduce

In [0]:
from functools import reduce

def load_table(file_location, 
               table_name, 
               rename_columns=None, 
               file_type="csv", 
               delimiter=";"):
  
  # The applied options are for CSV files. For other file types, these will be ignored.
  df = spark.read.format(file_type) \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", delimiter) \
    .load(file_location)
  df.columns
  if rename_columns is not None:
    df = reduce(lambda dfl, x: dfl.withColumnRenamed(x,rename_columns[x]),rename_columns, df)
    df.columns

  # Create a view or table
  df.createOrReplaceTempView(table_name)

  # Since this table is registered as a temp view, it will only be available to this notebook. If you'd like other users to be able to query  this table, you can also create a table from the DataFrame.
  # Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
  # To do so, choose your table name and uncomment the bottom line.

  df.write.format("parquet").saveAsTable(table_name)


In [0]:
#RAW DATA
df_movilidad = spark.read.format("parquet") \
.load("/mnt/IncendiosForestalesCAT/raw/mitma/movilidad/maestra1/municipios")
display(df_movilidad)

fecha,origen,destino,periodo,distancia,viajes,viajes_km,cod_mun_origen,cod_mun_destino,filename,year,month
20200724,01001_AM,01001_AM,0,002-005,29.629,111.63,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7
20200724,01001_AM,01001_AM,0,005-010,30.171,222.104,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7
20200724,01001_AM,01001_AM,0,010-050,20.205,214.927,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7
20200724,01001_AM,01001_AM,1,002-005,42.537,167.209,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7
20200724,01001_AM,01001_AM,1,005-010,35.103,253.545,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7
20200724,01001_AM,01001_AM,1,010-050,20.593,256.595,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7
20200724,01001_AM,01001_AM,2,002-005,20.866,61.994,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7
20200724,01001_AM,01001_AM,2,005-010,12.674,105.811,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7
20200724,01001_AM,01001_AM,3,002-005,21.649,55.126,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7
20200724,01001_AM,01001_AM,3,005-010,6.694,46.077,1001,1001,dbfs:/mnt/movilidadforest2020/movilidad/maestra1/municipios/20200724_maestra_1_mitma_municipio.txt.gz,2020,7


# Relación municipios MITMA - Tablas INE

In [0]:
# Relacion Municipio - Municipio mitma
file_location = "/mnt/IncendiosForestalesCAT/raw/mitma/rl_municipio_mitma/"
permanent_table_name = "rl_municipio_mitma"
dbutils.fs.rm(f"/user/hive/warehouse/{permanent_table_name}/", True)

load_table(file_location, permanent_table_name,delimiter="|")

# INE
### Municipios INE

In [0]:
# TABLA DEL INE CODIGO DE MUNCIPIOS - CODIGO MITMA
dbutils.fs.rm("/user/hive/warehouse/nacional_moviles/", True)
file_location = "/mnt/IncendiosForestalesCAT/raw/mitma/nacional_moviles/"
file_type = "csv"
table_name = "nacional_moviles"

load_table(file_location, table_name)

In [0]:
municipios_catalunya_INE = spark.sql("""
SELECT * FROM nacional_moviles WHERE NPRO in ('Barcelona', 'Tarragona', 'Girona', 'Lleida');
""")

In [0]:
display(municipios_catalunya_INE)

CUMUN,CPRO,NPRO,NMUN,AREA_GEO,ID_AREA_GEO,POB_AREA_GEO,COD_LITERAL_SCD,ID_GRUPO,POB_GRUPO,ID_COMPLETO_GRUPO,LITERAL_GRUPO,_c12,_c13,_c14,_c15
8038,8,Barcelona,Callús,MUNICIPIO,08038,2140,,001B,14529,08038 Callús,Sant Joan de Vilatorrada y otros municipios,,,,
8084,8,Barcelona,Fonollosa,MUNICIPIO,08084,1453,,001B,14529,08084 Fonollosa,Sant Joan de Vilatorrada y otros municipios,,,,
8218,8,Barcelona,Sant Joan de Vilatorrada,MUNICIPIO,08218,10936,,001B,14529,08218 Sant Joan de Vilatorrada,Sant Joan de Vilatorrada y otros municipios,,,,
8021,8,Barcelona,Bellprat,MUNICIPIO,08021,64,,002B,6522,08021 Bellprat,Òdena y otros municipios,,,,
8063,8,Barcelona,Castellolí,MUNICIPIO,08063,619,,002B,6522,08063 Castellolí,Òdena y otros municipios,,,,
8103,8,Barcelona,Jorba,MUNICIPIO,08103,835,,002B,6522,08103 Jorba,Òdena y otros municipios,,,,
8143,8,Barcelona,Òdena,MUNICIPIO,08143,3643,,002B,6522,08143 Òdena,Òdena y otros municipios,,,,
8226,8,Barcelona,Sant Martí de Tous,MUNICIPIO,08226,1235,,002B,6522,08226 Sant Martí de Tous,Òdena y otros municipios,,,,
8257,8,Barcelona,Santa Maria de Miralles,MUNICIPIO,08257,126,,002B,6522,08257 Santa Maria de Miralles,Òdena y otros municipios,,,,
8165,8,Barcelona,"Pobla de Claramunt, La",MUNICIPIO,08165,2193,,003B,5907,"08165 Pobla de Claramunt, La","Torre de Claramunt, La y Pobla de Claramunt, La",,,,


In [0]:
municipiosDF = spark.sql("""
select nm.CUMUN as cod_mun_ine, nm.ID_AREA_GEO as cod_area_ine, mm.municipio as cod_mun_mitma_movilidad ,nm.NMUN as nombre_municipio, nm.CPRO as cod_provincia, nm.NPRO as nombre_provincia, mm.municipio_mitma
FROM nacional_moviles as nm
JOIN rl_municipio_mitma as mm ON  nm.CUMUN = mm.municipio
WHERE nm.CUMUN == mm.municipio AND NPRO in ("Barcelona", "Lleida", "Tarragona", "Girona")""")

display(municipiosDF)

cod_mun_ine,cod_area_ine,cod_mun_mitma_movilidad,nombre_municipio,cod_provincia,nombre_provincia,municipio_mitma
8038,08038,8038,Callús,8,Barcelona,08218_AM
8084,08084,8084,Fonollosa,8,Barcelona,08218_AM
8218,08218,8218,Sant Joan de Vilatorrada,8,Barcelona,08218_AM
8021,08021,8021,Bellprat,8,Barcelona,08143_AM
8063,08063,8063,Castellolí,8,Barcelona,08143_AM
8103,08103,8103,Jorba,8,Barcelona,08143_AM
8143,08143,8143,Òdena,8,Barcelona,08143_AM
8226,08226,8226,Sant Martí de Tous,8,Barcelona,08143_AM
8257,08257,8257,Santa Maria de Miralles,8,Barcelona,08143_AM
8165,08165,8165,"Pobla de Claramunt, La",8,Barcelona,08286_AM


In [0]:
#Join de las tablas  df_movilidad  y municipiosDF
df_movilidad =df_movilidad.join(municipiosDF,df_movilidad.destino ==  municipiosDF.municipio_mitma,"inner")
df_movilidad.count()

In [0]:
from pyspark.sql import Window
windowSpecAgg  = Window.partitionBy("fecha", "destino")

df_movilidad_agg = df_movilidad.withColumn("total_viajes", F.sum(F.col("viajes")).over(windowSpecAgg)) \
.withColumn("total_viajes_km", F.sum(F.col("viajes_km")).over(windowSpecAgg))
df_movilidad_agg_unique = df_movilidad_agg.dropDuplicates(["fecha", "destino"])
df_movilidad_agg_unique.persist()

#Change format
df_movilidad_agg_unique= df_movilidad_agg_unique\
.withColumn("fecha", F.to_date(F.col("fecha").cast("string"),'yyyy-MM-dd'))\
.withColumn('fecha', F.col("fecha").cast("string"))\
.withColumn("cod_mun_destino", F.col("cod_mun_destino").cast("integer"))


#Write to S3
columns = ['fecha', 'year', 'month' ,'destino', 'cod_mun_destino', 'cod_mun_ine', 'nombre_municipio', 'cod_provincia', 'nombre_provincia', 'total_viajes', 'total_viajes_km']
df_movilidad_agg_unique.select(columns).write.mode("overwrite").partitionBy("year","month").parquet(f"/mnt/IncendiosForestalesCAT/prep/mitma/movilidad/")

In [0]:
display(df_movilidad_agg_unique)

fecha,destino,cod_mun_destino,cod_mun_ine,nombre_municipio,cod_provincia,nombre_provincia,total_viajes,total_viajes_km,year,month
2020-12-01,08031_AM,8031,8002,Aguilar de Segarra,8,Barcelona,361708.9949999988,6821802.045000114,2020,12
2020-12-01,08155_AM,8155,8155,Palafolls,8,Barcelona,105027.38999999952,907172.568,2020,12
2020-12-01,08235,8235,8235,Sant Pol de Mar,8,Barcelona,9594.482999999986,94310.10000000003,2020,12
2020-12-01,08238,8238,8238,Sant Quirze del Vallès,8,Barcelona,62554.402999999984,391881.7050000002,2020,12
2020-12-01,17007_AM,17007,17007,Amer,17,Girona,167169.63199999902,2311569.959999984,2020,12
2020-12-01,25008_AM,25008,25008,Albesa,25,Lleida,74376.75000000017,1025929.1640000008,2020,12
2020-12-02,08031_AM,8031,8002,Aguilar de Segarra,8,Barcelona,349045.09500001016,7002079.485000094,2020,12
2020-12-02,08155_AM,8155,8155,Palafolls,8,Barcelona,102280.32599999986,886457.6339999989,2020,12
2020-12-02,08235,8235,8235,Sant Pol de Mar,8,Barcelona,9774.138999999994,90927.37700000002,2020,12
2020-12-02,08238,8238,8238,Sant Quirze del Vallès,8,Barcelona,63039.94699999997,386234.3530000004,2020,12
