In [23]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import boto3
import os

In [24]:
# Configuración del cliente S3
s3 = boto3.client('s3')

In [25]:
# Nombre del bucket y ruta en S3
bucket_name = 'datainternetaccess'
s3_folder = ''
local_folder = "data/"

In [26]:
# Función para descargar los archivos de S3 manteniendo la estructura de carpetas
def descargar_desde_s3(bucket, s3_folder, local_folder):
    # Asegurarse de que la carpeta local existe
    if not os.path.exists(local_folder):
        os.makedirs(local_folder)

    # Listar los objetos en el bucket y ruta especificada (en este caso, la raíz)
    response = s3.list_objects_v2(Bucket=bucket, Prefix=s3_folder)

    # Iterar sobre los archivos encontrados
    for obj in response.get('Contents', []):
        s3_file_path = obj['Key']

        # Crear la ruta local manteniendo la estructura de carpetas de S3
        local_file_path = os.path.join(local_folder, s3_file_path)

        # Crear las carpetas necesarias en la ruta local
        local_dir = os.path.dirname(local_file_path)
        if not os.path.exists(local_dir):
            os.makedirs(local_dir)

        # Solo descargar si el archivo no existe localmente
        if not os.path.exists(local_file_path):
            print(f"Descargando {s3_file_path} a {local_file_path}...")
            s3.download_file(bucket, s3_file_path, local_file_path)
        else:
            print(f"El archivo {local_file_path} ya existe. No se descarga.")


In [27]:
# Llamar a la función para descargar los archivos desde la raíz del bucket
#descargar_desde_s3(bucket_name, s3_folder, local_folder)

In [28]:
spark = SparkSession.builder.appName("example").getOrCreate()

24/09/21 13:39:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [29]:
# Crear la sesión de Spark
spark = SparkSession.builder.appName("ClasificacionUrbanoRural").getOrCreate()

24/09/21 13:39:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [30]:
# Cargar el dataset principal con POBLACIÓN DANE
df_poblacion = spark.read.csv('data/Internet_Fijo_Penetraci_n_Municipio.csv', header=True, inferSchema=True,encoding='UTF-8')


In [31]:
# 2. Cargar los datasets de los departamentos con las áreas de terreno
df_departamentos = spark.read.csv('data/deptos/*.csv', header=True, inferSchema=True, encoding='UTF-8')

                                                                                

In [32]:
df_departamentos.printSchema()

root
 |-- DEPARTAMENTO: integer (nullable = true)
 |-- MUNICIPIO: integer (nullable = true)
 |-- NUMERO_PREDIAL: decimal(30,0) (nullable = true)
 |-- DIRECCION: string (nullable = true)
 |-- DESTINO_ECONOMICO: string (nullable = true)
 |-- AREA_TERRENO: string (nullable = true)
 |-- AREA_CONSTRUIDA: integer (nullable = true)
 |-- NUMERO_PREDIAL_ANTERIOR: double (nullable = true)



In [33]:
# 3. Convertir la columna AREA_TERRENO a tipo double (ya que es string)
df_departamentos = df_departamentos.withColumn('AREA_TERRENO', F.col('AREA_TERRENO').cast('double'))

In [34]:
# 4. Convertir el área de metros cuadrados a kilómetros cuadrados
df_departamentos = df_departamentos.withColumn('AREA_KM2', F.col('AREA_TERRENO') / 1000000)

In [35]:
# 5. Sumar el área total por municipio
df_areas_municipio = df_departamentos.groupBy('DEPARTAMENTO', 'MUNICIPIO').agg(F.sum('AREA_KM2').alias('AREA_TOTAL_KM2'))

In [36]:
# 6. Verificar duplicados en los datasets

# Verificar duplicados en el dataset de población
print("Verificando duplicados en el dataset de población:")
df_poblacion.groupBy('COD_DEPARTAMENTO', 'COD_MUNICIPIO').count().filter('count > 1').show()

Verificando duplicados en el dataset de población:
+----------------+-------------+-----+
|COD_DEPARTAMENTO|COD_MUNICIPIO|count|
+----------------+-------------+-----+
|              54|        54820|   12|
|              15|        15837|   12|
|              86|        86885|   12|
|              19|        19824|   12|
|               5|         5642|   12|
|              13|        13670|   12|
|              15|        15507|   12|
|              68|        68190|   12|
|              86|        86568|   12|
|              27|        27073|   12|
|              13|        13433|   12|
|              27|        27245|   12|
|              15|        15720|   12|
|               5|         5674|   12|
|              23|        23555|   12|
|              15|        15798|   12|
|              25|        25873|   12|
|              19|        19355|   12|
|              25|        25740|   12|
|               8|         8520|   12|
+----------------+-------------+-----+
only showing 

In [37]:
# Verificar duplicados en el dataset de áreas
print("Verificando duplicados en el dataset de áreas:")
df_areas_municipio.groupBy('DEPARTAMENTO', 'MUNICIPIO').count().filter('count > 1').show()

Verificando duplicados en el dataset de áreas:




+------------+---------+-----+
|DEPARTAMENTO|MUNICIPIO|count|
+------------+---------+-----+
+------------+---------+-----+



                                                                                

In [38]:
# 7. Renombrar las columnas para que coincidan con el dataset de población
df_areas_municipio = df_areas_municipio.withColumnRenamed('DEPARTAMENTO', 'COD_DEPARTAMENTO').withColumnRenamed('MUNICIPIO', 'COD_MUNICIPIO')

In [39]:
# 8. Asegurarse de que los códigos de departamento y municipio sean del mismo tipo (string)
df_poblacion = df_poblacion.withColumn('COD_DEPARTAMENTO', F.col('COD_DEPARTAMENTO').cast('string'))
df_poblacion = df_poblacion.withColumn('COD_MUNICIPIO', F.col('COD_MUNICIPIO').cast('string'))

In [40]:
df_areas_municipio = df_areas_municipio.withColumn('COD_DEPARTAMENTO', F.col('COD_DEPARTAMENTO').cast('string'))
df_areas_municipio = df_areas_municipio.withColumn('COD_MUNICIPIO', F.col('COD_MUNICIPIO').cast('string'))

In [41]:
# 9. Verificar qué municipios de población no tienen área correspondiente
municipios_faltantes = df_poblacion.join(df_areas_municipio, on=['COD_DEPARTAMENTO', 'COD_MUNICIPIO'], how='left_anti')

In [44]:
print("Municipios faltantes en el dataset de áreas:")
municipios_faltantes.select('COD_DEPARTAMENTO', 'COD_MUNICIPIO', 'MUNICIPIO').show(10, truncate=False)

Municipios faltantes en el dataset de áreas:




+----------------+-------------+---------+
|COD_DEPARTAMENTO|COD_MUNICIPIO|MUNICIPIO|
+----------------+-------------+---------+
|54              |54250        |EL TARRA |
|25              |25594        |QUETAME  |
|5               |5142         |CARACOLI |
|25              |25594        |QUETAME  |
|25              |25594        |QUETAME  |
|54              |54250        |EL TARRA |
|54              |54250        |EL TARRA |
|5               |5142         |CARACOLI |
|5               |5142         |CARACOLI |
|25              |25594        |QUETAME  |
+----------------+-------------+---------+
only showing top 10 rows



                                                                                

In [43]:
# Calcular la media de las áreas por departamento
df_departamentos_mean = df_areas_municipio.groupBy('COD_DEPARTAMENTO').agg(F.mean('AREA_TOTAL_KM2').alias('mean_area'))

# Unir el promedio de áreas al dataset principal
df_completo = df_poblacion.join(df_departamentos_mean, on='COD_DEPARTAMENTO', how='left')

# Rellenar los valores nulos en AREA_TOTAL_KM2 con el promedio del departamento
df_completo = df_completo.withColumn(
    'AREA_TOTAL_KM2',
    F.when(F.col('AREA_TOTAL_KM2').isNull(), F.col('mean_area')).otherwise(F.col('AREA_TOTAL_KM2'))
)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `AREA_TOTAL_KM2` cannot be resolved. Did you mean one of the following? [`AÑO`, `DEPARTAMENTO`, `TRIMESTRE`, `COD_MUNICIPIO`, `INDICE`].;
'Project [COD_DEPARTAMENTO#406, AÑO#256, TRIMESTRE#257, DEPARTAMENTO#259, COD_MUNICIPIO#416, MUNICIPIO#261, No. ACCESOS FIJOS A INTERNET#262, POBLACIÓN DANE#263, INDICE#264, mean_area#468, CASE WHEN isnull('AREA_TOTAL_KM2) THEN mean_area#468 ELSE 'AREA_TOTAL_KM2 END AS AREA_TOTAL_KM2#481]
+- Project [COD_DEPARTAMENTO#406, AÑO#256, TRIMESTRE#257, DEPARTAMENTO#259, COD_MUNICIPIO#416, MUNICIPIO#261, No. ACCESOS FIJOS A INTERNET#262, POBLACIÓN DANE#263, INDICE#264, mean_area#468]
   +- Join LeftOuter, (COD_DEPARTAMENTO#406 = COD_DEPARTAMENTO#426)
      :- Project [AÑO#256, TRIMESTRE#257, COD_DEPARTAMENTO#406, DEPARTAMENTO#259, cast(COD_MUNICIPIO#260 as string) AS COD_MUNICIPIO#416, MUNICIPIO#261, No. ACCESOS FIJOS A INTERNET#262, POBLACIÓN DANE#263, INDICE#264]
      :  +- Project [AÑO#256, TRIMESTRE#257, cast(COD_DEPARTAMENTO#258 as string) AS COD_DEPARTAMENTO#406, DEPARTAMENTO#259, COD_MUNICIPIO#260, MUNICIPIO#261, No. ACCESOS FIJOS A INTERNET#262, POBLACIÓN DANE#263, INDICE#264]
      :     +- Relation [AÑO#256,TRIMESTRE#257,COD_DEPARTAMENTO#258,DEPARTAMENTO#259,COD_MUNICIPIO#260,MUNICIPIO#261,No. ACCESOS FIJOS A INTERNET#262,POBLACIÓN DANE#263,INDICE#264] csv
      +- Aggregate [COD_DEPARTAMENTO#426], [COD_DEPARTAMENTO#426, avg(AREA_TOTAL_KM2#337) AS mean_area#468]
         +- Project [COD_DEPARTAMENTO#426, cast(COD_MUNICIPIO#402 as string) AS COD_MUNICIPIO#430, AREA_TOTAL_KM2#337]
            +- Project [cast(COD_DEPARTAMENTO#398 as string) AS COD_DEPARTAMENTO#426, COD_MUNICIPIO#402, AREA_TOTAL_KM2#337]
               +- Project [COD_DEPARTAMENTO#398, MUNICIPIO#292 AS COD_MUNICIPIO#402, AREA_TOTAL_KM2#337]
                  +- Project [DEPARTAMENTO#291 AS COD_DEPARTAMENTO#398, MUNICIPIO#292, AREA_TOTAL_KM2#337]
                     +- Aggregate [DEPARTAMENTO#291, MUNICIPIO#292], [DEPARTAMENTO#291, MUNICIPIO#292, sum(AREA_KM2#317) AS AREA_TOTAL_KM2#337]
                        +- Project [DEPARTAMENTO#291, MUNICIPIO#292, NUMERO_PREDIAL#293, DIRECCION#294, DESTINO_ECONOMICO#295, AREA_TERRENO#307, AREA_CONSTRUIDA#297, NUMERO_PREDIAL_ANTERIOR#298, (AREA_TERRENO#307 / cast(1000000 as double)) AS AREA_KM2#317]
                           +- Project [DEPARTAMENTO#291, MUNICIPIO#292, NUMERO_PREDIAL#293, DIRECCION#294, DESTINO_ECONOMICO#295, cast(AREA_TERRENO#296 as double) AS AREA_TERRENO#307, AREA_CONSTRUIDA#297, NUMERO_PREDIAL_ANTERIOR#298]
                              +- Relation [DEPARTAMENTO#291,MUNICIPIO#292,NUMERO_PREDIAL#293,DIRECCION#294,DESTINO_ECONOMICO#295,AREA_TERRENO#296,AREA_CONSTRUIDA#297,NUMERO_PREDIAL_ANTERIOR#298] csv


In [46]:
df_areas_municipio.printSchema()

root
 |-- COD_DEPARTAMENTO: string (nullable = true)
 |-- COD_MUNICIPIO: string (nullable = true)
 |-- AREA_TOTAL_KM2: double (nullable = true)



In [39]:
# 11. Calcular la densidad poblacional
df_completo = df_completo.withColumn('densidad_poblacional', F.col('POBLACIÓN DANE') / F.col('AREA_TOTAL_KM2'))

In [40]:
# 12. Clasificar como Urbano o Rural según la densidad poblacional
df_completo = df_completo.withColumn(
    'Clasificacion',
    F.when(F.col('densidad_poblacional') >= 100, 'Urbano').otherwise('Rural')
)

In [41]:
# 13. Verificar el número total de municipios en el dataset completo
num_municipios = df_completo.select('COD_MUNICIPIO').distinct().count()
print(f"Número total de municipios en el dataset completo: {num_municipios}")



Número total de municipios en el dataset completo: 832


                                                                                

In [42]:
# 14. Guardar el nuevo dataset clasificado en la carpeta transformaciones
df_completo.write.csv('data/deptos/transformaciones/transformaciones.csv', header=True)

                                                                                

In [44]:
municipios_faltantes_data = municipios_faltantes.select('COD_DEPARTAMENTO', 'COD_MUNICIPIO', 'MUNICIPIO') \
    .write.csv('data/deptos/transformaciones/municipios_faltantes.csv', header=True)

                                                                                

In [1]:
import os

In [5]:
# Ruta donde están los archivos CSV de los departamentos
ruta_deptos = 'data/deptos/'

In [7]:
# Listar todos los archivos en la carpeta, excluyendo los que están en la subcarpeta transformaciones
archivos_deptos = [archivo for archivo in os.listdir(ruta_deptos) if os.path.isfile(os.path.join(ruta_deptos, archivo))]
archivos_deptos

['17_CALDAS_R1.csv',
 '44_GUAJIRA_R1.csv',
 '88_SAN_ANDRES_R1.csv',
 '23_CORDOBA_R1.csv',
 '73_TOLIMA_R1.csv',
 '13_BOLIVAR_R1.csv',
 '50_META_R1.csv',
 '47_MAGDALENA_R1.csv',
 '18_CAQUETA_R1.csv',
 '95_GUAVIARE_R1.csv',
 '97_VAUPES_R1.csv',
 '54_N_SANTANDER_R1.csv',
 '68_SANTANDER_R1.csv',
 '41_HUILA_R1.csv',
 '19_CAUCA_R1.csv',
 '81_ARAUCA_R1.csv',
 '15_BOYACA_R1.csv',
 '70_SUCRE_R1.csv',
 '99_VICHADA_R1.csv',
 '63_QUINDIO_R1.csv',
 '76_VALLE_R1.csv',
 '66_RISARALDA_R1.csv',
 '08_ATLANTICO_R1.csv',
 '27_CHOCO_R1.csv',
 '52_NARINO_R1.csv',
 '91_AMAZONAS_R1.csv',
 '85_CASANARE_R1.csv',
 '20_CESAR_R1.csv',
 '94_GUAINIA_R1.csv',
 '25_CUNDINAMARCA_R1.csv',
 '86_PUTUMAYO_R1.csv']

In [9]:
# Revisar si hay valores faltantes en cada archivo de departamentos
for archivo in archivos_deptos:
    # Cargar el archivo CSV
    df_depto = spark.read.csv(os.path.join(ruta_deptos, archivo), header=True, inferSchema=True, encoding='UTF-8')
    
    # Convertir AREA_TERRENO a double (si no lo has hecho antes)
    df_depto = df_depto.withColumn('AREA_TERRENO', F.col('AREA_TERRENO').cast('double'))

    # Verificar si hay valores nulos o si AREA_TERRENO es igual a 0
    valores_faltantes = df_depto.filter(F.col('AREA_TERRENO').isNull() | (F.col('AREA_TERRENO') == 0))

    # Contar cuántos valores faltantes hay
    num_faltantes = valores_faltantes.count()

    if num_faltantes > 0:
        print(f"El archivo {archivo} tiene {num_faltantes} valores nulos o cero en AREA_TERRENO.")
    else:
        print(f"El archivo {archivo} no tiene valores nulos en AREA_TERRENO.")

El archivo 17_CALDAS_R1.csv tiene 17045 valores nulos o cero en AREA_TERRENO.
El archivo 44_GUAJIRA_R1.csv tiene 25661 valores nulos o cero en AREA_TERRENO.
El archivo 88_SAN_ANDRES_R1.csv tiene 2004 valores nulos o cero en AREA_TERRENO.
El archivo 23_CORDOBA_R1.csv tiene 48586 valores nulos o cero en AREA_TERRENO.
El archivo 73_TOLIMA_R1.csv tiene 42153 valores nulos o cero en AREA_TERRENO.
El archivo 13_BOLIVAR_R1.csv tiene 31775 valores nulos o cero en AREA_TERRENO.
El archivo 50_META_R1.csv tiene 35206 valores nulos o cero en AREA_TERRENO.
El archivo 47_MAGDALENA_R1.csv tiene 46997 valores nulos o cero en AREA_TERRENO.
El archivo 18_CAQUETA_R1.csv tiene 10861 valores nulos o cero en AREA_TERRENO.
El archivo 95_GUAVIARE_R1.csv tiene 2868 valores nulos o cero en AREA_TERRENO.
El archivo 97_VAUPES_R1.csv tiene 1181 valores nulos o cero en AREA_TERRENO.
El archivo 54_N_SANTANDER_R1.csv tiene 25412 valores nulos o cero en AREA_TERRENO.
El archivo 68_SANTANDER_R1.csv tiene 11898 valores 

In [10]:
# Revisar si hay valores faltantes en cada archivo de departamentos
for archivo in archivos_deptos:
    # Cargar el archivo CSV
    df_depto = spark.read.csv(os.path.join(ruta_deptos, archivo), header=True, inferSchema=True, encoding='UTF-8')
    
    # Convertir AREA_TERRENO a double (si no lo has hecho antes)
    df_depto = df_depto.withColumn('AREA_TERRENO', F.col('AREA_TERRENO').cast('double'))

    # Verificar si hay valores nulos o si AREA_TERRENO es igual a 0
    valores_faltantes = df_depto.filter(F.col('AREA_TERRENO').isNull() | (F.col('AREA_TERRENO') == 0))

    # Contar cuántos valores faltantes hay
    num_faltantes = valores_faltantes.count()

    if num_faltantes > 0:
        valores_faltantes.write.csv(f'data/deptos/transformaciones/{archivo}_valores_faltantes.csv', header=True)

                                                                                