## Consigna

### LDA

1. "Tensorizar" el modelo LDA y comparar sus tiempos de predicción con el modelo antes implementado. *Notar que, en modo tensorizado, se puede directamente precomputar $\mu^T \cdot \Sigma^{-1} \in \mathbb{R}^{k \times 1 \times p}$ y guardar eso en vez de $\Sigma^{-1}$.*
2. LDA no sufre del problema antes descrito de QDA debido a que no computa productos internos, por lo que no tiene un verdadero costo extra en memoria predecir "en batch". Implementar el modelo `FasterLDA` y comparar sus tiempos de predicción con las versiones anteriores de LDA.

# Modelo

### Imports

In [39]:
import numpy as np
from numpy.linalg import det, inv

In [40]:
### ClassEncoder

In [41]:
class ClassEncoder:
  def fit(self, y):
    self.names = np.unique(y)
    self.name_to_class = {name:idx for idx, name in enumerate(self.names)}
    self.fmt = y.dtype

  def _map_reshape(self, f, arr):
    return np.array([f(elem) for elem in arr.flatten()]).reshape(arr.shape)

  def transform(self, y):
    return self._map_reshape(lambda name: self.name_to_class[name], y)

  def fit_transform(self, y):
    self.fit(y)
    return self.transform(y)

  def detransform(self, y_hat):
    return self._map_reshape(lambda idx: self.names[idx], y_hat)

### BaseBayesianClassifier

In [42]:
class BaseBayesianClassifier:
  def __init__(self):
    self.encoder = ClassEncoder()

  def _estimate_a_priori(self, y):
    a_priori = np.bincount(y.flatten().astype(int)) / y.size
    return np.log(a_priori)

  def _fit_params(self, X, y):
    # estimate all needed parameters for given model
    raise NotImplementedError()

  def _predict_log_conditional(self, x, class_idx):
    # predict the log(P(x|G=class_idx)), the log of the conditional probability of x given the class
    # this should depend on the model used
    raise NotImplementedError()

  def fit(self, X, y, a_priori=None):
    # first encode the classes
    y = self.encoder.fit_transform(y)

    # if it's needed, estimate a priori probabilities
    self.log_a_priori = self._estimate_a_priori(y) if a_priori is None else np.log(a_priori)

    # check that a_priori has the correct number of classes
    assert len(self.log_a_priori) == len(self.encoder.names), "A priori probabilities do not match number of classes"

    # now that everything else is in place, estimate all needed parameters for given model
    self._fit_params(X, y)

  def predict(self, X):
    # this is actually an individual prediction encased in a for-loop
    m_obs = X.shape[1] # nro. de observaciones (Columnas de X)
    y_hat = np.empty(m_obs, dtype=self.encoder.fmt) # 1 x m

    for i in range(m_obs):
      encoded_y_hat_i = self._predict_one(X[:,i].reshape(-1,1))
      # Es la predicción numérica 0, 1 ó 2

      y_hat[i] = self.encoder.names[encoded_y_hat_i]
      # Es el valor de la predicción en texto (descripción)

      # y_hat es el array de predicciones (descripciones) 90 x 1
      # Se devuelve 1 x 90

    # return prediction as a row vector (matching y)
    return y_hat.reshape(1,-1)

  def _predict_one(self, x):
    # calculate all log posteriori probabilities (actually, +C)
    log_posteriori = [ log_a_priori_i + self._predict_log_conditional(x, idx) for idx, log_a_priori_i
                  in enumerate(self.log_a_priori) ]

    # Log posteriori es un array de las probabilidades de cada clase
    # Ej.: [array([[-289.46784276]]), array([[2.3818009]]), array([[-2.34841277]])]
    # Devuelve el índice con la probabilidad mayor (0, 1, ó 2)

    # return the class that has maximum a posteriori probability
    return np.argmax(log_posteriori)

### LDA

In [43]:
class LDA(BaseBayesianClassifier):
    """
    Clasifica los datos basándose en modelos Gaussianos con una matriz de
    covarianza común para todas las clases.
    """

    def _fit_params(self, X, y):
        """
        Ajusta los parámetros del modelo LDA.
        Calcula la matriz de covarianza común y las medias de cada clase.
        """
        # Número de clases
        num_classes = len(self.log_a_priori)

        # Calcular la media de cada clase
        self.means = [X[:, y.flatten() == idx].mean(axis=1, keepdims=True)
                      for idx in range(num_classes)]

        # Calcular la matriz de covarianza común
        # Inicializar matriz de covarianza común
        cov = np.zeros((X.shape[0], X.shape[0]))

        for idx in range(num_classes):
            X_class = X[:, y.flatten() == idx]
            cov += np.cov(X_class, bias=True) * X_class.shape[1]

        cov /= X.shape[1]  # Dividir por el número total de observaciones
        self.inv_cov = inv(cov)

    def _predict_log_conditional(self, x, class_idx):
        """
        Predice el logaritmo de la probabilidad condicional de x dado class_idx.
        """
        unbiased_x = x - self.means[class_idx]
        return -0.5 * unbiased_x.T @ self.inv_cov @ unbiased_x


In [44]:
class TensorizedLDA(LDA):
    def _fit_params(self, X, y):
        super()._fit_params(X, y)
        self.tensor_inv_cov = self.inv_cov
        self.tensor_means = np.stack(self.means)

        # Precomputar el producto de la media y la matriz inversa de covarianza
        self.muT_inv_cov = self.tensor_means.transpose(0, 2, 1) @ self.tensor_inv_cov

    def _predict_log_conditionals(self, x):
        unbiased_x = x - self.tensor_means
        inner_prod = unbiased_x.transpose(0, 2, 1) @ self.tensor_inv_cov @ unbiased_x
        return -0.5 * inner_prod.flatten()

    def _predict_one(self, x):
        return np.argmax(self.log_a_priori + self._predict_log_conditionals(x))

In [45]:
class FasterLDA(LDA):
    def predict(self, X):
        log_posteriori = np.zeros((len(self.log_a_priori), X.shape[1]))
        for idx, log_a_priori_i in enumerate(self.log_a_priori):
            unbiased_X = X - self.means[idx]
            log_conditional = -0.5 * np.sum(unbiased_X.T @ self.inv_cov * unbiased_X.T, axis=1)
            log_posteriori[idx, :] = log_a_priori_i + log_conditional
        return self.encoder.detransform(np.argmax(log_posteriori, axis=0))

## Código para pruebas

### Hiperparámetros

In [46]:
# hiperparámetros
rng_seed = 6543

### DataSets

In [47]:
from sklearn.datasets import load_iris, fetch_openml

def get_iris_dataset():
  data = load_iris()
  X_full = data.data
  y_full = np.array([data.target_names[y] for y in data.target.reshape(-1,1)])
  return X_full, y_full

def get_penguins():
    # get data
    df, tgt = fetch_openml(name="penguins", return_X_y=True, as_frame=True, parser='auto')

    # drop non-numeric columns
    df.drop(columns=["island","sex"], inplace=True)

    # drop rows with missing values
    mask = df.isna().sum(axis=1) == 0
    df = df[mask]
    tgt = tgt[mask]

    return df.values, tgt.to_numpy().reshape(-1,1)

# showing for iris
X_full, y_full = get_iris_dataset()

print(f"X: {X_full.shape}, Y:{y_full.shape}")

X: (150, 4), Y:(150, 1)


In [48]:
# peek data matrix
X_full[:5]

array([[5.1, 3.5, 1.4, 0.2],
       [4.9, 3. , 1.4, 0.2],
       [4.7, 3.2, 1.3, 0.2],
       [4.6, 3.1, 1.5, 0.2],
       [5. , 3.6, 1.4, 0.2]])

In [49]:
# peek target vector
y_full[:5]

array([['setosa'],
       ['setosa'],
       ['setosa'],
       ['setosa'],
       ['setosa']], dtype='<U10')

### Split del DataSet en Train/Test

In [50]:
# preparing data, train - test validation
# 70-30 split
from sklearn.model_selection import train_test_split

def split_transpose(X, y, test_sz, random_state):
    # split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=random_state)

    # transpose so observations are column vectors
    return X_train.T, y_train.T, X_test.T, y_test.T

def accuracy(y_true, y_pred):
  return (y_true == y_pred).mean()

train_x, train_y, test_x, test_y = split_transpose(X_full, y_full, 0.4, rng_seed)

print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)

(4, 90) (1, 90) (4, 60) (1, 60)


### Entrenamiento y performance

In [51]:
lda = LDA()

lda.fit(train_x, train_y)

In [52]:
train_acc = accuracy(train_y, lda.predict(train_x))
test_acc = accuracy(test_y, lda.predict(test_x))
print(f"Train (apparent) error is {1-train_acc:.4f} while test error is {1-test_acc:.4f}")

Train (apparent) error is 0.0222 while test error is 0.0167


In [53]:
t_lda = TensorizedLDA()

t_lda.fit(train_x, train_y)

In [54]:
ttrain_acc = accuracy(train_y, t_lda.predict(train_x))
ttest_acc = accuracy(test_y, t_lda.predict(test_x))
print(f"Train (apparent) error is {1-ttrain_acc:.4f} while test error is {1-ttest_acc:.4f}")

Train (apparent) error is 0.0222 while test error is 0.0167


In [55]:
f_lda = FasterLDA()
f_lda.fit(train_x, train_y)
ftrain_acc = accuracy(train_y, f_lda.predict(train_x))
ftest_acc = accuracy(test_y, f_lda.predict(test_x))
print(f"FasterLDA - Train error: {1-ftrain_acc:.4f}, Test error: {1-ftest_acc:.4f}")

FasterLDA - Train error: 0.0222, Test error: 0.0167


In [56]:
ftrain_acc = accuracy(train_y, f_lda.predict(train_x))
ftest_acc = accuracy(test_y, f_lda.predict(test_x))
print(f"Train (apparent) error is {1-ftrain_acc:.4f} while test error is {1-ftest_acc:.4f}")

Train (apparent) error is 0.0222 while test error is 0.0167


In [57]:
%%timeit

lda.predict(test_x)

2.56 ms ± 287 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [58]:
%%timeit

t_lda.predict(test_x)

1.12 ms ± 191 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [59]:
%%timeit

f_lda.predict(test_x)

160 µs ± 56.7 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [60]:
%%timeit

model = LDA()
model.fit(train_x, train_y)
model.predict(test_x)

4.32 ms ± 108 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
