# 04 - DBSCAN

## Introdução

O **DBSCAN** (Density-Based Spatial Clustering of Applications with Noise) é um algoritmo de clustering baseado em densidade desenvolvido por Martin Ester e colaboradores em 1996. Diferentemente do K-Means e clustering hierárquico, o DBSCAN não requer que especifiquemos o número de clusters antecipadamente e é capaz de identificar clusters de formas arbitrárias e detectar outliers (ruído).

### Características Principais do DBSCAN:

1. **Baseado em densidade**: Agrupa pontos que estão densamente empacotados
2. **Detecção de ruído**: Identifica automaticamente outliers
3. **Forma arbitrária**: Pode encontrar clusters de qualquer formato
4. **Número automático de clusters**: Não precisa especificar k antecipadamente
5. **Robusto a outliers**: Outliers não afetam a formação dos clusters

## Fundamentos Matemáticos

O DBSCAN utiliza dois parâmetros principais:
- $\varepsilon$ (eps): raio da vizinhança
- $\text{minPts}$: número mínimo de pontos para formar um cluster

### Definições Fundamentais:

**1. Vizinhança-$\varepsilon$**: Para um ponto $p$, sua vizinhança-$\varepsilon$ é definida como:
$$N_\varepsilon(p) = \{q \in D | \text{dist}(p,q) \leq \varepsilon\}$$

**2. Ponto Central (Core Point)**: Um ponto $p$ é um ponto central se:
$$|N_\varepsilon(p)| \geq \text{minPts}$$

**3. Diretamente Alcançável por Densidade**: Um ponto $q$ é diretamente alcançável por densidade a partir de $p$ se:
- $q \in N_\varepsilon(p)$ e
- $p$ é um ponto central

**4. Alcançável por Densidade**: Um ponto $q$ é alcançável por densidade a partir de $p$ se existe uma cadeia de pontos $p_1, p_2, ..., p_n$ onde $p_1 = p$ e $p_n = q$, tal que $p_{i+1}$ é diretamente alcançável por densidade a partir de $p_i$.

**5. Conectado por Densidade**: Dois pontos $p$ e $q$ são conectados por densidade se existe um ponto $o$ tal que tanto $p$ quanto $q$ são alcançáveis por densidade a partir de $o$.

### Classificação dos Pontos:

- **Core Point (Ponto Central)**: $|N_\varepsilon(p)| \geq \text{minPts}$
- **Border Point (Ponto de Fronteira)**: $|N_\varepsilon(p)| < \text{minPts}$, mas está na vizinhança de um core point
- **Noise Point (Ponto de Ruído)**: Não é core nem border point

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN as SklearnDBSCAN
from scipy.spatial.distance import pdist, squareform
from scipy.stats import mode
import pandas as pd

plt.style.use('seaborn-v0_8-whitegrid')
np.random.seed(1)

## Implementação do DBSCAN

Vamos implementar o algoritmo DBSCAN passo a passo usando apenas NumPy:

In [None]:
class DBSCAN:
    def __init__(self, eps=0.5, min_pts=5, metric='euclidean'):
        """Inicializa o DBSCAN com os parâmetros eps e min_pts"""
        self.eps = eps
        self.min_pts = min_pts
        self.metric = metric
        self.labels_ = None
        self.core_samples_ = None
        self.n_clusters_ = 0
    
    def _calculate_distance_matrix(self, X):
        """Calcula a matriz de distâncias entre todos os pontos"""
        if self.metric == 'euclidean':
            distances = np.linalg.norm(X[:, np.newaxis] - X, axis=2)
        # elif self.metric == '...':
            # distance = ...
        else:
            raise ValueError("Métrica não suportada")
        return distances
    
    def _get_neighbors(self, point_idx, distance_matrix):
        """Encontra todos os vizinhos dentro da distância eps"""
        return np.where(distance_matrix[point_idx] <= self.eps)[0]
    
    def _expand_cluster(self, point_idx, neighbors, cluster_id, distance_matrix, visited, labels):
        """Expande o cluster a partir do ponto inicial"""
        labels[point_idx] = cluster_id
        queue = neighbors.tolist()

        while queue:
            neighbor_idx = queue.pop()

            if not visited[neighbor_idx]:
                visited[neighbor_idx] = True
                neighbor_neighbors = self._get_neighbors(neighbor_idx, distance_matrix)

                if len(neighbor_neighbors) >= self.min_pts:
                    queue.extend(neighbor_neighbors)

            if labels[neighbor_idx] == -1:
                labels[neighbor_idx] = cluster_id

    def fit(self, X):
        """Executa o algoritmo DBSCAN"""
        n_points = len(X)
        visited = np.zeros(n_points, dtype=bool)
        cluster_id = 0
        self.labels_ = np.full(n_points, -1)  # -1 = ruído
        self.core_samples_ = []

        distance_matrix = self._calculate_distance_matrix(X)

        for point_idx in range(n_points):
            if visited[point_idx]:
                continue

            visited[point_idx] = True
            neighbors = self._get_neighbors(point_idx, distance_matrix)

            if len(neighbors) >= self.min_pts:   # core point
                self.core_samples_.append(point_idx)
                self._expand_cluster(point_idx, neighbors, cluster_id, distance_matrix, visited, self.labels_)
                cluster_id += 1

        self.core_samples_ = np.array(self.core_samples_)
        self.n_clusters_ = cluster_id
        return self

    def fit_predict(self, X):
        """Executa DBSCAN e retorna os labels"""
        self.fit(X)
        return self.labels_

## Demonstração com Dados Sintéticos

Vamos criar dados sintéticos para demonstrar o funcionamento do DBSCAN:

In [None]:
rng = np.random.default_rng(42)

# Cabeça: círculo com leve ruído
n_head = 400
theta = rng.uniform(0, 2*np.pi, n_head)
R = 10 + rng.normal(0, 0.35, n_head)
head = np.c_[R*np.cos(theta), R*np.sin(theta)]
y_head = np.full(n_head, 0)

# Olhos: dois blobs gaussianos
n_eye = 100
eye_left  = rng.normal(loc=[-3.2,  3.0], scale=[0.45, 0.45], size=(n_eye//2, 2))
eye_right = rng.normal(loc=[ 3.2,  3.0], scale=[0.45, 0.45], size=(n_eye - n_eye//2, 2))
eyes = np.vstack([eye_left, eye_right])
y_eyes = np.full(eyes.shape[0], 1)

# Boca: arco inferior com jitter (sorriso)
n_mouth = 100
phi = rng.uniform(np.deg2rad(200), np.deg2rad(340), n_mouth)  # arco de 200° a 340°
Rm = 5 + rng.normal(0, 0.22, n_mouth)
mouth = np.c_[Rm*np.cos(phi), -1 + Rm*np.sin(phi)]
mouth += rng.normal(0, [0.12, 0.15], mouth.shape)  # engrossar um pouco
y_mouth = np.full(n_mouth, 2)

# Ruído: pontos aleatórios
n_noise = 100
noise = rng.uniform(low=[-13, -13], high=[13, 13], size=(n_noise, 2))
y_noise = np.full(n_noise, -1)

# Concatenar
X_synthetic = np.vstack([head, eyes, mouth, noise])
true_labels  = np.concatenate([y_head, y_eyes, y_mouth, y_noise])

In [None]:
# Visualizar os dados sintéticos
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
colors = ['red', 'blue', 'green', 'gray']
for i in range(4):
    if i == 3:  # ruído
        mask = true_labels == -1
        plt.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], c=colors[i], alpha=0.6, s=30, marker='x', label='Ruído')
    else:
        mask = true_labels == i
        plt.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], c=colors[i], alpha=0.7, s=50, label=f'Cluster {i+1}')

plt.title('Dados Sintéticos - Classes Verdadeiras')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.scatter(X_synthetic[:, 0], X_synthetic[:, 1], c='black', alpha=0.6, s=30)
plt.title('Dados Sintéticos - Sem Labels')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
dbscan = DBSCAN(eps=1.3, min_pts=7)
labels = dbscan.fit_predict(X_synthetic)

plt.figure(figsize=(6, 6))
unique_labels = np.unique(labels)
colors = plt.cm.Set1(np.linspace(0, 1, len(unique_labels)))

for k, label in enumerate(unique_labels):
    if label == -1:
        # Ruído
        mask = labels == label
        plt.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], c='black', marker='x', s=30, alpha=0.6, label='Ruído')
    else:
        # Clusters
        mask = labels == label
        plt.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], c=[colors[k]], s=50, alpha=0.7, label=f'Cluster {label}')

# Destacar core points
if len(dbscan.core_samples_) > 0:
    plt.scatter(X_synthetic[dbscan.core_samples_, 0], 
                X_synthetic[dbscan.core_samples_, 1],
                s=100, facecolors='none', edgecolors='black', 
                linewidth=2, alpha=0.8)

plt.tight_layout()
plt.show()

## Aplicando DBSCAN aos Dados Sintéticos

Agora vamos aplicar nosso algoritmo DBSCAN:

In [None]:
# Testar diferentes valores de eps e min_pts
eps_values = [0.5, 1.3, 2.0]
min_pts_values = [3, 5, 8]

fig, axes = plt.subplots(3, 3, figsize=(15, 15))
fig.suptitle('DBSCAN com Diferentes Parâmetros', fontsize=16)

for i, eps in enumerate(eps_values):
    for j, min_pts in enumerate(min_pts_values):
        # Aplicar DBSCAN
        dbscan = DBSCAN(eps=eps, min_pts=min_pts)
        labels = dbscan.fit_predict(X_synthetic)
        
        # Visualizar resultados
        ax = axes[i, j]
        
        unique_labels = np.unique(labels)
        colors = plt.cm.Set1(np.linspace(0, 1, len(unique_labels)))
        
        for k, label in enumerate(unique_labels):
            if label == -1:
                # Ruído
                mask = labels == label
                ax.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], 
                          c='black', marker='x', s=30, alpha=0.6, label='Ruído')
            else:
                # Clusters
                mask = labels == label
                ax.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], 
                          c=[colors[k]], s=50, alpha=0.7, label=f'Cluster {label}')
        
        # Destacar core points
        if len(dbscan.core_samples_) > 0:
            ax.scatter(X_synthetic[dbscan.core_samples_, 0], 
                      X_synthetic[dbscan.core_samples_, 1],
                      s=100, facecolors='none', edgecolors='black', 
                      linewidth=2, alpha=0.8)
        
        ax.set_title(f'eps={eps}, min_pts={min_pts}\n{dbscan.n_clusters_} clusters')
        ax.set_xlabel('Feature 1')
        ax.set_ylabel('Feature 2')
        ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

### Análise com Parâmetros Ótimos

Vamos escolher os parâmetros que melhor capturam a estrutura dos dados:

In [None]:
# Parâmetros que parecem funcionar melhor
best_eps = 1.3
best_min_pts = 5

# Aplicar DBSCAN com os melhores parâmetros
dbscan_best = DBSCAN(eps=best_eps, min_pts=best_min_pts)
labels_best = dbscan_best.fit_predict(X_synthetic)

# Análise detalhada
unique_labels = np.unique(labels_best)
n_clusters = len(unique_labels) - (1 if -1 in unique_labels else 0)
n_noise = np.sum(labels_best == -1)
n_core_samples = len(dbscan_best.core_samples_)

print(f"Resultados do DBSCAN (eps={best_eps}, min_pts={best_min_pts}):")
print(f"- Número de clusters encontrados: {n_clusters}")
print(f"- Número de pontos de ruído: {n_noise}")
print(f"- Número de core samples: {n_core_samples}")
print(f"- Labels únicos: {unique_labels}")

In [None]:
# Classificar pontos por tipo
core_mask = np.zeros(len(X_synthetic), dtype=bool)
if len(dbscan_best.core_samples_) > 0:
    core_mask[dbscan_best.core_samples_] = True

border_mask = (labels_best != -1) & (~core_mask)
noise_mask = labels_best == -1

print(f"Classificação dos pontos:")
print(f"- Core points: {np.sum(core_mask)}")
print(f"- Border points: {np.sum(border_mask)}")
print(f"- Noise points: {np.sum(noise_mask)}")

In [None]:
# Visualização detalhada dos tipos de pontos
plt.figure(figsize=(15, 5))

# Subplot 1: Clusters encontrados
plt.subplot(1, 3, 1)
colors = plt.cm.Set1(np.linspace(0, 1, max(len(unique_labels), 3)))

for i, label in enumerate(unique_labels):
    if label == -1:
        mask = labels_best == label
        plt.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], 
                   c='black', marker='x', s=50, alpha=0.8, label='Ruído')
    else:
        mask = labels_best == label
        plt.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], 
                   c=[colors[i]], s=60, alpha=0.8, label=f'Cluster {label}')

plt.title(f'DBSCAN - Clusters Encontrados\n{n_clusters} clusters, {n_noise} ruído')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True, alpha=0.3)

# Subplot 2: Tipos de pontos
plt.subplot(1, 3, 2)
plt.scatter(X_synthetic[core_mask, 0], X_synthetic[core_mask, 1], 
           c='red', s=80, alpha=0.8, label='Core Points', marker='o')
plt.scatter(X_synthetic[border_mask, 0], X_synthetic[border_mask, 1], 
           c='blue', s=60, alpha=0.8, label='Border Points', marker='s')
plt.scatter(X_synthetic[noise_mask, 0], X_synthetic[noise_mask, 1], 
           c='black', s=40, alpha=0.8, label='Noise Points', marker='x')

plt.title('Classificação dos Pontos')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True, alpha=0.3)

# Subplot 3: Comparação com ground truth
plt.subplot(1, 3, 3)
for i in range(4):
    if i == 3:  # ruído
        mask = true_labels == -1
        plt.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], 
                   c='gray', alpha=0.6, s=30, marker='x', label='Ruído Real')
    else:
        mask = true_labels == i
        plt.scatter(X_synthetic[mask, 0], X_synthetic[mask, 1], 
                   c=colors[i], alpha=0.7, s=50, label=f'Cluster Real {i+1}')

plt.title('Ground Truth')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Estimativa do Parâmetro $\varepsilon$ usando K-Distance

Uma das maiores dificuldades do DBSCAN é escolher o valor apropriado para o parâmetro $\varepsilon$ (eps). O método **K-Distance** é uma heurística eficaz para estimar este parâmetro.

O método K-Distance consiste em:

1. **Calcular a k-ésima distância mais próxima** para cada ponto no dataset
2. **Ordenar essas distâncias** em ordem decrescente
3. **Identificar o "cotovelo"** no gráfico resultante

A intuição é que pontos dentro de clusters densos terão k-ésimas distâncias pequenas, enquanto pontos de ruído ou em bordas de clusters terão distâncias maiores.

### Algoritmo K-Distance:

Para um dataset $D$ e parâmetro $k = \text{minPts} - 1$:

1. Para cada ponto $p_i \in D$:
   - Calcule $d_k(p_i)$ = distância ao k-ésimo vizinho mais próximo
2. Ordene os valores $d_k(p_i)$
3. Plote o gráfico K-Distance
4. Escolha $\varepsilon$ no ponto onde a curva tem maior curvatura (cotovelo)

In [None]:
from sklearn.neighbors import NearestNeighbors

def plot_k_distance(X, min_pts, title="K-Distance Plot"):
    """Plota o gráfico K-Distance usando sklearn.NearestNeighbors."""
    k = int(min_pts - 1)

    nn = NearestNeighbors(n_neighbors=k+1, metric="euclidean")
    nn.fit(X)
    distances, _ = nn.kneighbors(X)

    kth_distances = distances[:, k]
    k_distances_sorted = np.sort(kth_distances)

    # Plot
    plt.figure(figsize=(10, 6))
    plt.plot(range(len(k_distances_sorted)), k_distances_sorted, linewidth=2, label=f'{k}-distance')
    plt.xlabel("Pontos ordenados por distância")
    plt.ylabel(f"{k}-distance")
    plt.title(title)
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
plot_k_distance(X_synthetic, min_pts=5, title="K-Distance Plot para Dados Sintéticos")

## Vantagens e Desvantagens do DBSCAN

### Vantagens:

1. **Não requer especificar o número de clusters antecipadamente**
2. **Pode encontrar clusters de forma arbitrária** (não apenas esféricos)
3. **Identifica automaticamente outliers/ruído**
4. **Robusto a outliers** (não afetam a formação dos clusters)
5. **Determinístico** (sempre produz os mesmos resultados)

### Desvantagens:

1. **Sensível aos parâmetros** eps e min_pts
2. **Dificuldade com clusters de densidades diferentes**
3. **Problemas em alta dimensionalidade** ("curse of dimensionality")
4. **Complexidade computacional** O(n²) no pior caso
5. **Requer escolha cuidadosa da métrica de distância**

## Exercícios

### Exercício 1: Ajuste de Parâmetros no DBSCAN em 3D

Com os dados das **três esferas concêntricas**, realize:

1. Plotar o K-Distance para diferentes valores de `min_pts` e sugerir um intervalo adequado para `eps`.
2. Selecionar os melhores parâmetros de `min_pts` e `eps`.
3. Visualizar em 3D os clusters encontrados (cores diferentes) e comentar a escolha de `eps` e `min_samples`.

In [None]:
from sklearn.preprocessing import StandardScaler

def generate_concentric_spheres(radii=[3, 15], n_samples_per_sphere=1000, noise=0.2, random_state=42):
    """
    Gera pontos em 3 esferas concêntricas no espaço 3D.
    - radii: lista com os raios das esferas
    - n_samples_per_sphere: pontos em cada esfera
    - noise: variação radial para "espessura" da casca
    """
    rng = np.random.default_rng(random_state)
    X, y = [], []
    
    for i, r in enumerate(radii):
        # amostrar ângulos uniformemente
        phi = rng.uniform(0, 2*np.pi, n_samples_per_sphere)       # ângulo azimutal
        costheta = rng.uniform(-1, 1, n_samples_per_sphere)       # cos(theta)
        theta = np.arccos(costheta)                               # ângulo polar
        
        # raio com ruído
        rr = r + noise * rng.standard_normal(n_samples_per_sphere)
        
        # coordenadas cartesianas
        x = rr * np.sin(theta) * np.cos(phi)
        y_ = rr * np.sin(theta) * np.sin(phi)
        z = rr * np.cos(theta)
        
        X.append(np.vstack((x, y_, z)).T)
        y.append(np.full(n_samples_per_sphere, i))
    
    X = np.vstack(X)
    y = np.concatenate(y)
    return X, y

X_spheres, y_spheres = generate_concentric_spheres(radii=[3, 8, 12], n_samples_per_sphere=200, noise=0.4)

scaler = StandardScaler()
X_spheres = scaler.fit_transform(X_spheres)


In [None]:
import plotly.express as px

fig = px.scatter_3d(
    x=X_spheres[:, 0],
    y=X_spheres[:, 1],
    z=X_spheres[:, 2],
    color_continuous_scale=px.colors.qualitative.Vivid,
)
fig.update_traces(marker=dict(size=3))
fig.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import DBSCAN
import plotly.express as px

# ---------- Gerar as 3 esferas concêntricas ----------
def generate_concentric_spheres(radii=[3, 8, 12], n_samples_per_sphere=200, noise=0.4, random_state=42):
    rng = np.random.default_rng(random_state)
    X, y = [], []

    for i, r in enumerate(radii):
        phi = rng.uniform(0, 2*np.pi, n_samples_per_sphere)
        costheta = rng.uniform(-1, 1, n_samples_per_sphere)
        theta = np.arccos(costheta)
        rr = r + noise * rng.standard_normal(n_samples_per_sphere)
        x = rr * np.sin(theta) * np.cos(phi)
        y_ = rr * np.sin(theta) * np.sin(phi)
        z = rr * np.cos(theta)
        X.append(np.vstack((x, y_, z)).T)
    X = np.vstack(X)
    return X

X_spheres = generate_concentric_spheres()
X_spheres = StandardScaler().fit_transform(X_spheres)

def plot_k_distance(X, min_pts, title="K-Distance Plot"):
    k = int(min_pts - 1)
    nn = NearestNeighbors(n_neighbors=k+1)
    nn.fit(X)
    distances, _ = nn.kneighbors(X)
    kth_distances = np.sort(distances[:, k])

    plt.figure(figsize=(8,5))
    plt.plot(kth_distances, linewidth=2)
    plt.title(title)
    plt.xlabel("Pontos ordenados")
    plt.ylabel(f"{k}-distance")
    plt.grid(True, alpha=0.3)
    plt.show()

for min_pts in [5, 8, 9]:
    plot_k_distance(X_spheres, min_pts, title=f"K-Distance (min_pts={min_pts})")

best_eps = 0.7
best_min_pts = 9

db = DBSCAN(eps=best_eps, min_samples=best_min_pts)
labels = db.fit_predict(X_spheres)

fig = px.scatter_3d(
    x=X_spheres[:,0], y=X_spheres[:,1], z=X_spheres[:,2],
    color=labels.astype(str),
    title=f"DBSCAN 3D - eps={best_eps}, min_samples={best_min_pts}",
)
fig.update_traces(marker=dict(size=3))
fig.show()

# O eps = 0.7 e o min_pts = 9.
# Com esses valores foram encontrados 4 cluters

### Exercício 2: DBSCAN com distância radial

Usando os dados das **3 esferas concêntricas** do exercício anterior:

1. Implemente a **distância radial** e use-a no DBSCAN. A **distância radial** entre dois pontos \(x_i\) e \(x_j\) é a diferença absoluta entre suas distâncias à origem: $d_{\text{radial}}(x_i, x_j) = \big|\;\|x_i\|_2 - \|x_j\|_2\;\big|$
2. Plote o **K-Distance radial** para sugerir `eps`.  
3. Teste combinações de `eps` e `min_samples`.  
4. Visualize em 3D os clusters obtidos e compare com o resultado usando distância euclidiana.  
5. Comente brevemente qual configuração foi melhor e por quê a métrica radial ajuda nesse dataset.

In [None]:
# --- Exercício 2: DBSCAN com distância radial ---

from sklearn.metrics import pairwise_distances

# Definindo distância radial
def radial_distance(X):
    norms = np.linalg.norm(X, axis=1).reshape(-1, 1)
    return pairwise_distances(norms, norms, metric="euclidean")

class DBSCANRadial(DBSCAN):
    def _calculate_distance_matrix(self, X):
        return radial_distance(X)

# 1) Plotando K-Distance radial
plot_k_distance(X_spheres, min_pts=5, title="K-Distance (Radial)")

# 2) Testando DBSCAN radial
db_radial = DBSCANRadial(eps=0.5, min_pts=5)
labels_radial = db_radial.fit_predict(X_spheres)

fig = px.scatter_3d(
    x=X_spheres[:, 0], y=X_spheres[:, 1], z=X_spheres[:, 2],
    color=labels_radial.astype(str), opacity=0.7
)
fig.update_traces(marker=dict(size=3))
fig.show()

print(f"Clusters encontrados com radial: {db_radial.n_clusters_}")


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
import pandas as pd
from scipy.spatial.distance import pdist, squareform
from sklearn.cluster import DBSCAN as SkDBSCAN


def radial_distance_matrix(X):
    # distância de cada ponto até a origem
    r = np.linalg.norm(X, axis=1)
    # matriz de distâncias radiais
    return squareform(pdist(r[:, None], metric="euclidean"))

def plot_k_distance_radial(X, min_pts):
    D = radial_distance_matrix(X)
    k = int(min_pts - 1)
    kth_distances = np.sort(np.partition(D, k, axis=1)[:, k])
    plt.figure(figsize=(8,5))
    plt.plot(kth_distances, linewidth=2)
    plt.title(f"K-Distance (Radial) - min_pts={min_pts}")
    plt.xlabel("Pontos ordenados")
    plt.ylabel(f"{k}-distance (radial)")
    plt.grid(True, alpha=0.3)
    plt.show()

plot_k_distance_radial(X_spheres, min_pts=5)

# Testando diferentes combinações
D = radial_distance_matrix(X_spheres)

params = [(0.15, 5), (0.2, 5), (0.25, 6), (0.3, 8), (0.4, 8)]
resultados = []

for eps, min_pts in params:
    db_radial = SkDBSCAN(eps=eps, min_samples=min_pts, metric='precomputed')
    labels_radial = db_radial.fit_predict(D)
    
    n_clusters = len(set(labels_radial)) - (1 if -1 in labels_radial else 0)
    n_noise = np.sum(labels_radial == -1)
    
    resultados.append((eps, min_pts, n_clusters, n_noise))
    print(f"eps={eps}, min_samples={min_pts} → clusters: {n_clusters}, ruído: {n_noise}")

# A melhor
best_eps = 0.25
best_min_samples = 6

db_radial = SkDBSCAN(eps=best_eps, min_samples=best_min_samples, metric='precomputed')
labels_radial = db_radial.fit_predict(D)

fig = px.scatter_3d(
    x=X_spheres[:,0], y=X_spheres[:,1], z=X_spheres[:,2],
    color=labels_radial.astype(str),
    title=f"DBSCAN 3D - Distância Radial (eps={best_eps}, min_samples={best_min_samples})",
    color_discrete_sequence=px.colors.qualitative.Safe
)
fig.update_traces(marker=dict(size=3))
fig.show()

print(f"Número de clusters encontrados (Radial): {len(set(labels_radial)) - (1 if -1 in labels_radial else 0)}")
print(f"Pontos de ruído (Radial): {np.sum(labels_radial==-1)}")

db_euclid = SkDBSCAN(eps=best_eps, min_samples=best_min_samples)
labels_euclid = db_euclid.fit_predict(X_spheres)

fig2 = px.scatter_3d(
    x=X_spheres[:,0], y=X_spheres[:,1], z=X_spheres[:,2],
    color=labels_euclid.astype(str),
    title=f"DBSCAN 3D - Distância Euclidiana (eps={best_eps}, min_samples={best_min_samples})",
    color_discrete_sequence=px.colors.qualitative.Safe
)
fig2.update_traces(marker=dict(size=3))
fig2.show()

print(f"Número de clusters encontrados (Euclidiana): {len(set(labels_euclid)) - (1 if -1 in labels_euclid else 0)}")
print(f"Pontos de ruído (Euclidiana): {np.sum(labels_euclid==-1)}")

# A melhor combinação foi eps = 0.25 e min_samples = 6 
# A distância radial se mostra mais eficiente neste caso porque leva em conta apenas
# o quanto cada ponto está distante do centro, sem considerar o ângulo ou direção.
# Logo, pontos que estão no mesmo raio são vistos como próximos,
# mesmo que estejam em lados opostos da esfera.
# Já a distância euclidiana considera também a posição angular,
# o que faz com que pontos da mesma esfera pareçam mais afastados do que realmente estão.

### Exercício 3: Detecção de Anomalias com DBSCAN e DTW

O **DTW (Dynamic Time Warping)** mede a similaridade entre séries temporais mesmo quando estão defasadas ou com velocidades diferentes, alinhando-as de forma elástica. Isso permite detectar padrões semelhantes sem que a defasagem atrapalhe.

Pode ser calculado por:
```python
from dtaidistance import dtw

n = len(X)              # número de séries
D = np.zeros((n, n))    # matriz de distâncias

for i in range(n):
    for j in range(i+1, n):
        dist = dtw.distance_fast(X[i], X[j])  # distância DTW
        D[i, j] = D[j, i] = dist              # matriz simétrica
````

**Tarefas:**
1. Use o dataset de senóides com variação e **anomalias simuladas**.  
2. Adicione a métrica DTW no DBSCAN.
3. Experimente diferentes valores de `eps` e `min_samples` até que o modelo consiga separar bem séries normais das anômalas.  
4. Plote todas as séries, usando uma cor para as normais e outra para as anomalias detectadas (`label = -1`).  

In [None]:
def generate_time_series_dataset(n_series=50, length=100, noise=0.1, n_outliers=2, random_state=42):
    rng = np.random.default_rng(random_state)
    X, y = [], []
    t = np.linspace(0, 4*np.pi, length)

    # séries normais: senóide com amplitude e frequência ligeiramente diferentes
    for _ in range(n_series):
        amp = rng.uniform(0.8, 1.2)         # amplitude
        freq = rng.uniform(0.9, 1.1)        # frequência
        phase = rng.uniform(0, 0.5*np.pi)   # pequena defasagem
        series = amp * np.sin(freq * t + phase) + noise * rng.normal(size=length)
        X.append(series)
        y.append(0)  # normal

    # outliers: picos ou deslocamentos fortes
    for _ in range(n_outliers):
        amp = rng.uniform(1.5, 2.0)         # amplitude anômala
        freq = rng.uniform(1.2, 1.5)        # frequência anômala
        series = amp * np.sin(freq * t) + noise * rng.normal(size=length)
        if rng.random() < 0.5:
            series[length//2] += 3  # pico
        else:
            series += rng.normal(2.0, 0.5)  # deslocamento
        X.append(series)
        y.append(-1)  # anomalia

    return np.array(X), np.array(y)

X_series, y_series = generate_time_series_dataset()

plt.figure(figsize=(10,4))
for i in range(5):
    plt.plot(X_series[i], alpha=0.7, label="normal" if i==0 else "")
for i in range(-3,0):
    plt.plot(X_series[i], alpha=0.7, color="red", label="anomalia" if i==-1 else "")
plt.title("Séries temporais com anomalias")
plt.legend()
plt.show()


# --- Exercício 3: DBSCAN com DTW ---

from dtaidistance import dtw

# 1) Construir matriz de distâncias DTW
n = len(X_series)
D = np.zeros((n, n))
for i in range(n):
    for j in range(i+1, n):
        dist = dtw.distance_fast(X_series[i], X_series[j])
        D[i, j] = D[j, i] = dist

# 2) Rodar DBSCAN com matriz DTW
eps, min_samples = 10, 3
db_dtw = SklearnDBSCAN(eps=eps, min_samples=min_samples, metric="precomputed")
labels_dtw = db_dtw.fit_predict(D)

# 3) Visualizar resultados
plt.figure(figsize=(12, 5))
for i, series in enumerate(X_series):
    if labels_dtw[i] == -1:
        plt.plot(series, color="red", alpha=0.7, label="Anomalia" if i == list(labels_dtw).index(-1) else "")
    else:
        plt.plot(series, color="blue", alpha=0.5, label="Normal" if i == 0 else "")
plt.title("DBSCAN + DTW - Detecção de Anomalias")
plt.legend()
plt.show()

print(f"Clusters encontrados: {len(set(labels_dtw)) - (1 if -1 in labels_dtw else 0)}")
print(f"Anomalias detectadas: {np.sum(labels_dtw == -1)}")


In [None]:
!pip install dtaidistance

import numpy as np
import matplotlib.pyplot as plt
from dtaidistance import dtw
from sklearn.cluster import DBSCAN

def generate_time_series_dataset(n_series=50, length=100, noise=0.1, n_outliers=2, random_state=42):
    rng = np.random.default_rng(random_state)
    X, y = [], []
    t = np.linspace(0, 4*np.pi, length)

    for _ in range(n_series):
        amp = rng.uniform(0.8, 1.2)
        freq = rng.uniform(0.9, 1.1)
        phase = rng.uniform(0, 0.5*np.pi)
        series = amp * np.sin(freq * t + phase) + noise * rng.normal(size=length)
        X.append(series)
        y.append(0)

    for _ in range(n_outliers):
        amp = rng.uniform(1.5, 2.0)
        freq = rng.uniform(1.2, 1.5)
        series = amp * np.sin(freq * t) + noise * rng.normal(size=length)
        if rng.random() < 0.5:
            series[length//2] += 3
        else:
            series += rng.normal(2.0, 0.5)
        X.append(series)
        y.append(-1)
    return np.array(X), np.array(y)

X_series, y_series = generate_time_series_dataset()

n = len(X_series)
D = np.zeros((n, n))
for i in range(n):
    for j in range(i+1, n):
        dist = dtw.distance_fast(X_series[i], X_series[j])
        D[i, j] = D[j, i] = dist

db_dtw = DBSCAN(eps=15, min_samples=3, metric="precomputed")
labels_dtw = db_dtw.fit_predict(D)

plt.figure(figsize=(10,5))
for i, series in enumerate(X_series):
    if labels_dtw[i] == -1:
        plt.plot(series, color='red', alpha=0.7)
    else:
        plt.plot(series, color='blue', alpha=0.4)
plt.title("Detecção de Anomalias com DBSCAN + DTW")
plt.xlabel("Tempo")
plt.ylabel("Valor")
plt.show()

print(f"Clusters encontrados: {len(set(labels_dtw)) - (1 if -1 in labels_dtw else 0)}")
print(f"Anomalias detectadas: {np.sum(labels_dtw==-1)}")