# TFM - Ingesta de Datos desde Yelp a MongoDB Atlas

Este notebook implementa el pipeline de ingesta de datos para el proyecto:

**Título del TFM**: Análisis de Datos y Procesamiento de Lenguaje Natural para la Extracción de Opiniones y Modelado de Tópicos en Restaurantes: Un Enfoque de Big Data y Ciencia de Datos Aplicado al Estudio Integral del Sector Gastronómico

## Objetivo del Notebook
Implementar un pipeline de datos que:
1. Lea los archivos JSON del dataset de Yelp
2. Cargue todos los datos en MongoDB Atlas para su posterior análisis
3. Analice las categorías de negocios disponibles
4. Filtre los negocios relevantes (restaurantes) 
5. Extraiga restaurantes con sus reviews para análisis de NLP

## Estructura de Datos
El dataset de Yelp incluye varios archivos JSON:
- `yelp_academic_dataset_business.json`: Información de negocios (ubicación, categorías, etc.)
- `yelp_academic_dataset_review.json`: Reseñas de usuarios con texto y calificaciones
- `yelp_academic_dataset_user.json`: Información de usuarios
- `yelp_academic_dataset_checkin.json`: Check-ins en negocios
- `yelp_academic_dataset_tip.json`: Tips cortos de usuarios

**Para este proyecto nos enfocaremos únicamente en `business` y `review`** ya que contienen la información necesaria para nuestro análisis de sentimientos y modelado de tópicos en restaurantes.

## 1. Instalación y Configuración

Necesitamos las siguientes librerías:
- `pymongo`: Para conectar con MongoDB Atlas
- `pandas`: Para el manejo eficiente de datos

### Nota sobre las herramientas utilizadas

En este notebook usamos:
- **uv**: Un instalador de paquetes Python ultrarrápido y confiable que reemplaza a pip

### Instalamos las dependencias necesarias con uv (instalador rápido de Python)

`uv add pymongo tqdm python-dotenv pandas`

## 2. Conexión a MongoDB Atlas

⚠️ **IMPORTANTE: Seguridad de las Credenciales**
- Nunca subas tu contraseña a un repositorio
- Usa variables de entorno o archivos .env para las credenciales
- Asegúrate de que tu IP esté en la whitelist de MongoDB Atlas

### Configuración del archivo .env

Para mayor seguridad, es recomendable guardar las credenciales en un archivo `.env` en el directorio raíz del proyecto:

```
# Archivo .env (coloca este archivo en la raíz del proyecto)
MONGODB_PASSWORD=tu_contraseña_real
```

Asegúrate de que este archivo esté incluido en `.gitignore` para evitar subirlo accidentalmente al repositorio.

In [2]:
import os
import json
import time
import pathlib
from dotenv import load_dotenv
from pymongo import MongoClient
from pymongo.server_api import ServerApi
from tqdm import tqdm
import pandas as pd

## 3. Configuración de la Conexión

### Configuración de la conexión MongoDB

Configuramos la conexión con timeouts extendidos para manejar operaciones de datos grandes:
- **Socket Timeout**: 5 minutos para operaciones largas
- **Connection Timeout**: 5 minutos para conexiones lentas
- **Connection Pool**: 50 conexiones máximo para eficiencia
- **Retry Writes/Reads**: Reintenta automáticamente en caso de fallos temporales

### Verificación de la conexión
Hacemos un ping a MongoDB Atlas para asegurar que:
- Las credenciales son correctas
- La conexión funciona
- El cluster está disponible

In [3]:
# Cargar variables de entorno desde archivo .env

# Opción 1: Ruta relativa simple (recomendada para notebooks)
load_dotenv('../.env')

# Opción 2: Ruta absoluta como fallback
project_root = pathlib.Path().resolve().parent
dotenv_path = project_root / '.env'
if dotenv_path.exists():
    load_dotenv(dotenv_path)
    print(f"Archivo .env encontrado en: {dotenv_path}")
else:
    print(f"Archivo .env no encontrado en: {dotenv_path}")

# Configuración de la conexión a MongoDB Atlas
PASSWORD = os.environ.get("MONGODB_PASSWORD")  # Obtener la contraseña del archivo .env

# Verificar que la contraseña existe
if not PASSWORD:
    print("Error: No se encontró la variable MONGODB_PASSWORD en el archivo .env")
    print("Por favor, crea un archivo .env con la variable MONGODB_PASSWORD=tu_contraseña_real")
    print(f"Variables de entorno disponibles que empiezan con 'MONGO': {[k for k in os.environ.keys() if 'MONGO' in k]}")
else:
    print("Variable MONGODB_PASSWORD encontrada correctamente")
    
    # URI mejorada con timeouts más largos (basada en el template de configuración)
    uri = f"mongodb+srv://juank920621:{PASSWORD}@cluster0.tsbdbxg.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

    # Crear cliente de MongoDB con configuración mejorada para evitar timeouts
    client = MongoClient(
        uri, 
        server_api=ServerApi('1'),
        socketTimeoutMS=300000,          # 5 minutos socket timeout
        connectTimeoutMS=300000,         # 5 minutos connection timeout  
        serverSelectionTimeoutMS=300000, # 5 minutos server selection timeout
        maxPoolSize=50,                  # Max connection pool size
        retryWrites=True,                # Enable retryable writes
        retryReads=True                  # Enable retryable reads
    )

    try:
        # Verificar la conexión con timeout
        print("Intentando conectar a MongoDB Atlas...")
        client.admin.command('ping')
        print("Conexión exitosa a MongoDB Atlas")
        
        # Crear/seleccionar la base de datos y colecciones
        db = client['tfm_yelp_db']
        businesses_collection = db['businesses']
        reviews_collection = db['reviews']
        
        print("Base de datos y colecciones configuradas:")
        print(f"   - Database: {db.name}")
        print(f"   - Collections: {', '.join([businesses_collection.name, reviews_collection.name])}")
        
    except Exception as e:
        print("Error al conectar a MongoDB Atlas:")
        print(f"Error details: {e}")
        print("\n Pasos de solución:")
        print("1. Verifica que tu IP esté en la whitelist de MongoDB Atlas")
        print("2. Asegúrate de que el cluster no esté pausado")
        print("3. Verifica tu conexión a internet")
        print("4. Revisa que el firewall no bloquee los puertos 27017-27019")
        print("5. Confirma que la contraseña en el archivo .env sea correcta")

Archivo .env encontrado en: /home/juangarzon/Master-Big-Data/M10. Trabajo Final de Máster/tfm-proyecto/.env
Variable MONGODB_PASSWORD encontrada correctamente
Intentando conectar a MongoDB Atlas...
Conexión exitosa a MongoDB Atlas
Base de datos y colecciones configuradas:
   - Database: tfm_yelp_db
   - Collections: businesses, reviews


### Limpieza de colecciones existentes

Antes de cargar nuevos datos, eliminamos las colecciones existentes para evitar duplicados:
- Se eliminan las colecciones `businesses` y `reviews`
- Esto asegura que empezamos con datos limpios en cada ejecución

In [4]:
# Drop collections for clean start
print("Dropping existing collections...")

# Drop both collections
db['businesses'].drop()
db['reviews'].drop()

print("Collections dropped successfully.")

Dropping existing collections...
Collections dropped successfully.


## 4. Importación de datos JSON a MongoDB Atlas

En esta sección cargamos los archivos principales del dataset de Yelp a MongoDB Atlas:

**Archivos que procesamos:**
1. **`yelp_academic_dataset_business.json`**: Información de negocios
2. **`yelp_academic_dataset_review.json`**: Reseñas de usuarios

**Características del proceso:**
- **Formato**: JSON Lines (un documento por línea)
- **Método**: Inserción en lotes de 5,000 documentos
- **Progreso**: Barras de progreso con tqdm para visualizar el avance
- **Manejo de errores**: Continúa procesando aunque encuentre documentos malformados

**Colecciones creadas:**
- `businesses`: Información sobre los negocios
- `reviews`: Reseñas realizadas por los usuarios

> **Nota:** Si los archivos son muy grandes, asegúrate de tener una conexión estable a internet. El proceso puede tomar varios minutos dependiendo del tamaño de los datos.

In [5]:
# Script de ingesta de datos para el dataset de Yelp

# Configuración
DATA_FILES = {
    "businesses": "../data/raw/yelp_academic_dataset_business.json",
    "reviews": "../data/raw/yelp_academic_dataset_review.json"
}
BATCH_SIZE = 5000

def load_data_to_mongodb(db, files_config, batch_size=5000):
    """Cargar archivos JSON line a colecciones de MongoDB con barras de progreso detalladas."""
    
    for collection_name, file_path in files_config.items():
        if not os.path.exists(file_path):
            print(f"Error: File not found - {file_path}")
            continue
            
        print(f"\nLoading {collection_name}...")
        collection = db[collection_name]
        
        # Contar líneas totales para la barra de progreso
        print(f"Counting lines in {file_path}...")
        with open(file_path, 'r', encoding='utf-8') as f:
            total_lines = sum(1 for _ in f)
        
        print(f"Total lines to process: {total_lines:,}")
        
        # Insertar datos en lotes con progreso detallado
        with open(file_path, 'r', encoding='utf-8') as f:
            batch = []
            total_inserted = 0
            total_processed = 0
            
            # Usar context manager para tqdm para asegurar limpieza adecuada
            with tqdm(
                total=total_lines,
                desc=f"Processing {collection_name}",
                unit="docs",
                unit_scale=True,
                bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}] Inserted: {postfix}",
                leave=True,
                disable=False
            ) as pbar:
                
                for line in f:
                    total_processed += 1
                    try:
                        document = json.loads(line.strip())
                        batch.append(document)
                        
                        if len(batch) >= batch_size:
                            # Insertar lote con indicador de progreso mini
                            collection.insert_many(batch)
                            total_inserted += len(batch)
                            batch = []
                            
                            # Actualizar barra de progreso con información de inserción
                            pbar.set_postfix_str(f"{total_inserted:,}")
                        
                        # Actualizar barra de progreso
                        pbar.update(1)
                            
                    except json.JSONDecodeError as e:
                        # Saltar líneas JSON malformadas
                        pbar.update(1)
                        continue
                
                # Insertar documentos restantes
                if batch:
                    collection.insert_many(batch)
                    total_inserted += len(batch)
                    pbar.set_postfix_str(f"{total_inserted:,}")
        
        print(f"Completed {collection_name}: {total_inserted:,} documents inserted from {total_processed:,} lines processed")

# Ejecutar carga de datos con seguimiento de progreso mejorado
print("Starting data ingestion process...")
load_data_to_mongodb(db, DATA_FILES, BATCH_SIZE)

# Verificar conteos finales
print("\nFinal collection summary:")
for collection_name in DATA_FILES.keys():
    count = db[collection_name].count_documents({})
    print(f"   {collection_name}: {count:,} documents")

print("\nData ingestion completed successfully!")

Starting data ingestion process...

Loading businesses...
Counting lines in ../data/raw/yelp_academic_dataset_business.json...
Total lines to process: 150,346


Processing businesses: 100%|██████████| 150k/150k [00:46<00:00, 3.24kdocs/s] Inserted: , 150,346


Completed businesses: 150,346 documents inserted from 150,346 lines processed

Loading reviews...
Counting lines in ../data/raw/yelp_academic_dataset_review.json...
Total lines to process: 6,990,280


Processing reviews: 100%|██████████| 6.99M/6.99M [35:34<00:00, 3.27kdocs/s] Inserted: , 6,990,280


Completed reviews: 6,990,280 documents inserted from 6,990,280 lines processed

Final collection summary:
   businesses: 150,346 documents
   reviews: 6,990,280 documents

Data ingestion completed successfully!


## 5. Verificación de la Ingesta de Datos

Una vez completada la carga, verificamos que todo se haya procesado correctamente:

### Conteo de documentos por colección
Verificamos cuántos documentos se insertaron en cada colección:
- `businesses`: Debería mostrar ~150K documentos
- `reviews`: Debería mostrar ~7M documentos

### Inspección de la estructura de datos
Examinamos un documento de muestra de cada colección para:
- Verificar que el formato JSON se preservó correctamente
- Identificar los campos disponibles para análisis
- Validar que no hay problemas de encoding con caracteres especiales

Esto nos da una idea de cómo están estructurados nuestros datos antes de continuar.

In [6]:
# Verificación inicial de las colecciones
for coleccion in DATA_FILES.keys():
    count = db[coleccion].count_documents({})
    print(f"{coleccion}: {count:,} documentos")

businesses: 150,346 documentos
reviews: 6,990,280 documentos


In [7]:
# Mostrar un documento de ejemplo de cada colección
for coleccion in DATA_FILES.keys():
    doc = db[coleccion].find_one()
    print(f"\nPrimer documento de '{coleccion}':\n{json.dumps(doc, indent=2, ensure_ascii=False, default=str)}")


Primer documento de 'businesses':
{
  "_id": "686eadc415b72f78f153cf15",
  "business_id": "Pns2l4eNsfO8kk83dixA6A",
  "name": "Abby Rappoport, LAC, CMQ",
  "address": "1616 Chapala St, Ste 2",
  "city": "Santa Barbara",
  "state": "CA",
  "postal_code": "93101",
  "latitude": 34.4266787,
  "longitude": -119.7111968,
  "stars": 5.0,
  "review_count": 7,
  "is_open": 0,
  "attributes": {
    "ByAppointmentOnly": "True"
  },
  "categories": "Doctors, Traditional Chinese Medicine, Naturopathic/Holistic, Acupuncture, Health & Medical, Nutritionists",
  "hours": null
}

Primer documento de 'reviews':
{
  "_id": "686eadf615b72f78f1561a5f",
  "review_id": "KU_O5udG6zpxOg-VcAEodg",
  "user_id": "mh_-eMZ6K5RLWhZyISBhwA",
  "business_id": "XQfwVwDr-v0ZS3_CbbE5Xw",
  "stars": 3.0,
  "useful": 0,
  "funny": 0,
  "cool": 0,
  "text": "If you decide to eat here, just be aware it is going to take about 2 hours from beginning to end. We have tried it multiple times, because I want to like it! I have b

## 6. Análisis de Categorías de Negocios

Antes de filtrar los restaurantes, analizamos qué tipos de negocios tenemos en el dataset.

### ¿Por qué analizar las categorías?
- El campo `categories` contiene múltiples categorías separadas por comas
- Necesitamos entender la distribución para filtrar correctamente
- Queremos asegurar que capturamos todos los restaurantes sin incluir negocios irrelevantes

### Pipeline de análisis
Usamos un pipeline de agregación de MongoDB para:
1. Dividir las categorías (separadas por comas) en elementos individuales
2. Contar cuántos negocios hay por categoría
3. Mostrar las 50 categorías más comunes

Esto nos ayuda a entender qué tipos de negocios dominan el dataset.

### Filtrado de restaurantes
Para nuestro análisis, buscamos específicamente negocios que tengan "Restaurants" en sus categorías:
- Usamos una expresión regular `\bRestaurants\b` para buscar la palabra exacta
- Esto evita falsos positivos como "Restaurant Supplies" o "Restaurant Equipment"
- Nos aseguramos de capturar solo restaurantes reales

In [8]:
pipeline_categories = [
    {"$match": {"categories": {"$exists": True}}},
    {"$project": {"categories": {"$split": ["$categories", ", "]}}},
    {"$unwind": "$categories"},
    {"$group": {"_id": "$categories", "count": {"$sum": 1}}},
    {"$sort": {"count": -1}},
]

categories_cursor = db["businesses"].aggregate(pipeline_categories)

for cat in categories_cursor:
    print(cat)

{'_id': 'Restaurants', 'count': 52268}
{'_id': 'Food', 'count': 27781}
{'_id': 'Shopping', 'count': 24395}
{'_id': 'Home Services', 'count': 14356}
{'_id': 'Beauty & Spas', 'count': 14292}
{'_id': 'Nightlife', 'count': 12281}
{'_id': 'Health & Medical', 'count': 11890}
{'_id': 'Local Services', 'count': 11198}
{'_id': 'Bars', 'count': 11065}
{'_id': 'Automotive', 'count': 10773}
{'_id': 'Event Planning & Services', 'count': 9895}
{'_id': 'Sandwiches', 'count': 8366}
{'_id': 'American (Traditional)', 'count': 8139}
{'_id': 'Active Life', 'count': 7687}
{'_id': 'Pizza', 'count': 7093}
{'_id': 'Coffee & Tea', 'count': 6703}
{'_id': 'Fast Food', 'count': 6472}
{'_id': 'Breakfast & Brunch', 'count': 6239}
{'_id': 'American (New)', 'count': 6097}
{'_id': 'Hotels & Travel', 'count': 5857}
{'_id': 'Home & Garden', 'count': 5799}
{'_id': 'Fashion', 'count': 5739}
{'_id': 'Burgers', 'count': 5636}
{'_id': 'Arts & Entertainment', 'count': 5434}
{'_id': 'Auto Repair', 'count': 5433}
{'_id': 'Hair 

In [9]:
# Filtrar solo por la categoría "Restaurants"
restaurant_category = "Restaurants"

# Pipeline para analizar todas las categorías disponibles
print("\nAnalizando categorías disponibles...")
pipeline_categories = [
    {"$match": {"categories": {"$exists": True, "$ne": None}}},
    {"$project": {"categories": {"$split": ["$categories", ", "]}}},
    {"$unwind": "$categories"},
    {"$group": {"_id": "$categories", "count": {"$sum": 1}}},
    {"$sort": {"count": -1}},
    {"$limit": 50}  # Limitar a las top 50 categorías
]

print("Top 50 categorías más comunes:")
categories_cursor = db["businesses"].aggregate(pipeline_categories)
for i, cat in enumerate(categories_cursor, 1):
    print(f"{i:2d}. {cat['_id']}: {cat['count']:,} negocios")

# Contar específicamente cuántos negocios son restaurantes
restaurant_count = db["businesses"].count_documents({
    "categories": {"$regex": f"\\b{restaurant_category}\\b"}
})

print(f"Total de restaurantes encontrados: {restaurant_count:,}")


Analizando categorías disponibles...
Top 50 categorías más comunes:


 1. Restaurants: 52,268 negocios
 2. Food: 27,781 negocios
 3. Shopping: 24,395 negocios
 4. Home Services: 14,356 negocios
 5. Beauty & Spas: 14,292 negocios
 6. Nightlife: 12,281 negocios
 7. Health & Medical: 11,890 negocios
 8. Local Services: 11,198 negocios
 9. Bars: 11,065 negocios
10. Automotive: 10,773 negocios
11. Event Planning & Services: 9,895 negocios
12. Sandwiches: 8,366 negocios
13. American (Traditional): 8,139 negocios
14. Active Life: 7,687 negocios
15. Pizza: 7,093 negocios
16. Coffee & Tea: 6,703 negocios
17. Fast Food: 6,472 negocios
18. Breakfast & Brunch: 6,239 negocios
19. American (New): 6,097 negocios
20. Hotels & Travel: 5,857 negocios
21. Home & Garden: 5,799 negocios
22. Fashion: 5,739 negocios
23. Burgers: 5,636 negocios
24. Arts & Entertainment: 5,434 negocios
25. Auto Repair: 5,433 negocios
26. Hair Salons: 5,046 negocios
27. Nail Salons: 4,621 negocios
28. Mexican: 4,600 negocios
29. Italian: 4,573 negocios
30. Specialty Food: 4,233 negocios
31. Docto

## 7. Optimización con Índices

Para mejorar el rendimiento de las consultas que haremos después, creamos índices en los campos que más usaremos:

### Índices creados:
- **`categories` en businesses**: Para filtrar por tipo de negocio rápidamente
- **`business_id` en businesses**: Para búsquedas por ID de negocio
- **`business_id` en reviews**: Para conectar reviews con restaurantes eficientemente

### ¿Por qué son importantes los índices?
Sin índices, MongoDB tiene que revisar todos los documentos uno por uno para encontrar lo que buscas. Con índices, las búsquedas son mucho más rápidas, especialmente cuando tenemos millones de documentos.

Crear estos índices ahora nos ahorrará mucho tiempo en las operaciones siguientes.

In [10]:
# Crear índices para optimizar las consultas
print("\nCreando índices para optimizar consultas...")
try:
    db["businesses"].create_index("categories")
    db["businesses"].create_index("business_id")
    db["reviews"].create_index("business_id")
    print("Indices creados exitosamente")
except Exception as e:
    print(f"Error creando índices: {e}")


Creando índices para optimizar consultas...
Indices creados exitosamente


## 8. Extracción de Restaurantes con Reviews

Esta es la parte principal del proceso donde obtenemos todos los restaurantes junto con sus reseñas.

### ¿Por qué no usar un JOIN directo en MongoDB?
Los JOINs tradicionales en MongoDB pueden ser muy lentos cuando tenemos:
- ~52,000 restaurantes
- ~5,000,000 reviews
- Conexión a MongoDB Atlas por internet
- Operaciones que pueden tomar 20-40 minutos y fallar por timeouts

### Nuestra solución: Queries separadas + JOIN local
En lugar de un JOIN complejo en MongoDB, hacemos el proceso en pasos simples:

1. **Paso 1**: Obtenemos todos los restaurantes (rápido)
2. **Paso 2**: Obtenemos las reviews de esos restaurantes en lotes (controlado)
3. **Paso 3**: Unimos todo usando pandas en memoria local (súper rápido)

Esta estrategia reduce el tiempo de ~30 minutos a ~5 minutos y es mucho más confiable.

### Paso 1: Obtener datos de restaurantes

Extraemos todos los negocios que tienen "Restaurants" en sus categorías.

**Campos que obtenemos:**
- **Información básica**: business_id, name, categories
- **Ubicación**: address, city, state, postal_code, latitude, longitude  
- **Métricas**: stars, review_count, is_open
- **Extras**: attributes, hours

**Optimización importante:** Solo transferimos los campos que necesitamos, excluyendo el `_id` de MongoDB y otros campos innecesarios. Esto hace la transferencia más rápida.

El resultado es un DataFrame de pandas con información de todos los restaurantes.

In [11]:
start_time = time.time()

# Obtener datos de restaurantes
print("Paso 1: Obteniendo datos de restaurantes...")


restaurants_cursor = db["businesses"].find(
    {"categories": {"$regex": f"\\b{restaurant_category}\\b"}},
    {
        "_id": 0,  # Excluir _id de MongoDB
        "business_id": 1, 
        "name": 1, 
        "categories": 1, 
        "address": 1,
        "city": 1, 
        "state": 1, 
        "postal_code": 1,
        "latitude": 1,      
        "longitude": 1,    
        "stars": 1,         
        "review_count": 1,  
        "is_open": 1,       
        "attributes": 1,    
        "hours": 1          
    }
)

# Convertir cursor a lista con seguimiento de progreso
restaurants_list = []
with tqdm(desc="Cargando restaurantes", unit="restaurants") as pbar:
    for restaurant in restaurants_cursor:
        restaurants_list.append(restaurant)
        pbar.update(1)
restaurants_df = pd.DataFrame(restaurants_list)

step1_time = time.time() - start_time
print(f"Restaurantes obtenidos: {len(restaurants_df):,} en {step1_time:.1f}s")

Paso 1: Obteniendo datos de restaurantes...


Cargando restaurantes: 52268restaurants [00:06, 7512.93restaurants/s]

Restaurantes obtenidos: 52,268 en 7.0s





### Paso 2: Obtener reviews de esos restaurantes

Ahora obtenemos todas las reviews de los restaurantes que encontramos en el paso anterior.

**¿Por qué procesamos en lotes?**
- Tenemos ~52,000 restaurantes que pueden tener millones de reviews
- Cada lote procesa 5,000 restaurantes a la vez
- Esto evita problemas de memoria y timeouts de red
- Podemos ver el progreso y recuperarnos si algo falla

**Campos de review que obtenemos:**
- **Identificadores**: business_id, review_id, user_id
- **Contenido principal**: text, stars, date
- **Métricas de utilidad**: useful, funny, cool

**Progreso:** Vemos el avance lote por lote para saber cómo va el proceso.

Al final tenemos todas las reviews de todos nuestros restaurantes.

In [12]:
# Obtener reviews de esos restaurantes
print("Paso 2: Obteniendo reviews de esos restaurantes...")

restaurant_ids = restaurants_df['business_id'].tolist()

# Procesar en lotes para evitar límites de MongoDB
batch_size = 5000  # Tamaño de lote
all_reviews = []

total_batches = (len(restaurant_ids) + batch_size - 1) // batch_size

# Usar context manager para procesamiento por lotes
with tqdm(range(0, len(restaurant_ids), batch_size), desc="Procesando lotes", total=total_batches, unit="lotes") as batch_progress:
    for i in batch_progress:
        batch_ids = restaurant_ids[i:i + batch_size]
        
        reviews_cursor = db["reviews"].find(
            {"business_id": {"$in": batch_ids}},
            {
                "_id": 0,  # Excluir _id de MongoDB
                "business_id": 1, 
                "review_id": 1, 
                "user_id": 1,
                "stars": 1, 
                "date": 1, 
                "text": 1, 
                "useful": 1,
                "funny": 1, 
                "cool": 1
            }
        )
        
        batch_reviews = list(reviews_cursor)
        all_reviews.extend(batch_reviews)

reviews_df = pd.DataFrame(all_reviews)

step2_time = time.time() - start_time - step1_time
print(f"Reviews obtenidos: {len(reviews_df):,} en {step2_time:.1f}s")

Paso 2: Obteniendo reviews de esos restaurantes...


Procesando lotes: 100%|██████████| 11/11 [46:11<00:00, 251.94s/lotes]


Reviews obtenidos: 4,724,471 en 2776.1s


### Paso 3: Guardar archivos separados de business y reviews

Antes de hacer el JOIN, guardamos los DataFrames de restaurantes y reviews en archivos separados para tener flexibilidad en el análisis.

**Archivos generados:**
- **`businesses.json`**: Contiene todos los restaurantes filtrados con sus datos completos
- **`reviews.json`**: Contiene todas las reviews de esos restaurantes

**¿Por qué guardar archivos separados?**
- **Flexibilidad**: Permite trabajar con restaurantes y reviews por separado
- **Análisis específicos**: Facilita análisis que requieren solo una parte de los datos
- **Debugging**: Ayuda a identificar problemas en cada conjunto de datos
- **Reutilización**: Los archivos pueden usarse en otros notebooks sin reprocesar

**Formato de los archivos:**
- JSON con indentación para fácil lectura
- Encoding UTF-8 para caracteres especiales
- Estructura consistente con el archivo final

In [13]:
# Guardar archivos separados de business y reviews
print("Paso 3: Guardando archivos separados de business y reviews...")

# Crear directorio si no existe
os.makedirs("../data", exist_ok=True)

# Guardar DataFrame de restaurantes
businesses_file = "../data/processed/businesses.json"
restaurants_list = restaurants_df.to_dict('records')
with open(businesses_file, "w", encoding="utf-8") as f:
    json.dump(restaurants_list, f, ensure_ascii=False, indent=2, default=str)
print(f"Archivo de restaurantes guardado: {businesses_file}")

# Guardar DataFrame de reviews
reviews_file = "../data/processed/reviews.json"
reviews_list = reviews_df.to_dict('records')
with open(reviews_file, "w", encoding="utf-8") as f:
    json.dump(reviews_list, f, ensure_ascii=False, indent=2, default=str)
print(f"Archivo de reviews guardado: {reviews_file}")

Paso 3: Guardando archivos separados de business y reviews...
Archivo de restaurantes guardado: ../data/processed/businesses.json
Archivo de reviews guardado: ../data/processed/reviews.json


### Paso 4: Unir restaurantes con reviews usando pandas

Finalmente, conectamos cada restaurante con todas sus reviews. Esto se hace en memoria local usando pandas, que es súper eficiente.

**Proceso:**
1. **Agrupamos reviews**: Tomamos todas las reviews y las agrupamos por business_id
2. **Creamos listas**: Convertimos las reviews de cada restaurante en una lista de diccionarios
3. **Contamos reviews**: Calculamos automáticamente cuántas reviews tiene cada restaurante  
4. **Hacemos JOIN**: Unimos la información del restaurante con sus reviews
5. **Ordenamos**: Ponemos primero los restaurantes más populares (más reviews)

**¿Por qué es rápido?**
- Todo se hace en memoria RAM local
- No hay latencia de red
- Pandas está optimizado para estas operaciones
- No dependemos de la conexión a MongoDB

El resultado final: cada restaurante tiene toda su información más un array con todas sus reviews.

In [14]:
# JOIN local con pandas
print("Paso 4: Realizando JOIN local...")

join_start = time.time()

# Agrupar reviews por restaurante usando pandas
reviews_grouped = reviews_df.groupby('business_id').agg({
    'review_id': lambda x: x.tolist(),
    'user_id': lambda x: x.tolist(),
    'stars': lambda x: x.tolist(),
    'date': lambda x: x.tolist(),
    'text': lambda x: x.tolist(),
    'useful': lambda x: x.tolist(),
    'funny': lambda x: x.tolist(),
    'cool': lambda x: x.tolist()
}).reset_index()

# Convertir a formato de lista de diccionarios
def create_reviews_list(row):
    return [
        {
            'review_id': row['review_id'][i],
            'user_id': row['user_id'][i], 
            'stars': row['stars'][i],
            'date': row['date'][i],
            'text': row['text'][i],
            'useful': row['useful'][i],
            'funny': row['funny'][i],
            'cool': row['cool'][i]
        }
        for i in range(len(row['review_id']))
    ]

reviews_grouped['reviews'] = reviews_grouped.apply(create_reviews_list, axis=1)
reviews_grouped['total_reviews'] = reviews_grouped['reviews'].apply(len)

# Mantener solo las columnas necesarias
reviews_final = reviews_grouped[['business_id', 'reviews', 'total_reviews']]

# JOIN con restaurantes
result_df = restaurants_df.merge(reviews_final, on='business_id', how='inner')

# Ordenar por número de reviews
result_df = result_df.sort_values('total_reviews', ascending=False)

join_time = time.time() - join_start
print(f"JOIN completado en {join_time:.1f}s")

Paso 4: Realizando JOIN local...
JOIN completado en 38.3s


## Exportación de Resultados

### Paso 5: Exportar resultado final

Guardamos todos los datos procesados en un archivo JSON listo para análisis.

**Características del archivo generado:**
- **Formato**: JSON con indentación para fácil lectura
- **Estructura**: Cada restaurante incluye todas sus reviews como un array
- **Ordenamiento**: Por popularidad (restaurantes con más reviews primero)
- **Encoding**: UTF-8 para manejar caracteres especiales correctamente

**Métricas que mostramos al final:**
- Cuántos restaurantes procesamos en total
- Número total de reviews incluidas
- Promedio de reviews por restaurante
- Tiempo total que tomó todo el proceso

### ¿Qué tenemos ahora?
Un dataset limpio y estructurado con:
- ~52,000 restaurantes de Estados Unidos
- ~5,000,000 reviews asociadas
- Datos geográficos completos
- Texto completo de reviews para análisis de NLP

In [15]:
# Exportar resultado
print("Paso 5: Exportando resultado...")

# Crear directorio si no existe
os.makedirs("../data", exist_ok=True)

# Convertir DataFrame a lista de diccionarios
results_list = result_df.to_dict('records')

# Exportar resultado como JSON
output_file = "../data/processed/restaurants_with_reviews.json"

with open(output_file, "w", encoding="utf-8") as f:
    json.dump(results_list, f, ensure_ascii=False, indent=2, default=str)

# Resumen de ejecución
total_time = time.time() - start_time
total_reviews = result_df['total_reviews'].sum()

print(f"\nExtracción completada")
print(f"Restaurantes procesados: {len(result_df):,}")
print(f"Total de reviews incluidas: {total_reviews:,}")
print(f"Tiempo de ejecución: {total_time:.1f}s ({total_time/60:.1f} minutos)")
print(f"Archivo generado: {output_file}")
print(f"\nDataset listo para análisis")

Paso 5: Exportando resultado...

Extracción completada
Restaurantes procesados: 52,268
Total de reviews incluidas: 4,724,471
Tiempo de ejecución: 2907.5s (48.5 minutos)
Archivo generado: ../data/processed/restaurants_with_reviews.json

Dataset listo para análisis
