In [0]:
# Punto 1

# La lectura es la más rápida posible porque definimos schemas explícitos (evitando inferSchema y su escaneo previo), aprovechamos el particionamiento físico por anno_firma=YYYY de SECOP con basePath + recursiveFileLookup (permitiendo partition pruning cuando se filtra por año), y consumimos los formatos comprimidos nativos (.json.gz/.csv.gz) que Spark procesa en paralelo y distribuido directamente desde wasbs://, sin pasos intermedios ni cache innecesario. Esto reduce I/O, CPU de inferencia y tiempos totales de carga.


from pyspark.sql import functions as F, types as T

# Rutas
SECOP_PATH = "wasbs://sid@uniandesyjt.blob.core.windows.net/secop"
BPIN_PATH  = "wasbs://sid@uniandesyjt.blob.core.windows.net/bpin/bpin.csv.gz"

# -------------------------
# SCHEMA COMPLETO: SECOP
# -------------------------
secop_schema = T.StructType([
    T.StructField("anno_bpin", T.StringType(), True),
    T.StructField("anno_firma", T.LongType(), True),
    T.StructField("ciudad", T.StringType(), True),
    T.StructField("código_bpin", T.StringType(), True),
    T.StructField("departamento", T.StringType(), True),
    T.StructField("documento_proveedor", T.StringType(), True),
    T.StructField("duración_del_contrato", T.StringType(), True),
    T.StructField("entidad_centralizada", T.StringType(), True),
    T.StructField("estado_contrato", T.StringType(), True),
    T.StructField("fecha_de_fin_del_contrato", T.StringType(), True),
    T.StructField("fecha_de_firma", T.StringType(), True),
    T.StructField("fecha_de_inicio_del_contrato", T.StringType(), True),
    T.StructField("modalidad_de_contratacion", T.StringType(), True),
    T.StructField("objeto_del_contrato", T.StringType(), True),
    T.StructField("orden", T.StringType(), True),
    T.StructField("origen_de_los_recursos", T.StringType(), True),
    T.StructField("proveedor_adjudicado", T.StringType(), True),
    T.StructField("rama", T.StringType(), True),
    T.StructField("sector", T.StringType(), True),
    T.StructField("tipo_de_contrato", T.StringType(), True),
    T.StructField("tipodocproveedor", T.StringType(), True),
    T.StructField("ultima_actualizacion", T.StringType(), True),
    T.StructField("urlproceso", T.StringType(), True),
    T.StructField("valor_del_contrato", T.LongType(), True),
    T.StructField("valor_pagado", T.StringType(), True),
])

# -------------------------
# SCHEMA COMPLETO: BPIN
# -------------------------
bpin_schema = T.StructType([
    T.StructField("Bpin", T.StringType(), True),
    T.StructField("NombreProyecto", T.StringType(), True),
    T.StructField("ObjetivoGeneral", T.StringType(), True),
    T.StructField("EstadoProyecto", T.StringType(), True),
    T.StructField("Horizonte", T.StringType(), True),
    T.StructField("Sector", T.StringType(), True),
    T.StructField("EntidadResponsable", T.StringType(), True),
    T.StructField("ProgramaPresupuestal", T.StringType(), True),
    T.StructField("TipoProyecto", T.StringType(), True),
    T.StructField("PlanDesarrolloNacional", T.StringType(), True),
    T.StructField("ValorTotalProyecto", T.StringType(), True),
    T.StructField("ValorVigenteProyecto", T.StringType(), True),
    T.StructField("ValorObligacionProyecto", T.StringType(), True),
    T.StructField("ValorPagoProyecto", T.StringType(), True),
    T.StructField("SubEstadoProyecto", T.StringType(), True),
    T.StructField("CodigoEntidadResponsable", T.StringType(), True),
    T.StructField("TotalBeneficiario", T.StringType(), True),
])

# --- Lectura SECOP (JSON.gz particionado por anno_firma=YYYY) ---
secop_df = (spark.read
    .schema(secop_schema)
    .option("basePath", SECOP_PATH)            # respeta particiones
    .option("recursiveFileLookup", "true")     # recorre subcarpetas anno_firma=YYYY
    .json(SECOP_PATH)
)

# --- Lectura BPIN (CSV.gz con header) ---
bpin_df = (spark.read
    .schema(bpin_schema)
    .option("header", True)
    .csv(BPIN_PATH)
)

# Quick checks
display(secop_df.limit(10))
display(bpin_df.limit(10))


In [0]:
# Punto 2

from pyspark.sql import functions as F

# Calcular el valor total de contratos por proveedor en 2024
top_proveedores_2024 = (
    secop_df
    .filter(F.col("anno_firma") == 2024)  # usamos partición por año
    .groupBy("proveedor_adjudicado")
    .agg(F.sum("valor_del_contrato").alias("valor_total_contratos"))
    .orderBy(F.desc("valor_total_contratos"))
    .limit(10)
)

display(top_proveedores_2024)


In [0]:
# Punto 3

from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast

# Normalización numérica robusta (valor_pagado viene como string)
secop_num = (
    secop_df
    .withColumn("valor_contrato_num", F.col("valor_del_contrato").cast("double"))
    .withColumn(
        "valor_pagado_num",
        F.regexp_replace(F.col("valor_pagado"), r"[^0-9\-.,]", "")  # elimina caracteres no numéricos
    )
    .withColumn(
        "valor_pagado_num",
        F.regexp_replace(F.col("valor_pagado_num"), ",", "")        # elimina comas de miles
    )
    .withColumn(
        "valor_pagado_num",
        F.when(F.col("valor_pagado_num") == "", None)
         .otherwise(F.col("valor_pagado_num").cast("double"))
    )
    .withColumn("bpin", F.upper(F.trim(F.col("código_bpin"))))
)

bpin_norm = bpin_df.withColumn("bpin", F.upper(F.trim(F.col("Bpin"))))

top_proyectos_sin_pagar = (
    secop_num
    .groupBy("bpin")
    .agg(
        F.sum(F.col("valor_contrato_num") - F.coalesce(F.col("valor_pagado_num"), F.lit(0.0)))
         .alias("valor_sin_pagar")
    )
    # Aquí se aplica broadcast al lado pequeño (BPIN)
    .join(broadcast(bpin_norm.select("bpin", F.col("NombreProyecto").alias("nombre_proyecto"))),
          on="bpin", how="left")
    .orderBy(F.desc("valor_sin_pagar"))
    .limit(10)
    .select("bpin", "nombre_proyecto", "valor_sin_pagar")
)
display(top_proyectos_sin_pagar)

In [0]:
# Punto 4

from pyspark.sql import Window, functions as F

agregado = (
    secop_df
    .groupBy("anno_firma", "proveedor_adjudicado")
    .agg(F.sum("valor_del_contrato").alias("valor_total"))
)

w = Window.partitionBy("anno_firma").orderBy(F.desc("valor_total"))

top5_por_ano = (
    agregado
    .withColumn("posicion", F.row_number().over(w))
    .filter(F.col("posicion") <= 5)
    .orderBy("anno_firma", "posicion")
)

display(top5_por_ano)


In [0]:
# Punto 5

from pyspark.sql import functions as F

prov_2024 = (
    secop_df
    .filter(F.col("anno_firma") == 2024)
    .select(F.upper(F.trim(F.col("proveedor_adjudicado"))).alias("proveedor"))
    .distinct()
)

prov_2020 = (
    secop_df
    .filter(F.col("anno_firma") == 2020)
    .select(F.upper(F.trim(F.col("proveedor_adjudicado"))).alias("proveedor"))
    .distinct()
)

solo_2024 = prov_2024.join(prov_2020, on="proveedor", how="left_anti")

# Conteo solicitado
resultado_p5 = solo_2024.agg(F.count("*").alias("proveedores_solo_2024"))
display(resultado_p5)


In [0]:
# Punto 6

from pyspark.sql import functions as F

# Filtramos SOLO lo necesario (column pruning) y usamos un filtro robusto por tipo de contrato.
# Nota: valor_del_contrato ya es LongType según el schema cargado.
servicios = (
    secop_df
    .select("tipo_de_contrato", "valor_del_contrato")
    .filter(F.lower(F.col("tipo_de_contrato")).like("%prestación de servicios%"))
)

# Promedio (exacto) y mediana aproximada (percentile_approx) para minimizar tiempo
resultado_p6 = servicios.agg(
    F.avg("valor_del_contrato").alias("promedio_valor_contrato"),
    F.expr("percentile_approx(valor_del_contrato, 0.5)").alias("mediana_aproximada")
)

display(resultado_p6)


In [0]:
# Punto 7

from pyspark.sql import functions as F

# Contamos proveedores distintos por año usando approx_count_distinct (rápido, tolera un pequeño error)
prov_por_ano = (
    secop_df
    .select("anno_firma", "proveedor_adjudicado")
    .groupBy("anno_firma")
    .agg(F.approx_count_distinct("proveedor_adjudicado", rsd=0.01).alias("proveedores_unicos_aprox"))
    .orderBy(F.desc("proveedores_unicos_aprox"))
)

# Top 1 (año con más proveedores distintos)
top_ano = prov_por_ano.limit(1)

display(prov_por_ano)
display(top_ano)


In [0]:
# Punto 8 - 20 palabras más comunes en el 20% superior de 2020

from pyspark.sql import functions as F

# 1) Umbral del 20% superior (percentil 80) para 2020 — rápido con percentile_approx
p80_2020 = (
    secop_df
    .filter(F.col("anno_firma") == 2020)
    .agg(F.expr("percentile_approx(valor_del_contrato, 0.8)").alias("p80"))
    .collect()[0]["p80"]
)

# 2) Filtrar solo el 20% superior por valor en 2020 y quedarnos con el objeto contractual
top20_df = (
    secop_df
    .filter( (F.col("anno_firma") == 2020) & (F.col("valor_del_contrato") >= F.lit(p80_2020)) )
    .select(F.coalesce(F.col("objeto_del_contrato"), F.lit("")).alias("objeto"))
)

# 3) Normalizar texto: a minúsculas, quitar puntuación/números y tokenizar
tokens_df = (
    top20_df
    .select(
        F.split(
            F.regexp_replace(F.lower(F.col("objeto")), r"[^a-záéíóúñü]+", " "),
            r"\s+"
        ).alias("words")
    )
)

# 4) Remover stopwords (lista básica en español) sin usar StopWordsRemover.loadDefaultStopWords
stopwords_es = [
    "a","acá","ahí","al","algo","algún","alguna","algunas","alguno","algunos","allá","allí","ante","antes",
    "aquel","aquella","aquellas","aquello","aquellos","aquí","así","aun","aún","bajo","bien","cada","casi",
    "como","con","contra","cual","cuales","cualquier","cualquiera","cuyos","de","del","desde","donde","dos",
    "el","él","ella","ellas","ello","ellos","en","entre","era","erais","eran","eras","eres","es","esa","esas",
    "ese","eso","esos","esta","estaba","estaban","estado","estados","estamos","estar","estará","estas","este",
    "esto","estos","estoy","fin","fue","fueron","ha","hace","hacen","hacer","hacia","han","hasta","hay","la",
    "las","le","les","lo","los","más","me","mi","mis","mismo","mismos","muy","nada","ni","no","nos","nosotros",
    "nuestra","nuestras","nuestro","nuestros","o","otra","otras","otro","otros","para","pero","poco","por",
    "porque","que","qué","quien","quién","quienes","quienesquiera","se","sea","sean","ser","si","sí","siempre",
    "sin","sobre","son","su","sus","tal","también","tan","tanto","te","tendrá","ti","tiene","tienen","toda",
    "todas","todavía","todo","todos","tras","tu","tus","un","una","unas","uno","unos","usted","ustedes","ya"
]

# Creamos una columna con el array de stopwords y filtramos con higher-order functions (sin ML)
filtered_df = (
    tokens_df
    .withColumn("stop_es", F.array([F.lit(w) for w in stopwords_es]))
    .select(F.expr("filter(words, w -> NOT array_contains(stop_es, w))").alias("filtered"))
)

# 5) Contar frecuencia y mostrar TOP 20
top_words = (
    filtered_df
    .select(F.explode("filtered").alias("palabra"))
    .filter(F.length("palabra") > 1)
    .groupBy("palabra")
    .count()
    .orderBy(F.desc("count"))
    .limit(20)
)

display(top_words)
