# Experimento 4

In [2]:
!pip install minisom

Collecting minisom
  Downloading minisom-2.3.5.tar.gz (12 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: minisom
  Building wheel for minisom (setup.py) ... [?25l[?25hdone
  Created wheel for minisom: filename=MiniSom-2.3.5-py3-none-any.whl size=12031 sha256=5cdece4ef1c1a2d4df3090a9c4ceccae2b58c776a8b32b76d9806ba796974422
  Stored in directory: /root/.cache/pip/wheels/0f/8c/a4/5b7aa56fa6ef11d536d45da775bcc5a2a1c163ff0f8f11990b
Successfully built minisom
Installing collected packages: minisom
Successfully installed minisom-2.3.5


In [None]:
# ==========================================================
# IMPORTS
# ==========================================================
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

import numpy as np
import matplotlib.pyplot as plt

from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import BaggingClassifier, AdaBoostClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay

from minisom import MiniSom

# ==========================================================
# CONFIGURAÇÕES GERAIS
# ==========================================================
device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 128
epochs_cnn = 10

num_classes = 10
min_cluster_size = 50

# Variação do SOM (atividade do aluno)
som_sizes = [(5, 5), (10, 10), (15, 15)]

# ==========================================================
# CNN PARA CLASSIFICAÇÃO E EXTRAÇÃO DE FEATURES
# ==========================================================
class CNNFeatureExtractor(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.fc = nn.Linear(64 * 7 * 7, num_classes)

    def forward(self, x, return_features=False):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)

        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)

        features = x.view(x.size(0), -1)

        if return_features:
            return features

        return self.fc(features)

# ==========================================================
# TREINAMENTO DA CNN
# ==========================================================
def train_cnn(model, loader):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    model.train()
    for epoch in range(epochs_cnn):
        for x, y in loader:
            x, y = x.to(device), y.to(device)

            logits = model(x)
            loss = criterion(logits, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        print(f"[CNN] Epoch {epoch+1}/{epochs_cnn} | Loss: {loss.item():.4f}")

# ==========================================================
# EXTRAÇÃO DE FEATURES
# ==========================================================
def extract_features(model, loader):
    model.eval()
    X, y = [], []

    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            feats = model(images, return_features=True)

            X.append(feats.cpu().numpy())
            y.append(labels.numpy())

    return np.vstack(X), np.hstack(y)

# ==========================================================
# MATRIZ DE CONFUSÃO
# ==========================================================
def plot_confusion(y_true, y_pred, title):
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(cm, display_labels=range(num_classes))
    disp.plot(cmap="Blues", values_format="d")
    plt.title(title)
    plt.grid(False)
    plt.show()

# ==========================================================
# PUREZA DOS CLUSTERS
# ==========================================================
def cluster_purity(clusters, labels):
    purity = {}
    unique_clusters = np.unique(clusters, axis=0)

    for c in unique_clusters:
        idx = np.all(clusters == c, axis=1)
        labels_c = labels[idx]

        if len(labels_c) == 0:
            continue

        counts = np.bincount(labels_c, minlength=num_classes)
        purity[(c[0], c[1])] = counts.max() / counts.sum()

    return purity

# ==========================================================
# ASSOCIAÇÃO AMOSTRA → CLUSTER SOM
# ==========================================================
def som_clusters(som, X):
    return np.array([som.winner(x) for x in X])

# ==========================================================
# PREDIÇÃO COM MODELOS LOCAIS (SOM)
# ==========================================================
def predict_som_models(X, clusters, models, fallback):
    preds = []
    for x, c in zip(X, clusters):
        model = models.get(tuple(c))
        if model:
            preds.append(model.predict([x])[0])
        else:
            preds.append(fallback.predict([x])[0])
    return np.array(preds)

# ==========================================================
# DATASET MNIST
# ==========================================================
transform = transforms.Compose([transforms.ToTensor()])

train_data = datasets.MNIST("data", train=True, download=True, transform=transform)
test_data  = datasets.MNIST("data", train=False, download=True, transform=transform)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_data, batch_size=batch_size, shuffle=False)

# ==========================================================
# 1) TREINAMENTO DA CNN
# ==========================================================
cnn = CNNFeatureExtractor().to(device)
train_cnn(cnn, train_loader)

# ==========================================================
# BASELINE CNN
# ==========================================================
cnn.eval()
y_pred_cnn = []

with torch.no_grad():
    for x, _ in test_loader:
        x = x.to(device)
        logits = cnn(x)
        y_pred_cnn.extend(torch.argmax(logits, dim=1).cpu().numpy())

y_test_img = test_data.targets.numpy()
plot_confusion(y_test_img, y_pred_cnn, "CNN – Dados Reais")

# ==========================================================
# 2) EXTRAÇÃO DE FEATURES
# ==========================================================
X_train, y_train = extract_features(cnn, train_loader)
X_test,  y_test  = extract_features(cnn, test_loader)

# ==========================================================
# 3) ENSEMBLES GLOBAIS
# ==========================================================
bagging_mlp = BaggingClassifier(
    estimator=MLPClassifier(hidden_layer_sizes=(128,), max_iter=300),
    n_estimators=2,
    n_jobs=-1
)
bagging_mlp.fit(X_train, y_train)

boosting_mlp = AdaBoostClassifier(
    estimator=MLPClassifier(hidden_layer_sizes=(128,), max_iter=200),
    n_estimators=2
)
boosting_mlp.fit(X_train, y_train)

svm_global = SVC(kernel="rbf", gamma="scale")
svm_global.fit(X_train, y_train)

# ==========================================================
# 4) SOM + ESPECIALISTAS LOCAIS
# ==========================================================
results = []

for grid in som_sizes:
    print(f"\n===== SOM {grid[0]}x{grid[1]} =====")

    som = MiniSom(
        grid[0], grid[1],
        X_train.shape[1],
        sigma=1.0,
        learning_rate=0.5
    )

    som.random_weights_init(X_train)
    som.train_random(X_train, 5000)

    train_clusters = som_clusters(som, X_train)
    test_clusters  = som_clusters(som, X_test)

    # ----------------------
    # Pureza dos clusters
    # ----------------------
    purity = cluster_purity(train_clusters, y_train)
    avg_purity = np.mean(list(purity.values()))
    print(f"Pureza média dos clusters: {avg_purity:.3f}")

    # ----------------------
    # SVM por cluster
    # ----------------------
    cluster_svms = {}
    unique_clusters = np.unique(train_clusters, axis=0)

    for cluster in unique_clusters:
        idx = np.all(train_clusters == cluster, axis=1)
        if np.sum(idx) < min_cluster_size:
            continue

        svm = SVC(kernel="rbf", gamma="scale")
        svm.fit(X_train[idx], y_train[idx])
        cluster_svms[tuple(cluster)] = svm

    y_pred = predict_som_models(
        X_test,
        test_clusters,
        cluster_svms,
        svm_global
    )

    acc = accuracy_score(y_test, y_pred)
    print(f"Acurácia SOM + SVM: {acc:.4f}")

    results.append({
        "som": f"{grid[0]}x{grid[1]}",
        "purity": avg_purity,
        "accuracy": acc
    })

    plot_confusion(y_test, y_pred, f"SOM {grid[0]}x{grid[1]} + SVM")

# ==========================================================
# RESUMO FINAL
# ==========================================================
print("\n===== RESULTADOS FINAIS =====")
for r in results:
    print(
        f"SOM {r['som']} | "
        f"Pureza média: {r['purity']:.3f} | "
        f"Acurácia: {r['accuracy']:.4f}"
    )


100%|██████████| 9.91M/9.91M [00:00<00:00, 51.8MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 2.26MB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 14.0MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 10.2MB/s]
