<table border="1" width="99%">
  <tr>
    <td bgcolor="#48a259">
      <h1 style="color: #FFFFFF; text-align: center;">Datamart</h1>
    </td>
  </tr>
</table>

In [1]:
from pyspark.sql import functions as F

In [2]:
from pyspark.sql.functions import col, lit, when, row_number, lpad, concat, upper, udf, trim, regexp_replace, translate, ltrim
from pyspark.sql.window import Window

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SIAF_Ingresos") \
    .master("local[2]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()


In [4]:
ruta = "Ingresos/*.csv"

# Ruta a todos los CSV
df_mef = spark.read.csv(ruta, header=True, inferSchema=True)

<table width="99%">
  <tr>
    <td bgcolor="#FFBA39">
      <h2 style="color: #000000; text-align: left;">Modelamiento Dimensional</h2>
    </td>
  </tr>
</table>

### Dimension Ubigeo

In [None]:
from pyspark.sql.functions import col, lpad, row_number, when, isnull
from pyspark.sql.window import Window
import pandas as pd

# 1. Leer RENAMU
renamu_df = spark.read.csv(
    "parcial/Renamu/Base_RENAMU_2022_f.csv",
    header=True,
    inferSchema=True,
    sep=";"
)

# 2. Seleccionar columnas principales
dim_ubigeo = renamu_df.select(
    lpad(col("Ubigeo").cast("string"), 6, "0").alias("ubigeo"),
    col("Departamento").alias("departamento"),
    col("ccdd").alias("codigo_departamento"),    
    col("Provincia").alias("provincia"),
    col("ccpp").alias("codigo_provincia"),
    col("Distrito").alias("distrito"),
    col("ccdi").alias("codigo_distrito"),
    col("TIPOMUNI").cast("int").alias("tipomuni")
)

# 3. Leer regi√≥n natural desde Excel
excel_df = pd.read_excel("Dimensiones/UBIGEO 2022_1891 distritos.xlsx", dtype=str)
excel_df = excel_df.rename(columns={
    "IDDIST": "ubigeo",
    "REGION NATURAL": "region_natural"
})
excel_df["ubigeo"] = excel_df["ubigeo"].str.zfill(6)
region_df = spark.createDataFrame(excel_df[["ubigeo", "region_natural"]])

# 4. Leer clasificaci√≥n municipalidad A-G
clasif_df = spark.read.csv(
    "parcial/tipo_muni/clasificacion_municipalidades_abcg.csv",
    header=True,
    inferSchema=True,
    sep=","
).withColumn("ubigeo", lpad(col("ubigeo").cast("string"), 6, "0")) \
 .withColumnRenamed("tipo_municipalidad", "clasificacion_municipalidad")  # <- rename aqu√≠

# 5. Unir todo
dim_ubigeo = dim_ubigeo \
    .join(region_df, on="ubigeo", how="left") \
    .join(clasif_df, on="ubigeo", how="left")

# 6. Descripci√≥n de TIPOMUNI
dim_ubigeo = dim_ubigeo.withColumn(
    "tipo_muni_desc",
    when(col("tipomuni") == 1, "Municipalidad Provincial")
    .when(col("tipomuni") == 2, "Municipalidad Distrital")
    .when(col("tipomuni") == 3, "Municipalidad de Centro Poblado")
    .otherwise("No especificado")
)

# 7. ID autoincremental
window = Window.orderBy("ubigeo")
dim_ubigeo = dim_ubigeo.withColumn("id_ubigeo", row_number().over(window))

# 8. Verificaci√≥n de valores nulos / faltantes
print("Registros con valores faltantes:\n")

dim_ubigeo.filter(
    isnull("region_natural") | 
    isnull("tipomuni") | 
    isnull("clasificacion_municipalidad")
).select(
    "ubigeo", "departamento", "provincia", "distrito",
    "region_natural", "tipomuni", "clasificacion_municipalidad"
).show(truncate=False)

# (Opcional) Tambi√©n podr√≠as contar cu√°ntos son:
faltantes = dim_ubigeo.filter(
    isnull("region_natural") | 
    isnull("tipomuni") | 
    isnull("clasificacion_municipalidad")
).count()

print(f"\nTotal de registros con informaci√≥n faltante: {faltantes}")


In [None]:
# Convertir el DataFrame de Spark a Pandas
df_pd = dim_ubigeo.toPandas()

# Forzar la columna 'ubigeo' a string de 6 d√≠gitos
df_pd["ubigeo"] = df_pd["ubigeo"].astype(str).str.zfill(6)

# Guardar como CSV con el formato correcto
df_pd.to_csv("Dimensiones/dim_ubigeo.csv", index=False)


In [None]:
# Corregir el nombre del departamento
df_mef = df_mef.withColumn(
    "DEPARTAMENTO_EJECUTORA_NOMBRE",
    when(col("DEPARTAMENTO_EJECUTORA_NOMBRE") == "PROVINCIA CONSTITUCIONAL DEL CALLAO", "CALLAO")
    .otherwise(col("DEPARTAMENTO_EJECUTORA_NOMBRE"))
)

# Corregir el nombre de la provincia
df_mef = df_mef.withColumn(
    "PROVINCIA_EJECUTORA_NOMBRE",
    when(col("PROVINCIA_EJECUTORA_NOMBRE") == "PROV. CALLAO", "CALLAO")
    .otherwise(col("PROVINCIA_EJECUTORA_NOMBRE"))
)

### Dimension Sector

Entonces, cada entidad ejecutora pertenece a un sector,
y cada transacci√≥n de ingreso o gasto est√° asociada a ese sector.

Esto permite hacer an√°lisis del tipo:

¬øQu√© sectores del Estado generan m√°s ingresos?

¬øC√≥mo evolucionan los ingresos del sector Educaci√≥n o Salud en el tiempo?

¬øQu√© regiones aportan m√°s dentro de un sector espec√≠fico?

En el contexto del SIAF (Sistema Integrado de Administraci√≥n Financiera),
la Entidad Ejecutora es el organismo p√∫blico que administra y ejecuta el presupuesto:

Puede ser un ministerio, gobierno regional o municipalidad.

Cada entidad tiene un c√≥digo √∫nico SEC_EJEC (clave que tambi√©n aparece en SISMEPRE y RENAMU).

Por tanto, dim_entidad:

Permite agrupar los ingresos y gastos por tipo de instituci√≥n.

Servir√° de punto de uni√≥n entre los datasets de SIAF, RENAMU y SISMEPRE.

Ser√° una dimensi√≥n organizacional dentro del modelo copo de nieve.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import Row

# Normalizar sector vac√≠o a null (para que coincida con sector=None de dim_sector)
dim_sector = df_mef.select(
    col("SECTOR").alias("sector"),
    col("SECTOR_NOMBRE").alias("sector_nombre")
).where(
    (col("sector").isNotNull()) & (trim(col("sector")) != "") &
    (col("sector_nombre").isNotNull()) & (trim(col("sector_nombre")) != "")
).distinct()

# 2. Crear ID incremental para sectores v√°lidos (empezando desde 1)
window = Window.orderBy("sector")
dim_sector = dim_sector.withColumn("id_sector", row_number().over(window))

# 1. Esquema expl√≠cito
schema = StructType([
    StructField("sector", StringType(), True),
    StructField("sector_nombre", StringType(), True),
    StructField("id_sector", IntegerType(), False)
])

# 2. Crear fila dummy con ese esquema
# Usamos "999" como c√≥digo especial para sector desconocido
fila_dummy = spark.createDataFrame([
    ("999", "SIN SECTOR", 0)
], schema=schema)
# 3. Unir la fila dummy con la dimensi√≥n ya generada
dim_sector = fila_dummy.unionByName(dim_sector)

# 4. (Opcional) Ordenar por ID
dim_sector = dim_sector.orderBy("id_sector")

# 5. Guardar CSV
df_pd_sector = dim_sector.toPandas()
df_pd_sector.to_csv("Dimensiones/dim_sector.csv", index=False)

In [None]:
dim_sector.show()

### Dimension Fuente Financiamiento

Clasifica los recursos financieros seg√∫n origen

In [None]:
# Seleccionar columnas relevantes
dim_fuente_financiamiento = df_mef.select(
    col("FUENTE_FINANCIAMIENTO").alias("fuente_financiamiento"),
    col("FUENTE_FINANCIAMIENTO_NOMBRE").alias("fuente_financiamiento_nombre"),
    col("RUBRO").alias("rubro"),
    col("RUBRO_NOMBRE").alias("rubro_nombre"),
    col("TIPO_RECURSO").alias("tipo_recurso"),
    col("TIPO_RECURSO_NOMBRE").alias("tipo_recurso_nombre")
).distinct()

# A√±adir ID autoincremental
window = Window.orderBy("fuente_financiamiento")
dim_fuente_financiamiento = dim_fuente_financiamiento.withColumn("id_fuente", row_number().over(window))

df_pd_ff = dim_fuente_financiamiento.toPandas()
df_pd_ff.to_csv("Dimensiones/dim_fuente_financiamiento.csv", index=False)

### Dimension Clasificacion Ingreso

Permite an√°lisis por tipo de ingreso (venta, impuesto, donaci√≥n, etc.)

In [None]:
# Seleccionamos columnas de clasificaci√≥n
dim_clasificacion_ingreso = df_mef.select(
    col("GENERICA").alias("generica"),
    col("GENERICA_NOMBRE").alias("generica_nombre"),
    col("SUBGENERICA").alias("subgenerica"),
    col("SUBGENERICA_NOMBRE").alias("subgenerica_nombre"),
    col("SUBGENERICA_DET").alias("subgenerica_det"),
    col("SUBGENERICA_DET_NOMBRE").alias("subgenerica_det_nombre"),
    col("ESPECIFICA").alias("especifica"),
    col("ESPECIFICA_NOMBRE").alias("especifica_nombre"),
    col("ESPECIFICA_DET").alias("especifica_det"),
    col("ESPECIFICA_DET_NOMBRE").alias("especifica_det_nombre")
).distinct()

# A√±adimos ID autoincremental
window = Window.orderBy("generica", "subgenerica", "especifica")
dim_clasificacion_ingreso = dim_clasificacion_ingreso.withColumn(
    "id_clasificacion", row_number().over(window)
)

In [None]:
df_pd_clas_in = dim_clasificacion_ingreso.toPandas()
df_pd_clas_in.to_csv("Dimensiones/dim_clasificacion_ingreso.csv", index=False)

### Dimension Tiempo

In [None]:
# 1. Extraer a√±os y meses √∫nicos del dataframe original
dim_tiempo = df_mef.select("ANO_DOC", "MES_DOC").distinct()

# 2. Renombrar columnas para mayor claridad
dim_tiempo = dim_tiempo.withColumnRenamed("ANO_DOC", "anio") \
                       .withColumnRenamed("MES_DOC", "mes")

# 3. Crear columna mes_str con dos d√≠gitos (01, 02, ..., 12)
dim_tiempo = dim_tiempo.withColumn("mes_str", lpad(col("mes").cast("string"), 2, "0"))

# 4. Crear clave primaria a√±o-mes (formato YYYYMM como string)
dim_tiempo = dim_tiempo.withColumn("pk_anio_mes", concat(col("anio").cast("string"), col("mes_str")))

# 5. Crear columna incremental id_tiempo
window_spec = Window.orderBy("anio", "mes")
dim_tiempo = dim_tiempo.withColumn("id_tiempo", row_number().over(window_spec))

# 6. Crear columna trimestre
dim_tiempo = dim_tiempo.withColumn(
    "trimestre",
    when((col("mes") >= 1) & (col("mes") <= 3), lit(1))
    .when((col("mes") >= 4) & (col("mes") <= 6), lit(2))
    .when((col("mes") >= 7) & (col("mes") <= 9), lit(3))
    .otherwise(lit(4))
)

# 7. Crear columna semestre
dim_tiempo = dim_tiempo.withColumn(
    "semestre",
    when((col("mes") >= 1) & (col("mes") <= 6), lit(1)).otherwise(lit(2))
)

# 8. Agregar nombre del mes
dim_tiempo = dim_tiempo.withColumn(
    "nombre_mes",
    when(col("mes") == 1, lit("Enero"))
    .when(col("mes") == 2, lit("Febrero"))
    .when(col("mes") == 3, lit("Marzo"))
    .when(col("mes") == 4, lit("Abril"))
    .when(col("mes") == 5, lit("Mayo"))
    .when(col("mes") == 6, lit("Junio"))
    .when(col("mes") == 7, lit("Julio"))
    .when(col("mes") == 8, lit("Agosto"))
    .when(col("mes") == 9, lit("Septiembre"))
    .when(col("mes") == 10, lit("Octubre"))
    .when(col("mes") == 11, lit("Noviembre"))
    .otherwise(lit("Diciembre"))
)

# 9. Reordenar columnas: poner la PK primero
dim_tiempo = dim_tiempo.select(
    "pk_anio_mes", "anio", "mes", "nombre_mes", "trimestre", "semestre", "id_tiempo"
)

# 10. Mostrar resultado
dim_tiempo.show()

In [None]:
# Convertir a pandas
dim_tiempo_pd = dim_tiempo.toPandas()

# Guardar como CSV en la carpeta Dimensiones
dim_tiempo_pd.to_csv("Dimensiones/dim_tiempo.csv", index=False)


### Dimension Nivel de Gobierno

In [None]:
dim_nivel_gobierno = df_mef.select(
    col("NIVEL_GOBIERNO").alias("nivel_codigo"),
    col("NIVEL_GOBIERNO_NOMBRE").alias("nivel_nombre")
).distinct()

dim_nivel_gobierno = dim_nivel_gobierno.withColumn(
    "id_nivel", row_number().over(Window.orderBy("nivel_codigo"))
)

df_nivel_gobierno = dim_nivel_gobierno.toPandas()
df_nivel_gobierno.to_csv("Dimensiones/dim_nivel_gobierno.csv", index=False)

### Dimensi√≥n Entidad ejecutora

In [None]:
dim_entidad_ejecutora = df_mef.select(
    col("SEC_EJEC").alias("sec_ejec"),
    col("EJECUTORA_NOMBRE").alias("entidad_nombre"),
    col("PLIEGO").alias("pliego"),
    col("PLIEGO_NOMBRE").alias("pliego_nombre"),
    col("SECTOR").alias("sector"),
    col("SECTOR_NOMBRE").alias("sector_nombre"),
    col("NIVEL_GOBIERNO").alias("nivel_codigo"),  # FK temporal
    col("DEPARTAMENTO_EJECUTORA_NOMBRE").alias("departamento_entidad")
).distinct()

# üîó Unir con dim_nivel_gobierno para traer el id_nivel
dim_entidad_ejecutora = dim_entidad_ejecutora.join(
    dim_nivel_gobierno.select("nivel_codigo", "id_nivel"),
    on="nivel_codigo",
    how="left"
).drop("nivel_codigo")

# Agregar ID de entidad
window = Window.orderBy("sec_ejec")
dim_entidad_ejecutora = dim_entidad_ejecutora.withColumn("id_entidad", row_number().over(window))

df_pd_entidad = dim_entidad_ejecutora.toPandas()
df_pd_entidad.to_csv("Dimensiones/dim_entidad_ejecutora.csv", index=False)

### Hecho MEF

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, upper, when, regexp_replace

# -----------------------------
# 2Ô∏è‚É£ Limpieza de montos
# -----------------------------
for m in ["MONTO_PIA", "MONTO_PIM", "MONTO_RECAUDADO"]:
    df_mef = df_mef.withColumn(m, regexp_replace(col(m), ",", "").cast("double"))



In [6]:
# -----------------------------
# 3Ô∏è‚É£ Corregir inconsistencias de texto (para ubigeo)
# -----------------------------
df_mef = (
    df_mef
    .withColumn("DEPARTAMENTO_EJECUTORA_NOMBRE", upper(trim(col("DEPARTAMENTO_EJECUTORA_NOMBRE"))))
    .withColumn("PROVINCIA_EJECUTORA_NOMBRE", upper(trim(col("PROVINCIA_EJECUTORA_NOMBRE"))))
    .withColumn("DISTRITO_EJECUTORA_NOMBRE", upper(trim(col("DISTRITO_EJECUTORA_NOMBRE"))))
)

df_mef = (
    df_mef
    .withColumn("SECTOR", regexp_replace(trim(col("SECTOR")), "^0+", "").cast("int"))
    .withColumn("SEC_EJEC", trim(col("SEC_EJEC")).cast("int"))
    .withColumn("ANO_DOC", trim(col("ANO_DOC")).cast("int"))
    .withColumn("MES_DOC", trim(col("MES_DOC")).cast("int"))
)

# --------------------------------------------------------
# üîß Correcciones manuales conocidas (provincias y distritos)
# --------------------------------------------------------
df_mef = df_mef.withColumn(
    "PROVINCIA_EJECUTORA_NOMBRE",
    when(col("PROVINCIA_EJECUTORA_NOMBRE") == "MARANON", "MARA√ëON")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "NAZCA", "NASCA")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "CANETE", "CA√ëETE")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "FERRENAFE", "FERRE√ëAFE")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "RAMON CASTILLA", "MARISCAL RAMON CASTILLA")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "VILCASHUAMAN", "VILCAS HUAMAN")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "UCTUBAMBA", "UTCUBAMBA")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "ANTONIO RAIMONDI", "ANTONIO RAYMONDI")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "COTABAMBA", "COTABAMBAS")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "DANIEL A. CARRION", "DANIEL ALCIDES CARRION")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "CANARIS", "CA√ëARIS")
    .when(col("PROVINCIA_EJECUTORA_NOMBRE") == "QUISPICANCHIS", "QUISPICANCHI")
    .otherwise(col("PROVINCIA_EJECUTORA_NOMBRE"))
).withColumn(
    "DISTRITO_EJECUTORA_NOMBRE",
    when(col("DISTRITO_EJECUTORA_NOMBRE") == "QUIMBIRI", "KIMBIRI")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "ENCANADA", "ENCA√ëADA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "NAZCA", "NASCA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "PUNOS", "PU√ëOS")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "LEONOR ORDONEZ", "LEONOR ORDO√ëEZ")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "NAHUIMPUQUIO", "√ëAHUIMPUQUIO")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "CANARIS", "CA√ëARIS")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "UNON", "U√ëON")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "CHANCAYBANOS", "CHANCAYBA√ëOS")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "ZUNIGA", "ZU√ëIGA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "CHAVINA", "CHAVI√ëA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "VINAC", "VI√ëAC")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "SAN VICENTE DE CANETE", "SAN VICENTE DE CA√ëETE")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "CORONEL CASTANEDA", "CORONEL CASTA√ëEDA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "PARINAS", "PARI√ëAS")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "ICHUNA", "ICHU√ëA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "QUINOTA", "QUI√ëOTA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "HUAYA", "HUALLA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "MANAZO", "MA√ëAZO")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "CALLAIRA", "CALLERIA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "APARICIO POMARES - CHUPAN", "APARICIO POMARES")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "NUNOA", "NU√ëOA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "CUNUMBUQUI", "CU√ëUMBUQUI")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "STO. DOMINGO DE ACOBAMBA", "SANTO DOMINGO DE ACOBAMBA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "PAMPAS", "PAMPAS GRANDE")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "LA YARADA-LOS PALOS", "LA YARADA LOS PALOS")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "SANA", "SA√ëA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "HUACANA", "HUACA√ëA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "LARAOS", "SAN PEDRO DE LARAOS")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "AYAUCA", "ALLAUCA")    
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "BRENA", "BRE√ëA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "QUEQUENA", "QUEQUE√ëA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "JOSE MAR√É¬çA ARGUEDAS", "JOS√â MAR√çA ARGUEDAS")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "VEINTISEIS DE OCTUBRE", "VEINTIS√âIS DE OCTUBRE")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "OCANA", "OCA√ëA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "SAN JUAN DE CHACNA", "SAN JUAN DE CHAC√ëA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "MANAYTAY", "MANANTAY")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "KOSNIPATA", "KOS√ëIPATA")
    .when(col("DISTRITO_EJECUTORA_NOMBRE") == "INAPARI", "I√ëAPARI")
    .otherwise(col("DISTRITO_EJECUTORA_NOMBRE"))
)



<table width="99%">
  <tr>
    <td bgcolor="#FFBA39">
      <h2 style="color: #000000; text-align: left;">Cargar Dimensiones</h2>
    </td>
  </tr>
</table>

In [7]:
# -----------------------------
# 4Ô∏è‚É£ Cargar dimensiones
# -----------------------------
def cargar_dim(ruta, cast_int_cols=None):
    df = spark.read.csv(ruta, header=True)
    for c in df.columns:
        df = df.withColumn(c, trim(upper(col(c))))
    if cast_int_cols:
        for c in cast_int_cols:
            df = df.withColumn(c, col(c).cast("int"))
    return df
dim_tiempo = cargar_dim("Dimensiones/dim_tiempo.csv", ["anio", "mes", "id_tiempo"])
dim_entidad = cargar_dim("Dimensiones/dim_entidad_ejecutora.csv", ["sec_ejec", "id_entidad"])
dim_sector = cargar_dim("Dimensiones/dim_sector.csv", ["sector", "id_sector"])
dim_fuente = cargar_dim("Dimensiones/dim_fuente_financiamiento.csv", ["id_fuente"])
dim_clasificacion = cargar_dim("Dimensiones/dim_clasificacion_ingreso.csv", ["id_clasificacion"])
dim_ubigeo = cargar_dim("Dimensiones/dim_ubigeo.csv", ["id_ubigeo"])

In [8]:
print(df_mef.columns)

['ANO_DOC', 'MES_DOC', 'NIVEL_GOBIERNO', 'NIVEL_GOBIERNO_NOMBRE', 'SECTOR', 'SECTOR_NOMBRE', 'PLIEGO', 'PLIEGO_NOMBRE', 'SEC_EJEC', 'EJECUTORA', 'EJECUTORA_NOMBRE', 'DEPARTAMENTO_EJECUTORA', 'DEPARTAMENTO_EJECUTORA_NOMBRE', 'PROVINCIA_EJECUTORA', 'PROVINCIA_EJECUTORA_NOMBRE', 'DISTRITO_EJECUTORA', 'DISTRITO_EJECUTORA_NOMBRE', 'FUENTE_FINANCIAMIENTO', 'FUENTE_FINANCIAMIENTO_NOMBRE', 'RUBRO', 'RUBRO_NOMBRE', 'TIPO_RECURSO', 'TIPO_RECURSO_NOMBRE', 'GENERICA', 'GENERICA_NOMBRE', 'SUBGENERICA', 'SUBGENERICA_NOMBRE', 'SUBGENERICA_DET', 'SUBGENERICA_DET_NOMBRE', 'ESPECIFICA', 'ESPECIFICA_NOMBRE', 'ESPECIFICA_DET', 'ESPECIFICA_DET_NOMBRE', 'MONTO_PIA', 'MONTO_PIM', 'MONTO_RECAUDADO']


In [9]:
dim_sector.show()

+------+--------------------+---------+
|sector|       sector_nombre|id_sector|
+------+--------------------+---------+
|   999|          SIN SECTOR|        0|
|     1|PRESIDENCIA CONSE...|        1|
|     3|             CULTURA|        2|
|     4|      PODER JUDICIAL|        3|
|     5|           AMBIENTAL|        4|
|     6|            JUSTICIA|        5|
|     7|            INTERIOR|        6|
|     8|RELACIONES EXTERI...|        7|
|     9| ECONOMIA Y FINANZAS|        8|
|    10|           EDUCACION|        9|
|    11|               SALUD|       10|
|    12|TRABAJO Y PROMOCI...|       11|
|    13|  AGRARIO Y DE RIEGO|       12|
|    13|         AGRICULTURA|       13|
|    16|     ENERGIA Y MINAS|       14|
|    19| CONTRALORIA GENERAL|       15|
|    20|DEFENSORIA DEL PU...|       16|
|    21|JUNTA NACIONAL DE...|       17|
|    21|CONSEJO NACIONAL ...|       18|
|    22|  MINISTERIO PUBLICO|       19|
+------+--------------------+---------+
only showing top 20 rows



In [10]:
print(df_mef.columns.count("SECTOR_NOMBRE"))


1


In [11]:
# ============================================================
# üîó 6Ô∏è‚É£ Joins con claves limpias y compatibles
# ============================================================

# 1. Join con dim_tiempo
df_mef = df_mef.join(
    dim_tiempo,
    (df_mef["ANO_DOC"] == dim_tiempo["anio"]) & (df_mef["MES_DOC"] == dim_tiempo["mes"]),
    "left"
).drop(dim_tiempo["anio"], dim_tiempo["mes"])

# 2. Join con dim_entidad - IMPORTANTE: eliminar TODAS las columnas duplicadas
df_mef = df_mef.join(
    dim_entidad,
    df_mef["SEC_EJEC"] == dim_entidad["sec_ejec"],
    "left"
).drop(
    dim_entidad["sec_ejec"], 
    dim_entidad["sector"],  # ‚Üê Nombre correcto sin renombrar
    dim_entidad["sector_nombre"], 
    dim_entidad["pliego"], 
    dim_entidad["pliego_nombre"]
)

# 3. Join con dim_sector - Ahora s√≠ podemos usar SECTOR sin ambig√ºedad
df_mef = df_mef.join(
    dim_sector,
    df_mef["SECTOR"].cast("int") == dim_sector["sector"].cast("int"),
    "left"
).drop(dim_sector["sector"], dim_sector["sector_nombre"])

# 4. Join con dim_fuente
df_mef = df_mef.join(
    dim_fuente,
    (df_mef["FUENTE_FINANCIAMIENTO"] == dim_fuente["fuente_financiamiento"]) &
    (df_mef["RUBRO"] == dim_fuente["rubro"]) &
    (df_mef["TIPO_RECURSO"] == dim_fuente["tipo_recurso"]),
    "left"
).drop(dim_fuente["fuente_financiamiento"], dim_fuente["rubro"], dim_fuente["tipo_recurso"])

# 5. Join con dim_clasificacion
df_mef = df_mef.join(
    dim_clasificacion,
    (df_mef["GENERICA"] == dim_clasificacion["generica"]) &
    (df_mef["SUBGENERICA"] == dim_clasificacion["subgenerica"]) &
    (df_mef["ESPECIFICA"] == dim_clasificacion["especifica"]),
    "left"
).drop(dim_clasificacion["generica"], dim_clasificacion["subgenerica"], dim_clasificacion["especifica"])

# 6. Join con dim_ubigeo (match geogr√°fico)
df_mef = df_mef.join(
    dim_ubigeo,
    (upper(trim(df_mef["DEPARTAMENTO_EJECUTORA_NOMBRE"])) == upper(trim(dim_ubigeo["departamento"]))) &
    (upper(trim(df_mef["PROVINCIA_EJECUTORA_NOMBRE"])) == upper(trim(dim_ubigeo["provincia"]))) &
    (upper(trim(df_mef["DISTRITO_EJECUTORA_NOMBRE"])) == upper(trim(dim_ubigeo["distrito"]))),
    "left"
).drop(dim_ubigeo["departamento"], dim_ubigeo["provincia"], dim_ubigeo["distrito"])

In [12]:
dim_sector.printSchema()
dim_entidad.printSchema()
dim_sector.show(5)

root
 |-- sector: integer (nullable = true)
 |-- sector_nombre: string (nullable = true)
 |-- id_sector: integer (nullable = true)

root
 |-- sec_ejec: integer (nullable = true)
 |-- entidad_nombre: string (nullable = true)
 |-- pliego: string (nullable = true)
 |-- pliego_nombre: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- sector_nombre: string (nullable = true)
 |-- departamento_entidad: string (nullable = true)
 |-- id_nivel: string (nullable = true)
 |-- id_entidad: integer (nullable = true)

+------+--------------------+---------+
|sector|       sector_nombre|id_sector|
+------+--------------------+---------+
|   999|          SIN SECTOR|        0|
|     1|PRESIDENCIA CONSE...|        1|
|     3|             CULTURA|        2|
|     4|      PODER JUDICIAL|        3|
|     5|           AMBIENTAL|        4|
+------+--------------------+---------+
only showing top 5 rows



In [13]:
# -----------------------------
# 6Ô∏è‚É£ Seleccionar columnas finales
# -----------------------------
hecho_ingreso_mef = df_mef.select(
    col("id_tiempo").cast("int"),
    col("id_entidad").cast("int"),
    col("id_sector").cast("int"),
    col("id_fuente").cast("int"),
    col("id_clasificacion").cast("int"),
    col("id_ubigeo").cast("int"),
    col("MONTO_PIA").alias("monto_pia").cast("double"),
    col("MONTO_PIM").alias("monto_pim").cast("double"),
    col("MONTO_RECAUDADO").alias("monto_recaudado").cast("double")
)



In [14]:
hecho_ingreso_mef.printSchema()

root
 |-- id_tiempo: integer (nullable = true)
 |-- id_entidad: integer (nullable = true)
 |-- id_sector: integer (nullable = true)
 |-- id_fuente: integer (nullable = true)
 |-- id_clasificacion: integer (nullable = true)
 |-- id_ubigeo: integer (nullable = true)
 |-- monto_pia: double (nullable = true)
 |-- monto_pim: double (nullable = true)
 |-- monto_recaudado: double (nullable = true)



In [15]:
hecho_ingreso_mef = hecho_ingreso_mef.withColumn(
    "id_sector",
    F.when(F.col("id_sector").isNull(), 0).otherwise(F.col("id_sector"))
)

In [16]:
hecho_ingreso_mef = hecho_ingreso_mef.repartition(8)
hecho_ingreso_mef.count()  # fuerza evaluaci√≥n, libera el plan de joins


370100541

In [17]:
hecho_ingreso_mef.coalesce(4) \
    .write.mode("overwrite") \
    .parquet("Hechos/hecho_ingreso_mef_powerbi/")

print("‚úÖ Parquet generado correctamente: Hechos/hecho_ingreso_mef_powerbi/")

‚úÖ Parquet generado correctamente: Hechos/hecho_ingreso_mef_powerbi/


<table width="99%">
  <tr>
    <td bgcolor="#FFBA39">
      <h2 style="color: #000000; text-align: left;">Nueva tabla de Hechos</h2>
    </td>
  </tr>
</table>

### Dimension Periodo

In [None]:
df_predial_aplicacion = spark.read.csv(
    "parcial/Predial/csvs/rentas_ano_aplicacion.csv",
    header=True,
    inferSchema=True,
    sep=","  # <- IMPORTANTE
)

In [None]:
df_dim_periodo = df_predial_aplicacion.select("ANO_APLICACION", "PERIODO").distinct() \
    .withColumnRenamed("ANO_APLICACION", "anio") \
    .withColumnRenamed("PERIODO", "periodo")

# Crear una clave compuesta y un id incremental
df_dim_periodo = df_dim_periodo.withColumn(
    "pk_anio_periodo", F.concat_ws("_", F.col("anio").cast("string"), F.col("periodo").cast("string"))
)

window_spec = Window.orderBy("anio", "periodo")
df_dim_periodo = df_dim_periodo.withColumn("id_periodo", F.row_number().over(window_spec))

# Puedes agregar una descripci√≥n si lo deseas
df_dim_periodo = df_dim_periodo.withColumn(
    "desc_periodo", F.concat_ws(" ", F.lit("Periodo"), F.col("periodo"), F.lit("del a√±o"), F.col("anio"))
)

df_dim_periodo.show()


In [None]:
# Convertir a pandas
df_dim_periodo_pd = df_dim_periodo.toPandas()

# Guardar como CSV en la carpeta Dimensiones
df_dim_periodo_pd.to_csv("Dimensiones/dim_periodo.csv", index=False)


### Dimensi√≥n ‚ÄúEntidad Estado‚Äù

In [None]:
rentas_entidad_estado_df = spark.read.csv(
    "parcial/Predial/csvs/rentas_entidad_estado.csv",
    header=True,
    inferSchema=True,
    sep=","  # <- IMPORTANTE
)

In [None]:
dim_entidad_estado  = rentas_entidad_estado_df.select(
    col("SEC_EJEC").alias("sec_ejec"),
    col("ANO_APLICACION").alias("ano_aplicacion"),
    col("PERIODO").alias("periodo"),    
    col("ESTADO").alias("estado_meta"),
    col("CLASIFICACION").alias("clasificacion_municipalidad"),
    col("ORIGEN_INFORMACION").alias("origen_informacion"),
    col("IND_RESOL_ALCAL_ADJUNTO").alias("resolucion_alcaldia")    
).distinct()

# A√±adir ID autoincremental
window_spec = Window.orderBy("sec_ejec", "ano_aplicacion", "periodo")
dim_entidad_estado = dim_entidad_estado.withColumn("id_entidad_estado", F.row_number().over(window_spec))

dim_entidad_estado.show()


In [None]:
df_dim_entidad_estado = dim_entidad_estado.toPandas()
df_dim_entidad_estado.to_csv("Dimensiones/dim_entidad_estado.csv", index=False)

### Hecho Predial

In [5]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import trim, upper

# -----------------------------
# 1. Cargar dataset base
# -----------------------------
df_predial = spark.read.csv(
    "parcial/Predial/csvs/rentas_esat_estadistica_atm.csv",
    header=True,
    inferSchema=True,
    sep=","
)

df_predial = df_predial.withColumn(
    "UBIGEO",
    F.lpad(F.col("UBIGEO").cast("string"), 6, "0")
)

# üîπ Limpieza: eliminar comas y convertir a double
for colname in df_predial.columns:
    if "MON_" in colname or "NUM_" in colname:
        df_predial = df_predial.withColumn(
            colname,
            F.regexp_replace(F.col(colname), ",", "").cast("double")
        )

# üîπ Normalizar claves (trim y upper para evitar fallos en joins)
df_predial = df_predial.select(
    *[F.trim(F.upper(F.col(c))).alias(c) if c in ["UBIGEO"] else F.col(c) for c in df_predial.columns]
)


# -----------------------------
# 2. Cargar dimensiones
# -----------------------------
dim_ubigeo2 = spark.read.csv("Dimensiones/dim_ubigeo.csv", header=True)
dim_entidad2 = spark.read.csv("Dimensiones/dim_entidad_ejecutora.csv", header=True)
dim_periodo2 = spark.read.csv("Dimensiones/dim_periodo.csv", header=True)
dim_estado2 = spark.read.csv("Dimensiones/dim_entidad_estado.csv", header=True)


# --- üîπ Normalizaci√≥n de columnas clave ---
dim_ubigeo2 = dim_ubigeo2.withColumn(
    "ubigeo", F.lpad(F.trim(F.upper(F.col("ubigeo"))), 6, "0")
)
dim_entidad2 = dim_entidad2.withColumn("sec_ejec", F.col("sec_ejec").cast("int"))
dim_periodo2 = dim_periodo2.withColumn("anio", F.col("anio").cast("int"))
dim_estado2 = dim_estado2.withColumn("sec_ejec", F.col("sec_ejec").cast("int"))

In [6]:
dim_ubigeo2.printSchema()

root
 |-- ubigeo: string (nullable = true)
 |-- departamento: string (nullable = true)
 |-- codigo_departamento: string (nullable = true)
 |-- provincia: string (nullable = true)
 |-- codigo_provincia: string (nullable = true)
 |-- distrito: string (nullable = true)
 |-- codigo_distrito: string (nullable = true)
 |-- tipomuni: string (nullable = true)
 |-- region_natural: string (nullable = true)
 |-- clasificacion_municipalidad: string (nullable = true)
 |-- tipo_muni_desc: string (nullable = true)
 |-- id_ubigeo: string (nullable = true)



In [21]:
df_predial.printSchema()

root
 |-- SEC_EJEC: integer (nullable = true)
 |-- UBIGEO: string (nullable = true)
 |-- DEPARTAMENTO: integer (nullable = true)
 |-- DEPARTAMENTO_NOMBRE: string (nullable = true)
 |-- PROVINCIA: integer (nullable = true)
 |-- PROVINCIA_NOMBRE: string (nullable = true)
 |-- DISTRITO: integer (nullable = true)
 |-- DISTRITO_NOMBRE: string (nullable = true)
 |-- MUNICIPALIDAD_NOMBRE: string (nullable = true)
 |-- ANO_APLICACION: integer (nullable = true)
 |-- PERIODO: integer (nullable = true)
 |-- ANO_ESTADISTICA: integer (nullable = true)
 |-- MON_EMISIONPREDIAL_AFECTO: double (nullable = true)
 |-- MON_EMISIONPREDIAL_EXON: double (nullable = true)
 |-- NUM_EMISIONPREDIAL_AFECTO: double (nullable = true)
 |-- NUM_EMISIONPREDIAL_EXON: double (nullable = true)
 |-- NUM_EMISIONPREDIAL_CASA: double (nullable = true)
 |-- NUM_EMISIONPREDIAL_OTROS: double (nullable = true)
 |-- MON_BASEIMPONIBLE_AFECTO: double (nullable = true)
 |-- MON_BASEIMPONIBLE_EXON: double (nullable = true)
 |-- MON_A

In [9]:
# -----------------------------
# 3. Uniones (claves for√°neas)
# -----------------------------

# Aliases
df_predial_alias = df_predial.alias("predial")
dim_entidad_alias = dim_entidad2.alias("entidad")
dim_periodo_alias = dim_periodo2.alias("periodo")
dim_estado_alias = dim_estado2.alias("estado")
dim_ubigeo_alias = dim_ubigeo2.alias("ubigeo")
# joins
df_predial = (
    df_predial_alias
    .join(
        dim_entidad_alias,
        F.col("predial.SEC_EJEC") == F.col("entidad.sec_ejec"),
        "left"
    )
    .join(
        dim_periodo_alias,
        (F.col("predial.ANO_APLICACION") == F.col("periodo.anio")) &
        (F.col("predial.PERIODO") == F.col("periodo.periodo")),
        "left"
    )
    .join(
        dim_estado_alias,
        (F.col("predial.SEC_EJEC") == F.col("estado.sec_ejec")) &
        (F.col("predial.ANO_APLICACION") == F.col("estado.ano_aplicacion")) &
        (F.col("predial.PERIODO") == F.col("estado.periodo")),
        "left"
    )
    .join(
        dim_ubigeo_alias,
        F.col("predial.UBIGEO") == F.col("ubigeo.ubigeo"),
        "left"
    )
)



In [10]:

# -----------------------------
# 4. Seleccionar columnas del hecho
# -----------------------------
hecho_recaudacion_predial = df_predial.select(
    F.col("ubigeo.id_ubigeo").cast("int").alias("id_ubigeo"),
    F.col("entidad.id_entidad").cast("int").alias("id_entidad"),
    F.col("periodo.id_periodo").cast("int").alias("id_periodo"),
    F.col("estado.id_entidad_estado").cast("int").alias("id_entidad_estado"),
    F.col("MON_EMISIONPREDIAL_AFECTO").alias("monto_emision_afecto"),
    F.col("MON_EMISIONPREDIAL_EXON").alias("monto_emision_exonerado"),
    F.col("MON_RECAUDACTUAL_ORDIN").alias("monto_recaudacion_actual_ordinaria"),
    F.col("MON_RECAUDACTUAL_COAC").alias("monto_recaudacion_actual_coactiva"),
    F.col("MON_RECAUDANTER_ORDI").alias("monto_recaudacion_anterior_ordinaria"),
    F.col("MON_RECAUDANTER_COAC").alias("monto_recaudacion_anterior_coactiva"),
    F.col("MON_SALDOPREDIAL_ORD").alias("monto_saldo_ordinario"),
    F.col("MON_SALDOPREDIAL_COAC").alias("monto_saldo_coactivo"),
    F.col("NUM_CONTRIPREDIO").alias("num_contribuyentes_predial"),
    F.col("NUM_PREDIOTOTAL").alias("num_predios_totales")
)




<table width="99%">
  <tr>
    <td bgcolor="#FFBA39">
      <h2 style="color: #000000; text-align: left;">Resultados</h2>
    </td>
  </tr>
</table>

In [11]:
hecho_recaudacion_predial.printSchema()

root
 |-- id_ubigeo: integer (nullable = true)
 |-- id_entidad: integer (nullable = true)
 |-- id_periodo: integer (nullable = true)
 |-- id_entidad_estado: integer (nullable = true)
 |-- monto_emision_afecto: double (nullable = true)
 |-- monto_emision_exonerado: double (nullable = true)
 |-- monto_recaudacion_actual_ordinaria: double (nullable = true)
 |-- monto_recaudacion_actual_coactiva: double (nullable = true)
 |-- monto_recaudacion_anterior_ordinaria: double (nullable = true)
 |-- monto_recaudacion_anterior_coactiva: double (nullable = true)
 |-- monto_saldo_ordinario: double (nullable = true)
 |-- monto_saldo_coactivo: double (nullable = true)
 |-- num_contribuyentes_predial: double (nullable = true)
 |-- num_predios_totales: double (nullable = true)



In [12]:
no_match = hecho_recaudacion_predial.filter(F.col("id_ubigeo").isNull())
print("Registros sin ubigeo:", no_match.count())

Registros sin ubigeo: 156


In [15]:
df_predial.filter(F.col("predial.UBIGEO").startswith("0")).count()

38315

In [17]:
dim_ubigeo2.select("ubigeo").orderBy("ubigeo").show(10)

+------+
|ubigeo|
+------+
|010101|
|010102|
|010103|
|010104|
|010105|
|010106|
|010107|
|010108|
|010109|
|010110|
+------+
only showing top 10 rows



In [19]:
df_predial.select("predial.UBIGEO").orderBy("predial.UBIGEO").show(10)

+------+
|UBIGEO|
+------+
|010101|
|010101|
|010101|
|010101|
|010101|
|010101|
|010101|
|010101|
|010101|
|010101|
+------+
only showing top 10 rows



In [20]:
# Filtrar registros del hecho que no encontraron su ubigeo
no_match = df_predial.filter(F.col("ubigeo.id_ubigeo").isNull())
print("Registros sin ubigeo:", no_match.count())

Registros sin ubigeo: 156


In [16]:
# -----------------------------
# 5. Guardar parquet
# -----------------------------
(
    hecho_recaudacion_predial
    .coalesce(8)
    .write
    .mode("overwrite")
    .option("compression", "snappy")
    .parquet("Hechos/hecho_recaudacion_predial_powerbi/")
)

print("‚úÖ Parquet generado correctamente: Hechos/hecho_recaudacion_predial_powerbi/")

‚úÖ Parquet generado correctamente: Hechos/hecho_recaudacion_predial_powerbi/
