%md
## PONTIFICIA UNIVERSIDAD JAVERIANA ##

Materia: Procesamieno de datos a gran escala

Limpieza de datos Nivel educativo por municipio

In [14]:
import findspark
findspark.init()

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib as plt
%matplotlib inline
from matplotlib import rcParams
import squarify
from sklearn.metrics import roc_curve, auc
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

#Biblioteca PySpark
import warnings
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import input_file_name, mean, col, split, regexp_extract, when, lit, isnan, count, udf
from pyspark import SparkFiles
from pyspark.sql.types import *
from pyspark.sql.functions import max, min
#Bioblioteca ML para PySpark
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, sum
#Importar el SO
from time import time
from pyspark.conf import SparkConf
import os

In [2]:
SPARK_MASTER_URL = os.getenv("SPARK_MASTER_URL", "spark://10.43.103.125:7077")
configura = SparkConf()
configura.setMaster(SPARK_MASTER_URL)
configura.set('spark.local.dir', '/almacen')
configura.setAppName("PrimerSparkGonzalez")
spark = SparkSession.builder.config(conf=configura).getOrCreate()
SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# spark = spark.sparkContext

### Revisar si El servicio esta vivo
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 23:24:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/10 23:24:04 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [3]:
url = "https://raw.githubusercontent.com/Kahsma/Proyecto_PDGE/refs/heads/main/Datos/accesoainternet.csv"

spark.sparkContext.addFile(url)


In [4]:
AcInternet = spark.read.csv("file://" + SparkFiles.get("accesoainternet.csv"), header = True , inferSchema = True )

                                                                                

In [5]:
AcInternet.show(3)

[Stage 2:>                                                          (0 + 1) / 1]

+----+---------+----------------+------------+-------------+---------+-----------------------+--------------+------+
| AÑO|TRIMESTRE|COD_DEPARTAMENTO|DEPARTAMENTO|COD_MUNICIPIO|MUNICIPIO|NoACCESOSFIJOSAINTERNET|POBLACIÓN DANE|INDICE|
+----+---------+----------------+------------+-------------+---------+-----------------------+--------------+------+
|2018|        3|              99|     VICHADA|        99773| CUMARIBO|                     75|         72691|  0,10|
|2016|        2|              99|     VICHADA|        99773| CUMARIBO|                     27|         68305|  0,04|
|2018|        2|              99|     VICHADA|        99773| CUMARIBO|                     75|         72691|  0,10|
+----+---------+----------------+------------+-------------+---------+-----------------------+--------------+------+
only showing top 3 rows



                                                                                

In [6]:

#Eliminar espacios innecesarios en los nombres de las columnas
for nombre_columna in AcInternet.columns:
    AcInternet = AcInternet.withColumnRenamed(nombre_columna, nombre_columna.strip())

#Seleccionar las columnas especificadas
columnas_a_mantener = ["NoACCESOSFIJOSAINTERNET", "INDICE", "AÑO", "COD_MUNICIPIO"]
AcInternet = AcInternet.select(*columnas_a_mantener)

#Eliminar filas con valores nulos en las columnas seleccionadas
AcInternet = AcInternet.dropna(subset=columnas_a_mantener)

#Definir una ventana para particionar por COD_MUNICIPIO, ordenando por AÑO (más reciente primero) e INDICE (más alto primero)
window_spec = Window.partitionBy("COD_MUNICIPIO").orderBy(col("AÑO").desc(), col("INDICE").desc())

#Usar row_number para asignar un número de fila dentro de cada grupo de COD_MUNICIPIO
AcInternet = AcInternet.withColumn("num_fila", row_number().over(window_spec))

#Filtrar para mantener solo las filas donde num_fila es 1 (año más reciente e índice más alto para cada COD_MUNICIPIO)
AcInternet = AcInternet.filter(col("num_fila") == 1).drop("num_fila")

# Mostrar el resultado para confirmar
AcInternet.show(3)


[Stage 3:>                                                          (0 + 1) / 1]

+-----------------------+------+----+-------------+
|NoACCESOSFIJOSAINTERNET|INDICE| AÑO|COD_MUNICIPIO|
+-----------------------+------+----+-------------+
|                 772844| 29,78|2023|         5001|
|                   1103|  5,18|2023|         5002|
|                    151|  5,32|2023|         5004|
+-----------------------+------+----+-------------+
only showing top 3 rows



                                                                                

In [7]:
# Contar el número total de filas en el DataFrame
total_tuplas = AcInternet.count()
# Mostrar el resultado
print(f"Número total de tuplas: {total_tuplas}")

Total number of tuples: 1120


In [8]:
# Eliminar la columna 'AÑO'
AcInternet = AcInternet.drop("AÑO")
# Mostrar el resultado para confirmar que la columna fue eliminada
AcInternet.show(3)

[Stage 12:>                                                         (0 + 1) / 1]

+-----------------------+------+-------------+
|NoACCESOSFIJOSAINTERNET|INDICE|COD_MUNICIPIO|
+-----------------------+------+-------------+
|                 772844| 29,78|         5001|
|                   1103|  5,18|         5002|
|                    151|  5,32|         5004|
+-----------------------+------+-------------+
only showing top 3 rows



                                                                                

In [9]:
# Calcular los valores más altos y más bajos de INDICE
resultado = AcInternet.agg(
    max("INDICE").alias("Mayor_INDICE"),
    min("INDICE").alias("Menor_INDICE")
)

# Mostrar el resultado
resultado.show()


[Stage 15:>                                                         (0 + 1) / 1]

+--------------+-------------+
|Highest_INDICE|Lowest_INDICE|
+--------------+-------------+
|          9,98|         0,02|
+--------------+-------------+



                                                                                

In [12]:
# Definir el directorio de salida para el archivo CSV
directorio_salida = "Proyecto/"

# Escribir el DataFrame en un solo archivo CSV dentro del directorio
AcInternet.coalesce(1).write.csv(directorio_salida, header=True, mode="overwrite")


                                                                                