# Reconhecimento de Padrões (TIP8311) - Trabalho 3


**Professor:** Guilherme de Alencar Barreto  

<img src="https://loop.frontiersin.org/images/profile/243428/203" alt="Foto do Professor" width="150"/>


**Aluno:** Luis Felipe Carneiro de Souza    **Matrícula:** 535049

In [1]:
import pandas as pd
import numpy as np
import time
from scipy.spatial.distance import minkowski
from sklearn.preprocessing import normalize
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import seaborn as sns
import tqdm

In [2]:
data_path = "wall+following+robot+navigation+data/sensor_readings_24.data"

## Questão 01

Para este trabalho computacional, considere o conjunto de dados disponível no site abaixo.

Usando o conjunto original para 24 sensores de ultrassom. Pede-se:

1.1. Identificar para o problema em questão o número de classes, o númeor de instâncias/exemplos de cada classe e a dimensão do vetor de atributos.

1.2. Verificar se as matrizes de covariância das classes são invertíveis ou não.

1.3. Implementar e avaliar os seguintes classificadors: (1) classificador quadrático gaussiano (CQG) e (2) classificador de distância mímina ao protótipo¹ (DMP). Preencher a tabela abaixo após Nr = 100 Rodadas de treinamento/teste. Comente os resultados obtidos.


**Classificador ** | Média Desvio Padrão |


In [3]:
data = np.genfromtxt(fname=data_path, delimiter=",", dtype=str, encoding="utf-8")

X = data[:, :-1].astype(float)  # todas menos a última coluna convertidas para float
y = data[:, -1]                 # última coluna como string

y = y.reshape(-1, 1)

# X, y
X.shape

(5456, 24)

In [4]:
class CQG:
    def __init__(self, reg=1e-8):
        self.C = None         # classes
        self.mcov = None      # dict: classe -> matriz de covariância (p x p)
        self.means = None     # dict: classe -> vetor média (p x 1)
        self.priors = None    # dict: classe -> prior f(ωi)
        self.reg = reg        # regularização para estabilidade numérica

    def _mcov(self, X_i):
        # X_i: shape (p, N) -> p = nº features, N = nº amostras
        p, N = X_i.shape
        m = X_i.mean(axis=1).reshape(-1,1)
        R = (X_i @ X_i.T) / N
        C = R - (m @ m.T)
        return C

    def fit(self, X, y):
        """
        Treina o modelo.
        X: (n_samples, n_features)
        y: (n_samples, 1) ou (n_samples,)
        """
        y_flat = y.ravel()
        self.C = np.sort(np.unique(y_flat))
        self.mcov = {}
        self.means = {}
        self.priors = {}

        n = X.shape[0]
        for c in self.C:
            idx = np.where(y_flat == c)[0]
            Xc = X[idx]                 # shape (Nc, p)
            Nc = Xc.shape[0]
            if Nc == 0:
                raise ValueError(f"Classe {c} sem amostras.")

            # média como vetor coluna (p,1)
            mean_c = Xc.mean(axis=0).reshape(-1,1)

            # covariância usando _mcov (espera p x N)
            cov_c = self._mcov(Xc.T)

            # regularização na diagonal para evitar singularidade
            cov_c = cov_c + self.reg * np.eye(cov_c.shape[0])

            prior_c = Nc / n

            self.means[c] = mean_c
            self.mcov[c] = cov_c
            self.priors[c] = prior_c

    def _Qi(self, x_col, mean_col, inv_cov):
        """
        Calcula Q_i(x) = (x - m_i)^T C_i^{-1} (x - m_i)
        x_col, mean_col: (p,1)
        inv_cov: (p,p)
        retorna escalar
        """
        diff = x_col - mean_col
        return diff.T @ inv_cov @ diff

    def predict(self, X):
        """
        Prediz rótulos para X (n_samples, n_features).
        Retorna array com rótulos (mesmo tipo de self.C).
        """
        if self.C is None:
            raise ValueError("Modelo não treinado. Chame fit(X, y) primeiro.")

        n, p = X.shape
        preds = np.empty(n, dtype=self.C.dtype)

        # pré-computar inversas e log-determinantes por classe
        invs = {}
        logdets = {}
        for c in self.C:
            cov = self.mcov[c]
            # usar slogdet para estabilidade
            sign, logdet = np.linalg.slogdet(cov)
            if sign <= 0:
                # fallback (deveria ser raro por causa da regularização)
                logdet = np.log(np.linalg.det(cov) + 1e-20)
            invs[c] = np.linalg.inv(cov)
            logdets[c] = logdet

        for i in range(n):
            x_col = X[i].reshape(-1,1)
            best_c = None
            best_g = np.inf  # queremos o menor g_i^*(x)
            for c in self.C:
                inv_cov = invs[c]
                Qi = self._Qi(x_col, self.means[c], inv_cov)
                # g*_i(x) = Qi(x) + ln(|Ci|) - 2 ln(f(ωi))
                prior = self.priors[c]
                # evitar log(0)
                if prior <= 0:
                    raise ValueError(f"Prior da classe {c} é zero.")
                g = Qi + logdets[c] - 2.0 * np.log(prior)
                if g < best_g:
                    best_g = g
                    best_c = c
            preds[i] = best_c

        return preds


In [5]:
class Kmeans:
    """K-means sequencial (online) com alpha = 1 / count_i(t) e kmeans++."""

    def __init__(self, k=3, max_epochs=100, tol=1e-4, random_state=None,
                 init='random', handle_empty='reinit'):
        self.k = int(k)
        self.max_epochs = int(max_epochs)
        self.tol = float(tol)
        self.random_state = random_state
        self.init = init
        self.handle_empty = handle_empty

        self.centroids = None
        self.counts = None
        self.inertia_ = None
        self.n_iter_ = 0
        self.rng = np.random.default_rng(random_state)

    def _check_X(self, X):
        if not isinstance(X, np.ndarray):
            X = np.asarray(X, dtype=float)
        else:
            X = X.astype(float, copy=False)
        if X.ndim != 2:
            raise ValueError("X deve ser 2D com shape (n_samples, n_features).")
        if np.isnan(X).any():
            raise ValueError("X contém NaNs. Trate-os antes de ajustar.")
        return X

    def _init_centroids(self, X):
        n, _ = X.shape
        if self.k > n:
            raise ValueError("k não pode ser maior que o número de amostras.")

        if self.init == 'random':
            idx = self.rng.choice(n, size=self.k, replace=False)
            self.centroids = X[idx].astype(float).copy()
        elif self.init == 'kmeans++':
            # kmeans++ initialization
            centroids = np.empty((self.k, X.shape[1]), dtype=float)
            # 1) choose first centroid uniformly at random
            first_idx = int(self.rng.integers(0, n))
            centroids[0] = X[first_idx]
            # 2) choose remaining centroids
            # distances squared to nearest chosen centroid
            closest_dist_sq = np.sum((X - centroids[0])**2, axis=1)
            for c in range(1, self.k):
                # probability proportional to distance squared
                total = closest_dist_sq.sum()
                if total == 0.0:
                    # all points identical to chosen centroids; pick random remaining
                    next_idx = int(self.rng.integers(0, n))
                else:
                    probs = closest_dist_sq / total
                    next_idx = int(self.rng.choice(n, p=probs))
                centroids[c] = X[next_idx]
                # update closest distances squared
                dist_to_new = np.sum((X - centroids[c])**2, axis=1)
                closest_dist_sq = np.minimum(closest_dist_sq, dist_to_new)
            self.centroids = centroids
        else:
            # fallback para random
            idx = self.rng.choice(n, size=self.k, replace=False)
            self.centroids = X[idx].astype(float).copy()

        self.counts = np.zeros(self.k, dtype=int)

    def _handle_empty_clusters(self, X, labels):
        for i in range(self.k):
            if np.sum(labels == i) == 0:
                if self.handle_empty == 'reinit':
                    self.centroids[i] = X[self.rng.integers(0, X.shape[0])]
                    self.counts[i] = 0
                elif self.handle_empty == 'farthest':
                    dists = np.linalg.norm(X[:, None] - self.centroids[None, :], axis=2)
                    far_idx = np.argmax(np.min(dists, axis=1))
                    self.centroids[i] = X[far_idx]
                    self.counts[i] = 0

    def predict(self, X):
        if self.centroids is None:
            raise RuntimeError("Modelo não treinado. Chame fit(X) primeiro.")
        X = self._check_X(X)
        dists = np.linalg.norm(X[:, None, :] - self.centroids[None, :, :], axis=2)
        return np.argmin(dists, axis=1)

    def fit(self, X):
        X = self._check_X(X)
        n, _ = X.shape
        self._init_centroids(X)
        prev_centroids = self.centroids.copy()

        for epoch in range(1, self.max_epochs + 1):
            perm = self.rng.permutation(n)
            X_shuffled = X[perm]

            for x in X_shuffled:
                dists = np.linalg.norm(self.centroids - x, axis=1)
                i_star = int(np.argmin(dists))
                self.counts[i_star] += 1
                alpha = 1.0 / self.counts[i_star]
                self.centroids[i_star] = (1 - alpha) * self.centroids[i_star] + alpha * x

            labels = self.predict(X)
            self._handle_empty_clusters(X, labels)

            shift = np.linalg.norm(self.centroids - prev_centroids)
            prev_centroids = self.centroids.copy()
            self.n_iter_ = epoch

            dists = np.linalg.norm(X - self.centroids[labels], axis=1)
            self.inertia_ = float(np.sum(dists ** 2))

            if shift < self.tol:
                break

        return self


In [None]:
import numpy as np
from collections import Counter

# ---------- Funções manuais de índices e utilitários ----------
def _pairwise_distances(X, Y=None):
    if Y is None:
        Y = X
    XX = np.sum(X * X, axis=1)[:, None]
    YY = np.sum(Y * Y, axis=1)[None, :]
    D2 = XX + YY - 2 * (X @ Y.T)
    D2 = np.maximum(D2, 0.0)
    return np.sqrt(D2)

def silhouette_score_manual(X, labels):
    X = np.asarray(X, dtype=float)
    labels = np.asarray(labels)
    unique = np.unique(labels)
    n_clusters = unique.size
    n_samples = X.shape[0]
    if n_clusters < 2 or n_samples <= 1:
        return -1.0
    D = _pairwise_distances(X)
    sil_vals = np.zeros(n_samples, dtype=float)
    for idx in range(n_samples):
        lab = labels[idx]
        same_idx = np.where(labels == lab)[0]
        if same_idx.size <= 1:
            a = 0.0
        else:
            a = (np.sum(D[idx, same_idx]) - 0.0) / (same_idx.size - 1)
        b_vals = []
        for other_lab in unique:
            if other_lab == lab:
                continue
            other_idx = np.where(labels == other_lab)[0]
            if other_idx.size == 0:
                continue
            b_vals.append(np.mean(D[idx, other_idx]))
        b = np.min(b_vals) if len(b_vals) > 0 else 0.0
        denom = max(a, b)
        sil_vals[idx] = 0.0 if denom == 0.0 else (b - a) / denom
    return float(np.mean(sil_vals))

def calinski_harabasz_score_manual(X, labels):
    X = np.asarray(X, dtype=float)
    labels = np.asarray(labels)
    n_samples, _ = X.shape
    unique = np.unique(labels)
    k = unique.size
    if k < 2 or n_samples <= k:
        return -np.inf
    overall_mean = np.mean(X, axis=0)
    B = 0.0
    W = 0.0
    for lab in unique:
        Xi = X[labels == lab]
        ni = Xi.shape[0]
        if ni == 0:
            continue
        mean_i = np.mean(Xi, axis=0)
        B += ni * np.sum((mean_i - overall_mean) ** 2)
        W += np.sum((Xi - mean_i) ** 2)
    if W == 0.0:
        return np.inf
    score = (B * (n_samples - k)) / (W * (k - 1))
    return float(score)

def davies_bouldin_score_manual(X, labels):
    X = np.asarray(X, dtype=float)
    labels = np.asarray(labels)
    unique = np.unique(labels)
    k = unique.size
    if k < 2:
        return np.inf
    centroids = []
    s = []
    for lab in unique:
        Xi = X[labels == lab]
        if Xi.shape[0] == 0:
            centroids.append(np.zeros(X.shape[1], dtype=float))
            s.append(0.0)
            continue
        mean_i = np.mean(Xi, axis=0)
        centroids.append(mean_i)
        dists = np.linalg.norm(Xi - mean_i, axis=1)
        s.append(np.mean(dists) if dists.size > 0 else 0.0)
    centroids = np.vstack(centroids)
    s = np.array(s, dtype=float)
    C = _pairwise_distances(centroids)
    R = np.full((k, k), -np.inf, dtype=float)
    for i in range(k):
        for j in range(k):
            if i == j:
                continue
            denom = C[i, j]
            R[i, j] = np.inf if denom == 0.0 else (s[i] + s[j]) / denom
    R_i = np.max(R, axis=1)
    return float(np.mean(R_i))

In [24]:
class DMP:
    """
    Classificador DMP com múltiplos protótipos por classe.
    Usa Kmeans externo (deve existir no escopo) e índices manuais.
    """
    def __init__(self, k_max=5, Nr=5, kmeans_params=None, validation_indices=None):
        self.k_max = int(k_max)
        self.Nr = int(Nr)
        self.kmeans_params = {} if kmeans_params is None else dict(kmeans_params)
        self.validation_indices = ['silhouette', 'calinski', 'davies'] if validation_indices is None else validation_indices

        self.class_prototypes_ = {}
        self.class_Kopt_ = {}
        self.prototype_labels_ = None
        self.prototypes_ = None

    def _validate_X(self, X):
        if not isinstance(X, np.ndarray):
            X = np.asarray(X, dtype=float)
        else:
            X = X.astype(float, copy=False)
        if X.ndim != 2:
            raise ValueError("X deve ser 2D com shape (n_samples, n_features).")
        if np.isnan(X).any():
            raise ValueError("X contém NaNs. Trate-os antes de ajustar.")
        return X

    def _validate_y(self, y, n_samples):
        y = np.asarray(y).ravel()
        if y.shape[0] != n_samples:
            raise ValueError("y deve ter o mesmo número de amostras que X.")
        return y

    def fit(self, X, y):
        X = self._validate_X(X)
        y = self._validate_y(y, X.shape[0])
        classes = np.unique(y)

        for cls in classes:
            mask = (y == cls)
            Xc = X[mask]
            nc = Xc.shape[0]
            if nc == 0:
                continue
            k_max_local = min(self.k_max, nc)
            best_centroids_for_k = {}
            best_ssd_for_k = {}

            for k in range(1, k_max_local + 1):
                best_ssd = np.inf
                best_centroids = None
                for r in range(self.Nr):
                    params = dict(self.kmeans_params)
                    params.update({'k': k})
                    km = Kmeans(**params)   # Kmeans deve estar definido
                    km.fit(Xc)
                    ssd = km.inertia_ if hasattr(km, 'inertia_') else np.sum((Xc - km.centroids[km.predict(Xc)])**2)
                    if ssd < best_ssd:
                        best_ssd = ssd
                        best_centroids = km.centroids.copy()
                best_centroids_for_k[k] = best_centroids
                best_ssd_for_k[k] = best_ssd

            index_scores = {'silhouette': {}, 'calinski': {}, 'davies': {}}
            for k, centroids in best_centroids_for_k.items():
                dists = np.linalg.norm(Xc[:, None, :] - centroids[None, :, :], axis=2)
                labels_k = np.argmin(dists, axis=1)
                index_scores['silhouette'][k] = silhouette_score_manual(Xc, labels_k)
                index_scores['calinski'][k] = calinski_harabasz_score_manual(Xc, labels_k)
                index_scores['davies'][k] = davies_bouldin_score_manual(Xc, labels_k)

            suggested_K = []
            if 'silhouette' in self.validation_indices:
                best_k_sil = max(index_scores['silhouette'].items(), key=lambda kv: kv[1])[0]
                suggested_K.append(best_k_sil)
            if 'calinski' in self.validation_indices:
                best_k_cal = max(index_scores['calinski'].items(), key=lambda kv: kv[1])[0]
                suggested_K.append(best_k_cal)
            if 'davies' in self.validation_indices:
                best_k_dav = min(index_scores['davies'].items(), key=lambda kv: kv[1])[0]
                suggested_K.append(best_k_dav)

            counts = Counter(suggested_K)
            most_common = counts.most_common()
            if len(most_common) == 0:
                Kopt = 1
            else:
                top_count = most_common[0][1]
                candidates = [k for k, cnt in most_common if cnt == top_count]
                Kopt = min(candidates)

            self.class_prototypes_[cls] = best_centroids_for_k[Kopt]
            self.class_Kopt_[cls] = Kopt

        prototypes_list = []
        proto_labels = []
        for cls in classes:
            cent = self.class_prototypes_.get(cls)
            if cent is None:
                continue
            prototypes_list.append(cent)
            proto_labels.extend([cls] * cent.shape[0])

        if len(prototypes_list) == 0:
            raise RuntimeError("Nenhum protótipo foi gerado. Verifique os dados e parâmetros.")

        self.prototypes_ = np.vstack(prototypes_list)
        self.prototype_labels_ = np.array(proto_labels)
        return self

    def predict(self, X):
        if self.prototypes_ is None:
            raise RuntimeError("Modelo não treinado. Chame fit(X, y) primeiro.")
        X = self._validate_X(X)
        dists = np.linalg.norm(X[:, None, :] - self.prototypes_[None, :, :], axis=2)
        idx = np.argmin(dists, axis=1)
        return self.prototype_labels_[idx].reshape(-1, 1)

    def predict_proba_distance(self, X):
        if self.prototypes_ is None:
            raise RuntimeError("Modelo não treinado. Chame fit(X, y) primeiro.")
        X = self._validate_X(X)
        dists = np.linalg.norm(X[:, None, :] - self.prototypes_[None, :, :], axis=2)
        idx = np.argmin(dists, axis=1)
        min_dists = dists[np.arange(dists.shape[0]), idx]
        return min_dists, idx


In [9]:
class PCA:
    """
    PCA manual com duas abordagens:
      - method='eig' : autovalores/autovetores da matriz de covariancia
      - method='svd' : SVD da matriz de dados centrada
    Retorna componentes (autovetores), variancias (autovalores), e média.
    """
    def __init__(self, n_components=None, method='svd', whiten=False):
        self.n_components = n_components
        self.method = method
        self.whiten = bool(whiten)

        self.components_ = None        # shape (q, p)
        self.explained_variance_ = None
        self.explained_variance_ratio_ = None
        self.mean_ = None
        self.n_samples_ = None
        self.n_features_ = None

    def fit(self, X):
        X = np.asarray(X, dtype=float)
        if X.ndim != 2:
            raise ValueError("X deve ser 2D (n_samples, n_features).")
        n, p = X.shape
        self.n_samples_ = n
        self.n_features_ = p
        self.mean_ = X.mean(axis=0)
        Xc = X - self.mean_

        if self.method == 'eig':
            # matriz de covariancia (unbiased)
            C = np.dot(Xc.T, Xc) / (n - 1)
            vals, vecs = np.linalg.eigh(C)   # eigh para simétrica
            # ordenar decrescente
            idx = np.argsort(vals)[::-1]
            vals = vals[idx]
            vecs = vecs[:, idx]
        elif self.method == 'svd':
            # SVD direto em Xc: Xc = U S Vt
            # autovalores de C = (S^2)/(n-1)
            U, S, Vt = np.linalg.svd(Xc, full_matrices=False)
            vals = (S**2) / (n - 1)
            vecs = Vt.T
        else:
            raise ValueError("method deve ser 'eig' ou 'svd'.")

        # número de componentes
        if self.n_components is None:
            q = min(n, p)
        else:
            q = int(self.n_components)
            if q <= 0 or q > min(n, p):
                raise ValueError("n_components inválido.")

        self.components_ = vecs[:, :q].T            # (q, p)
        self.explained_variance_ = vals[:q].copy()
        total_var = vals.sum()
        self.explained_variance_ratio_ = (vals[:q] / total_var) if total_var > 0 else np.zeros_like(vals[:q])

        if self.whiten:
            # divide componentes por sqrt(autovalores)
            eps = 1e-12
            self.components_ = self.components_ / np.sqrt(self.explained_variance_[:, None] + eps)

        return self

    def transform(self, X):
        X = np.asarray(X, dtype=float)
        if self.components_ is None:
            raise RuntimeError("Chame fit(X) antes de transform.")
        Xc = X - self.mean_
        return np.dot(Xc, self.components_.T)   # (n_samples, q)

    def inverse_transform(self, Z):
        Z = np.asarray(Z, dtype=float)
        if self.components_ is None:
            raise RuntimeError("Chame fit(X) antes de inverse_transform.")
        Xc_rec = np.dot(Z, self.components_)
        return Xc_rec + self.mean_

    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)

In [None]:
acc_cqg = []
acc_mdp = []

for _ in tqdm.tqdm(range(100)):
    idx = np.random.permutation(X.shape[0])
    X, y = X[idx], y[idx]

    split = int(0.7 * len(X))
    X_train, X_test = X[:split], X[split:]
    y_train, y_test = y[:split], y[split:]

    model_cqg = CQG()
    model_mdp = DMP(k_max=6, Nr=11, kmeans_params={'init': 'kmeans++', 'random_state': 42}, validation_indices=['davies'])

    model_cqg.fit(X_train, y_train)
    y_pred_cqg = model_cqg.predict(X_test)

    model_mdp.fit(X_train, y_train)
    y_pred_mdp = model_mdp.predict(X_test)

    acc_cqg.append(np.mean(y_pred_cqg == y_test.ravel()))
    acc_mdp.append(np.mean(y_pred_mdp == y_test.ravel()))


In [25]:
model_mdp = DMP(k_max=6, Nr=11, kmeans_params={'init': 'kmeans++'}, validation_indices=['davies'])

model_mdp.fit(X, y)
y_pred = model_mdp.predict(X)

array([['Slight-Right-Turn'],
       ['Sharp-Right-Turn'],
       ['Sharp-Right-Turn'],
       ...,
       ['Move-Forward'],
       ['Move-Forward'],
       ['Move-Forward']], shape=(5456, 1), dtype='<U17')

## Questão 02

Aplicar PCA ao conjunto original de 24 sensores. Pede-se:

2.1. Determinar o número de componenets (q) adequado para o problema, ou seja, que promova uma redução de dimensão dos vetores de atributo sem piorar o desempenho dos classificadores implementados. Mostre o gráfico da variância explicada VE(q).

2.2. Repetir o experimento so Subitem 1.3. para os dados transformados por PCA, preenchendo uma tabela de resultados similar. Comente os resultados obtidos.

**OBS.: A implementação das tarefas pedidas nas Questões 1 e 2 é feita simultaneamente, na verdade. Enquano um classificador é testado sem PCA, já se pode testa-lo também após aplicação de PCA. Apenas a apresentação dos resultados é que é separada em duas questões para facilitar melhoro entendimento do efeito da aplicação de PCA**

---
¹ Este classificador é construído aplicando-se o algoritmo de K-médias aos dados de cada classe em separado. Cada classe terá seu número de protótipos, sendo que estes herdam o rótulo da classe a qual pertencem. Durante o teste, funciona como classificador distância mínima ao centroide, onde deve-se encontrar o protótipo da classe mais próxima. Para mais detalhes, vide slides do assunto "Introdução à Clusterização - Métodos Particionais"