# Proyecto - Parte 1 
## SID 2025-20

| Estudiante | Codigo | Maestria |
| :------- | :------: | -------: |
| David | 123 | MATI |
| Jhony | 123 | MATI |
| Nicolas Medina | 123 | MATI |

a continuacion se detalla la solución planteada para la primera parte del proyecto

## Importar librerias/Funciones

In [0]:
from pyspark.sql import functions as f
import pyspark.sql as sql
from pyspark.sql.functions import col, regexp_replace ,desc
from pyspark.sql.types import LongType

## Lectura de datos

In [0]:
df_secop = spark.read.format("json") \
    .load("wasbs://sid@uniandesyjt.blob.core.windows.net/secop/")

df_secop.printSchema()

In [0]:
display(df_secop.head(5))

In [0]:
df_bpin = spark.read \
    .option("header", "true") \
    .format("csv")\
    .load("wasbs://sid@uniandesyjt.blob.core.windows.net/bpin/")

df_bpin.printSchema()

In [0]:
display(df_bpin.head(10))

# Punto 1:
Lea los dos conjuntos de datos. Escriba el código para que la lectura sea lo más rápida
posible y justifique su respuesta. (9%)

## Sampling
En las siguientes celdas hacemos la lectura con sampling ratio buscando reducir el tiempo de lectura del schema pero de todas formas aunque es inferior no es el mas optimo

In [0]:
df_secop = spark.read.format("json") \
    .option("samplingRatio", 0.1) \
    .load("wasbs://sid@uniandesyjt.blob.core.windows.net/secop/")

In [0]:
df_bpin = spark.read.format("csv") \
    .option("header", "true")\
    .option("samplingRatio", 0.1) \
    .load("wasbs://sid@uniandesyjt.blob.core.windows.net/bpin/")

## Insertando Schema
Si ya conocemos la estructura de los datos podemos colocar el schema para evitar abrir los archivos para conocer el schema de los datos

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType

schema = StructType([
    StructField("anno_bpin", StringType(), True),
    StructField("anno_firma", IntegerType(), True),
    StructField("ciudad", StringType(), True),
    StructField("código_bpin", StringType(), True),
    StructField("departamento", StringType(), True),
    StructField("documento_proveedor", StringType(), True),
    StructField("duración_del_contrato", StringType(), True),
    StructField("entidad_centralizada", StringType(), True),
    StructField("estado_contrato", StringType(), True),
    StructField("fecha_de_fin_del_contrato", StringType(), True),
    StructField("fecha_de_firma", StringType(), True),
    StructField("fecha_de_inicio_del_contrato", StringType(), True),
    StructField("modalidad_de_contratacion", StringType(), True),
    StructField("objeto_del_contrato", StringType(), True),
    StructField("orden", StringType(), True),
    StructField("origen_de_los_recursos", StringType(), True),
    StructField("proveedor_adjudicado", StringType(), True),
    StructField("rama", StringType(), True),
    StructField("sector", StringType(), True),
    StructField("tipo_de_contrato", StringType(), True),
    StructField("tipodocproveedor", StringType(), True),
    StructField("ultima_actualizacion", StringType(), True),
    StructField("urlproceso", StringType(), True),
    StructField("valor_del_contrato", LongType(), True),
    StructField("valor_pagado", StringType(), True),
])

df_secop = spark.read.format("json") \
    .schema(schema) \
    .load("wasbs://sid@uniandesyjt.blob.core.windows.net/secop/")


In [0]:
schema = StructType([
    StructField("Bpin", StringType(), True),
    StructField("NombreProyecto", StringType(), True),
    StructField("ObjetivoGeneral", StringType(), True),
    StructField("EstadoProyecto", StringType(), True),
    StructField("Horizonte", StringType(), True),
    StructField("Sector", StringType(), True),
    StructField("EntidadResponsable", StringType(), True),
    StructField("ProgramaPresupuestal", StringType(), True),
    StructField("TipoProyecto", StringType(), True),
    StructField("PlanDesarrolloNacional", StringType(), True),
    StructField("ValorTotalProyecto", StringType(), True),
    StructField("ValorVigenteProyecto", StringType(), True),
    StructField("ValorObligacionProyecto", StringType(), True),
    StructField("ValorPagoProyecto", StringType(), True),
    StructField("SubEstadoProyecto", StringType(), True),
    StructField("CodigoEntidadResponsable", StringType(), True),
    StructField("TotalBeneficiario", StringType(), True),
])

df_bpin = spark.read.format("csv") \
    .schema(schema) \
    .option("header", "true")\
    .load("wasbs://sid@uniandesyjt.blob.core.windows.net/bpin/")



# Punto 2:
Identifique los 10 proveedores que han tenido el mayor valor de contratos durante el año
2024. (13%)

## Arreglar Columna

Convertir la columna valor pagado a BigInt

In [0]:
df_secop = df_secop.withColumn(
    "valor_pagado_num",
    regexp_replace(col("valor_pagado"), "[^0-9]", "").cast(LongType())
)

## Agregar Datos - SPARK

In [0]:
(
    df_secop
    .groupBy(
        f.col('documento_proveedor').alias('nit'),
        f.col('proveedor_adjudicado').alias('nombre')
    )
    .agg(
        f.count(f.col('documento_proveedor')).alias('contratos'),
        f.sum(f.col('valor_del_contrato')).alias('valor'),
        f.sum('valor_pagado_num').alias('pagado')
    )
    .orderBy(f.col('valor').desc())
    .limit(10)
    .withColumn('% Pago', f.round(f.col('pagado') / f.col('valor'), 2))
    .display()
)

## Agregar Datos - SPARK sql

In [0]:
df_secop.createOrReplaceTempView("secop")

spark.sql("""
SELECT 
    documento_proveedor as nit, 
    proveedor_adjudicado as nombre,
    sum(valor_del_contrato) as valor, 
    sum(valor_pagado_num) as pagado,
    round(TRY_DIVIDE(sum(valor_pagado_num),sum(valor_del_contrato)),2) as `% Pago`
FROM secop
GROUP BY documento_proveedor, proveedor_adjudicado
ORDER BY valor DESC
LIMIT 10
""").display()

#Punto 3:
Identifique los 10 Proyectos de inversión que han tenido el mayor valor de contratos sin
pagar (valor del contrato menos el valor pagado). La respuesta debe incluir el nombre de
los proyectos y el valor sin pagar. (13%)


#Punto 4:
Identifique para cada año el top 5 de los proveedores con mayor valor de contratos. La
respuesta debe incluir el nombre de los proveedores, el valor de sus contratos y su
posición en el top. (13%)

#Punto 5:
Identifique el número de proveedores que firmaron contratos en el 2024 y NO firmaron
contratos en el 2020. (13%)

#Punto 6:
Calcule el valor promedio de los contratos de “Prestación de servicios” y una
aproximación de la mediana (el percentil 50). Construya esta respuesta de manera que se
minimice el tiempo de cálculo, aunque se sacrifique precisión. (13%)


#Punto 7:
Cuál fue el año en el que firmaron contratos un mayor número de proveedores distintos.
Construya esta respuesta de manera que se minimice el tiempo de cálculo, aunque se
sacrifique precisión. (13%)

#Punto 8:
Cuáles son las 20 palabras más comunes en los objetos contractuales del 20% de los
contratos más altos del 2020. Para esta respuesta omita stopwords
(https://en.wikipedia.org/wiki/Stop_word, https://pypi.org/project/stop-words/),
puntuaciones y no diferencie entre mayúsculas y minúsculas. (13%)

In [0]:
pip install nltk

In [0]:
df_words=(
df_secop
.select(F.lower(f.col("objeto_del_contrato")).alias("text"))
.withColumn("word", f.explode(F.split(f.col("text"), r"\s+")))
.groupBy("word")
.count()
.orderBy(f.desc("count"))
)

In [0]:
from nltk.corpus import stopwords
import nltk

nltk.download('stopwords')
# Cargar las stopwords en español a un conjunto para búsquedas más rápidas
stop_words_es = set(stopwords.words('spanish'))

rows = [w for w in stop_words_es]
df_sw = spark.createDataFrame(rows, ["word"])

In [0]:

df_words.createOrReplaceTempView("df_words")
df_sw.createOrReplaceTempView("df_sw")

spark.sql("""
SELECT 
    w.*
FROM df_words as w
left join df_sw as sw
on w.word = sw.word
where sw.word is  NULL
order by w.count DESC
limit 20 
""").display()