#   Regressão Linear na qualidade de vinho.  

> **Autor**: *Gabriel M. S. O.*  
> **Matrícula**: 190042656


##  Intruções para rodar o codigo.

Estarei usando o package manager `Poetry`,
uma vez que deixa minha instancia python
padrão.  
É a forma mais proxima de um docker container
sem a necessidade de usar docker...  

[Documentação](https://python-poetry.org/)  

### Instalar poetry.


Você pode instalar o poetry de duas maneiras.  

#### PIP
Mais simples seria usando pip,
contudo a documentação do poetry não
recomenda a instalação desta maneira.
  
```bash
pip install poetry
```

#### Seguindo a [documentação](https://python-poetry.org/docs/#installation).

```bash
curl -sSL https://install.python-poetry.org | python3 - --version 2.1.1
```

### Instalar dependencias.

Apenas rode.

```bash
poetry env activate && \
poetry install --no-root
```

### Rodar o codigo.

Por fim use o virtual enviroment criado,
poetry cuidara das dependencias.  

Olhe em: [PyProject](pyproject.toml)

##  Não desejo rodar o codigo.

Tudo bem, PDF do projeto esta na pasta [out](/out/)

## Escopo do projeto

Meu objetivo com esse projeto sera aprender
a utilizar a biblioteca `PyTorch` com mais
familiaridade, também implementar os conceitos
de regressão linear na mão, logo não desejo 
depender do modulo `d2l`. Uma vez que vejo
como uma muleta no aprendizado.

### Requisitos do projeto

*   Qual o erro absoluto medio do meu preditor ?
    *   Este error é melhor que o preditor trivial ?
    *   Quão melhor que o preditor trivial ?
    *   Compare a acuracia contra o preditor trivial.
*   Qual a correlação entre a saida desejada e obtida ?
*   Variar número de epocas, minibatch, taxa de aprendizado
*   Normalização das entradas
*   Tentar eliminar variaveis pouco significantes (optimizar o modelo).

##  Entendendo o conjunto de dados.

Para montar nosso modelo primeiro precisamos
entender com que tipo de informações estamos trabalhando.

In [1]:
import pandas as pd
from pathlib import Path

DATA_PATH = Path("./data")
DATA_PATH.mkdir(parents=True, exist_ok=True)

FILE_NAME = "winequalityN.csv"
FILE_PATH = DATA_PATH / FILE_NAME

df = pd.read_csv(FILE_PATH)

print(*df.columns, sep="\n")

type
fixed acidity
volatile acidity
citric acid
residual sugar
chlorides
free sulfur dioxide
total sulfur dioxide
density
pH
sulphates
alcohol
quality


Podemos observar diferentes variaveis, 
vamos entender cada uma melhor.

In [2]:
import json
import pandas.api.types as pdtypes


def get_column_summary(series: pd.Series) -> dict:
    dtype = series.dtype
    column_info = {"dtype": str(dtype)}

    if series.empty:
        return column_info

    if pdtypes.is_numeric_dtype(dtype):
        max_val = series.max()
        min_val = series.min()
        avr = series.sum() / series.__len__()

        if pdtypes.is_integer_dtype(dtype):
            column_info["avr"] = round(avr)
        else:
            column_info["avr"] = round(avr, ndigits=5)

        if hasattr(max_val, "item"):
            column_info["max"] = max_val.item()
        else:
            column_info["max"] = max_val

        if hasattr(min_val, "item"):
            column_info["min"] = min_val.item()
        else:
            column_info["min"] = min_val

    elif (
        pdtypes.is_object_dtype(dtype)
        or pdtypes.is_string_dtype(dtype)
        or pdtypes.is_bool_dtype(dtype)
    ):
        unique_values = series.unique()
        column_info["unique"] = unique_values.tolist()

    return column_info


column_dict = {col: get_column_summary(df[col]) for col in df.columns}

data_dict = {"size": df.__len__(), "columns": column_dict}

data_dict_dumped = json.dumps(data_dict, indent=2)

output_dir_path = Path("./out")
output_dir_path.mkdir(parents=True, exist_ok=True)

output_file_path = output_dir_path / "data_info.json"

with output_file_path.open("w") as file:
    file.write(data_dict_dumped)

Escrevi as informações em [`./out/data_info.json`](./out/data_info.json).  

Com isto podemos ver cada variavel, o score final *quality*, da se a entender
que é uma nota que vai de 0 a 10, mas não sei se seria apropriado
fazer essa suposição, uma vez que não tenho certeza, logo usarei os
valores maxímos e minimos do nosso **data set** para serem 
o chão e o teto.

In [3]:
print(json.dumps(data_dict["columns"]["quality"], indent=2))

{
  "dtype": "int64",
  "avr": 6,
  "max": 9,
  "min": 3
}


## Plano de ação

Uma vez que possuimos diferentes tipos de vinho, minha ideia
seria criar um set de pesos para cada tipo. Também
vamos concatenar o *bias* no data set. Porque eu quero...

Logo nossa equação seria algo do genero.

$$ 
y^{i} = X_{j\ k}^{i}\ w^{j\ k} 
$$

*   $N$ Tamanho do Data Set; 
*   $i$ Indice de onde estamos no nosso batch;
*   $j$ Indice do tamanho do vetor de peso $\vec{w}$;
*   $k$ Indice do tipo do vinho.

**Aqui começa minha solução**

## Solução

#### Variaveis globais

Não gosto de variaveis globais, mas como estamos usando
o jupyter ate que desce

### Bibliotecas a serem usadas

In [4]:
import pandas #  para lidar com o Data Set
import torch #  Erm what the sigma
import numpy # erm What the sigmar
from pathlib import Path #  Gosto dessa blibioteca...
from torch.utils.data import TensorDataset, DataLoader #  Realização de batches 
from sklearn.preprocessing import StandardScaler #  Para normalizar nossos dados.
from sklearn.model_selection import train_test_split #  Dividir nossos dados em teste.

### Carregar Data Set

In [5]:
DATA_PATH = Path("./data")
DATA_PATH.mkdir(parents=True, exist_ok=True)

DATA_FILE_NAME = "winequalityN.csv"
DATA_FILE_PATH = DATA_PATH / DATA_FILE_NAME

df = pandas.read_csv(DATA_FILE_PATH)

#### Tratamento dos dados

Usar a função `get_dummies` para
gerar uma nova tabela, contendo
informações boleanas, se o conteudo
se encaixa em um determinado tipo.

In [6]:
df = pandas.get_dummies(df, columns=["type"])

Vamos avalair quantos porcento dos dados estão faltando.

In [None]:
missing_pct = df.isna().mean() * 100
print(missing_pct.sort_values(ascending=False))

NameError: name 'df' is not defined

Dado que faltam muitos poucos dados, vou substituir estes
pela media.

In [8]:
df = df.fillna(df.mean())

In [9]:
quality = df['quality']
quality = quality.to_numpy()

quality_mean = quality.mean().__round__()
quality_mean = numpy.full_like(quality, fill_value=quality_mean)

quality = torch.tensor(quality, dtype=torch.float32)
quality_mean = torch.tensor(quality_mean, dtype=torch.float32)

mse_loss = torch.nn.MSELoss()

loss = mse_loss(quality_mean, quality)
accu = (quality_mean == quality).float().mean().item()
print(loss.item())
print(accu)

0.7954440712928772
0.4365091621875763


Separar as colunas de interesse

*   Colunas númericas:  
    Contem as informações
    que usaremos nos pesos.
*   Colunas de tipo:  
    Representam efetivamente
    se aquela linha de informações
    bate com o tipo.

In [10]:
def filter_columns(
    df: pandas.DataFrame,
) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]:
    """"""
    cols_names = df.columns
    type_mask = cols_names.str.startswith("type_")

    type_cols_names = cols_names[type_mask]
    numeric_cols_names = cols_names[~type_mask & (cols_names != "quality")]

    X_type_values = df[type_cols_names].values.astype(
        "float32"
    )  # Tensor (N, type_cols_size), type_cols_size = 2
    # print(X_type_values.shape)
    # Representam efetivamente se aquela linha de informações bate com o tipo.

    X_numeric_values = df[numeric_cols_names].values.astype(
        "float32"
    )  # Tensor (N, numeric_cols_size), numeric_cols_size = 12, inclui os bias
    # Contem as informações que usaremos nos pesos.
    # print(X_numeric_values.shape)

    y = (
        df["quality"].values.astype("float32").reshape(-1, 1)
    )  # Tensor (N,).reshape(-1, 1) -> Tensor (N, 1)
    # print(y.shape)

    return X_numeric_values, X_type_values, y

#### Normalização das Entradas

In [11]:
def normalize_values(X_numeric_values: numpy.ndarray) -> numpy.ndarray:
    """"""
    X_num_val_norm = StandardScaler().fit_transform(X_numeric_values)
    X_num_val_norm_with_bias = numpy.concatenate(
        [X_num_val_norm, numpy.ones((X_num_val_norm.shape[0], 1), dtype="float32")],
        axis=1,
    )
    return X_num_val_norm_with_bias

#### Divisão em Sets de Treinamento e Teste

In [12]:
def get_train_test_sets(
    X_numeric_values: numpy.ndarray,
    X_type_values: numpy.ndarray,
    y: numpy.ndarray,
    test_data_size: float,
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    """"""
    train_data = train_test_split(
        X_numeric_values,
        X_type_values,
        y,
        test_size=test_data_size,
        random_state=666,  # Controls the shuffling applied to the data before applying the split
        shuffle=True,
    )

    return (
        torch.from_numpy(arr) for arr in train_data
    )  # Ja converto direto para Tensor

#### Divisão em Batches

In [13]:
def get_batches_loader(
    X_num_train: torch.Tensor, X_type_train: torch.Tensor, y_train: torch.Tensor, batch_size: int
) -> DataLoader:
    """"""
    train_dataset = TensorDataset(X_num_train, X_type_train, y_train)
    return DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

#### Obtendo dimeções em que vamos trabalhar

In [14]:
def get_dimentions(
    X_num_train: torch.Tensor, X_type_train: torch.Tensor
) -> tuple[int, int]:
    """"""
    single_w_size = X_num_train.shape[1]
    w_type_size = X_type_train.shape[1]
    return single_w_size, w_type_size

#### Obetendo as informações necessárias

In [15]:
def get_necessary_data(
    df: pandas.DataFrame,
    batch_size: int,
    test_data_size: float,
) -> tuple[int, int, DataLoader, torch.Tensor, torch.Tensor, torch.Tensor]:
    """"""
    X_numeric_values, X_type_values, y = filter_columns(df=df)

    X_numeric_values = normalize_values(X_numeric_values=X_numeric_values)

    X_num_train, X_num_test, X_type_train, X_type_test, y_train, y_test = (
        get_train_test_sets(
            X_numeric_values=X_numeric_values,
            X_type_values=X_type_values,
            y=y,
            test_data_size=test_data_size,
        )
    )

    train_dataloader = get_batches_loader(
        X_num_train=X_num_train,
        X_type_train=X_type_train,
        y_train=y_train,
        batch_size=batch_size,
    )

    single_w_size, w_type_size = get_dimentions(
        X_num_train=X_num_train, X_type_train=X_type_train
    )

    return single_w_size, w_type_size, train_dataloader, X_num_test, X_type_test, y_test

#### Função de treinamento

Função de erro Mean Square Error

$$
MSE(\vec{w}) = \frac{1}{N}\sum_{i=1}^{N}\left(y^{i} - X_{j}^{i}\ w^{j}\right)^2
$$

In [16]:
import torch.optim.adam


def train_in_epoch(
    optimizer: torch.optim.SGD,
    mse_loss: torch.nn.MSELoss,
    w_tensor: torch.Tensor,
    train_dataloader: DataLoader,
    total_loss: float = 0.0,
) -> float:
    """Treina o modelo dentro em
    uma unica época"""
    for x_num_batch, x_type_batch, y_batch in train_dataloader:
        type_idx: torch.Tensor = x_type_batch.argmax(dim=1)  # Tensor (Batch Size,)

        # Tensor (Weight Type Size, Single Weight Size) -> Tensor (Batch Size, Single Weigh Size)
        w_sample = w_tensor[type_idx]

        predict = (x_num_batch * w_sample).sum(dim=1, keepdim=True)  # Tensor (Batch Size, )
        # predict = predict.unsqueeze(1)  # Tensor (Batch Size, 1)

        loss: torch.Tensor = mse_loss(predict, y_batch)
        optimizer.zero_grad()
        loss.backward()
        # torch.nn.utils.clip_grad_norm_([w_tensor], max_norm=1.0)
        optimizer.step()
        
        total_loss += loss.item() * x_num_batch.size(0)
        
    return total_loss


def train_model(
    learning_rate: float | torch.Tensor,
    n_epochs: int,
    w_type_size: int,
    single_w_size: int,
    train_dataloader: DataLoader,
) -> torch.Tensor:
    """Treina o modelo dentro de varias épocas"""
    # Cria Tensor de peso aleatorio
    w_tensor = torch.randn(
        w_type_size, single_w_size, requires_grad=True
    )  # Tensor (2, 12), inclui o BIAS
    # print(w_tensor.shape)

    # Inicia optimizer e função de erro
    optimizer = torch.optim.SGD([w_tensor], lr=learning_rate)
    mse_loss = torch.nn.MSELoss()

    # Roda por cada época
    for epoch in range(n_epochs):
        total_loss = train_in_epoch(
            optimizer=optimizer,
            mse_loss=mse_loss,
            w_tensor=w_tensor,
            train_dataloader=train_dataloader,
        )
        train_mse = total_loss / len(train_dataloader.dataset)
        if (epoch + 1) % 100 == 0: # Print every 10 epochs
            print(f'Epoch [{epoch+1}/{n_epochs:_}], Train Loss: {train_mse:.4f}')
        
    return w_tensor

#### Função de avaliação

In [21]:
from scipy.stats import pearsonr
def evaluate(
    X_num_test: torch.Tensor,
    X_type_test: torch.Tensor,
    y_test: torch.Tensor,
    w_tensor: torch.Tensor,
) -> tuple[float, float]:
    """"""
    mse_loss = torch.nn.MSELoss()
    with torch.no_grad():
        type_idx = X_type_test.argmax(dim=1)  # Tensor(Test Size, 1)

        w_sample = w_tensor[
            type_idx
        ]  # Tensor (Weight Type Size, Single Weight Size) -> Tensor (Test Size, Single Weight size)
        
        predict = (X_num_test * w_sample).sum(dim=1)
        predict = predict.unsqueeze(1)
        
        loss: torch.Tensor = mse_loss(predict, y_test)
        # Mean Square Error
        mse = loss.item()
        # Accuracy
        accu = (predict.round() == y_test).float().mean().item()
        # accu, _ = pearsonr(predict, y_test)
        
        return mse, accu
        

In [22]:
single_w_size, w_type_size, train_dataloader, X_num_test, X_type_test, y_test = (
    get_necessary_data(df=df, batch_size=64, test_data_size=0.2)
)
w_tensor = train_model(
    learning_rate=1e-1,
    n_epochs=10,
    w_type_size=w_type_size,
    single_w_size=single_w_size,
    train_dataloader=train_dataloader,
)

mse, accu = evaluate(
    X_num_test=X_num_test, X_type_test=X_type_test, y_test=y_test, w_tensor=w_tensor
)

print(mse, accu)

0.5613079071044922 0.510769248008728
