
# Challenge de Reproducibilidad: MR-DBSCAN
## Algoritmo Paralelo de Clustering por Densidad usando MapReduce

---

## Informaci√≥n del Proyecto

**Paper:** MR-DBSCAN: An Efficient Parallel Density-based Clustering Algorithm using MapReduce  
**Autores:** Yaobin He, Haoyu Tan, Wuman Luo, et al. (Shenzhen Institutes of Advanced Technology)  
**Dataset:** 1.9 billion GPS records (Shanghai taxi data) o sint√©tico  
**Plataforma:** Hadoop + PySpark  

**Objetivo:** Reproducir el algoritmo MR-DBSCAN en 4 etapas usando MapReduce para clustering de datos espaciales a gran escala.

---

## Par√°metros del Algoritmo

- **Eps (epsilon):** Radio m√°ximo de vecindad
  - Limpieza de datos: 0.002 (elimina ruido/outliers)
  - Identificaci√≥n de regiones densas: 0.0002 (clusters m√°s espec√≠ficos)

- **MinPts:** N√∫mero m√≠nimo de puntos para formar un cluster
  - Limpieza: 1000 puntos
  - Regiones densas: 100 puntos

- **Particiones:** Grid-based partitioning
  - Strips por dimensi√≥n: 80-160 seg√∫n Eps

---

## Divisi√≥n de Tareas por Persona

| Rol | Responsabilidades |
|-----|-------------------|
| **Persona 1** | Etapa 1 (Preprocesamiento y Particionado) + Setup inicial |
| **Persona 2** | Etapa 2 (DBSCAN Local) + Etapa 3 (Detecci√≥n de Cruces) |
| **Persona 3** | Etapa 4 (Fusi√≥n Global) + Validaci√≥n y Reportes |

---

## Fases del Algoritmo MR-DBSCAN

El algoritmo se ejecuta en 4 etapas usando MapReduce:

1. **Stage 1 - Preprocesamiento:** An√°lisis del dataset y particionamiento en grid
2. **Stage 2 - DBSCAN Local:** Clustering independiente en cada partici√≥n
3. **Stage 3 - Detecci√≥n de Cruces:** Identificar clusters que cruzan fronteras (MC Sets)
4. **Stage 4 - Fusi√≥n Global:** Unificar clusters y relabeling global



---

# PARTE 1: SETUP INICIAL Y CARGA DE DATOS
## Responsable: Jose

### Paso 1.1: Configuraci√≥n del Entorno Spark

**¬øQu√© hace este paso?**

Este paso inicializa una sesi√≥n de Spark (SparkSession), que es el punto de entrada principal para trabajar con DataFrames y RDDs en PySpark. La sesi√≥n establece la conexi√≥n con el cluster Hadoop y configura el nombre de la aplicaci√≥n para identificarla en el Hadoop NameNode.

**Detalles t√©cnicos:**
- `SparkSession.builder` permite crear una nueva sesi√≥n de forma fluida
- `.appName()` asigna el nombre "MR-DBSCAN" que ver√°s en la interfaz Hadoop
- `.getOrCreate()` obtiene una sesi√≥n existente o crea una nueva

**¬øPor qu√© es importante?**
- Necesario para distribuir datos y c√≥digo a los nodos del cluster
- Permite usar funciones paralelas de Spark en todo el c√≥digo



In [None]:

# Inicializar SparkSession
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

def get_spark_session(app_name='MR-DBSCAN'):
    """
    Crea una nueva sesi√≥n de Spark o retorna la existente.

    Par√°metros:
        app_name (str): Nombre de la aplicaci√≥n para identificar en Hadoop

    Retorna:
        SparkSession: Sesi√≥n de Spark lista para usar
    """
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.shuffle.partitions", "200") \
        .getOrCreate()

    # Configurar nivel de logging para reducir mensajes
    spark.sparkContext.setLogLevel("WARN")

    return spark

# Crear sesi√≥n
spark = get_spark_session()
print(f"‚úì Sesi√≥n de Spark creada exitosamente")
print(f"  - Spark Version: {spark.version}")
print(f"  - Master: {spark.sparkContext.master()}")



### üìå Paso 1.2: Cargar y Preprocesar los Datos

**¬øQu√© hace este paso?**

Este paso:
1. **Carga o genera datos:** Si tienes un archivo CSV con GPS, lo carga. Si no, genera datos sint√©ticos que simulan coordenadas GPS (latitud y longitud)
2. **Normalizaci√≥n:** Transforma las coordenadas a un rango est√°ndar (0-1) para facilitar el particionado
3. **Estructuras de datos:** Convierte los datos a un DataFrame de Pandas (para pre-procesamiento local) o Spark (para procesamiento distribuido)

**¬øPor qu√© es importante?**
- El MR-DBSCAN requiere datos espaciales (lat, lon) en formato limpio
- La normalizaci√≥n es crucial para que el grid partitioning funcione correctamente
- Los datos limpios y bien estructurados son la base del algoritmo

**Datos de ejemplo:**
- Dataset original: 1.9 mil millones de GPS del taxi de Shanghai
- Para este ejemplo: 100,000 puntos sint√©ticos distribuidos aleatoriamente



In [None]:

import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

# Opci√≥n 1: Si tienes un CSV con datos GPS, descomenta y modifica:
# data = pd.read_csv('ruta/a/tu/archivo.csv')
# data = data[['lon', 'lat']].copy()

# Opci√≥n 2: Generar datos sint√©ticos (para demostraci√≥n)
n_points = 100000  # N√∫mero de puntos a generar
np.random.seed(42)  # Para reproducibilidad

# Simular coordenadas GPS de Shanghai (rango real)
data = pd.DataFrame({
    'lon': np.random.uniform(121.0, 122.0, size=n_points),  # Longitud
    'lat': np.random.uniform(30.9, 31.5, size=n_points)    # Latitud
})

# Normalizar coordenadas al rango [0, 1]
data['lon_norm'] = (data['lon'] - data['lon'].min()) / (data['lon'].max() - data['lon'].min())
data['lat_norm'] = (data['lat'] - data['lat'].min()) / (data['lat'].max() - data['lat'].min())

print(f"‚úì Datos cargados exitosamente")
print(f"  - N√∫mero de puntos: {len(data):,}")
print(f"  - Rango Longitud: [{data['lon'].min():.6f}, {data['lon'].max():.6f}]")
print(f"  - Rango Latitud: [{data['lat'].min():.6f}, {data['lat'].max():.6f}]")
print(f"\nPrimeros 5 registros:")
print(data.head())



---

# PARTE 2: PARTICIONADO EN GRID
## Responsable: Jose

### Paso 2.1: Divisi√≥n del Dominio Espacial con Grid

**¬øQu√© hace este paso?**

Este es el coraz√≥n del Stage 1 del MR-DBSCAN. El particionamiento en grid:

1. **Divide el espacio:** Crea una malla 2D que cubre todo el √°rea geogr√°fica
2. **Asigna particiones:** Cada punto GPS recibe un ID de partici√≥n seg√∫n su ubicaci√≥n en la malla
3. **Equilibra carga:** Intenta que cada partici√≥n tenga aproximadamente la misma cantidad de datos

**Conceptos clave:**
- **n_strips:** N√∫mero de divisiones por dimensi√≥n (80-160 seg√∫n el paper)
  - M√°s strips = particiones m√°s peque√±as (mejor balance pero m√°s r√©plicas en bordes)
  - Menos strips = particiones m√°s grandes (menos comunicaci√≥n pero desbalance)
- **lon_bin / lat_bin:** √çndice de la celda en cada dimensi√≥n
- **partition_id:** Identificador √∫nico de la partici√≥n (combinaci√≥n de lon_bin y lat_bin)

**¬øPor qu√© es importante?**
- MapReduce distribuye datos a diferentes nodos seg√∫n la partici√≥n
- El balance de carga afecta directamente el rendimiento paralelo
- Un particionamiento pobre causa "data skew" (algunos nodos reciben m√°s datos)

**Ejemplo visual:**
```
Si n_strips=3, espacio 0-1 se divide en:
[0-0.33, 0.33-0.67, 0.67-1.0]

Un punto en (0.5, 0.8) recibe:
lon_bin = 1 (est√° en [0.33-0.67])
lat_bin = 2 (est√° en [0.67-1.0])
partition_id = "1_2"
```



In [None]:

# Definir par√°metros de particionamiento
n_strips = 120  # √ìptimo seg√∫n el paper para Eps grande (0.002)

# Crear bins para dividir el espacio normalizado [0, 1]
lon_bins = np.linspace(0, 1, n_strips + 1)
lat_bins = np.linspace(0, 1, n_strips + 1)

# Asignar cada punto a su bin (celda del grid)
data['lon_bin'] = np.digitize(data['lon_norm'], lon_bins) - 1
data['lat_bin'] = np.digitize(data['lat_norm'], lat_bins) - 1

# Crear identificador √∫nico de partici√≥n
data['partition_id'] = data['lon_bin'].astype(str) + '_' + data['lat_bin'].astype(str)

# Informaci√≥n del particionado
n_partitions = len(data['partition_id'].unique())
print(f"‚úì Particionamiento completado")
print(f"  - N√∫mero de strips por dimensi√≥n: {n_strips}")
print(f"  - Total de particiones: {n_partitions}")
print(f"  - Puntos por partici√≥n (promedio): {len(data) / n_partitions:.1f}")
print(f"  - Particiones con datos: {(data['partition_id'].value_counts() > 0).sum()}")

# Mostrar distribuci√≥n de datos entre particiones
print(f"\nDistribuci√≥n de datos:")
partition_counts = data['partition_id'].value_counts()
print(f"  - Partici√≥n con m√°s datos: {partition_counts.max()} puntos")
print(f"  - Partici√≥n con menos datos: {partition_counts.min()} puntos")
print(f"  - Desviaci√≥n est√°ndar: {partition_counts.std():.1f}")



### Paso 2.2: Preparaci√≥n de Datos para MapReduce

**¬øQu√© hace este paso?**

Prepara los datos para ser procesados en paralelo en Hadoop:

1. **Agrupa por partici√≥n:** Organiza todos los puntos que pertenecen a la misma partici√≥n
2. **Convierte a Spark DataFrame:** Transforma los datos de Pandas a Spark para procesamiento distribuido
3. **Repartici√≥n:** Distribuye las particiones entre los nodos del cluster Hadoop

**¬øPor qu√© es importante?**
- MapReduce requiere datos en formato distribuido
- Spark DataFrame permite operaciones paralelas autom√°ticas
- La repartici√≥n asegura que cada nodo reciba trabajo similar



In [None]:

# Opci√≥n 1: Mantener en Pandas para procesamiento local
partition_groups = dict(list(data.groupby('partition_id')))
print(f"‚úì Datos agrupados por partici√≥n")
print(f"  - N√∫mero de grupos: {len(partition_groups)}")
print(f"  - Ejemplo de keys: {list(partition_groups.keys())[:5]}")

# Opci√≥n 2: Convertir a Spark DataFrame para procesamiento distribuido
spark_df = spark.createDataFrame(data)
spark_df = spark_df.repartition('partition_id')  # Distribuir por partition_id

print(f"\n‚úì Spark DataFrame creado")
print(f"  - N√∫mero de particiones Spark: {spark_df.rdd.getNumPartitions()}")
print(f"  - Total de registros: {spark_df.count():,}")
print(f"\nEstructura de datos:")
spark_df.printSchema()
