In [0]:
%pip install geopy folium requests -q

Python interpreter will be restarted.
Python interpreter will be restarted.


## Imports y Clases

In [0]:
# Databricks notebook source
import pandas as pd
import re
import requests
import json
from geopy.geocoders import Nominatim, ArcGIS
from geopy.exc import GeocoderTimedOut, GeocoderServiceError
from time import sleep
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
import logging
from typing import List, Dict, Tuple, Optional
import numpy as np

# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuración de geocodificadores
GEOCODERS_CONFIG = {
    'nominatim': {
        'class': Nominatim,
        'params': {'user_agent': 'databricks_casino_geocoder_v3', 'timeout': 15},
        'delay': 1.2  # Respeta los términos de uso de Nominatim
    },
    'arcgis': {
        'class': ArcGIS,
        'params': {'timeout': 15},
        'delay': 0.5
    }
}

# Configuración de paralelización
MAX_WORKERS = 3
CACHE_SIZE = 500

class DireccionProcessor:
    """Procesador robusto de direcciones peruanas."""
    def __init__(self):
        self.patterns = {
            'numeration': r'(?:Nro\.?|No\.?|N°|Nº|#)\s*',
            'duplicated_spaces': r'\s{2,}',
            'street_types': {
                'AV\.?': 'AVENIDA', 'JR\.?': 'JIRON', 'CA\.?': 'CALLE',
                'PSJ\.?': 'PASAJE', 'MZ\.?': 'MANZANA', 'LT\.?': 'LOTE',
                'URB\.?': 'URBANIZACIÓN', 'COOP\.?': 'COOPERATIVA'
            }
        }

    @lru_cache(maxsize=CACHE_SIZE)
    def clean_address(self, address: str) -> str:
        """Limpia y normaliza una dirección."""
        if pd.isna(address): return ""
        addr = str(address).upper().strip()
        for pattern, replacement in self.patterns['street_types'].items():
            addr = re.sub(pattern, replacement, addr, flags=re.IGNORECASE)
        addr = re.sub(self.patterns['numeration'], '', addr, flags=re.IGNORECASE)
        addr = re.sub(self.patterns['duplicated_spaces'], ' ', addr)
        addr = re.sub(r'[^\w\s,.-]', ' ', addr)
        return addr.strip()

    def generate_address_variants(self, row: pd.Series) -> List[str]:
        """Genera múltiples variantes de una dirección para maximizar éxito."""
        direccion = self.clean_address(row.get('Dirección', ''))
        distrito = str(row.get('Distrito', '')).upper().strip()
        provincia = str(row.get('Provincia', 'LIMA')).upper().strip()
        departamento = str(row.get('Departamento', 'LIMA')).upper().strip()
        
        if not direccion: return []
        
        variants = [
            f"{direccion}, {distrito}, {provincia}, {departamento}, PERÚ",
            f"CASINO {direccion}, {distrito}, {provincia}, PERÚ",
            f"{direccion}, {distrito}, {provincia}, PERÚ",
            f"{direccion}, {distrito}, PERÚ",
            f"{direccion}, PERÚ"
        ]
        if provincia.upper() == 'LIMA':
            variants.append(f"{direccion}, {distrito}, LIMA, PERÚ")
        if distrito.upper() in ['LIMA', 'CERCADO DE LIMA']:
            variants.append(f"{direccion}, LIMA CENTRO, PERÚ")
        
        seen = set()
        unique_variants = []
        for variant in variants:
            variant_clean = re.sub(r'\s+', ' ', variant).strip()
            if variant_clean and variant_clean not in seen:
                seen.add(variant_clean)
                unique_variants.append(variant_clean)
        return unique_variants

class RobustGeocoder:
    """Geocodificador robusto con múltiples estrategias y fallbacks."""
    def __init__(self):
        self.processor = DireccionProcessor()
        self.geocoders = self._initialize_geocoders()
        self.stats = {'total_processed': 0, 'successful': 0, 'failed': 0, 'by_geocoder': {}}

    def _initialize_geocoders(self) -> Dict:
        """Inicializa los geocodificadores disponibles."""
        geocoders = {}
        for name, config in GEOCODERS_CONFIG.items():
            try:
                geocoders[name] = {'instance': config['class'](**config['params']), 'delay': config['delay']}
                logger.info(f"✅ Geocodificador {name} inicializado")
            except Exception as e:
                logger.warning(f"⚠️ No se pudo inicializar {name}: {e}")
        return geocoders

    def _is_valid_peru_coordinates(self, lat: float, lon: float) -> bool:
        """Valida que las coordenadas estén dentro del territorio peruano."""
        return (-18.5 <= lat <= 0.5) and (-81.5 <= lon <= -68.5)

    def _geocode_single_variant(self, address: str, geocoder_name: str) -> Optional[Tuple[float, float]]:
        """Geocodifica una variante de dirección con un geocodificador específico."""
        if geocoder_name not in self.geocoders: return None
        geocoder_info = self.geocoders[geocoder_name]
        try:
            sleep(geocoder_info['delay'])
            location = geocoder_info['instance'].geocode(address, exactly_one=True, timeout=15)
            if location and self._is_valid_peru_coordinates(location.latitude, location.longitude):
                return (location.latitude, location.longitude)
        except (GeocoderTimedOut, GeocoderServiceError) as e:
            logger.debug(f"Error de geocodificador {geocoder_name}: {e}")
        except Exception as e:
            logger.debug(f"Error inesperado en {geocoder_name}: {e}")
        return None

    def geocode_address_robust(self, row: pd.Series) -> Dict:
        """Geocodifica una dirección usando todas las estrategias disponibles."""
        variants = self.processor.generate_address_variants(row)
        for variant in variants:
            for geocoder_name in self.geocoders.keys():
                coordinates = self._geocode_single_variant(variant, geocoder_name)
                if coordinates:
                    self.stats['by_geocoder'][geocoder_name] = self.stats['by_geocoder'].get(geocoder_name, 0) + 1
                    return {
                        'Direccion_Original': row.get('Dirección', 'N/A'), 'Direccion_Exitosa': variant,
                        'Latitud': coordinates[0], 'Longitud': coordinates[1], 'Estado': 'Encontrado',
                        'Geocodificador_Usado': geocoder_name, 'Variantes_Probadas': variants.index(variant) + 1
                    }
        return {'Direccion_Original': row.get('Dirección', 'N/A'), 'Latitud': None, 'Longitud': None, 
                'Estado': 'No Encontrado', 'Variantes_Probadas': len(variants)}

    def process_batch(self, df: pd.DataFrame, use_parallel: bool = True) -> pd.DataFrame:
        """Procesa un lote de direcciones en paralelo o secuencial."""
        logger.info(f"🚀 Iniciando geocodificación de {len(df)} direcciones...")
        results = []
        if not use_parallel or len(df) <= 5: # Procesamiento secuencial
            for _, row in df.iterrows():
                results.append(self.geocode_address_robust(row))
        else: # Procesamiento paralelo
            with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
                future_to_row = {executor.submit(self.geocode_address_robust, row): i for i, row in df.iterrows()}
                temp_results = {}
                for future in as_completed(future_to_row):
                    idx = future_to_row[future]
                    try:
                        temp_results[idx] = future.result()
                    except Exception as e:
                        logger.error(f"Error procesando fila {idx}: {e}")
                results = [temp_results[i] for i in sorted(temp_results)]

        for result in results:
            self.stats['total_processed'] += 1
            if result['Estado'] == 'Encontrado': self.stats['successful'] += 1
            else: self.stats['failed'] += 1
        return pd.DataFrame(results)

    def get_stats_summary(self) -> str:
        """Retorna un resumen de estadísticas."""
        total = self.stats['total_processed']
        success_rate = (self.stats['successful'] / total * 100) if total > 0 else 0
        summary = f"📊 RESUMEN: {self.stats['successful']}/{total} ({success_rate:.1f}%) exitosos.\n"
        for geocoder, count in self.stats['by_geocoder'].items():
            percentage = (count / self.stats['successful'] * 100) if self.stats['successful'] > 0 else 0
            summary += f"  • {geocoder}: {count} ({percentage:.1f}%)\n"
        return summary



## Ejecución Principal

In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC ## Ejecución Principal
# MAGIC 
# MAGIC ### 1. Carga de Datos desde DBFS
# MAGIC 
# MAGIC **Importante:** Se asume que el archivo `datos_casinos_salas.csv` fue cargado a Databricks usando la UI de "Datos". La ruta `/FileStore/tables/` es la ubicación por defecto.

# COMMAND ----------

def main():
    """Función principal que carga, procesa y guarda los datos."""
    
    logger.info("🎯 Iniciando sistema de geocodificación robusto para casinos peruanos")
    
    # --- PASO 1: Cargar datos desde DBFS con Spark ---
    # Esta es la forma correcta de leer un archivo CSV en Databricks.
    # Reemplaza la función 'load_casino_data' que era para un entorno local.
    
    file_path = "/FileStore/tables/datos_casinos_salas.csv"
    
    try:
        spark_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)
        df = spark_df.toPandas() # Convertir a pandas para usarlo en el resto del script
        logger.info(f"✅ Archivo '{file_path}' cargado exitosamente desde DBFS.")
        logger.info(f"📁 Datos cargados: {len(df)} registros")
        print(f"Columnas disponibles: {list(df.columns)}")
        print(f"Vista previa:\n{df.head()}")
    except Exception as e:
        logger.error(f"❌ Error cargando el archivo desde DBFS: {e}")
        logger.error("Asegúrate que el archivo 'datos_casinos_salas.csv' esté cargado en DBFS en la ruta correcta.")
        return

    # --- El resto del proceso no necesita cambios ---
    
    # 2. Inicializar geocodificador
    geocoder = RobustGeocoder()
    
    # 3. Procesar datos
    try:
        use_parallel = len(df) > 10
        results_df = geocoder.process_batch(df, use_parallel=use_parallel)
        
        # 4. Mostrar estadísticas
        print("\n" + "="*80)
        print(geocoder.get_stats_summary())
        print("="*80 + "\n")

        # 5. Guardar resultados
        output_file = 'resultados_geocodificados_databricks.csv'
        ########################### results_df.to_csv(f'/dbfs/FileStore/tables/{output_file}', index=False, encoding='utf-8-sig')
        display(results_df)
        logger.info(f"💾 Resultados guardados en DBFS: /FileStore/tables/{output_file}")
        
        # 6. Mostrar vista previa de resultados en el notebook
        print("\n📋 VISTA PREVIA DE RESULTADOS:")
        successful_results = results_df[results_df['Estado'] == 'Encontrado']
        failed_results = results_df[results_df['Estado'] != 'Encontrado']
        
        print(f"\n✅ DIRECCIONES ENCONTRADAS ({len(successful_results)}):")
        if not successful_results.empty:
            print(successful_results[['Direccion_Original', 'Latitud', 'Longitud', 'Geocodificador_Usado']].head())

        print(f"\n❌ DIRECCIONES NO ENCONTRADAS ({len(failed_results)}):")
        if not failed_results.empty:
            print(failed_results[['Direccion_Original', 'Estado']].head())
        
        return results_df
        
    except Exception as e:
        logger.error(f"❌ Error durante el procesamiento: {e}", exc_info=True)
        raise

# Ejecutar el proceso y guardar los resultados en una variable
results = main()

INFO:py4j.clientserver:Received command c on object id p0
INFO:__main__:🎯 Iniciando sistema de geocodificación robusto para casinos peruanos
INFO:__main__:✅ Archivo '/FileStore/tables/datos_casinos_salas.csv' cargado exitosamente desde DBFS.
INFO:__main__:📁 Datos cargados: 692 registros
INFO:__main__:✅ Geocodificador nominatim inicializado
INFO:__main__:✅ Geocodificador arcgis inicializado
INFO:__main__:🚀 Iniciando geocodificación de 692 direcciones...


Columnas disponibles: ['Ruc', 'Empresa', 'Establecimiento', 'Giro', 'Resolución', 'Código Sala', 'Vigencia', 'Dirección', 'Distrito', 'Provincia', 'Departamento']
Vista previa:
           Ruc                               Empresa   Establecimiento  \
0  20265815830  CORPORACION TURISTICA PERUANA S.A.C.     ATLANTIC CITY   
1  20231843460                    COSTA DEL SOL S.A.  MASARIS - TUMBES   
2  20305556786            GAMING AND SERVICES S.A.C.            JOKERS   
3  20305556786            GAMING AND SERVICES S.A.C.         FANTASTIC   
4  20305556786            GAMING AND SERVICES S.A.C.             MIAMI   

          Giro Resolución  Código Sala    Vigencia  \
0    REST. 5TT       5363    110019002  20/01/2028   
1   HOTEL ****        117    110021001  17/01/2027   
2  HOTEL *****       1598    110032006  22/09/2026   
3  HOTEL *****       1614    110032007  25/10/2026   
4    REST. 5TT       2917    110032008  11/07/2027   

                           Dirección    Distrito Prov

In [0]:
# Databricks notebook source
# Asumimos que el DataFrame 'results' ya existe con los datos geocodificados.
if 'results' in locals() and results is not None:
    
    print("Iniciando la exportación a CSV con punto y coma...")
    
    # --- 1. Definir el nombre del archivo de salida ---
    output_filename = "resultados_geocodificados_puntoycoma.csv"
    
    # --- 2. Convertir el DataFrame de pandas a un DataFrame de Spark ---
    # Esto es necesario para usar el escritor nativo y robusto de Spark.
    spark_results_df = spark.createDataFrame(results)

    # --- 3. Escribir el archivo CSV usando las opciones correctas ---
    # Se escribe en una carpeta temporal primero, es el comportamiento normal de Spark.
    temp_output_path = f"dbfs:/FileStore/tables/{output_filename}_temp"
    final_output_path = f"dbfs:/FileStore/tables/{output_filename}"
    
    try:
        (spark_results_df.repartition(1).write
            .format("csv")
            .option("header", "true")           # Incluir la fila de encabezados
            .option("sep", ";")                 # Establecer el separador a punto y coma
            .option("encoding", "UTF-8")        # Usar codificación UTF-8 para tildes y ñ
            .mode("overwrite")                  # Sobrescribir si el archivo ya existe
            .save(temp_output_path)
        )
        
        # --- 4. Mover y renombrar el archivo para un acceso fácil ---
        # Spark guarda el archivo con un nombre genérico (part-00000), lo movemos y renombramos.
        part_file = dbutils.fs.ls(temp_output_path)[0].path
        dbutils.fs.mv(part_file, final_output_path)
        dbutils.fs.rm(temp_output_path, recurse=True) # Eliminar la carpeta temporal
        
        print("✅ ¡Exportación completada con éxito!")
        print(f"Puedes descargar tu archivo desde la siguiente ruta en Databricks:")
        print(f"➡️ Data > DBFS > FileStore > tables > {output_filename}")

    except Exception as e:
        print(f"❌ Ocurrió un error durante la exportación: {e}")

else:
    print("⚠️ La variable 'results' no fue encontrada. Por favor, ejecuta primero el proceso de geocodificación.")

INFO:py4j.clientserver:Received command c on object id p0


Iniciando la exportación a CSV con punto y coma...
✅ ¡Exportación completada con éxito!
Puedes descargar tu archivo desde la siguiente ruta en Databricks:
➡️ Data > DBFS > FileStore > tables > resultados_geocodificados_puntoycoma.csv


In [0]:
# Databricks notebook source
import base64
import os
import uuid

# Asumimos que el DataFrame 'results' ya existe
if 'results' in locals() and results is not None:
  
  print("Generando enlace de descarga para el archivo CSV...")
  
  # --- 1. Crear un nombre de archivo temporal y único ---
  temp_filename = f"{uuid.uuid4()}.csv"
  local_temp_path = os.path.join("/tmp", temp_filename)
  
  # --- 2. Guardar el DataFrame en el archivo temporal con el formato deseado ---
  results.to_csv(
      local_temp_path, 
      sep=';',                  # Separador de punto y coma
      encoding='utf-8-sig',     # Codificación para compatibilidad con Excel y acentos
      index=False
  )
  
  # --- 3. Leer el archivo y codificarlo en base64 para el enlace ---
  with open(local_temp_path, "rb") as f:
    b64 = base64.b64encode(f.read()).decode()
  
  # --- 4. Crear el enlace HTML para la descarga ---
  download_link = f'<a href="data:text/csv;base64,{b64}" download="resultados_geocodificados.csv">🔗 Haz clic aquí para descargar el archivo CSV (separado por punto y coma)</a>'
  
  # --- 5. Mostrar el enlace en el notebook y limpiar el archivo temporal ---
  displayHTML(download_link)
  os.remove(local_temp_path)

else:
  print("⚠️ La variable 'results' no fue encontrada. Ejecuta primero el proceso de geocodificación.")

Generando enlace de descarga para el archivo CSV...


## Análisis y Visualización

In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC ## Análisis y Visualización de Resultados (Corregido)

# COMMAND ----------

def analyze_and_visualize(results_df: pd.DataFrame):
    """Analiza y visualiza los resultados de geocodificación."""
    if results_df is None or results_df.empty:
        print("⚠️ No hay resultados para analizar.")
        return

    print("\n📈 ANÁLISIS DETALLADO DE RESULTADOS")
    print("="*50)

    # Crear mapa si hay resultados exitosos
    successful_coords = results_df[results_df['Estado'] == 'Encontrado'].dropna(subset=['Latitud', 'Longitud'])
    
    if not successful_coords.empty:
        print(f"🗺️ Creando mapa con {len(successful_coords)} ubicaciones...")
        try:
            import folium
            center_lat = successful_coords['Latitud'].mean()
            center_lon = successful_coords['Longitud'].mean()
            
            m = folium.Map(location=[center_lat, center_lon], zoom_start=11, tiles='OpenStreetMap')
            
            for _, row in successful_coords.iterrows():
                popup_text = f"<b>{row['Direccion_Original']}</b><br>Geocodificador: {row['Geocodificador_Usado']}"
                folium.Marker(
                    location=[row['Latitud'], row['Longitud']],
                    popup=folium.Popup(popup_text, max_width=300),
                    tooltip=row['Direccion_Original'],
                    icon=folium.Icon(color='red', icon='star')
                ).add_to(m)
            
            # --- SECCIÓN CORREGIDA ---
            
            # 1. Definir rutas temporal y final
            local_temp_path = '/tmp/mapa_casinos.html'
            dbfs_final_path = 'dbfs:/FileStore/tables/mapa_casinos.html'
            
            # 2. Guardar el mapa en la ruta local temporal
            m.save(local_temp_path)
            
            # 3. Mover el archivo desde la ruta local a DBFS
            dbutils.fs.mv(f"file:{local_temp_path}", dbfs_final_path)
            print(f"🗺️ Mapa guardado exitosamente en DBFS: {dbfs_final_path.replace('dbfs:', '')}")
            
            # 4. Mostrar el mapa en el notebook (esta línea ya estaba bien)
            display(m)

        except Exception as e:
            print(f"❌ Error creando mapa: {e}")
            # Muestra información más detallada del error si ocurre
            import traceback
            traceback.print_exc()

    else:
        print("No se encontraron coordenadas exitosas para generar un mapa.")

# Ejecutar análisis si la variable 'results' existe
if 'results' in locals() and results is not None:
    analyze_and_visualize(results)
else:
    print("⚠️ La variable 'results' no fue encontrada. Ejecuta primero la celda de geocodificación.")


📈 ANÁLISIS DETALLADO DE RESULTADOS
🗺️ Creando mapa con 692 ubicaciones...
🗺️ Mapa guardado exitosamente en DBFS: /FileStore/tables/mapa_casinos.html


**Lo siguiente es opcional, aunque se recomienda no ejecutarlo porque el mapa ya se ve acá arriba**

In [0]:
# Databricks notebook source
import pandas as pd
import base64
import os
import uuid

# COMMAND ----------

def analyze_and_visualize(results_df: pd.DataFrame):
    """Analiza y visualiza los resultados de geocodificación y proporciona un enlace de descarga para el mapa HTML."""
    if results_df is None or results_df.empty:
        print("⚠️ No hay resultados para analizar.")
        return

    print("\n📈 ANÁLISIS DETALLADO DE RESULTADOS")
    print("="*50)

    # Crear mapa si hay resultados exitosos
    successful_coords = results_df[results_df['Estado'] == 'Encontrado'].dropna(subset=['Latitud', 'Longitud'])
    
    if not successful_coords.empty:
        print(f"🗺️ Creando mapa con {len(successful_coords)} ubicaciones...")
        try:
            import folium
            center_lat = successful_coords['Latitud'].mean()
            center_lon = successful_coords['Longitud'].mean()
            
            m = folium.Map(location=[center_lat, center_lon], zoom_start=11, tiles='OpenStreetMap')
            
            for _, row in successful_coords.iterrows():
                popup_text = f"<b>{row['Direccion_Original']}</b><br>Geocodificador: {row['Geocodificador_Usado']}"
                folium.Marker(
                    location=[row['Latitud'], row['Longitud']],
                    popup=folium.Popup(popup_text, max_width=300),
                    tooltip=row['Direccion_Original'],
                    icon=folium.Icon(color='red', icon='star')
                ).add_to(m)
            
            # --- SECCIÓN PARA GUARDAR Y OFRECER DESCARGA DEL HTML ---
            
            # 1. Definir ruta local temporal única para el HTML
            temp_filename_html = f"mapa_{uuid.uuid4()}.html"
            local_temp_path_html = os.path.join("/tmp", temp_filename_html)
            
            # 2. Guardar el mapa en la ruta local temporal
            m.save(local_temp_path_html)
            print(f"🗺️ Mapa guardado temporalmente en: {local_temp_path_html}")

            # 3. Leer el archivo HTML y codificarlo en base64 para el enlace de descarga
            with open(local_temp_path_html, "rb") as f_html:
                b64_html = base64.b64encode(f_html.read()).decode()
            
            # 4. Crear el enlace HTML para la descarga
            download_link_html = f'<a href="data:text/html;base64,{b64_html}" download="mapa_geocodificado.html">🔗 Haz clic aquí para descargar el mapa HTML</a>'
            
            # 5. Mostrar el enlace en el notebook
            displayHTML(download_link_html)
            
            # 6. Limpiar el archivo temporal local (opcional, pero buena práctica)
            os.remove(local_temp_path_html)

            # --- SECCIÓN PARA MOVER A DBFS (Si aún lo necesitas, sino puedes quitarlo) ---
            dbfs_final_path = 'dbfs:/FileStore/tables/mapa_casinos.html'
            # Mover el archivo desde la ruta local a DBFS (deberías tener el archivo original para esto)
            # dbutils.fs.mv(f"file:{local_temp_path_html}", dbfs_final_path) # Esto movería el archivo, no copiaría. Si quieres ambos, guarda una copia antes de moverlo o haz una copia a DBFS
            # Como ya lo hemos leído, si quieres que también esté en DBFS, deberías guardarlo de nuevo o copiarlo.
            # Para fines de este ejemplo, asumiremos que solo quieres el enlace de descarga o que ya lo manejas.
            # Para moverlo, necesitarías el archivo en 'local_temp_path_html' aún.
            # Una forma de mantenerlo y moverlo a DBFS es así:
            # from shutil import copyfile
            # dbfs_temp_path = '/tmp/mapa_para_dbfs.html' # Otro temporal para no afectar el anterior si lo borramos.
            # copyfile(local_temp_path_html, dbfs_temp_path)
            # dbutils.fs.mv(f"file:{dbfs_temp_path}", dbfs_final_path)
            # print(f"🗺️ Mapa también guardado exitosamente en DBFS: {dbfs_final_path.replace('dbfs:', '')}")


            # 7. Mostrar el mapa en el notebook (esta línea ya estaba bien)
            display(m)

        except Exception as e:
            print(f"❌ Error creando mapa o enlace de descarga: {e}")
            import traceback
            traceback.print_exc()

    else:
        print("No se encontraron coordenadas exitosas para generar un mapa.")

# Ejecutar análisis si la variable 'results' existe
if 'results' in locals() and results is not None:
    analyze_and_visualize(results)
else:
    print("⚠️ La variable 'results' no fue encontrada. Ejecuta primero la celda de geocodificación.")



⚠️ La variable 'results' no fue encontrada. Ejecuta primero la celda de geocodificación.
