# Preparación datos - Tablero Construcción

## Instalación de librerías

In [0]:
# !pip install pandas
# !pip install geopandas
# !pip install matplotlib
# !pip install requests
# !pip install swifter
# !pip install pyxlsb
# !pip install openpyxl

# !pip install --upgrade pip
# !pip install pyxlsb
# !pip install pandas
# !pip install geopandas

# !pip install azure-storage-blob

# !pip install pyproj

# !pip install fuzzywuzzy
# !pip install python-Levenshtein


In [0]:
import pandas as pd
import pyproj
import geopandas as gpd
import matplotlib

import geopandas as gpd
from shapely import wkt


from shapely.geometry import Point
from shapely.wkt import loads

import unicodedata
import requests
import json
import os
import re
import numpy as np

from fuzzywuzzy import fuzz

import jenkspy

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, cast, when, udf, struct, from_json, count, countDistinct, lit
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType
from pyspark import StorageLevel

from sedona.spark import *
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.register.geo_registrator import SedonaRegistrator
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.utils.adapter import Adapter
from sedona.core.enums import IndexType
from sedona.core.enums import GridType
from sedona.core.spatialOperator import JoinQuery

from sedona.core.SpatialRDD import PointRDD



## Parametrización

In [0]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 50)
num_partitions = 4
url = "https://catalogopmb.catastrobogota.gov.co/PMBWeb/web/api"
startpath = '/dbfs/FileStore/tables/Catastro/'

In [0]:
# configuración de spark y Sedona para datos geograficos
spark = SparkSession.\
    builder.\
    master("local[*]").\
    appName("Sedona App").\
    config("spark.serializer", KryoSerializer.getName).\
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName) .\
    config("spark.kryoserializer.buffer.max", "200gb").\
    getOrCreate()

SedonaContext.create(spark)


sc = spark.sparkContext
sc.setSystemProperty("sedona.global.charset", "utf8")

In [0]:
usos_homologados={
    'Asociar al uso principal':['AREA DE MEZANINE EN PH','DEPOSITO (LOCKERS) PH','DEPOSITO ALMACENAMIENTO PH','PARQUEO CUBIERTO NPH','PARQUEO CUBIERTO PH','PARQUEO LIBRE PH','PISCINAS EN NPH','PISCINAS EN PH','SECADEROS'],
    'Comercio y servicios':['BODEGA COMERCIAL NPH','BODEGA COMERCIAL PH','BODEGA ECONOMICA','BODEGA ECONOMICA(SERVITECA,ESTA.SERVIC.)','BODEGAS DE ALMACENAMIENTO NPH','BODEGAS DE ALMACENAMIENTO PH','CENTRO COMERCIAL GRANDE NPH','CENTRO COMERCIAL GRANDE PH','CENTRO COMERCIAL MEDIANO NPH','CENTRO COMERCIAL MEDIANO PH','CENTRO COMERCIAL PEQUENO NPH','CENTRO COMERCIAL PEQUENO PH','CLUBES PEQUENOS','COMERCIO PUNTUAL NPH O HASTA 3 UNID PH','COMERCIO PUNTUAL PH','CORREDOR COMERCIAL NPH O HASTA 3 UNID PH','CORREDOR COMERCIAL PH','DEPOSITOS DE ALMACENAMIENTO NPH','EDIFICIOS DE PARQUEO NPH','EDIFICIOS DE PARQUEO PH','HOTELES NPH','HOTELES PH','MOTELES, AMOBLADOS, RESIDENCIAS NPH','MOTELES, AMOBLADOS, RESIDENCIAS PH','OFICINA BODEGA Y/O INDUSTRIA PH','OFICINAS EN BODEGAS Y/O INDUSTRIAS','OFICINAS OPERATIVAS','OFICINAS OPERATIVAS(ESTACIONES SERVICIO)','OFICINAS Y CONSULTORIOS NPH','OFICINAS Y CONSULTORIOS PH','PARQUES DE DIVERSION','RESTAURANTES NPH','RESTAURANTES PH','TEATROS Y CINEMAS NPH','TEATROS Y CINEMAS PH', 'PARQUES DE DIVERSION EN P.H.'],
    'Dotacional':['AULAS DE CLASE','CEMENTERIOS','CENTROS MEDICOS EN PH','CLINICAS, HOSPITALES, CENTROS MEDICOS','CLUBES MAYOR EXTENSION','COLEGIOS EN PH','COLEGIOS Y UNIVERSIDADES 1 A 3 PISOS','COLEGIOS Y UNIVERSIDADES 4 O MAS PISOS','COLISEOS','CULTO RELIGIOS EN NPH','CULTO RELIGIOSO EN PH','IGLESIA PH','IGLESIAS','INSTALACIONES MILITARES','INSTITUCIONAL PH','INSTITUCIONAL PUNTUAL','MUSEOS','OFICINAS Y CONSULTORIOS (OFICIAL) NPH','OFICINAS Y CONSULTORIOS (OFICIAL) PH','PLAZAS DE MERCADO'],
    'Industrial':['INDUSTRIA ARTESANAL','INDUSTRIA GRANDE','INDUSTRIA GRANDE PH','INDUSTRIA MEDIANA','INDUSTRIA MEDIANA PH'],
    'Otro':['COCHERAS, MARRANERAS, PORQUERIZAS','ENRAMADAS, COBERTIZOS, CANEYES','ESTABLOS, PESEBRERAS','GALPONES, GALLINEROS','KIOSKOS','LOTE EN PROPIEDAD HORIZONTAL','PISTA AEROPUERTO','SILOS'],
    'Residencial':['HABITACIONAL EN PROPIEDAD HORIZONTAL','HABITACIONAL MAYOR O IGUAL A 4 PISOS NPH O 3 PISOS PH','HABITACIONAL MENOR O IGUAL A 3 PISOS NPH','HABITACIONAL MENOR O IGUAL A 3 PISOS PH','MAYOR O IGUAL A 4 PISOS NPH O 3 PISOS PH']
 }

In [0]:
usos_licencias_homologados = {
    'Residencial':['Vivienda'],
    'Comercio y servicios':['Comercio','Oficinas','Servicios'],
    'Industrial':['Industria'],
    'Otros':['Dotacionales','Institucional','Estacionamientos / Parqueaderos','Otros','Sin informacion','Sin Información','Recreativos']
}

In [0]:
modalidades_homologadas = {
    'Obra nueva':['1.Obra Nueva','1.Obra nueva'],
    'Ampliación':['2.Ampliación'],
    'Modificación':['Modificación'],
    'Demolición':['Demolición Total', 'Demolición Parcial', 'Demolición'],
    'Reforzamiento Estructural':['Reforzamiento Estructural'],
    'Restauración':['Restauración'],
    'Cerramiento':['Cerramiento'],
    'Adecuación':['Adecuación'],
    'Subdivisión':['Subdivisión urbana', 'Subdivisión rural'],
    'Propiedad Horizontal':['Propiedad Horizontal'],
    'Sin Modalidad':['Sin Modalidad', 'Ninguna', 'Sin Información', 'No Aplica Modalidad'],
    'Culminación de Obras':['Culminación de Obras'],
    'Reconocimiento':['Reconocimiento'],
    'Desarrollo':['Desarrollo','Desarrollo por etapas'],
    'Otras':['Restitución','Reloteo', 'Área Bruta Urbanismo', 'Área Útil Urbanismo', 'Saneamiento', 'Reconstrucción'],
}

## Funciones

In [0]:
def remove_words_from_string(input_str, words_to_remove):
    for word in words_to_remove:
        input_str = input_str.replace(word, "")
    return input_str

In [0]:
def clean_ph_address(s):
    s = re.sub(r'\b(AP|GJ|IN|LT) \d+\b', '', s).strip()
    if 'SUR' in s:
        s = re.sub(r'\bSUR\b', '', s).strip() + ' S'
    return s

In [0]:
schema = StructType([StructField("estado", StringType()), StructField("yinput", DoubleType()), StructField("lotcodigo", StringType()),
                    StructField("latitude", StringType()), StructField("diraprox", StringType()), StructField("mancodigo", StringType()),
                    StructField("cpocodigo", StringType()), StructField("xinput", DoubleType()), StructField("codloc", StringType()),
                    StructField("dirtrad", StringType()), StructField("nomupz", StringType()), StructField("localidad", StringType()),
                    StructField("dirinput", StringType()), StructField("codupz", StringType()), StructField("nomseccat", StringType()),
                    StructField("tipo_direccion", StringType()), StructField("codseccat", StringType()), StructField("longitude", StringType()),
                ])
def georeferenciar(input_str:str):
    """
    Realiza la georreferenciación de una cadena de entrada utilizando una API de geocodificación.

    Args:
        input_str (str): La cadena a georreferenciar.

    Returns:
        dict: Un diccionario que contiene los datos de georreferenciación si la operación fue exitosa.
              En caso contrario, devuelve un diccionario que indica un estado de falla o un estado de error.

    """
    schema = StructType([StructField("estado", StringType()), StructField("yinput", DoubleType()), StructField("lotcodigo", StringType()),
                    StructField("latitude", StringType()), StructField("diraprox", StringType()), StructField("mancodigo", StringType()),
                    StructField("cpocodigo", StringType()), StructField("xinput", DoubleType()), StructField("codloc", StringType()),
                    StructField("dirtrad", StringType()), StructField("nomupz", StringType()), StructField("localidad", StringType()),
                    StructField("dirinput", StringType()), StructField("codupz", StringType()), StructField("nomseccat", StringType()),
                    StructField("tipo_direccion", StringType()), StructField("codseccat", StringType()), StructField("longitude", StringType()),
                ])

    input_str = '' if input_str is None else input_str

    direc = unicodedata.normalize("NFKD", input_str).encode("ascii","ignore").decode("ascii").lower()

    val = [True for t in direc.replace('.',' ').replace("  "," ").split() if t in ['via','km','chia','cota','tocancipa','tenjo','madrid','tabio,','cajica,','mosquera','zipaquira']]
    
    if sum(val)>=1:
        result = {'estado': 'outside_bogota','yinput':0,'lotcodigo':'','latitude':'','diraprox':'','mancodigo':'','cpocodigo':'','xinput':0,'codloc':'','dirtrad':'','nomupz':'','localidad':'','dirinput':'','codupz':'','nomseccat':'','tipo_direccion':'','codseccat':'','longitude':''}
        return result

    else:
        try:
            words_to_remove = ["costado occ.", "esquina"]
            direc = remove_words_from_string(direc, words_to_remove)
            direc = clean_ph_address(direc)

            pattern = r'av\.?(?: de la)? esp\.?|\bavenida la esperanza\b|\besperanza\b'
            replacement_str = "AC 24"
            output_str = re.sub(pattern, replacement_str, direc)

            pattern = r'av\.?(?: de las)? ame\.?|\bavenida de las americas\b|\bamericas\b'
            replacement_str = "AC 9"
            output_str = re.sub(pattern, replacement_str, output_str)

            pattern = r'(autop\.?(\s)?norte|autonorte|au\.?(\s)?norte|aut\.?(\s)?norte)'
            replacement_str = "AK 45"
            output_str = re.sub(pattern, replacement_str, output_str)

            pattern = r'av\.?(?: de las)?(?: c\.?)?\s*(?:ciudad\s*)?cali|avenida\s*cali'
            replacement_str = "AK 86"
            output_str = re.sub(pattern, replacement_str, output_str)

            pattern = r'av. nqs|\bnqs\b'
            replacement_str = "AK 30"
            output_str = re.sub(pattern, replacement_str, output_str)

            pattern = r'av. suba|\bsuba\b|tr. suba'
            replacement_str = "AK 69"
            output_str = re.sub(pattern, replacement_str, output_str)

            output_str = output_str.replace('cl. av.', 'AC')
            output_str = output_str.replace('av. cl.', 'AC')
            output_str = output_str.replace('av. cr.', 'AK')

            con = False

            output_str = output_str.replace('con','#')
            output_str = output_str.replace('paralela','#')

            parts = output_str.split("#")

            if len(parts) > 1:
                if '-' not in parts[1]:
                    parts[1] = re.sub(r'[a-zA-Z.]', '', parts[1])
                    output_str = "#".join(parts)

            

            pattern = r'(\d+)\D+(\w+)'
            output_str = re.sub(pattern, lambda x: x.group(1) + ' # ' + x.group(2), output_str)

            if '-' not in output_str:
                output_str = output_str + ' - 01'


            parts = output_str.split("#")

            if parts[0].strip() == 'AK 30' and (int(parts[1].split('-')[0].strip())>100):
                parts[0] = 'AK 9'
                output_str = "#".join(parts)

            output_str = output_str.replace("AK 69",'Av Suba')

            print(output_str)

            params = {
                'cmd': 'geocodificar',
                'apikey': '1c025f92-4520-49c1-90a7-a24b3d9d374c',
                'query': output_str
            }

            response = requests.get(url, params=params)
            res = response.json()
            success = res['response']['success']
            if success:
                result = res['response']['data']
                return result
            else:
                result = {'estado': 'failed','yinput':0,'lotcodigo':'','latitude':'','diraprox':'','mancodigo':'','cpocodigo':'','xinput':0,'codloc':'','dirtrad':'','nomupz':'','localidad':'','dirinput':'','codupz':'','nomseccat':'','tipo_direccion':'','codseccat':'','longitude':''}
                return result
        except:
            result = {'estado': 'error_ws','yinput':0,'lotcodigo':'','latitude':'','diraprox':'','mancodigo':'','cpocodigo':'','xinput':0,'codloc':'','dirtrad':'','nomupz':'','localidad':'','dirinput':'','codupz':'','nomseccat':'','tipo_direccion':'','codseccat':'','longitude':''}
            
            return result
        
georeferenciar_udf = udf(georeferenciar, returnType=schema)

In [0]:
create_point_udf = udf(lambda x,y: Point(x,y).wkt, returnType=StringType())

In [0]:
def homologar_uso(uso_catastro:str,usos_agregados=usos_homologados):
    '''
    Toma el uso de la base de predios de catastro y lo agrega a 
    categorias mas sencillas de entender para el análisis de datos del OOVS

    Args:
        input_str (str): Cadena de texto que trae el uso para gregarlo.

    Returns:
        str: retorna el uso agragado y homologado por el equipo del observatorio
    '''
    for k,v in usos_agregados.items():
        if uso_catastro in v:
            return k
        elif uso_catastro not in v:
            pass
homoUso_udf = udf(homologar_uso, StringType())

In [0]:
def homologar_uso_licencia(uso:str,usos_agregados=usos_licencias_homologados):
    '''
    Toma el uso de la base de predios de catastro y lo agrega a 
    categorias mas sencillas de entender para el análisis de datos del OOVS

    Args:
        input_str (str): Cadena de texto que trae el uso para gregarlo.

    Returns:
        str: retorna el uso agragado y homologado por el equipo del observatorio
    '''
    for k,v in usos_agregados.items():
        if uso in v:
            return k
        elif uso not in v:
            pass
homoUsoLicencia_udf = udf(homologar_uso_licencia, StringType())

In [0]:
def homologar_modalidad(modalidad_licencia:str,modalidades_agregados=modalidades_homologadas):
    '''
    Toma el uso de la base de predios de catastro y lo agrega a 
    categorias mas sencillas de entender para el análisis de datos del OOVS

    Args:
        input_str (str): Cadena de texto que trae el uso para gregarlo.

    Returns:
        str: retorna el uso agragado y homologado por el equipo del observatorio
    '''
    for k,v in modalidades_agregados.items():
        if modalidad_licencia in v:
            return k
        elif modalidad_licencia not in v:
            pass
homoModalidad_udf = udf(homologar_modalidad, StringType())

In [0]:
def transform_coordinates(x, y):
    if x is not None and y is not None and x != 'NULL' and y!='NULL' and x != 0.0 and y!=0.0 and isinstance(x, (float)) and isinstance(y, (float)):
        source_crs = pyproj.CRS("ESRI:102233")
        target_crs = pyproj.CRS("EPSG:4326")
        transformer = pyproj.Transformer.from_crs(source_crs, target_crs, always_xy=True)
        new_x, new_y = transformer.transform(x, y)
        return new_x, new_y
    else:
        return None, None

coordinate_udf = udf(transform_coordinates, StructType([StructField("wgs84_lng", DoubleType(), False),
                                                        StructField("wgs84_lat", DoubleType(), False)]))

In [0]:
def get_year_from_path(path, paths_dict):
    # Iterate over the dictionary to find the year for the given path
    for year, paths in paths_dict.items():
        if path in paths:
            return year
    return None

## Cargue de datos

### Catastro

El siguiente recorre una estructura de directorios y buscar archivos específicos relacionados con "ph", "predios" y "usos" para los años determinados en el rango

In [0]:
# Se inicializan listas vacías para almacenar rutas de archivos.
ph = list()  # Lista para almacenar rutas de archivos relacionados con 'ph'.
pre = list()  # Lista para almacenar rutas de archivos relacionados con 'predios'.
uso = list()  # Lista para almacenar rutas de archivos relacionados con 'usos'.
dict_anio = dict()  # Diccionario para almacenar rutas de archivos por año.

# Se recorre la estructura de directorios a partir de 'startpath'.
for root, dirs, files in os.walk(startpath):
    # Se crea una lista de periodos (años) desde 2006 hasta 2022.
    periodos = [str(a) for a in range(2006,2024)]
    
    # Si el nombre del directorio actual (root) está en la lista de periodos...
    if os.path.basename(root) in periodos:
        # Se obtiene el nombre del directorio actual (que representa un año).
        carpeta = os.path.basename(root)
        
        # Se construyen las rutas completas para los archivos esperados en ese directorio/año.
        ph_predios = '{}/ph_{}.csv'.format(root, carpeta)
        predios = '{}/predios_{}.csv'.format(root, carpeta)
        usos = '{}/usos_{}.csv'.format(root, carpeta)
        
        # Se inicializa una lista interna para almacenar las rutas de archivos encontrados en el directorio actual.
        lista_interna = list()
        
        # Se recorren los archivos en el directorio actual.
        for f in files:
            # Si el archivo es de 'ph'...
            if 'ph_' in f:
                lista_interna.append(ph_predios)
                ph.append(ph_predios)
            # Si el archivo es de 'predios'...
            elif 'predios_'in f:
                lista_interna.append(predios)
                pre.append(predios)
            # Si el archivo es de 'usos'...
            elif 'usos_'in f:
                lista_interna.append(usos)
                uso.append(usos)
            # Si no coincide con ninguno de los anteriores, no se hace nada.
            else:
                pass
        
        # Se añade la lista interna al diccionario, usando el año como clave.
        dict_anio[carpeta] = lista_interna
        
        # Se elimina la lista interna para liberar memoria.
        del lista_interna
    # Si el directorio actual no es un año de interés, no se hace nada.
    else:
        pass


In [0]:
# Se lee un archivo shapefile de lotes y se convierte en un GeometryRDD.
lotes_shp = ShapefileReader.readToGeometryRDD(sc, "dbfs:/FileStore/tables/shp/Lotes_centroide/")

# Se lee un archivo shapefile de isocronas y se convierte en un GeometryRDD.
isocrona_shp = ShapefileReader.readToGeometryRDD(sc, "dbfs:/FileStore/tables/shp/isocronas/")

# Se realiza una transformación del sistema de referencia de coordenadas (CRS) para el GeometryRDD de lotes.
lotes_shp.CRSTransform('epsg:4326','epsg:4326')

# Se realiza una transformación del CRS para el GeometryRDD de isocronas, similar al anterior.
isocrona_shp.CRSTransform('epsg:4326','epsg:4326')

# Se convierte el GeometryRDD de lotes en un DataFrame de Spark.
lotes_df = Adapter.toDf(lotes_shp, spark)

# Se convierte el GeometryRDD de isocronas en un DataFrame de Spark.
isocrona_df = Adapter.toDf(isocrona_shp, spark)


#### Cruce de lotes con isocrónas proyecto Metro

In [0]:
# Se analiza el objeto 'lotes_shp' para obtener estadísticas básicas y optimizar futuras operaciones.
lotes_shp.analyze()

# Se realiza una partición espacial del objeto 'lotes_shp' utilizando un árbol KDBTREE con 4 particiones.
lotes_shp.spatialPartitioning(GridType.KDBTREE, 4)

# Se realiza una partición espacial del objeto 'isocrona_shp' utilizando el particionador del objeto 'lotes_shp'.
isocrona_shp.spatialPartitioning(lotes_shp.getPartitioner())

# Se establecen variables para determinar si se construirá sobre un RDD particionado espacialmente y si se usará un índice.
build_on_spatial_partitioned_rdd = True  # Se establece a TRUE solo si se ejecuta una consulta de unión.
using_index = True

# Se construye un índice QUADTREE para el objeto 'isocrona_shp'.
isocrona_shp.buildIndex(IndexType.QUADTREE, build_on_spatial_partitioned_rdd)

# Se realiza una consulta de unión espacial entre 'lotes_shp' e 'isocrona_shp'.
lotes_isocrona_join = JoinQuery.SpatialJoinQueryFlat(lotes_shp, isocrona_shp, using_index, True)

# Se convierte el resultado de la unión espacial en un GeoDataFrame de Geopandas.
lotes_gdf = gpd.GeoDataFrame(
    # Se mapea el resultado para extraer geometrías y datos asociados.
    lotes_isocrona_join.map(lambda x: [x[1].geom, *x[0].userData.split("\t"), *x[1].userData.split("\t")]).collect(),
    # Se definen las columnas del GeoDataFrame.
    columns=["geom", 'group_inde','Tiempo','layer','Linea','LOTSECT_ID','LOTMANZ_ID','LOTLOTE_ID','LOTZHF_ID','LOTZHG_ID','LOTUNIDAPH','LOTDISTRIT','LOTLSIMBOL','ESTADO_REG','FECHA_REGI','FECHA_DESD','FECHA_HAST','LOTLNUMERO','FRENTE','FONDO'],
    # Se establece la columna de geometría.
    geometry="geom",
    # Se define el sistema de referencia de coordenadas.
    crs= "epsg:4326"
)

# Extract coordinates from geom field in a geopandas df
lotes_gdf['def_lng'] = lotes_gdf['geom'].apply(lambda p: p.x)
lotes_gdf['def_lat'] = lotes_gdf['geom'].apply(lambda p: p.y)



In [0]:
lotes_gdf[['geom','Tiempo']].sample(5000).explore('Tiempo', tiles='CartoDB positron')

#### Cruce de lotes con usos e inmuebles

In [0]:
# Se recorre cada ruta de archivo en la lista 'uso'.
for u in uso:
    # Se reemplaza la parte "/dbfs" en la ruta del archivo con "dbfs:" para ajustar la ruta al formato requerido por Spark en ciertos entornos.
    path = u.replace("/dbfs","dbfs:")
    
    # Se lee el archivo CSV en un DataFrame de Spark.
    df = spark.read.format("csv").option("header", "true").load(path)
    year = get_year_from_path(u, dict_anio)
    if year:
        df = df.withColumn("año_vigencia", lit(year))
    else:
        df = df.withColumn("año_vigencia", lit(0))
    
    
    df = df.withColumn("año_vigencia", col("año_vigencia").cast("int"))

    # Se aplica una función definida por el usuario (UDF) llamada 'homoUso_udf' al DataFrame.
    # Esta función parece homologar o transformar la columna "DESCRIPCION_USO".
    # El resultado se almacena en una nueva columna llamada "UsoAgregado".
    df = df.withColumn("UsoAgregado", homoUso_udf(df["DESCRIPCION_USO"]))
    
    # Se modifica el nombre del archivo original, reemplazando '.csv' con '_agregado.csv'.
    # Esto sugiere que el archivo resultante contendrá datos "homologados" o "agregados".
    new_name = path.replace('.csv','_agregado.csv')
    
    # Se escribe el DataFrame modificado de vuelta a un archivo CSV con el nuevo nombre.
    # Si un archivo con ese nombre ya existe, se sobrescribirá.
    df.write.csv(new_name, header=True, mode="overwrite")


#### Generación de dataframe consolidado usos

In [0]:
# Inicializamos un DataFrame vacío llamado usos_agregados_df.
usos_agregados_df = None

# Iteramos sobre cada elemento en la lista 'uso', que parece contener rutas a archivos CSV.
for u in uso:
    # Reemplazamos "/dbfs" con "dbfs:" y cambiamos la extensión '.csv' por '_agregado.csv' en la ruta del archivo.
    path = u.replace("/dbfs","dbfs:")
    path = path.replace('.csv','_agregado.csv')
    
    # Leemos el archivo CSV desde la ruta modificada 'path' usando Spark y lo almacenamos en el DataFrame 'df'.
    df = spark.read.format("csv").option("header", "true").load(path)
    
    # Obtenemos el año de la ruta del archivo usando la función 'get_year_from_path' y el diccionario 'dict_anio'.
    year = get_year_from_path(u, dict_anio)
    
    # Si se encuentra un año en la ruta del archivo, agregamos una nueva columna 'año_vigencia' con ese año al DataFrame 'df'.
    # De lo contrario, agregamos la columna 'año_vigencia' con el valor 0.
    if year:
        df = df.withColumn("año_vigencia", lit(year))
    else:
        df = df.withColumn("año_vigencia", lit(0))

    df = df.withColumn("año_vigencia", col("año_vigencia").cast("int"))
    
    # Si el DataFrame 'usos_agregados_df' está vacío (es decir, es la primera iteración), lo inicializamos con 'df'.
    # De lo contrario, unimos 'df' con 'usos_agregados_df' usando la operación 'union'.
    if usos_agregados_df is None:
        usos_agregados_df = df
    else:
        usos_agregados_df = usos_agregados_df.union(df)


#### Conteo de usos por lote

In [0]:
# Agrupamos el DataFrame 'usos_agregados_df' por las columnas 'LOTLOTE_ID' y 'año_vigencia'.
# Luego, pivotamos el DataFrame usando la columna 'UsoAgregado' para transformar sus valores únicos en columnas individuales.
# Posteriormente, contamos las ocurrencias de cada combinación de 'LOTLOTE_ID', 'año_vigencia' y 'UsoAgregado'.
# El resultado es un DataFrame que tiene una columna para cada valor único de 'UsoAgregado' y celdas que indican el conteo de cada combinación.
conteo_usos = usos_agregados_df.groupBy(["LOTLOTE_ID","año_vigencia"]).pivot("UsoAgregado").count().fillna(0)


#### Generación de dataframe consolidado predios

In [0]:
# Inicializamos un DataFrame vacío llamado predios_agregados_df.
predios_agregados_df = None

# Iteramos sobre cada elemento en la lista 'pre', que parece contener rutas a archivos CSV.
for p in pre:
    # Reemplazamos "/dbfs" con "dbfs:" en la ruta del archivo.
    path = p.replace("/dbfs","dbfs:")
    print(path)
    
    # Leemos el archivo CSV desde la ruta 'path' usando Spark y lo almacenamos en el DataFrame 'df'.
    df = spark.read.format("csv").option("header", "true").load(path)
    
    # Obtenemos el año de la ruta del archivo usando la función 'get_year_from_path' y el diccionario 'dict_anio'.
    # Nota: Parece haber un error en esta línea. Debería ser 'get_year_from_path(p, dict_anio)' en lugar de 'get_year_from_path(u, dict_anio)'.
    year = get_year_from_path(p, dict_anio)
    
    # Si se encuentra un año en la ruta del archivo, agregamos una nueva columna 'año_vigencia' con ese año al DataFrame 'df'.
    # De lo contrario, agregamos la columna 'año_vigencia' con el valor 0.
    if year:
        df = df.withColumn("año_vigencia", lit(year))
    else:
        df = df.withColumn("año_vigencia", lit(0))

    df = df.withColumn("año_vigencia", col("año_vigencia").cast("int"))
    
    # Si el DataFrame 'predios_agregados_df' está vacío (es decir, es la primera iteración), lo inicializamos con 'df'.
    # De lo contrario, unimos 'df' con 'predios_agregados_df' usando la operación 'union'.
    if predios_agregados_df is None:
        predios_agregados_df = df
    else:
        predios_agregados_df = predios_agregados_df.union(df)


#### Suma de área terreno, área construida, valor avalúo y conteo de CHIPs por lote

In [0]:
# Agrupamos el DataFrame 'predios_agregados_df' por las columnas 'LOTLOTE_ID' y 'año_vigencia'.
# Luego, aplicamos varias funciones de agregación:
# - Sumamos los valores de la columna 'AREA_TERRENO'.
# - Sumamos los valores de la columna 'AREA_CONSTRUIDA'.
# - Sumamos los valores de la columna 'VALOR_AVALUO'.
# - Contamos el número de valores en la columna 'CHIP'.
# El resultado es un DataFrame que tiene una columna para cada función de agregación aplicada.
conteo_predios = predios_agregados_df.groupBy(["LOTLOTE_ID","año_vigencia"]).agg({"AREA_TERRENO":"sum", "AREA_CONSTRUIDA":"sum", "VALOR_AVALUO":"sum", "CHIP":"count"}).fillna(0)

# Renombramos las columnas del DataFrame 'conteo_predios' para que tengan nombres más descriptivos y fáciles de entender.
# - "count(CHIP)" se renombra a "NumPredios".
# - "sum(VALOR_AVALUO)" se renombra a "ValorAvaluo".
# - "sum(AREA_CONSTRUIDA)" se renombra a "AC".
# - "sum(AREA_TERRENO)" se renombra a "AT".
conteo_predios = conteo_predios.withColumnRenamed("count(CHIP)","NumPredios") \
    .withColumnRenamed("sum(VALOR_AVALUO)","ValorAvaluo") \
    .withColumnRenamed("sum(AREA_CONSTRUIDA)","AC") \
    .withColumnRenamed("sum(AREA_TERRENO)","AT")


#### Generación de dataframe consolidado propiedad horizontal

In [0]:
# Inicializamos un DataFrame vacío llamado ph_agregados_df.
ph_agregados_df = None

# Iteramos sobre cada elemento en la lista 'ph', que parece contener rutas a archivos CSV.
for p in ph:
    # Reemplazamos "/dbfs" con "dbfs:" en la ruta del archivo.
    path = p.replace("/dbfs","dbfs:")
    
    # Leemos el archivo CSV desde la ruta 'path' usando Spark y lo almacenamos en el DataFrame 'df'.
    df = spark.read.format("csv").option("header", "true").load(path)
    
    # Obtenemos el año de la ruta del archivo usando la función 'get_year_from_path' y el diccionario 'dict_anio'.
    # Nota: Parece haber un error en esta línea. Debería ser 'get_year_from_path(p, dict_anio)' en lugar de 'get_year_from_path(u, dict_anio)'.
    year = get_year_from_path(p, dict_anio)
    print(p)
    print(year)
    
    # Si se encuentra un año en la ruta del archivo, agregamos una nueva columna 'año_vigencia' con ese año al DataFrame 'df'.
    # De lo contrario, agregamos la columna 'año_vigencia' con el valor 0.
    if year:
        df = df.withColumn("año_vigencia", lit(year))
    else:
        df = df.withColumn("año_vigencia", lit(0))

    df = df.withColumn("año_vigencia", col("año_vigencia").cast("int"))
    
    # Si el DataFrame 'ph_agregados_df' está vacío (es decir, es la primera iteración), lo inicializamos con 'df'.
    # De lo contrario, unimos 'df' con 'ph_agregados_df' usando la operación 'union'.
    if ph_agregados_df is None:
        ph_agregados_df = df
    else:
        ph_agregados_df = ph_agregados_df.union(df)


In [0]:
# Seleccionamos solo las columnas "LOTLOTE_ID", "año_vigencia" y "NUMERO_UNIDADES_PH" del DataFrame 'ph_agregados_df'.
# Esto reduce el DataFrame a solo estas tres columnas, descartando cualquier otra columna que pueda existir en 'ph_agregados_df'.
ph_agregados_df = ph_agregados_df.select("LOTLOTE_ID","año_vigencia","NUMERO_UNIDADES_PH")

# Renombramos la columna "NUMERO_UNIDADES_PH" a "Unidades_En_PH" en el DataFrame 'ph_agregados_df'.
# Esto se hace para tener un nombre de columna más descriptivo y fácil de entender.
ph_agregados_df = ph_agregados_df.withColumnRenamed("NUMERO_UNIDADES_PH","Unidades_En_PH")


In [0]:
campos_lotes = ['LOTLOTE_ID','Tiempo','layer','Linea','def_lat', 'def_lng']
lotes_df = pd.DataFrame(lotes_gdf)[campos_lotes].drop_duplicates()
conteo_predios_df = conteo_predios.toPandas().drop_duplicates()
conteo_usos_df = conteo_usos.toPandas().drop_duplicates()
ph_agregados_df = ph_agregados_df.toPandas().drop_duplicates()

#### Unión de predios con conteos de usos por lote y con conteo de unidades ph

In [0]:
union_lotes_conteo_predios = pd.merge(left=lotes_df, right=conteo_predios_df, on=['LOTLOTE_ID'], how='left')

union_predios_usos_df = pd.merge(left=union_lotes_conteo_predios, right=conteo_usos_df, on=['LOTLOTE_ID','año_vigencia'], how='left')

union_catastro_df = pd.merge(left=union_predios_usos_df, right=ph_agregados_df, on=['LOTLOTE_ID','año_vigencia'], how='left').fillna(0)

In [0]:
union_catastro_df.columns = ['LOTLOTE_ID','Tiempo','Estacion','Linea','DefLat','DefLng','AnioVigencia','NumPredios','ValorAvaluo','AC','AT','AsociarAlUsoPrincipal','ComercioYServicios','Dotacional','Industrial','Otro','Residencial','UnidadesEnPH']


In [0]:
# Convertir el DataFrame de Pandas a un DataFrame de PySpark
# union_catastro_df.iteritems = union_catastro_df.items
union_catastro_spark_df = spark.createDataFrame(union_catastro_df)
union_catastro_spark_df.write.csv('/FileStore/tables/Catastro/union_catastro_2006_2023.csv', header=True, mode="overwrite")

In [0]:
union_catastro_spark_df.createOrReplaceTempView("union_catastro_spark_df")

In [0]:
%sql

CREATE OR REPLACE TABLE lotes_catastro_final AS (
  Select 
  u.*, e.ESoEstrato Estrato, case when AC <> 0 then ValorAvaluo/AC else 0 end as VC, AC/AT as IC
  from union_catastro_spark_df u
  left join estrato_socioeconomico e on u.LOTLOTE_ID = e.ESoCLote);


In [0]:
df = spark.sql("SELECT * FROM lotes_catastro where AnioVigencia = 2022")

In [0]:
df.display()

In [0]:
df = spark.sql("SELECT * FROM lotes_catastro_final where AnioVigencia = 2023")

In [0]:
df.display()

In [0]:
# First, sort the data by revenue and assign percent rank
df_with_rank = df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy("VC")))

df_with_rank = df_with_rank.where(df.VC <= 20000000)

# Function to determine the division based on the percent rank
def get_division(rank):
    if 0 < rank <= 1:
        for i in range(1, 31):  # for 30 divisions
            if rank <= i/30:
                # Use ordinal function to get "1st", "2nd", "3rd", etc.
                ordinal = lambda n: "%d%s" % (n, "tsnrhtdd"[((n//10%10!=1)*(n%10<4)*n%10)::4])
                return f"{ordinal(i)} Division"
    else:
        return "Invalid Rank"


# UDF to compute the division
get_division_udf = F.udf(get_division, "string")

# Applying the division function to the percent_rank column and adding the division information to the DataFrame
df_with_division = df_with_rank.withColumn("Division", get_division_udf("percent_rank"))

# Grouping the data by division and computing the range (minimum and maximum revenue) for each division
division_ranges = df_with_division.groupBy("Division").agg(
    F.min("VC").alias("RangeStart"),
    F.max("VC").alias("RangeEnd")
)

# Displaying the result
division_ranges.show()


In [0]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Initialize Spark session (assuming you haven't already)
spark = SparkSession.builder.appName("KMeansBreaks").getOrCreate()

sampled_df = df.where(df.VC <= 50000000)

# Convert to vector for k-means
vec_assembler = VectorAssembler(inputCols=["VC"], outputCol="features")
vec_df = vec_assembler.transform(sampled_df)

# Apply k-means clustering
k = 7  # Number of clusters, adjust as needed
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(vec_df)
centers = model.clusterCenters()

# Extract cluster centers and sort them
sorted_centers = sorted([center[0] for center in centers])

# Determine break points
breaks = [(sorted_centers[i] + sorted_centers[i+1]) / 2 for i in range(len(sorted_centers) - 1)]

print(breaks)


In [0]:
from pyspark.sql.functions import col
import numpy as np
import jenkspy

# Step 1: Filter and clean the data
filtered_df = df.filter(
    (col("VC").isNotNull()) &
    (col("VC") != np.inf) &
    (col("VC") != -np.inf) &
    (col("VC") <= 30000000)
)

# Step 2: Sampling 5% of the data
sampled_data = filtered_df.sample(False, 0.05).select("VC").rdd.flatMap(lambda x: x).collect()

# Compute the Jenks breaks using sampled data
breaks = jenkspy.jenks_breaks(sampled_data, n_classes=6)

# Print the Jenks breaks table
for i, b in enumerate(breaks):
    print(f"Break {i + 1}: {b}")


In [0]:
from h3 import h3
# Define a UDF to convert lat/lng to H3 index
def lat_lng_to_h3(lat, lng, resolution=9):
    return h3.geo_to_h3(lat, lng, resolution)

h3_udf = udf(lat_lng_to_h3)

# Apply the UDF to the DataFrame
df_with_h3 = df.withColumn("h3_index", h3_udf(df["Deflat"], df["DefLng"]))


In [0]:
result = df_with_h3.groupBy(["h3_index",'Linea','Estacion','AnioVigencia']).agg(F.avg("VC").alias("avg_vc"))

In [0]:
def h3_to_polygon(h3_index):
    geo_boundary = h3.h3_to_geo_boundary(h3_index)
    # Convert to a list of [lng, lat] coordinates
    return [[coord[1], coord[0]] for coord in geo_boundary]

h3_to_polygon_udf = udf(h3_to_polygon)

df_with_polygons = result.withColumn("polygon", h3_to_polygon_udf(result["h3_index"]))

In [0]:
df_with_polygons.createOrReplaceTempView("df_with_polygons")

In [0]:
%sql
drop table agregado_espacial_lotes_catastro

In [0]:
%sql

CREATE OR REPLACE TABLE agregado_espacial_lotes_catastro AS (
  Select 
  h3_index, Linea,Estacion,AnioVigencia, avg_vc, polygon as contour
  from df_with_polygons );


In [0]:
%sql
select * from agregado_espacial_lotes_catastro;

In [0]:
geojson_features = []

for row in df_with_polygons.collect():
    feature = {
        "type": "Feature",
        "geometry": {
            "type": "Polygon",
            "coordinates": [row["polygon"]]
        },
        "properties": {
            "h3_index": row["h3_index"],
            "avg_vc": row["avg_vc"]
        }
    }
    geojson_features.append(feature)

geojson_data = {
    "type": "FeatureCollection",
    "features": geojson_features
}

In [0]:
geojson_data

#### Predios 2023

In [0]:
df_catastro_2023 = spark.read.csv('/FileStore/tables/Catastro/2023/predios_2023.csv', header=True)

In [0]:
df_catastro_2023

In [0]:
df_catastro_2023 = df_catastro_2023.withColumn('geocode_result', struct(georeferenciar_udf(col('DIRECCION_REAL')).alias('geocoding')))

In [0]:
df_catastro_2023 = df_catastro_2023.withColumn('estado', col('geocode_result.geocoding.estado'))
df_catastro_2023 = df_catastro_2023.withColumn('tipo_direccion', col('geocode_result.geocoding.tipo_direccion'))
df_catastro_2023 = df_catastro_2023.withColumn('diraprox', col('geocode_result.geocoding.diraprox'))
df_catastro_2023 = df_catastro_2023.withColumn('lat', col('geocode_result.geocoding.yinput'))
df_catastro_2023 = df_catastro_2023.withColumn('lng', col('geocode_result.geocoding.xinput'))
df_catastro_2023 = df_catastro_2023.withColumn('lotcodigo', col('geocode_result.geocoding.lotcodigo'))

In [0]:
df_catastro_2023 = df_catastro_2023.drop('geocode_result')

In [0]:
df_catastro_2023.write.csv('/FileStore/tables/Catastro/2023/predios_2023_georef.csv', header=True, mode="overwrite")

In [0]:
# Convertimos el DataFrame de Spark (df_licencias) a un DataFrame de Pandas (df_licencias_pd).
df_catastro_2023 = df_catastro_2023.toPandas()

# Filtramos el DataFrame de Pandas para mantener solo las filas donde 'def_lat' no es nulo.
df_catastro_2023 = df_catastro_2023[df_catastro_2023['lat'].notnull()]

# Filtramos el DataFrame de Pandas para mantener solo las filas donde 'def_lng' no es nulo.
df_catastro_2023 = df_catastro_2023[df_licencias_pd['lng'].notnull()]

# Creamos una nueva columna 'geometry' en el DataFrame de Pandas.
# Esta columna contiene objetos Point (puntos geográficos) creados a partir de las columnas 'def_lng' y 'def_lat'.
df_catastro_2023['geometry'] = df_catastro_2023.apply(lambda row: Point(row['lng'], row['lat']), axis=1)

# Convertimos el DataFrame de Pandas (df_licencias_pd) a un GeoDataFrame (licencias_gdf) usando geopandas.
# Especificamos la columna 'geometry' como la columna que contiene las geometrías y definimos el sistema de referencia de coordenadas (CRS) como "EPSG:4326".
gdf_catastro_2023 = gpd.GeoDataFrame(df_catastro_2023, geometry='geometry', crs="EPSG:4326")



In [0]:
# Se lee un archivo shapefile de isocronas y se convierte en un GeometryRDD.
isocrona_shp = ShapefileReader.readToGeometryRDD(sc, "dbfs:/FileStore/tables/shp/isocronas/")

# Se realiza una transformación del CRS para el GeometryRDD de isocronas, similar al anterior.
isocrona_shp.CRSTransform('epsg:4326','epsg:4326')

# Se convierte el GeometryRDD de isocronas en un DataFrame de Spark.
isocrona_df = Adapter.toDf(isocrona_shp, spark)

# Convertimos el DataFrame de Spark (isocrona_df) a un DataFrame de Pandas (isocronas_pd).
isocrona_df = isocrona_df.withColumn("geometry", col("geometry").cast("string"))
isocronas_pd = isocrona_df.toPandas()
isocronas_pd['geometry'] = isocronas_pd['geometry'].apply(wkt.loads)

# Convertimos el DataFrame de Pandas (isocronas_pd) a un GeoDataFrame (isocronas_gdf) usando geopandas.
# Especificamos la columna 'geometry' como la columna que contiene las geometrías y definimos el sistema de referencia de coordenadas (CRS) como "EPSG:4326".
isocronas_gdf = gpd.GeoDataFrame(isocronas_pd, geometry='geometry', crs="EPSG:4326")



In [0]:
# Realizamos una operación de join espacial entre los GeoDataFrames licencias_gdf e isocronas_gdf.
# El join se realiza con base en la relación espacial "within", es decir, se unen las filas de licencias_gdf que están dentro de las geometrías de isocronas_gdf.
# El resultado es un nuevo GeoDataFrame (licencias_gdf_iso) que contiene solo las licencias que están dentro de las isócronas.
gdf_catastro_2023_iso = gpd.sjoin(gdf_catastro_2023, isocronas_gdf, how="inner", op="within")

In [0]:
gdf_catastro_2023_iso['wkt'] = pd.Series(
        map(lambda geom: str(geom.to_wkt()), gdf_catastro_2023_iso['geometry']),
        index=gdf_catastro_2023_iso.index, dtype='string')

In [0]:
catastro_2023_iso_spark_df = spark.createDataFrame(gdf_catastro_2023_iso)


In [0]:
catastro_2023_iso_spark_df.write.csv('/FileStore/tables/Catastro/2023/predios_2023_georef_iso.csv', header=True, mode="overwrite")

### Licencias (SDP)

In [0]:
# Definimos la ubicación del archivo que queremos leer.
file_location = "/FileStore/tables/SDP/sdp_licencias_2008_2023.csv"

# Especificamos el tipo de archivo. En este caso, es un archivo CSV.
file_type = "csv"

# Definimos algunas opciones para la lectura del archivo:
# - 'infer_schema': Si es "true", Spark intentará inferir automáticamente el esquema (tipos de datos) de las columnas.
# - 'first_row_is_header': Si es "true", Spark tratará la primera fila del archivo CSV como encabezados (nombres de columnas).
# - 'delimiter': Especifica el delimitador utilizado en el archivo CSV. En este caso, es una coma.
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# Usamos el método 'read' de Spark para leer el archivo CSV con las opciones especificadas.
# El resultado es un DataFrame llamado 'df_licencias' que contiene los datos del archivo CSV.
df_licencias = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)


#### Georeferenciación de licencias SDP a partir de dirección catastral

In [0]:
# df_licencias = df_licencias.withColumn('geocode_result', struct(georeferenciar_udf(col('Dirección')).alias('geocoding')))

In [0]:
# df_licencias = df_licencias.withColumn('estado', col('geocode_result.geocoding.estado'))
# df_licencias = df_licencias.withColumn('tipo_direccion', col('geocode_result.geocoding.tipo_direccion'))
# df_licencias = df_licencias.withColumn('diraprox', col('geocode_result.geocoding.diraprox'))
# df_licencias = df_licencias.withColumn('lat', col('geocode_result.geocoding.yinput'))
# df_licencias = df_licencias.withColumn('lng', col('geocode_result.geocoding.xinput'))
# df_licencias = df_licencias.withColumn('lotcodigo', col('geocode_result.geocoding.lotcodigo'))
# df_licencias = df_licencias.withColumn('eq_lot_georef', col('lotcodigo').cast(LongType()) == col('Código lote').cast(LongType()))

In [0]:
# df_licencias = df_licencias.drop('geocode_result')

In [0]:
# df_licencias.write.csv('/FileStore/tables/SDP/sdp_licencias_2008_2023_georef.csv', header=True, mode="overwrite")

In [0]:
df_licencias = spark.read.csv('/FileStore/tables/SDP/sdp_licencias_2008_2023_georef.csv', header=True)

In [0]:
# Agregamos una nueva columna "new_coords" al DataFrame df_licencias.
# Esta columna se genera utilizando la función definida por el usuario 'coordinate_udf',
# que toma como entrada las columnas "Longitud" y "Latitud".
df_licencias = df_licencias.withColumn("new_coords", coordinate_udf("Longitud", "Latitud"))

# Seleccionamos todas las columnas del DataFrame original, y adicionalmente extraemos
# los campos 'wgs84_lng' y 'wgs84_lat' de la columna "new_coords".
# Posteriormente, eliminamos la columna "new_coords" para mantener solo los campos extraídos.
df_licencias = df_licencias.select("*", "new_coords.wgs84_lng", "new_coords.wgs84_lat").drop("new_coords")


In [0]:
# Actualizamos el DataFrame df_licencias agregando una nueva columna "def_lng".
# Si "wgs84_lng" no es nulo y es diferente de 0, se usa el valor de "wgs84_lng".
# En caso contrario, se utiliza el valor de la columna "lng".
df_licencias = df_licencias.withColumn(
    "def_lng",
    when(
        (col("wgs84_lng").isNotNull()) & 
        (col("wgs84_lng") != 0), 
        col("wgs84_lng")
    ).otherwise(col("lng"))
)

# Similar al proceso anterior, actualizamos el DataFrame df_licencias agregando una nueva columna "def_lat".
# Si "wgs84_lat" no es nulo y es diferente de 0, se usa el valor de "wgs84_lat".
# En caso contrario, se utiliza el valor de la columna "lat".
df_licencias = df_licencias.withColumn(
    "def_lat",
    when(
        (col("wgs84_lat").isNotNull()) & 
        (col("wgs84_lat") != 0), 
        col("wgs84_lat")
    ).otherwise(col("lat"))
)

# Definimos una lista de columnas que queremos eliminar del DataFrame.
columns_to_drop = ["Latitud", "Longitud", "wgs84_lng", "wgs84_lat", "lat", "lng", "eq_lot_georef"]

# Eliminamos las columnas especificadas en la lista 'columns_to_drop' del DataFrame df_licencias.
df_licencias = df_licencias.drop(*columns_to_drop)


In [0]:
# Actualizamos el DataFrame df_licencias agregando una nueva columna "ModalidadAgregada".
# Esta columna se genera utilizando la función definida por el usuario 'homoModalidad_udf',
# que toma como entrada la columna "Modalidad" del DataFrame.
df_licencias = df_licencias.withColumn("ModalidadAgregada", homoModalidad_udf(df_licencias["Modalidad"]))

# Eliminamos la columna "Modalidad" del DataFrame df_licencias ya que ha sido reemplazada por "ModalidadAgregada".
df_licencias = df_licencias.drop("Modalidad")


In [0]:
# Actualizamos el DataFrame df_licencias agregando una nueva columna "UsoAgregado".
# Esta columna se genera utilizando la función definida por el usuario 'homoUsoLicencia_udf',
# que toma como entrada la columna "Uso" del DataFrame.
df_licencias = df_licencias.withColumn("UsoAgregado", homoUsoLicencia_udf(df_licencias["Uso"]))

# Eliminamos la columna "Modalidad" del DataFrame df_licencias ya que ha sido reemplazada por "ModalidadAgregada".
df_licencias = df_licencias.drop("Uso")


In [0]:
# Definimos una lista de tipos de trámites que queremos filtrar en el DataFrame.
tipo_tramite = ['Licencia de Construcción','Reconocimiento de la existencia de una construcción','Licencia de Urbanización','Licencia de Urbanismo y construcción','Propiedad Horizontal','Licencia de Subdivisión','Revalidación licencia de construcción','Autorización para el movimiento de tierras','Sin informacion','Modificación','Ajuste de cotas de áreas','Copia Certificada de planos','Sin Tipo de Trámite','Inicial','Modificación de planos urbanísticos']
tipo_tramite = ['Licencia de Construcción']

# Filtramos el DataFrame df_licencias para mantener solo las filas donde 'Tipo Trámite' esté en la lista tipo_tramite.
df_licencias = df_licencias.filter(col('Tipo Trámite').isin(tipo_tramite))

objeto_tramite = ['Inicial','Modificación']

df_licencias = df_licencias.filter(col('Objeto trámite').isin(objeto_tramite))

# Definimos una lista de modalidades que queremos filtrar en el DataFrame.
modalidades = ['Obra nueva','Ampliación','Modificación','Demolición','Reforzamiento Estructural','Restauración','Cerramiento','Adecuación','Subdivisión','Propiedad Horizontal','Sin Modalidad','Culminación de Obras','Reconocimiento','Desararollo','Otras']
modalidades = ['Obra nueva']

# Filtramos el DataFrame df_licencias para mantener solo las filas donde 'ModalidadAgregada' esté en la lista modalidades.
df_licencias = df_licencias.filter(col('ModalidadAgregada').isin(modalidades))

# Definimos una lista de decisiones que queremos filtrar en el DataFrame.
#decidiones = ['Aprobado', 'Desistido','Aclarado','Prorrogado','Negado','Recurso Confirmado', 'Recurso Rechazado', 'Recurso Aclarado', 'Renuncia de Licencia', 'Revocado', 'Archivado']
#decidiones = ['Aprobado', 'Desistido','Renuncia de Licencia']

# Filtramos el DataFrame df_licencias para mantener solo las filas donde 'Tipo de decisión' esté en la lista decidiones.
#df_licencias = df_licencias.filter(col('Tipo de decisión').isin(decidiones))

# Filtramos el DataFrame df_licencias para mantener solo las filas donde 'Conteo licencias' sea mayor a 0.
df_licencias = df_licencias.filter(col('Conteo licencias') > 0)


In [0]:
# Agrupamos el DataFrame 'predios_agregados_df' por las columnas 'LOTLOTE_ID' y 'año_vigencia'.
# Luego, aplicamos varias funciones de agregación:
# - Sumamos los valores de la columna 'AREA_TERRENO'.
# - Sumamos los valores de la columna 'AREA_CONSTRUIDA'.
# - Sumamos los valores de la columna 'VALOR_AVALUO'.
# - Contamos el número de valores en la columna 'CHIP'.
# El resultado es un DataFrame que tiene una columna para cada función de agregación aplicada.
conteo_licencias = df_licencias.groupBy(["Código lote","Año de ejecutoría"]).agg({"Área":"sum", "Unidades":"sum", "Matrícula Inmobiliaria":"count", "CHIP":"count"}).fillna(0)
## Matrícula Inmobiliaria, CHIP, Unidades	Área
# Renombramos las columnas del DataFrame 'conteo_predios' para que tengan nombres más descriptivos y fáciles de entender.
# - "count(CHIP)" se renombra a "NumPredios".
# - "sum(VALOR_AVALUO)" se renombra a "ValorAvaluo".
# - "sum(AREA_CONSTRUIDA)" se renombra a "AC".
# - "sum(AREA_TERRENO)" se renombra a "AT".
conteo_licencias = conteo_licencias.withColumnRenamed("count(CHIP)","NumCHIP") \
    .withColumnRenamed("sum(Área)","Area") \
    .withColumnRenamed("sum(Unidades)","Unidades") \
    .withColumnRenamed("count(Matrícula Inmobiliaria)","Matriculas")


In [0]:
display(df_licencias)

In [0]:
# Agrupamos el DataFrame df_licencias por las columnas "Código lote" y "Año de ejecutoría".
# Luego, pivotamos el DataFrame usando la columna "ModalidadAgregada" para transformar sus valores únicos en columnas individuales.
# Posteriormente, contamos las ocurrencias de cada combinación de "Código lote", "Año de ejecutoría" y "ModalidadAgregada".
# Finalmente, reemplazamos cualquier valor nulo con 0 usando fillna(0).
conteo_modalidades = (df_licencias.groupBy(["Código lote","Año de ejecutoría"]).pivot("ModalidadAgregada").count()).fillna(0)


In [0]:
# Agrupamos el DataFrame df_licencias por las columnas "Código lote" y "Año de ejecutoría".
# Luego, pivotamos el DataFrame usando la columna "ModalidadAgregada" para transformar sus valores únicos en columnas individuales.
# Posteriormente, contamos las ocurrencias de cada combinación de "Código lote", "Año de ejecutoría" y "ModalidadAgregada".
# Finalmente, reemplazamos cualquier valor nulo con 0 usando fillna(0).
conteo_usos_licencias = (df_licencias.groupBy(["Código lote","Año de ejecutoría"]).pivot("UsoAgregado").count()).fillna(0)


In [0]:
display(conteo_usos_licencias)

In [0]:
# Convertimos el DataFrame de Spark (df_licencias) a un DataFrame de Pandas (df_licencias_pd).
df_licencias_pd = df_licencias.toPandas()

# Filtramos el DataFrame de Pandas para mantener solo las filas donde 'def_lat' no es nulo.
df_licencias_pd = df_licencias_pd[df_licencias_pd['def_lat'].notnull()]

# Filtramos el DataFrame de Pandas para mantener solo las filas donde 'def_lng' no es nulo.
df_licencias_pd = df_licencias_pd[df_licencias_pd['def_lng'].notnull()]

# Creamos una nueva columna 'geometry' en el DataFrame de Pandas.
# Esta columna contiene objetos Point (puntos geográficos) creados a partir de las columnas 'def_lng' y 'def_lat'.
df_licencias_pd['geometry'] = df_licencias_pd.apply(lambda row: Point(row['def_lng'], row['def_lat']), axis=1)

# Convertimos el DataFrame de Pandas (df_licencias_pd) a un GeoDataFrame (licencias_gdf) usando geopandas.
# Especificamos la columna 'geometry' como la columna que contiene las geometrías y definimos el sistema de referencia de coordenadas (CRS) como "EPSG:4326".
licencias_gdf = gpd.GeoDataFrame(df_licencias_pd, geometry='geometry', crs="EPSG:4326")


In [0]:
# Convertimos el DataFrame de Spark (isocrona_df) a un DataFrame de Pandas (isocronas_pd).
isocrona_df = isocrona_df.withColumn("geometry", col("geometry").cast("string"))
isocronas_pd = isocrona_df.toPandas()
isocronas_pd['geometry'] = isocronas_pd['geometry'].apply(wkt.loads)

In [0]:


# Convertimos el DataFrame de Pandas (isocronas_pd) a un GeoDataFrame (isocronas_gdf) usando geopandas.
# Especificamos la columna 'geometry' como la columna que contiene las geometrías y definimos el sistema de referencia de coordenadas (CRS) como "EPSG:4326".
isocronas_gdf = gpd.GeoDataFrame(isocronas_pd, geometry='geometry', crs="EPSG:4326")

# Realizamos una operación de join espacial entre los GeoDataFrames licencias_gdf e isocronas_gdf.
# El join se realiza con base en la relación espacial "within", es decir, se unen las filas de licencias_gdf que están dentro de las geometrías de isocronas_gdf.
# El resultado es un nuevo GeoDataFrame (licencias_gdf_iso) que contiene solo las licencias que están dentro de las isócronas.
licencias_gdf_iso = gpd.sjoin(licencias_gdf, isocronas_gdf, how="inner", op="within")

In [0]:
licencias_gdf_iso[['geometry','Tiempo']].sample(5000).explore('Tiempo', tiles='CartoDB positron')

In [0]:
campos_licencias = ['Código lote','Año de ejecutoría','Localidad','UPZ','Tiempo','layer','Linea','Tipo de decisión','def_lng','def_lat']
licencias_gdf = pd.DataFrame(licencias_gdf_iso)[campos_licencias].drop_duplicates()
conteo_licencias_df = conteo_licencias.toPandas().drop_duplicates()
conteo_modalidades_df = conteo_modalidades.toPandas().drop_duplicates()
conteo_usos_licencias_df = conteo_usos_licencias.toPandas().drop_duplicates()

In [0]:
# Realizamos una operación de unión (merge) entre el GeoDataFrame licencias_gdf_iso y el DataFrame conteo_modalidades.
# La unión se basa en las columnas 'Código lote' y 'Año de ejecutoría'.
# Utilizamos un join de tipo "left", lo que significa que se conservarán todas las filas del GeoDataFrame licencias_gdf_iso y se agregarán las columnas correspondientes del DataFrame conteo_modalidades.
# Si no hay coincidencia para una fila en particular de licencias_gdf_iso en conteo_modalidades, los valores de las columnas agregadas serán NaN.
union_licencias_df =  pd.merge(left=licencias_gdf, right=conteo_licencias_df, on=['Código lote','Año de ejecutoría'], how="left")
#union_licencias_df =  pd.merge(left=union_licencias_df, right=conteo_modalidades_df, on=['Código lote','Año de ejecutoría'], how="left")
union_licencias_df =  pd.merge(left=union_licencias_df, right=conteo_usos_licencias_df, on=['Código lote','Año de ejecutoría'], how="left")


In [0]:
union_licencias_df

In [0]:
#union_licencias_df.columns = ['CodigoLote','AnioDeEjecutoria','Localidad','UPZ','Tiempo','Estacion','Linea','TipoDeDecision','DefLng','DefLat','NumCHIP','Area','Unidades','Matriculas','Adecuacion','Ampliacion','Cerramiento','CulminacionDeObras','Demolicion','Modificacion','ObraNueva','Otras','PropiedadHorizontal','Reconocimiento','ReforzamientoEstructural','Restauración','SinModalidad','Subdivisión']
union_licencias_df.columns = ['CodigoLote','AnioDeEjecutoria','Localidad','UPZ','Tiempo','Estacion','Linea','TipoDeDecision','DefLng','DefLat','NumCHIP','Area','Unidades','Matriculas','ComercioYServicios','Industrial','Otros','Residencial']

In [0]:
# Convertir el DataFrame de Pandas a un DataFrame de PySpark
union_licencias_spark_df = spark.createDataFrame(union_licencias_df)

# Escribir el DataFrame de PySpark a un archivo CSV
union_licencias_spark_df.write.csv('/FileStore/tables/SDP/union_licencias_2008_2023.csv', header=True, mode="overwrite")

In [0]:
union_licencias_spark_df

In [0]:
union_licencias_spark_df.createOrReplaceTempView("union_sdp_spark_df")

In [0]:
%sql
CREATE OR REPLACE TABLE lotes_licencias_sdp AS (
  Select 
  *, cast(AnioDeEjecutoria as float) as AnioVigencia
  from union_sdp_spark_df where CodigoLote is not null);


In [0]:
%sql
CREATE OR REPLACE TABLE lotes_catastro AS (
  Select 
  *
  from lotes_catastro where Estrato <> '0');

In [0]:
%sql
SELECT AnioDeEjecutoria, sum(ComercioYServicios),sum(Industrial), sum(Otros), sum(Residencial) from (select *
 from  lotes_licencias_sdp) group by AnioDeEjecutoria ;

In [0]:
%sql

SELECT * FROM lotes_catastro;

## Sandbox

In [0]:
#dbfs:/FileStore/tables/Catastro/2010/predios_2010.csv
df = spark.read.format('csv').options(header='true').load('/FileStore/tables/Catastro/2023/predios_2023.csv')

In [0]:
display(df)

In [0]:
display(df)