In [0]:
dbutils.fs.ls("/mnt/storage/load/dataset-covid")

[FileInfo(path='dbfs:/mnt/storage/load/dataset-covid/TB_UBIGEOS.csv', name='TB_UBIGEOS.csv', size=239590, modificationTime=1705078365000),
 FileInfo(path='dbfs:/mnt/storage/load/dataset-covid/fallecidos_covid.csv', name='fallecidos_covid.csv', size=23226350, modificationTime=1705011323000),
 FileInfo(path='dbfs:/mnt/storage/load/dataset-covid/positivos_covid.csv', name='positivos_covid.csv', size=170890665, modificationTime=1705009612000),
 FileInfo(path='dbfs:/mnt/storage/load/dataset-covid/vacunas_covid.csv', name='vacunas_covid.csv', size=10797553441, modificationTime=1705010717000)]

CREACIÓN DEL SCHEMA

In [0]:
from pyspark.sql.functions import col, expr, regexp_replace, monotonically_increasing_id, lit, to_date
from pyspark.sql.types import StructType, StructField, DateType, IntegerType, StringType, TimestampType, DoubleType

In [0]:
schema_ubigeo = StructType(
    [
        StructField("id_ubigeo", IntegerType(), True),
        StructField("ubigeo_reniec", StringType(), True),
        StructField("ubigeo_inei", StringType(), True),
        StructField("departamento_inei", StringType(), True),
        StructField("departamento", StringType(), True),
        StructField("provincia_inei", StringType(), True),
        StructField("provincia", StringType(), True),
        StructField("distrito", StringType(), True),
        StructField("region", StringType(), True),
        StructField("macroregion_inei", StringType(), True),
        StructField("macroregion_minsa", StringType(), True),
        StructField("iso_3166_2", StringType(), True),
        StructField("fips", IntegerType(), True),
        StructField("superficie", IntegerType(), True),
        StructField("altitud", IntegerType(), True),
        StructField("latitud", DoubleType(), True),
        StructField("longitud", DoubleType(), True),
        StructField("Frontera", StringType(), True),
    ]
)

In [0]:
path = "/mnt/storage/load/dataset-covid/TB_UBIGEOS.csv"
df_ubigeo = spark.read.csv(path, header = True, schema = schema_ubigeo, sep = ",")

display(df_ubigeo)

id_ubigeo,ubigeo_reniec,ubigeo_inei,departamento_inei,departamento,provincia_inei,provincia,distrito,region,macroregion_inei,macroregion_minsa,iso_3166_2,fips,superficie,altitud,latitud,longitud,Frontera
1,10101.0,10101,1,AMAZONAS,101,CHACHAPOYAS,CHACHAPOYAS,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,154.0,2338.0,-6.2294,-77.8728,NO
2,10102.0,10102,1,AMAZONAS,101,CHACHAPOYAS,ASUNCION,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,26.0,2823.0,-6.0325,-77.7108,NO
3,10103.0,10103,1,AMAZONAS,101,CHACHAPOYAS,BALSAS,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,357.0,859.0,-6.8358,-78.0197,NO
4,10104.0,10104,1,AMAZONAS,101,CHACHAPOYAS,CHETO,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,57.0,2143.0,-6.2556,-77.7008,NO
5,10105.0,10105,1,AMAZONAS,101,CHACHAPOYAS,CHILIQUIN,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,143.0,2677.0,-6.0783,-77.7375,NO
6,10106.0,10106,1,AMAZONAS,101,CHACHAPOYAS,CHUQUIBAMBA,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,279.0,2803.0,-6.935,-77.8542,NO
7,10107.0,10107,1,AMAZONAS,101,CHACHAPOYAS,GRANADA,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,181.0,3041.0,-6.1064,-77.6286,NO
8,10108.0,10108,1,AMAZONAS,101,CHACHAPOYAS,HUANCAS,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,49.0,2591.0,-6.1736,-77.8644,NO
9,10109.0,10109,1,AMAZONAS,101,CHACHAPOYAS,LA JALCA,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,380.0,2869.0,-6.4847,-77.815,NO
10,10110.0,10110,1,AMAZONAS,101,CHACHAPOYAS,LEIMEBAMBA,AMAZONAS,ORIENTE,MACROREGION ORIENTE,PE-AMA,1,373.0,2226.0,-6.7075,-77.8039,NO


In [0]:
df_ubigeo.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergerSchema", "true") \
    .save("/mnt/storage/layers/bronze/ubigeoBronze")

CAPA GOLD

In [0]:
df_ubigeoGold = df_ubigeo.select(
    col("ubigeo_inei").alias("COD_UBIGEO"),
    col("departamento_inei").alias("COD_DEPARTAMENTO"),
    col("departamento").alias("NOM_DEPARTAMENTO"),
    col("provincia_inei").alias("COD_PROVINCIA"),
    col("provincia").alias("NOM_PROVINCIA"),
    col("distrito").alias("NOM_DISTRITO"),
    col("region").alias("REGION")
)

display(df_ubigeoGold)

COD_UBIGEO,COD_DEPARTAMENTO,NOM_DEPARTAMENTO,COD_PROVINCIA,NOM_PROVINCIA,NOM_DISTRITO,REGION
10101,1,AMAZONAS,101,CHACHAPOYAS,CHACHAPOYAS,AMAZONAS
10102,1,AMAZONAS,101,CHACHAPOYAS,ASUNCION,AMAZONAS
10103,1,AMAZONAS,101,CHACHAPOYAS,BALSAS,AMAZONAS
10104,1,AMAZONAS,101,CHACHAPOYAS,CHETO,AMAZONAS
10105,1,AMAZONAS,101,CHACHAPOYAS,CHILIQUIN,AMAZONAS
10106,1,AMAZONAS,101,CHACHAPOYAS,CHUQUIBAMBA,AMAZONAS
10107,1,AMAZONAS,101,CHACHAPOYAS,GRANADA,AMAZONAS
10108,1,AMAZONAS,101,CHACHAPOYAS,HUANCAS,AMAZONAS
10109,1,AMAZONAS,101,CHACHAPOYAS,LA JALCA,AMAZONAS
10110,1,AMAZONAS,101,CHACHAPOYAS,LEIMEBAMBA,AMAZONAS


In [0]:
df_ubigeoGold.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergerSchema", "true") \
    .save("/mnt/storage/layers/gold/ubigeoGold")