In [1]:
# Celda 1: Importación de Librerías
import requests
import pandas as pd
import time
import os
import json
import logging
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
import math # Para cálculos geográficos si es necesario
from collections import deque # Para manejar la cola de claves API

In [111]:
# Configuraciones Básicas
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("001_load_api.log"),
        logging.StreamHandler()
    ]
)

logging.info("Inicio del notebook 001_load_api.ipynb.")

# --- Configuraciones de Pandas ---
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.float_format', lambda x: '%.3f' % x)
pd.set_option('display.width', 1000)
logging.info("Configuraciones de Pandas para visualización aplicadas.")

2025-05-17 12:54:45,581 - INFO - Inicio del notebook 001_load_api.ipynb.
2025-05-17 12:54:45,581 - INFO - Configuraciones de Pandas para visualización aplicadas.


In [112]:
# Celda 3: Carga de Variables de Entorno (API y PostgreSQL)
ENV_FILE_PATH = '/home/nicolas/Escritorio/proyecto ETL/develop/env/.env'
RAW_API_CSV_PATH = '/home/nicolas/Escritorio/proyecto ETL/develop/data/raw/api_data.csv' # Nuevo nombre para datos extendidos
STATE_FILE_PATH = '/home/nicolas/Escritorio/proyecto ETL/develop/data/raw/api_data.json' # Para guardar el estado
TABLE_NAME_API_RAW = 'raw_api' # Nombre de la tabla en PostgreSQL para datos crudos de la API

logging.info(f"Ruta del archivo .env: {ENV_FILE_PATH}")
logging.info(f"Ruta donde se guardará el CSV de la API: {RAW_API_CSV_PATH}")
logging.info(f"Ruta del archivo de estado de extracción: {STATE_FILE_PATH}")
logging.info(f"Nombre de la tabla en PostgreSQL para datos crudos de la API: {TABLE_NAME_API_RAW}")

if os.path.exists(ENV_FILE_PATH):
    load_dotenv(ENV_FILE_PATH)
    logging.info(f"Archivo .env encontrado y cargado desde {ENV_FILE_PATH}")
else:
    logging.error(f"Archivo .env NO encontrado en {ENV_FILE_PATH}.")
    raise FileNotFoundError(f"Archivo .env no encontrado en {ENV_FILE_PATH}")

# Credenciales de PostgreSQL
POSTGRES_USER = os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
POSTGRES_HOST = os.getenv('POSTGRES_HOST')
POSTGRES_PORT = os.getenv('POSTGRES_PORT')
POSTGRES_DATABASE = os.getenv('POSTGRES_DATABASE')

# Credenciales de la API de Foursquare
FOURSQUARE_API_KEYS = [
    os.getenv('FOURSQUARE_API_KEY_JACOBO'),
    os.getenv('FOURSQUARE_API_KEY_NICOLAS'),
    os.getenv('FOURSQUARE_API_KEY_NICOLAS_2')
]
# Filtrar claves None si alguna no está definida
FOURSQUARE_API_KEYS = [key for key in FOURSQUARE_API_KEYS if key is not None]

if not all([POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DATABASE]):
    logging.error("Una o más variables de entorno de PostgreSQL no están definidas.")
    raise ValueError("Faltan credenciales de PostgreSQL. Verifica el archivo .env.")
else:
    logging.info("Variables de entorno para PostgreSQL cargadas correctamente.")

if not FOURSQUARE_API_KEYS:
    logging.error("No se encontraron claves API de Foursquare válidas en el archivo .env (FOURSQUARE_API_KEY_JACOBO, FOURSQUARE_API_KEY_NICOLAS, FOURSQUARE_API_KEY_NICOLAS_2).")
    raise ValueError("Faltan claves API de Foursquare. Verifica el archivo .env.")
else:
    logging.info(f"{len(FOURSQUARE_API_KEYS)} clave(s) API de Foursquare cargada(s) correctamente.")

current_api_key_index = 0 # Índice para la clave API actual

2025-05-17 12:54:45,595 - INFO - Ruta del archivo .env: /home/nicolas/Escritorio/proyecto ETL/develop/env/.env
2025-05-17 12:54:45,596 - INFO - Ruta donde se guardará el CSV de la API: /home/nicolas/Escritorio/proyecto ETL/develop/data/raw/api_data.csv
2025-05-17 12:54:45,596 - INFO - Ruta del archivo de estado de extracción: /home/nicolas/Escritorio/proyecto ETL/develop/data/raw/api_data.json
2025-05-17 12:54:45,597 - INFO - Nombre de la tabla en PostgreSQL para datos crudos de la API: raw_api
2025-05-17 12:54:45,598 - INFO - Archivo .env encontrado y cargado desde /home/nicolas/Escritorio/proyecto ETL/develop/env/.env
2025-05-17 12:54:45,599 - INFO - Variables de entorno para PostgreSQL cargadas correctamente.
2025-05-17 12:54:45,600 - INFO - 3 clave(s) API de Foursquare cargada(s) correctamente.


In [113]:
# Celda 4.1: Definición de Constantes y Gestión de Claves API
logging.info("Celda 4.1: Definiendo constantes y gestor de claves API.")

BASE_URL_FOURSQUARE = 'https://api.foursquare.com/v3/places'
MAX_RETRIES_PER_KEY = 2 # Cuántas veces reintentar con una misma clave ante un error de cuota antes de rotar

def get_current_api_key():
    global current_api_key_index
    if not FOURSQUARE_API_KEYS:
        raise ValueError("No hay claves API de Foursquare disponibles.")
    return FOURSQUARE_API_KEYS[current_api_key_index]

def rotate_api_key():
    global current_api_key_index
    logging.warning(f"Rotando clave API. Clave actual ({current_api_key_index}) agotada o con error persistente.")
    current_api_key_index = (current_api_key_index + 1) % len(FOURSQUARE_API_KEYS)
    logging.info(f"Nueva clave API seleccionada (índice: {current_api_key_index}).")
    if current_api_key_index == 0: # Si dimos la vuelta completa a las claves
        logging.error("Todas las claves API han sido probadas y agotadas en esta ronda. Esperando antes de reintentar el ciclo.")
        print("Todas las claves API agotadas. Durmiendo por 15 minutos...")
        time.sleep(15 * 60) # Esperar un tiempo largo si todas las claves fallan
    return get_current_api_key()

def get_foursquare_headers():
    return {
        "Accept": "application/json",
        "Authorization": get_current_api_key()
    }

logging.info(f"URL base de la API de Foursquare: {BASE_URL_FOURSQUARE}")

2025-05-17 12:54:45,608 - INFO - Celda 4.1: Definiendo constantes y gestor de claves API.
2025-05-17 12:54:45,609 - INFO - URL base de la API de Foursquare: https://api.foursquare.com/v3/places


In [114]:
# Celda 4.2: Funciones Auxiliares Modificadas para Rotación de Claves y Paginación
logging.info("Celda 4.2: Definiendo funciones auxiliares de API con rotación de claves y paginación.")

def make_foursquare_request(url, params, method="GET"):
    """
    Realiza una solicitud a la API de Foursquare manejando errores y rotación de claves.
    """
    initial_key_index = current_api_key_index
    retries_with_current_key = 0

    while True:
        try:
            headers = get_foursquare_headers()
            logging.debug(f"Solicitando a {url} con params={params} usando clave índice {current_api_key_index}")
            if method == "GET":
                response = requests.get(url, headers=headers, params=params, timeout=20)
            # Añadir más métodos si es necesario
            
            if response.status_code == 429: # Too Many Requests
                logging.warning(f"Error 429 (Too Many Requests) con clave índice {current_api_key_index}. URL: {url}")
                if retries_with_current_key < MAX_RETRIES_PER_KEY:
                    wait_time = (2 ** retries_with_current_key) # Backoff exponencial simple
                    logging.info(f"Esperando {wait_time}s antes de reintentar con la misma clave.")
                    time.sleep(wait_time)
                    retries_with_current_key += 1
                    continue
                else: # Máximos reintentos con esta clave alcanzados
                    rotate_api_key()
                    retries_with_current_key = 0 # Resetear contador para la nueva clave
                    if current_api_key_index == initial_key_index: # Si dimos la vuelta y volvimos a la clave inicial
                        logging.error("Todas las claves han fallado para esta solicitud. Abortando esta solicitud específica.")
                        return None, None # Indicar fallo
                    continue # Reintentar con la nueva clave

            response.raise_for_status() # Otros errores HTTP
            
            next_page_url = None
            if 'Link' in response.headers:
                links = requests.utils.parse_header_links(response.headers['Link'])
                for link_info in links:
                    if link_info.get('rel') == 'next':
                        next_page_url = link_info.get('url')
                        break
            logging.debug(f"Próxima página URL: {next_page_url}")
            return response.json(), next_page_url # Devuelve datos y URL de próxima página

        except requests.exceptions.HTTPError as http_err:
            logging.error(f"Error HTTP ({http_err.response.status_code}) al solicitar {url}: {http_err}. Clave índice {current_api_key_index}. Respuesta: {getattr(http_err.response, 'text', 'N/A')}")
            if http_err.response.status_code in [401, 403]: # Unauthorized or Forbidden (podría ser clave inválida o suspendida)
                rotate_api_key()
                retries_with_current_key = 0
                if current_api_key_index == initial_key_index:
                    return None, None
                continue
            return None, None # Otros errores HTTP no recuperables inmediatamente
        except requests.exceptions.RequestException as req_err:
            logging.error(f"Error de Solicitud para {url}: {req_err}. Clave índice {current_api_key_index}")
            time.sleep(5) # Esperar un poco por errores de red
            # No rotar clave por errores de red inmediatamente, podría ser temporal
            # Podríamos añadir un contador de reintentos para errores de red también
            return None, None # Indicar fallo
        except Exception as e:
            logging.error(f"Error inesperado para {url}: {e}. Clave índice {current_api_key_index}")
            return None, None

def get_place_details_v3_robust(fsq_id): # Asegúrate que esta función esté actualizada como se indicó antes
    """
    Obtiene detalles de un lugar específico usando su FSQ ID.
    Especifica los campos que quieres para optimizar la respuesta.
    """
    fields_to_request = [
        "fsq_id", "name", "geocodes", "categories", "location", 
        "price", "rating", "popularity", "stats" # 'stats' es para 'tips_count'
    ]
    fields_param_value = ",".join(fields_to_request)
    
    params = {'fields': fields_param_value}
    url = f"{BASE_URL_FOURSQUARE}/{fsq_id}"
    
    data, _ = make_foursquare_request(url, params)
    if data:
         logging.debug(f"Detalles obtenidos para FSQ ID {fsq_id}: {list(data.keys())}")
    return data

def search_places_paginated(search_params):
    """
    Busca lugares y maneja la paginación usando el cursor 'Link' header.
    search_params: dict con los parámetros para /search (ej. near, categories, limit)
    """
    all_results = []
    current_url_for_request = f"{BASE_URL_FOURSQUARE}/search" # URL base para la primera solicitud
    current_params_for_request = search_params.copy() # Parámetros para la primera solicitud
    page_count = 0
    max_pages_per_search = 5 # Límite de páginas por búsqueda (50 resultados/página * 5 = 250 lugares)

    while current_url_for_request and page_count < max_pages_per_search:
        page_count += 1
        
        # CORRECCIÓN DEL LOGGING:
        log_params_display = "N/A (usando URL de paginación)"
        if current_params_for_request: # Solo intentar .get si no es None
            log_params_display = current_params_for_request.get('categories', 'N/A (búsqueda general)')
        
        logging.info(f"Buscando página {page_count} de lugares. URL: {current_url_for_request}, Categoría(s): {log_params_display}")
        
        data, next_page_url = make_foursquare_request(current_url_for_request, current_params_for_request)

        if data and data.get('results'):
            results = data.get('results', [])
            all_results.extend(results)
            logging.info(f"Página {page_count}: {len(results)} resultados obtenidos. Total acumulado para esta búsqueda: {len(all_results)}")
            
            # La API de Foursquare puede devolver menos del límite incluso si hay más páginas (raro, pero posible)
            # o si la siguiente página es la misma URL (indicador de error o fin no estándar)
            if not next_page_url or next_page_url == current_url_for_request or len(results) == 0:
                if len(results) < search_params.get('limit',50) and len(results) > 0: # Si no llenó la pagina pero trajo algo
                     logging.info("Página no completa pero con resultados, asumiendo fin para esta búsqueda específica.")
                elif len(results) == 0 and page_count > 1: # Página vacía después de la primera
                    logging.info("Página vacía obtenida, asumiendo fin de resultados.")
                elif not next_page_url:
                    logging.info("No hay URL para la siguiente página, asumiendo fin de resultados.")
                elif next_page_url == current_url_for_request:
                    logging.warning("URL de siguiente página es la misma que la actual. Deteniendo paginación para evitar bucle.")
                break # Salir del bucle while

        else: 
            logging.warning(f"No se obtuvieron resultados o hubo un error en la página {page_count} para la búsqueda. Deteniendo paginación.")
            break 

        current_url_for_request = next_page_url 
        current_params_for_request = None # Las siguientes llamadas usan la URL completa del Link header
        if current_url_for_request:
             time.sleep(1.2) # Aumentar ligeramente la pausa entre páginas

    logging.info(f"Paginación completada para la búsqueda. Total de {len(all_results)} resultados obtenidos en {page_count} páginas.")
    return all_results

print("Celda 4.2: Funciones auxiliares de API modificadas y robustecidas (corrección de logging en paginación).")
logging.info("Funciones auxiliares de API con rotación de claves y paginación definidas (logging corregido).")

logging.info("Funciones auxiliares de API con rotación de claves y paginación definidas.")

2025-05-17 12:54:45,625 - INFO - Celda 4.2: Definiendo funciones auxiliares de API con rotación de claves y paginación.
2025-05-17 12:54:45,626 - INFO - Funciones auxiliares de API con rotación de claves y paginación definidas (logging corregido).
2025-05-17 12:54:45,627 - INFO - Funciones auxiliares de API con rotación de claves y paginación definidas.


Celda 4.2: Funciones auxiliares de API modificadas y robustecidas (corrección de logging en paginación).


In [115]:
# Celda 4.3: Definición de la Función Principal de Recolección (Modificada y Campos Ajustados)
logging.info("Celda 4.3: Definiendo la función principal de recolección de datos extendida con campos seleccionados.")

def load_extraction_state():
    if os.path.exists(STATE_FILE_PATH):
        try:
            with open(STATE_FILE_PATH, 'r') as f:
                state = json.load(f)
            logging.info(f"Estado de extracción cargado desde {STATE_FILE_PATH}: {state}")
            return state
        except Exception as e:
            logging.error(f"Error al cargar el archivo de estado {STATE_FILE_PATH}: {e}. Iniciando desde cero.")
    return {"last_processed_area_index": -1, "last_processed_category_index": -1, "collected_fsq_ids": [], "all_places_data": []}

def save_extraction_state(state):
    try:
        with open(STATE_FILE_PATH, 'w') as f:
            json.dump(state, f, indent=4)
        logging.info(f"Estado de extracción guardado en {STATE_FILE_PATH}")
    except Exception as e:
        logging.error(f"Error al guardar el archivo de estado {STATE_FILE_PATH}: {e}")

def collect_foursquare_extended_data(max_results_per_search=50):
    state = load_extraction_state()
    all_places_data = state.get("all_places_data", [])
    collected_fsq_ids = set(state.get("collected_fsq_ids", []))
    
    areas_of_interest = [
        {'name': 'Manhattan, New York, NY', 'level': 0, 'coords': (40.7831, -73.9712)},
        {'name': 'Brooklyn, New York, NY', 'level': 0, 'coords': (40.6782, -73.9442)},
        {'name': 'Queens, New York, NY', 'level': 0, 'coords': (40.7282, -73.7949)},
        {'name': 'Bronx, New York, NY', 'level': 0, 'coords': (40.8448, -73.8648)},
        {'name': 'Staten Island, New York, NY', 'level': 0, 'coords': (40.5795, -74.1502)},
        {'name': 'Yonkers, NY', 'level': 1, 'coords': (40.9312, -73.8988)},
    ]

    categories_foursquare = {
        'arts_entertainment': '10000', 'food_drink': '13000',
        'nightlife_spot': '10032', 'outdoors_recreation': '16000',
        'shops_services': '17000', 'travel_transport': '19000'
    }
    
    start_area_index = state.get("last_processed_area_index", -1) + 1

    try:
        for area_idx in range(start_area_index, len(areas_of_interest)):
            area = areas_of_interest[area_idx]
            area_name_query = f"{area['name']}"
            logging.info(f"\n--- Procesando Área ({area_idx+1}/{len(areas_of_interest)}): {area_name_query} ---")
            print(f"\nProcesando Área: {area_name_query}...")
            state["last_processed_area_index"] = area_idx
            
            start_category_index = state.get("last_processed_category_index", -1) + 1 if area_idx == state.get("last_processed_area_index") else 0
            category_items = list(categories_foursquare.items())

            for cat_idx in range(start_category_index, len(category_items)):
                cat_name, cat_id = category_items[cat_idx]
                state["last_processed_category_index"] = cat_idx
                save_extraction_state(state)

                search_params = {'near': area_name_query, 'categories': cat_id, 'limit': 50}
                logging.info(f"Buscando en {area_name_query} para categoría '{cat_name}' (ID: {cat_id}).")
                places_in_cat = search_places_paginated(search_params)
                
                logging.info(f"Obtenidos {len(places_in_cat)} lugares para '{cat_name}' en '{area_name_query}' (después de paginación).")
                print(f"  - {len(places_in_cat)} lugares para {cat_name}")

                for place_summary in places_in_cat:
                    fsq_id = place_summary.get('fsq_id')
                    if not fsq_id:
                        logging.warning(f"Lugar sin fsq_id encontrado en resumen: {place_summary.get('name', 'N/A')}. Omitiendo.")
                        continue
                    if fsq_id not in collected_fsq_ids:
                        logging.info(f"Obteniendo detalles para FSQ ID: {fsq_id} (Nombre: {place_summary.get('name', 'N/A')})")
                        # Ajustar los campos solicitados en get_place_details_v3_robust si es necesario,
                        # aunque la selección final se hace al construir el diccionario.
                        details = get_place_details_v3_robust(fsq_id)
                        if details:
                            all_places_data.append({
                                'fsq_id': details.get('fsq_id'),
                                'name': details.get('name'),
                                'latitude': details.get('geocodes', {}).get('main', {}).get('latitude'),
                                'longitude': details.get('geocodes', {}).get('main', {}).get('longitude'),
                                'category_main_name': details.get('categories', [{}])[0].get('name') if details.get('categories') else None,
                                'category_main_id': details.get('categories', [{}])[0].get('id') if details.get('categories') else None,
                                'all_categories': json.dumps(details.get('categories', [])),
                                'address': details.get('location', {}).get('formatted_address'),
                                'locality': details.get('location', {}).get('locality'),
                                'region': details.get('location', {}).get('region'),
                                # Campos eliminados según tu solicitud anterior:
                                # 'postcode': details.get('location', {}).get('postcode'),
                                # 'country': details.get('location', {}).get('country'),
                                'area_searched': area['name'],
                                'price_tier': details.get('price'),
                                'rating': details.get('rating'),
                                # 'hours_display': details.get('hours', {}).get('display'), # Eliminado
                                'popularity_score': details.get('popularity'),
                                # 'website': details.get('website'), # Eliminado
                                # 'phone_contact': details.get('tel'), # Eliminado
                                'tips_count': details.get('stats', {}).get('total_tips')
                            })
                            collected_fsq_ids.add(fsq_id)
                            state["collected_fsq_ids"] = list(collected_fsq_ids)
                            state["all_places_data"] = all_places_data
                            if len(all_places_data) % 20 == 0: 
                                save_extraction_state(state)
                                if all_places_data:
                                     pd.DataFrame(all_places_data).to_csv(RAW_API_CSV_PATH, index=False, encoding='utf-8')
                                     logging.info(f"Guardado progresivo: {len(all_places_data)} lugares en {RAW_API_CSV_PATH}")
                        else:
                            logging.warning(f"No se pudieron obtener detalles para FSQ ID: {fsq_id}")
                        time.sleep(0.7)
                    else:
                        logging.debug(f"FSQ ID {fsq_id} ya procesado, omitiendo detalles.")
                state["last_processed_category_index"] = -1 
            
            state["last_processed_category_index"] = -1 
            save_extraction_state(state)

        logging.info(f"Recolección finalizada. Total de entradas de lugares (con detalles): {len(all_places_data)}")
        state["last_processed_area_index"] = len(areas_of_interest) -1
        state["last_processed_category_index"] = -1
        save_extraction_state(state)
        
        if not all_places_data:
            logging.warning("No se recolectaron datos de lugares. El DataFrame estará vacío.")
            return pd.DataFrame()
            
        return pd.DataFrame(all_places_data)

    except KeyboardInterrupt:
        logging.warning("Extracción interrumpida por el usuario.")
        print("Extracción interrumpida. Guardando estado y datos parciales...")
        save_extraction_state(state)
        if all_places_data:
            pd.DataFrame(all_places_data).to_csv(RAW_API_CSV_PATH, index=False, encoding='utf-8')
            logging.info(f"Datos parciales ({len(all_places_data)} lugares) guardados en {RAW_API_CSV_PATH}")
        raise
    except Exception as e_main:
        logging.critical(f"Error crítico durante la recolección de datos: {e_main}", exc_info=True)
        print(f"Error crítico: {e_main}. Guardando estado y datos parciales si es posible...")
        save_extraction_state(state)
        if all_places_data:
            pd.DataFrame(all_places_data).to_csv(RAW_API_CSV_PATH, index=False, encoding='utf-8')
            logging.info(f"Datos parciales ({len(all_places_data)} lugares) guardados en {RAW_API_CSV_PATH}")
        return pd.DataFrame(all_places_data)


logging.info("Función principal de recolección de datos de API (campos ajustados) definida.")

2025-05-17 12:54:45,648 - INFO - Celda 4.3: Definiendo la función principal de recolección de datos extendida con campos seleccionados.
2025-05-17 12:54:45,653 - INFO - Función principal de recolección de datos de API (campos ajustados) definida.


In [116]:
'''# Celda 4.4: Ejecución de la Recolección de Datos Extendida y Guardado Final
logging.info("Celda 4.4: Iniciando la ejecución de la recolección de datos extendida de Foursquare.")
print("\nIniciando extracción extendida de datos con Foursquare API (puede tomar tiempo)...")

output_directory = os.path.dirname(RAW_API_CSV_PATH)
if not os.path.exists(output_directory):
    os.makedirs(output_directory)
    logging.info(f"Directorio '{output_directory}' creado.")
# Celda 8: Verificar los primeros datos cargados en la tabla de PostgreSQL
logging.info(f"Celda 8: Verificando las primeras 5 filas de la tabla '{TABLE_NAME_API_RAW}' desde PostgreSQL.")

# Recrear el engine si se cerró en la celda anterior o si esta celda se ejecuta independientemente
if 'engine_pg' not in locals() or engine_pg.closed: # .closed puede no ser el atributo correcto, depende de la versión
                                                 # Una forma más segura es recrearlo o asegurar que esté abierto.
    try:
        DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DATABASE}"
        engine_pg = create_engine(DATABASE_URL)
        logging.info("Motor de SQLAlchemy (engine_pg) recreado o verificado para consulta.")
    except Exception as e:
        logging.error(f"Error al recrear el motor de SQLAlchemy: {e}")
        engine_pg = None # Asegurar que sea None si falla

if 'df_api_data' in locals() and not df_api_data.empty and engine_pg is not None:
    try:
        query = f"SELECT * FROM \"{TABLE_NAME_API_RAW}\" LIMIT 5;"
        # Es importante usar text() si la versión de SQLAlchemy lo requiere para consultas directas.
        # O pd.read_sql_query directamente.
        df_from_db_api = pd.read_sql_query(sql=text(query), con=engine_pg) # Usar text() para la query
        
        logging.info("Primeras 5 filas obtenidas de PostgreSQL (formato markdown):")
        print(f"\nPrimeras 5 filas de la tabla '{TABLE_NAME_API_RAW}' consultadas desde PostgreSQL:")
        if not df_from_db_api.empty:
            print(df_from_db_api.to_markdown(index=False))
        else:
            logging.info(f"La consulta a '{TABLE_NAME_API_RAW}' no devolvió filas o la tabla está vacía después de la carga.")
            print(f"La tabla '{TABLE_NAME_API_RAW}' parece estar vacía o la consulta no devolvió resultados.")
            
    except Exception as e:
        logging.error(f"Error al consultar datos desde la tabla '{TABLE_NAME_API_RAW}' en PostgreSQL: {e}")
        print(f"Error en Celda 8 al consultar datos: {e}")
    finally:
        if engine_pg: # Cerrar el engine después de la consulta de verificación
            engine_pg.dispose()
            logging.info("Conexiones del motor de SQLAlchemy (engine_pg) dispuestas después de la verificación.")
            print("Motor de DB (engine_pg) dispuesto después de la verificación.")
elif 'df_api_data' in locals() and df_api_data.empty:
    logging.warning("El DataFrame df_api_data está vacío. No se puede verificar la tabla en DB.")
    print("El DataFrame de la API está vacío, no se verifica la tabla en DB.")
else:
    logging.warning("Motor de base de datos (engine_pg) no inicializado o df_api_data no existe. No se puede verificar la tabla.")
    print("Motor de DB no inicializado o DataFrame no existe, no se verifica la tabla.")
df_foursquare_extended_places = collect_foursquare_extended_data(max_results_per_search=50) # El límite es por página ahora

if not df_foursquare_extended_places.empty:
    logging.info(f"Extracción extendida completada. Total de lugares obtenidos: {len(df_foursquare_extended_places)}")
    print(f"\nExtracción extendida completada! Total de lugares obtenidos: {len(df_foursquare_extended_places)}")
    
    try:
        df_foursquare_extended_places.to_csv(RAW_API_CSV_PATH, index=False, encoding='utf-8')
        logging.info(f"Datos finales de Foursquare guardados exitosamente en: {RAW_API_CSV_PATH}")
        print(f"Datos finales guardados en: {os.path.abspath(RAW_API_CSV_PATH)}")
        
        print("\nVista previa de los datos recolectados (df_foursquare_extended_places):")
        print(df_foursquare_extended_places.head().to_markdown(index=False))
    except Exception as e:
        logging.error(f"Error al guardar el DataFrame final en CSV '{RAW_API_CSV_PATH}': {e}")
        print(f"Error al guardar el DataFrame final en CSV: {e}")
else:
    logging.warning("No se recolectaron datos de Foursquare o el DataFrame final está vacío.")
    print("\nNo se encontraron datos o el DataFrame final está vacío.")'''



In [117]:
# Celda 5: Leer el archivo CSV guardado en un DataFrame
logging.info("Celda 5: Leyendo el archivo CSV con datos de la API en un DataFrame.")
df_api_data = pd.DataFrame() # Predefinir

# RAW_API_CSV_PATH fue definido en la Celda 3
if os.path.exists(RAW_API_CSV_PATH):
    try:
        df_api_data = pd.read_csv(RAW_API_CSV_PATH)
        logging.info(f"Archivo CSV '{RAW_API_CSV_PATH}' cargado exitosamente en df_api_data.")
        logging.info(f"df_api_data tiene {df_api_data.shape[0]} filas y {df_api_data.shape[1]} columnas.")
            
    except Exception as e:
        logging.error(f"Error al leer el archivo CSV '{RAW_API_CSV_PATH}': {e}")
else:
    logging.warning(f"El archivo CSV '{RAW_API_CSV_PATH}' no fue encontrado. df_api_data estará vacío. Verifica la ejecución de la Celda 4.4.")


2025-05-17 12:54:45,678 - INFO - Celda 5: Leyendo el archivo CSV con datos de la API en un DataFrame.
2025-05-17 12:54:45,725 - INFO - Archivo CSV '/home/nicolas/Escritorio/proyecto ETL/develop/data/raw/api_data.csv' cargado exitosamente en df_api_data.
2025-05-17 12:54:45,725 - INFO - df_api_data tiene 3595 filas y 15 columnas.


In [118]:
# Celda 6: Conexión a PostgreSQL y Creación de Tabla Vacía
logging.info("Celda 6: Conectando a PostgreSQL y creando la tabla vacía si no existe (o reemplazándola).")
engine_pg = None # Predefinir para el scope de la celda

if not df_api_data.empty:
    try:
        DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DATABASE}"
        engine_pg = create_engine(DATABASE_URL)
        logging.info("Motor de SQLAlchemy para PostgreSQL creado.")
        
        # Probar la conexión (opcional pero recomendado)
        with engine_pg.connect() as connection:
            logging.info("Conexión a la base de datos PostgreSQL establecida y verificada.")
        print("Conexión a PostgreSQL verificada.")

        # Crear tabla vacía (o reemplazarla si ya existe) con la estructura de df_api_data
        # if_exists='replace' borrará la tabla si existe y la creará de nuevo.
        # Esto es útil para asegurar que la estructura coincida con el DataFrame actual.
        logging.info(f"Intentando crear/reemplazar la tabla '{TABLE_NAME_API_RAW}' con la estructura de df_api_data.")
        df_api_data.head(0).to_sql(TABLE_NAME_API_RAW, engine_pg, if_exists='replace', index=False)
        logging.info(f"Tabla '{TABLE_NAME_API_RAW}' creada/reemplazada exitosamente (solo estructura, sin datos).")
        print(f"Tabla '{TABLE_NAME_API_RAW}' creada/reemplazada en PostgreSQL (estructura definida).")

    except Exception as e:
        logging.error(f"Error durante la conexión a PostgreSQL o la creación de la tabla '{TABLE_NAME_API_RAW}': {e}")
        print(f"Error en Celda 6: {e}")
        # Si engine_pg se creó pero falló la creación de la tabla, aún se dispondrá en finally.
else:
    logging.warning("El DataFrame df_api_data está vacío. No se procederá con la creación de la tabla en PostgreSQL.")
    print("El DataFrame df_api_data está vacío. Se omite la creación de la tabla.")

# No cerramos el engine_pg aquí si vamos a usarlo inmediatamente en la siguiente celda para insertar datos.
# Se cerrará al final del script o si ya no se necesita.

2025-05-17 12:54:45,732 - INFO - Celda 6: Conectando a PostgreSQL y creando la tabla vacía si no existe (o reemplazándola).
2025-05-17 12:54:45,734 - INFO - Motor de SQLAlchemy para PostgreSQL creado.
DETAIL:  La base de datos fue creada usando la versión de ordenamiento 2.31, pero el sistema operativo provee la versión 2.35.
HINT:  Reconstruya todos los objetos en esta base de datos que usen el ordenamiento por omisión y ejecute ALTER DATABASE airbnb REFRESH COLLATION VERSION, o construya PostgreSQL con la versión correcta de la biblioteca.
2025-05-17 12:54:45,747 - INFO - Conexión a la base de datos PostgreSQL establecida y verificada.
2025-05-17 12:54:45,750 - INFO - Intentando crear/reemplazar la tabla 'raw_api' con la estructura de df_api_data.
2025-05-17 12:54:45,776 - INFO - Tabla 'raw_api' creada/reemplazada exitosamente (solo estructura, sin datos).


Conexión a PostgreSQL verificada.
Tabla 'raw_api' creada/reemplazada en PostgreSQL (estructura definida).


In [119]:
# Celda 7: Inserción de Datos del DataFrame en la Tabla de PostgreSQL
logging.info(f"Celda 7: Intentando insertar datos de df_api_data en la tabla '{TABLE_NAME_API_RAW}'.")

if not df_api_data.empty and 'engine_pg' in locals() and engine_pg is not None:
    try:
        # Insertar los datos del DataFrame
        # if_exists='append' porque la tabla ya fue creada (vacía) en la celda anterior.
        # chunksize puede ayudar con la memoria para DataFrames grandes.
        logging.info(f"Insertando {len(df_api_data)} filas en '{TABLE_NAME_API_RAW}'.")
        df_api_data.to_sql(TABLE_NAME_API_RAW, engine_pg, if_exists='append', index=False, chunksize=1000)
        logging.info(f"Datos insertados exitosamente en la tabla '{TABLE_NAME_API_RAW}'.")
        print(f"Datos de df_api_data insertados correctamente en la tabla '{TABLE_NAME_API_RAW}'.")

    except Exception as e:
        logging.error(f"Error al insertar datos en la tabla '{TABLE_NAME_API_RAW}': {e}")
        print(f"Error en Celda 7 al insertar datos: {e}")
    # finally:
        # Aquí podrías cerrar el engine si esta es la última operación de DB en el notebook.
        # if engine_pg:
        #     engine_pg.dispose()
        #     logging.info("Conexiones del motor de SQLAlchemy (engine_pg) dispuestas.")
        #     print("Motor de DB (engine_pg) dispuesto.")
elif df_api_data.empty:
    logging.warning("El DataFrame df_api_data está vacío. No hay datos para insertar.")
else: # engine_pg no definido o es None
    logging.warning("El motor de base de datos (engine_pg) no está inicializado. No se pueden insertar datos.")


2025-05-17 12:54:45,784 - INFO - Celda 7: Intentando insertar datos de df_api_data en la tabla 'raw_api'.
2025-05-17 12:54:45,785 - INFO - Insertando 3595 filas en 'raw_api'.
2025-05-17 12:54:45,920 - INFO - Datos insertados exitosamente en la tabla 'raw_api'.


Datos de df_api_data insertados correctamente en la tabla 'raw_api'.


In [120]:
# Celda 8: Verificar los primeros datos cargados en la tabla de PostgreSQL
logging.info(f"Celda 8: Verificando las primeras 5 filas de la tabla '{TABLE_NAME_API_RAW}' desde PostgreSQL.")

engine_verify = None # Usar un nuevo nombre de engine para esta celda para evitar conflictos de scope

if 'df_api_data' in locals() and not df_api_data.empty:
    try:
        # Siempre crear un nuevo engine para la verificación para asegurar que esté activo
        # y evitar problemas con el estado de 'engine_pg' de celdas anteriores.
        DATABASE_URL_VERIFY = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DATABASE}"
        engine_verify = create_engine(DATABASE_URL_VERIFY)
        logging.info("Motor de SQLAlchemy (engine_verify) creado para la consulta de verificación.")

        with engine_verify.connect() as connection: # Verificar la conexión
            query = f"SELECT * FROM \"{TABLE_NAME_API_RAW}\" LIMIT 5;"
            df_from_db_api = pd.read_sql_query(sql=text(query), con=connection) # Usar la conexión activa
        
        logging.info("Primeras 5 filas obtenidas de PostgreSQL (formato markdown):")
        print(f"\nPrimeras 5 filas de la tabla '{TABLE_NAME_API_RAW}' consultadas desde PostgreSQL:")
        if not df_from_db_api.empty:
            print(df_from_db_api.to_markdown(index=False))
        else:
            logging.info(f"La consulta a '{TABLE_NAME_API_RAW}' no devolvió filas o la tabla está vacía después de la carga.")
            print(f"La tabla '{TABLE_NAME_API_RAW}' parece estar vacía o la consulta no devolvió resultados.")
            
    except Exception as e:
        logging.error(f"Error al consultar datos desde la tabla '{TABLE_NAME_API_RAW}' en PostgreSQL: {e}")
        print(f"Error en Celda 8 al consultar datos: {e}")
    finally:
        if engine_verify: # Cerrar el engine_verify después de la consulta
            engine_verify.dispose()
            logging.info("Conexiones del motor de SQLAlchemy (engine_verify) dispuestas después de la verificación.")
            print("Motor de DB (engine_verify) dispuesto después de la verificación.")
elif 'df_api_data' in locals() and df_api_data.empty:
    logging.warning("El DataFrame df_api_data está vacío. No se puede verificar la tabla en DB.")
    print("El DataFrame de la API está vacío, no se verifica la tabla en DB.")
else:
    logging.warning("df_api_data no está definido en el contexto actual. No se puede verificar la tabla.")
    print("DataFrame df_api_data no definido, no se verifica la tabla.")

2025-05-17 12:54:45,928 - INFO - Celda 8: Verificando las primeras 5 filas de la tabla 'raw_api' desde PostgreSQL.
2025-05-17 12:54:45,930 - INFO - Motor de SQLAlchemy (engine_verify) creado para la consulta de verificación.
DETAIL:  La base de datos fue creada usando la versión de ordenamiento 2.31, pero el sistema operativo provee la versión 2.35.
HINT:  Reconstruya todos los objetos en esta base de datos que usen el ordenamiento por omisión y ejecute ALTER DATABASE airbnb REFRESH COLLATION VERSION, o construya PostgreSQL con la versión correcta de la biblioteca.
2025-05-17 12:54:45,941 - INFO - Primeras 5 filas obtenidas de PostgreSQL (formato markdown):
2025-05-17 12:54:45,945 - INFO - Conexiones del motor de SQLAlchemy (engine_verify) dispuestas después de la verificación.



Primeras 5 filas de la tabla 'raw_api' consultadas desde PostgreSQL:
| fsq_id                   | name                      |   latitude |   longitude | category_main_name   |   category_main_id | all_categories                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                | address                                                          | locality   | region   | area_searched          |   price_tier |   rating |   popularity_score |   tips_count |
|:-------------------------|:-------------------