**D3APL: Aplicações em Ciência de Dados** <br/>
IFSP Campinas

Alexandre Freire da Silva Osorio <br/><br/>

# Implementando um Classificador Multiclasses a partir do dataset Wine

## Regressor Logístico com Gradient Descent e 3 tipos de otimizadores (estocástico, mini-batch e batch)

## 1. Carregando o dataset

#### Imports

In [17]:
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

#### Carregando o dataset wine

In [18]:
from sklearn.datasets import load_wine

X, y = load_wine(return_X_y=True)

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

In [19]:
print(f'X_train.shape = {X_train.shape}')
print(f'y_train.shape = {y_train.shape}')

print(f'X_test.shape = {X_test.shape}')
print(f'y_test.shape = {y_test.shape}')

X_train.shape = (133, 13)
y_train.shape = (133,)
X_test.shape = (45, 13)
y_test.shape = (45,)


## 2. Implementação do Classificador

Parâmetros do construtor:

- optimizer: str = default=“batch”
-- Algoritmo de otimização (“batch”, “mini”, “estocástico”)
- batch_sz: int = default=32
-- Tamanho do batch
- learning_rate: float, default=0,001
-- Taxa de Aprendizagem.
- n_epochs : int, default=1000
-- Número de épocas para treinamento (parada de convergência).
- alpha : float, default=0,0001
-- Constante que multiplica o termo de regularização.
-- Use 0 para ignorar a regularização (regressão logística padrão).
- random_state: int, default=42
-- Semente usada para gerar números aleatórios.

In [20]:
from typing import Tuple

import numpy as np
from numpy import ndarray

from sklearn.base import BaseEstimator, ClassifierMixin


class LogRegression(ClassifierMixin, BaseEstimator):
    
    def __init__(self, 
                 optimizer: str = 'batch',
                 batch_sz: int = 32,
                 learning_rate: float = 0.001,
                 n_epochs: int = 3000,
                 alpha: float = 0.0001, 
                 random_state: int = 42):
        """
         Parâmetros
         ----------
         optimizer: str = default=“batch”
             Algoritmo de otimização (“batch”, “mini”, “estocástico”)
         batch_sz: int = default=32
             Tamanho do batch
         learning_rate: float, default=0,001
             Taxa de Aprendizagem.
         n_epochs : int, default=1000
             Número de épocas para treinamento (parada de convergência).
         alpha : float, default=0,0001
             Constante que multiplica o termo de regularização.
             Use 0 para ignorar a regularização (regressão logística padrão).
         random_state: int, default=42
             Semente usada para gerar números aleatórios.
        """
        assert (optimizer is not None) and (optimizer != ''), \
        f'O otimizador deve ser batch, mini ou stochastic. Passou: {optimizer}'
        
        assert (batch_sz is not None) and (batch_sz > 0), \
        f'O tamanho do batch deve ser > 0. Passed: {batch_sz}'
        
        assert (learning_rate is not None) and (learning_rate > 0.0), \
        f'Learning rate deve ser > 0. Passed: {learning_rate}'
        
        assert (n_epochs is not None) and (n_epochs > 0), \
        f'Número de épocas deve ser > 0. Passed: {n_epochs}'
        
        assert (alpha is not None) and (alpha >= 0), \
        f'Alpha deve ser >= 0. Passed: {alpha}'
        
        self.optimizer = optimizer
        self.batch_sz = batch_sz
        self.learning_rate = learning_rate
        self.n_epochs = n_epochs
        self.alpha = alpha
        self.random_state = random_state
        
        # parâmetros a serem aprendidos
        MAX_CLASSES = 10
        MAX_FEATURES = 13

        self.__w = np.zeros((MAX_CLASSES, MAX_FEATURES))  # array de pesos (um vetor para cada classe)
        self.__b = np.zeros((MAX_CLASSES, 1))  # bias


    # Método especial usado para representar um objeto de classe como uma string, chamado com print() ou str()
    def __str__(self):
        msg = f'Otimizador: {self.optimizer}\n' \
        f'Tamanho do batch: {self.batch_sz}\n' \
        f'Taxa de aprendizado: {self.learning_rate}\n' \
        f'Número de épocas: {self.n_epochs}\n' \
        f'Constante de regularização (alpha): {self.alpha}\n' \
        f'Estado aleatório: {self.random_state}\n\n' \
        f'Treinado?: {self.is_fitted()}\n'
        return msg


    # getter: acessar a função como um atributo - não é possível definir valores através dele
    @property
    def coef_(self) -> ndarray:
        """Retorne a matriz de pesos (parâmetros aprendidos), caso o estimador tenha sido treinado.
           Caso contrário, lance uma exceção.
        """
        assert self.is_fitted(), 'Não foi treinado ainda.'
        return self.__w
    
    
    # getter: acessar a função como um atributo - não é possível definir valores através dele
    @property
    def intercept_(self) -> float:
        """Retorne o bias (intercepto aprendido) se o estimador foi treinado.
           Caso contrário, lance uma exceção.
        """
        assert self.is_fitted(), 'Não foi treinado ainda.'
        return self.__b
    
    
    def is_fitted(self) -> bool:
        a = np.zeros(self.__w.shape)
        if (self.__w == a).all():
            return False
        else:
            return True    


    def __sigmoid(self, z: ndarray) -> ndarray:
        return 1 / (1 + np.e ** (-z))


    def __log_loss(self, y: ndarray, p_hat: ndarray, eps: float = 1e-15):
        '''Retorne a loss para uma determinada estimativa e rótulos verdadeiros.

        
         Parâmetros
         ----------
         y : ndarray, shape (n_samples,)
             Rótulos verdadeiros de amostras de entrada.
         p_hat : ndarray
             Probabilidades estimadas de amostras de entrada.
         eps: float, default=1e-15
             Termo Epsilon usado para evitar uma loss indefinida em 0 e 1.
        
         Retorno
         -------
         log_loss: float
             Perda de log computada.        
        '''
        
        p_hat_eps = np.maximum(eps, np.minimum(1 - eps, p_hat))
        
        # shape: (n_samples,)
        losses = -(y * np.log(p_hat_eps) + (1 - y) * np.log(1 - p_hat_eps))
        log_loss = losses.mean()
        
        return log_loss
    
    
    def __gradient(self, X: ndarray, y: ndarray, p_hat: ndarray,
                   w: ndarray, alpha: float) -> Tuple[ndarray, float]:
        '''Calcule o vetor de gradiente para a log loss em relação aos pesos e bias.
        
        Parâmetros
        ----------
         X: ndarray de forma (n_samples, n_features)
             Dados de treinamento.
         y: ndarray de forma (n_samples,).
             Rótulos de destino (verdadeiros).
         p_hat : ndarray, forma (n_samples,)
             Probabilidades estimadas.
         w : ndarray, forma (n_features,)
             Matriz de peso.
         alfa: flutuar
             Constante de regularização.        
             
        Retornos
        -------
        Tuple[ndarray, float]: 
            Tupla com:
             - uma matriz numpy de forma (n_features,) contendo as derivadas parciais com respeito aos pesos; e
             - um float representando a derivada parcial com respeito ao bias.        
        '''
        
        regularization = alpha * w
    
        n_samples = X.shape[0]
        
        error = p_hat - y  # shape (n_samples,)
        grad_w = (np.dot(error, X) / n_samples) + regularization  # shape (n_features,)
        grad_b = error.mean()  # float
        
        return grad_w, grad_b

    
    # Transforma um vetor de saída multiclasses num vetor uniclasse
    def __make_bin(self, y: ndarray, curr_class: int) -> ndarray:
        return (y[:] == curr_class).astype(int)

        
    def __fit_core(self, X: ndarray, y: ndarray) -> Tuple[ndarray, ndarray]:
        ### INICIALIZACAO DE PARAMETROS
        # 
        w = np.random.randn(self.n_features)  # shape: (n_features,)
        b = 0.0

        # array that stores the loss of each epoch
        losses = []

        # Iterações do aprendizado
        for epoch in np.arange(self.n_epochs):

            ### Itera sobre cada batch_sz conjunto de amostras
            #
            i = 0
            while (i + self.n_samples_per_batch) <= X.shape[0]:
                X_batch = X[i:i + self.n_samples_per_batch, :]
                y_batch = y[i:i + self.n_samples_per_batch]

                z = np.dot(X_batch, w) + b

                p_hat = self.__sigmoid(z)

                loss_epoch = self.__log_loss(y_batch, p_hat)
                losses.append(loss_epoch)

                ### GRADIENT DESCENT 
                # grad_w.shape: (n_features,)
                # grad_b: float
                grad_w, grad_b = self.__gradient(X_batch, y_batch, p_hat, w, self.alpha)
                w = w - self.learning_rate * grad_w  # shape: (n_features)
                b = b - self.learning_rate * grad_b  # float

                i += self.n_samples_per_batch

        return w, b


    def fit(self, X: ndarray, y: ndarray):
        '''Treina o classificador.

        Parâmetros
        ----------
         X: ndarray de shape (n_samples, n_features)
             Conjunto de treinamento.
         y: ndarray de shape (n_samples,).
             Rótulos de destino (verdadeiros).
         batch_sz: int
             Tamanho do batch
        '''
        ### VERIFICAR AS DIMENSÕES DA MATRIZ DE ENTRADA (DO DATASET WINE)
        assert X.ndim == 2, f'X deve ser 2D. Passou: {X.ndim}'
        assert y.ndim == 1, f'y deve ser 1D. Passou: {y.ndim}'
        assert X.shape[0] == y.shape[0], \
            f'X.shape[0] deve ser igual a y.shape[0]. Ao invés: {X.shape[0]} != {y.shape[0]}'
        if self.optimizer == 'mini':
            assert self.batch_sz < X.shape[0], \
                f'O tamanho do batch deve ser menor ou igual a X.shape[0]. Ao invés: {self.batch_sz} != {X.shape[0]}'
        
        ### CONFIGURAR A SEMENTE DO GERADOR ALEATORIO
        np.random.seed(self.random_state)
        
        self.n_samples_per_batch, self.n_features = X.shape

        ### Calcula o número de amostras por batch
        if self.optimizer == 'batch':
            self.n_samples_per_batch = X.shape[0]
        elif self.optimizer == 'mini':
            self.n_samples_per_batch = self.batch_sz
        else: # optimizer = stochastic gradient descent
            self.n_samples_per_batch = 1

        ### Extrai as classes
        self.classes = np.unique(y)

        for c in self.classes:
            
            y_bin = self.__make_bin(y, c)

            ### ATRIBUI OS PARÂMETROS TREINADOS AOS ATRIBUTOS PRIVADOS
            self.__w[c], self.__b[c] = self.__fit_core(X, y_bin)


    def predict_proba(self, X: ndarray, curr_class: int) -> ndarray:
        '''Estima a probabilidade para a classe positiva de amostras de entrada.

        Parameters
        ----------
        X: ndarray de shape (n_samples, n_features)
            Amostras de entrada
            
        Retorna
        -------
        ndarray de shape (n_samples,)
            As probabilidades estimadas para a classe positiva.
        '''
        assert self.is_fitted(), 'A instância ainda não foi treinada.'
        assert X.ndim == 2, f'X deve ser 2D. Passou: {X.ndim}'

        z = np.dot(X, self.__w[curr_class]) + self.__b[curr_class]

        return self.__sigmoid(z)

        
        
    def predict(self, X: ndarray) -> ndarray:
        '''Prevê a classes de cada amostra dos dados de entrada.
        
        Parâmetros
        ----------
        X: ndarray de shape (n_samples, n_features)
            Amostras de entrada
            
        Retorno
        -------
        ndarray de shape (n_samples,)
            Rótulo predito de cada amostra.
        '''
        assert self.is_fitted(), 'A instância ainda não foi treinada.'
        assert X.ndim == 2, f'X deve ser 2D. Passou: {X.ndim}'

        n_samples = X.shape[0]
        max_predict = np.zeros(n_samples)
        y_hat = np.zeros(n_samples)

        for c in self.classes:
            p_hat = self.predict_proba(X, c)
            for i in range(n_samples):
                if p_hat[i] > max_predict[i]:
                    max_predict[i] = p_hat[i]
                    y_hat[i] = c

        return y_hat

## 3. Avaliação de modelos

### 3.1 Modelo com hiperparâmetros default e otimização batch

In [21]:
clf = LogRegression(optimizer='batch')

print('Imprimindo o objeto:')
print(clf)

Imprimindo o objeto:
Otimizador: batch
Tamanho do batch: 32
Taxa de aprendizado: 0.001
Número de épocas: 3000
Constante de regularização (alpha): 0.0001
Estado aleatório: 42

Treinado?: False



In [22]:
clf.fit(X_train, y_train)

  return 1 / (1 + np.e ** (-z))


In [23]:
y_test_pred = clf.predict(X_test)

#### 3.1.1 Cálculo de métricas

In [24]:
from sklearn.metrics import classification_report
print(classification_report(y_test, y_test_pred))

              precision    recall  f1-score   support

           0       0.58      1.00      0.73        15
           1       0.68      0.72      0.70        18
           2       0.00      0.00      0.00        12

    accuracy                           0.62        45
   macro avg       0.42      0.57      0.48        45
weighted avg       0.47      0.62      0.52        45



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


### 3.2 Modelo com hiperparâmetros default e otimização mini-batch, com batch de 32 amostras

In [25]:
clf = LogRegression(optimizer='mini')

print('Imprimindo o objeto:')
print(clf)

Imprimindo o objeto:
Otimizador: mini
Tamanho do batch: 32
Taxa de aprendizado: 0.001
Número de épocas: 3000
Constante de regularização (alpha): 0.0001
Estado aleatório: 42

Treinado?: False



In [26]:
clf.fit(X_train, y_train)

  return 1 / (1 + np.e ** (-z))


In [27]:
y_test_pred = clf.predict(X_test)

#### 3.2.1 Cálculo de métricas

In [28]:
from sklearn.metrics import classification_report
print(classification_report(y_test, y_test_pred))

              precision    recall  f1-score   support

           0       1.00      0.87      0.93        15
           1       1.00      0.06      0.11        18
           2       0.39      1.00      0.56        12

    accuracy                           0.58        45
   macro avg       0.80      0.64      0.53        45
weighted avg       0.84      0.58      0.50        45



### 3.3 Modelo com hiperparâmetros default e otimização SGD

In [29]:
clf = LogRegression(optimizer='stochastic')

print('Imprimindo o objeto:')
print(clf)

Imprimindo o objeto:
Otimizador: stochastic
Tamanho do batch: 32
Taxa de aprendizado: 0.001
Número de épocas: 3000
Constante de regularização (alpha): 0.0001
Estado aleatório: 42

Treinado?: False



In [30]:
clf.fit(X_train, y_train)

  return 1 / (1 + np.e ** (-z))


In [31]:
y_test_pred = clf.predict(X_test)

  return 1 / (1 + np.e ** (-z))


#### 3.3.1 Cálculo de métricas

In [32]:
clf = LogRegression(optimizer='mini')

print('Imprimindo o objeto:')
print(clf)

Imprimindo o objeto:
Otimizador: mini
Tamanho do batch: 32
Taxa de aprendizado: 0.001
Número de épocas: 3000
Constante de regularização (alpha): 0.0001
Estado aleatório: 42

Treinado?: False



#### 3.3.1 Cálculo de métricas

In [33]:
from sklearn.metrics import classification_report
print(classification_report(y_test, y_test_pred))

              precision    recall  f1-score   support

           0       0.54      1.00      0.70        15
           1       0.69      0.61      0.65        18
           2       1.00      0.08      0.15        12

    accuracy                           0.60        45
   macro avg       0.74      0.56      0.50        45
weighted avg       0.72      0.60      0.53        45



### 3.4. Fine tuning com grid search

In [18]:
from sklearn.pipeline import Pipeline

pipeline_multiclass_LR = Pipeline([
        ('log_regression', LogRegression())
])

In [19]:
pipeline_multiclass_LR.get_params()

{'memory': None,
 'steps': [('log_regression', LogRegression())],
 'verbose': False,
 'log_regression': LogRegression(),
 'log_regression__alpha': 0.0001,
 'log_regression__batch_sz': 32,
 'log_regression__learning_rate': 0.001,
 'log_regression__n_epochs': 3000,
 'log_regression__optimizer': 'batch',
 'log_regression__random_state': 42}

#### 3.4.1 Definição do conjunto de hiperparâmetros a serem testados

In [20]:
# search space
param_grid = [
    {
    'log_regression__n_epochs': [3000, 4000],
    'log_regression__alpha': [0, 0.0001],
    'log_regression__optimizer': ['batch', 'mini', 'stochastic'],
    'log_regression__batch_sz': [16, 32]
    }
]

In [21]:
from sklearn.model_selection import GridSearchCV

grid_search_multiclass_LR = GridSearchCV(pipeline_multiclass_LR, param_grid, cv=10, scoring='balanced_accuracy', verbose=0)

In [22]:
grid_search_multiclass_LR.fit(X_train, y_train)

  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return 1 / (1 + np.e ** (-z))
  return

GridSearchCV(cv=10,
             estimator=Pipeline(steps=[('log_regression', LogRegression())]),
             param_grid=[{'log_regression__alpha': [0, 0.0001],
                          'log_regression__batch_sz': [16, 32],
                          'log_regression__n_epochs': [3000, 4000],
                          'log_regression__optimizer': ['batch', 'mini',
                                                        'stochastic']}],
             scoring='balanced_accuracy')

In [23]:
grid_search_multiclass_LR.best_params_

{'log_regression__alpha': 0,
 'log_regression__batch_sz': 16,
 'log_regression__n_epochs': 4000,
 'log_regression__optimizer': 'stochastic'}

In [24]:
grid_search_multiclass_LR.best_score_

0.8516666666666668

#### 3.4.2. Inferência usando o melhor conjunto de hiperparâmetros

In [36]:
clf = LogRegression(n_epochs=4000, optimizer='stochastic', batch_sz=16, alpha=0)

print('Imprimindo o objeto:')
print(clf)

Imprimindo o objeto:
Otimizador: stochastic
Tamanho do batch: 16
Taxa de aprendizado: 0.001
Número de épocas: 4000
Constante de regularização (alpha): 0
Estado aleatório: 42

Treinado?: False



In [37]:
clf.fit(X_train, y_train)

  return 1 / (1 + np.e ** (-z))


In [38]:
y_test_pred = clf.predict(X_test)

  return 1 / (1 + np.e ** (-z))


#### 3.4.3 Cálculo de métricas

In [39]:
from sklearn.metrics import classification_report
print(classification_report(y_test, y_test_pred))

              precision    recall  f1-score   support

           0       0.58      1.00      0.73        15
           1       0.80      0.67      0.73        18
           2       1.00      0.33      0.50        12

    accuracy                           0.69        45
   macro avg       0.79      0.67      0.65        45
weighted avg       0.78      0.69      0.67        45



### 3.5 Logistic Regression do Sklearn

In [14]:
from sklearn.linear_model import LogisticRegression

clf = LogisticRegression(max_iter=3000)
clf.fit(X_train, y_train)

LogisticRegression(max_iter=3000)

In [15]:
y_test_pred = clf.predict(X_test)

#### 3.5.3 Cálculo de métricas

In [16]:
from sklearn.metrics import classification_report
print(classification_report(y_test, y_test_pred))

              precision    recall  f1-score   support

           0       1.00      1.00      1.00        15
           1       1.00      1.00      1.00        18
           2       1.00      1.00      1.00        12

    accuracy                           1.00        45
   macro avg       1.00      1.00      1.00        45
weighted avg       1.00      1.00      1.00        45

