![picture](https://prowly-uploads.s3.eu-west-1.amazonaws.com/uploads/4626/assets/71776/large_logo_wsb_poziom.jpg)

# WPROWADZENIE DO ALGORYTMÓW GŁĘBOKIEGO UCZENIA MASZYNOWEGO - DEEP LEARNING PYTORCH

## Wprowadzenie do sieci neuronowych w pythonie (pytorch):
<ul>
    <li>Model sequential</li>
    <li>nn.Module</li>
    <li>Jak uczyć model pytorch?</li>
    <li>Dataloader</li>
    <li>Case study</li>
    <li>Tensorboard - wybór najlepszych parametrów sieci</li>
    <li>XAI</li>
    </li>
</ul>

# Biblioteki

In [None]:
import numpy as np
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
import plotly.graph_objects as go

In [None]:
from tqdm import tqdm
import copy

In [None]:
# Zdefiniujmy proste zbiore danych, których użyjemy w procesie uczenia się sieci neuronowych
zbiór_liniowy = np.zeros([100,2])
zbiór_liniowy[:,0] = [x/100 for x in range(100)]
zbiór_liniowy[:,1] = [2*x+3 for x in zbiór_liniowy[:,0]]
plt.plot(zbiór_liniowy[:,0],zbiór_liniowy[:,1])

In [None]:
XOR = np.zeros([400,4])
XOR[:100,1:3] = np.array([np.random.rand(100)/5,
                           np.random.rand(100)/5]).reshape(100,2)
XOR[:100,2] = XOR[:100,2]+1                      
XOR[100:200,1:] = np.array([1+np.random.rand(100)/3,
                            1+np.random.rand(100)/2,
                            1+np.random.rand(100)/10]).reshape(100,3)
XOR[200:300,[0,2,3]] = np.array([1+np.random.rand(100)/10,
                            1+np.random.rand(100)/2,
                            1+np.random.rand(100)/3]).reshape(100,3)
XOR[300:,:2] = np.array([1+np.random.rand(100)/3,
                            1+np.random.rand(100)/5]).reshape(100,2)

# Wprowadzenie do sieci neuronowych w pytorch

In [None]:
import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

W pytorch trochę inaczej korzystamy z GPU - na potrzeby dzisiejszych zajęć skupimy się na CPU, w modelach CNN pokażemy sobie model wytrenowany na GPU.

In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

## Model sequential

https://pytorch.org/docs/stable/nn.html#

https://pytorch.org/docs/stable/nn.functional.html

In [None]:
model = nn.Sequential( # podobnie jak w tensorflow
    nn.Linear(
      1, # wymiarowosc zbioru
      10),# liczba neuronow
    nn.ReLU(), # Tutaj ReLU nie jest traktowana jako warstwa, tylko jak funkcja aktywacji
    nn.Linear(10, 1)# Zauważ, że w pytorch podajemy wymiarowość zbioru wejściowego i wyjściowego!
    )
print(model)

In [None]:
loss_function = nn.MSELoss() # Definicja funkcji kosztu
optimizer = torch.optim.SGD(model.parameters(), lr=0.001) # definicja optimizera

### Podstawowa pętla treningowa

Pytorch uczy się na tensorach - więc wektory musimy zamienić na typ "torch.tensor". Dodatkowo pamiętajcie o castowaniu wektorów do odpowiedniego urządenia (device). Model i wektor muszą korzystać z tego samego akceleratora obliczeniowego.

In [None]:
x = torch.tensor(zbiór_liniowy[:,0], dtype=torch.float32).reshape(-1, 1).to(device) # przykładowe wektory wejściowy
y = torch.tensor(zbiór_liniowy[:,1], dtype=torch.float32).reshape(-1, 1).to(device) # przykładowe wektory wyjściowe

In [None]:
# Prosta pętla treningowa
losses = []
for epoch in range(5000):
    pred_y = model(x) # predykcja
    loss = loss_function(pred_y, y) # obliczamy wartość funkcji kosztu
    losses.append(loss.item()) # dodajemy błąd do historii

    model.zero_grad() # resetujemy gradient modelu
    loss.backward() # krok uczenia - wysyłamy informację o błędzie do modelu

    optimizer.step() # update wag na podstawie obliczonego gradientu (który wyliczamy na podstawie funkcji kosztu)

In [None]:
plt.plot(losses)
plt.ylabel('loss')
plt.xlabel('epoch')
plt.show()

In [None]:
predictions = model(x)
predictions = predictions.detach().numpy().reshape(100) # Predykcje w numpy

In [None]:
# Zdefiniujmy proste zbiore danych, których użyjemy w procesie uczenia się sieci neuronowych

plt.plot(zbiór_liniowy[:,0],zbiór_liniowy[:,1])
plt.plot(zbiór_liniowy[:,0],predictions)
plt.show()

## Wykorzystanie nn.Module

W pytorch najłatwiej wykorzystać moduł nn, którego składnia jest bardzo zbliżona do tensorflow. nn.Sequential buduje graf obliczeniowy na podstawie kolejnych funkcji.

In [None]:
class NeuralNetwork(nn.Module):
  def __init__(self, n_dims, n_class):
      super(NeuralNetwork, self).__init__()
      self.seq = nn.Sequential(
          nn.Linear(n_dims, 512),
          nn.ReLU(),
          nn.Linear(512, 512),
          nn.ReLU(),
          nn.Linear(512, n_class),
          nn.Sigmoid()
      )

  def forward(self, x):
      logits = self.seq(x)
      return logits

In [None]:
X = torch.rand(1, 3, device=device)

In [None]:
x = XOR[:,:3]
y = XOR[:,3].reshape([400,1])

In [None]:
y = (y>0)*1

In [None]:
model = NeuralNetwork(3, 1)
print(model)

In [None]:
logits = model(X)
print(f"Predicted probabilites : {logits}")
# Dla więcej niż 1 klasy:
y_pred = logits.argmax(1)
print(f"Predicted class: {y_pred}")

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
# podzielmy zbior na testowy i treningowy
X_train, X_test, y_train, y_test = train_test_split(x, y, train_size=0.8, shuffle=True)
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).reshape(-1, 1).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test, dtype=torch.float32).reshape(-1, 1).to(device)

In [None]:
learning_rate = 1e-2

In [None]:
# BCE - binary crossentropy, jest też wersja nn.BCEWITHLOGITSLOSS, która ma wbudowaną warstwę sigmoid
# W przypadku użycia BCEWITHLOGITSLOSS należy jako funkcję aktywacji ostatniej warstwy zostawić funkcję liniową
loss_fn = nn.BCELoss()

In [None]:
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

### Pętla treningowa z early stopping oraz zapamiętywaniem najlepszych wag

In [None]:
n_epochs = 100   # liczba iteracji
batch_size = 50  # rozmiar wsadu
batch_start = torch.arange(0, len(X_train), batch_size)
 
# parametry, aby zapamietać najlepszą iterację
best_loss = np.inf   # init to infinity
best_weights = None
early_stop_thresh = 10
history = []

In [None]:
# Zapisywanie modelu do pliku i jego wczytywanie
def checkpoint(model, filename):
    torch.save(model.state_dict(), filename)
def resume(model, filename):
    model.load_state_dict(torch.load(filename))

In [None]:
# jak uczymy model pytorch
for epoch in range(n_epochs):
    model.train() # wprowadzamy model w tryb treningu
    with tqdm(batch_start, unit="batch", mininterval=0) as bar: # tworzy pasek uczenia - wygodna sprawa do monitorowania
        bar.set_description(f"Epoch {epoch}")
        for start in bar:
            # wybierz obserwacje z poszczególnych batchów - zwróć uwagę, że w tym przypadku obserwacje nie są losowane
            X_batch = X_train[start:start+batch_size]
            y_batch = y_train[start:start+batch_size]
            # krok wprzód
            y_pred = model(X_batch)
            loss = loss_fn(y_pred, y_batch)
            # krok wstecz
            optimizer.zero_grad()
            loss.backward()
            # zmiana wartości wag
            optimizer.step()
            bar.set_postfix(loss = float(loss))
    # walidacja
    model.eval()
    y_pred = model(X_test)
    epoch_loss = loss_fn(y_pred, y_test)
    epoch_loss = float(epoch_loss)
    history.append(epoch_loss)
    if epoch_loss < best_loss:
        # spełnia funkcję modułu EarlyStopping
        best_loss = epoch_loss
        best_epoch = epoch
        best_weights = copy.deepcopy(model.state_dict())
        # lub do pliku
        # checkpoint(model, "best_model.pth")
    elif epoch - best_epoch > early_stop_thresh:
            print("Early stopped training at epoch %d" % epoch)
            break  # terminate the training loop

In [None]:
print("Loss: %.2f" % best_loss)
plt.plot(history)
plt.show()

In [None]:
# Wczytanie modelu
model.load_state_dict(best_weights)
# lub z pliku
# resume(model, "best_model.pth")

In [None]:
predictions = model(torch.tensor(x, dtype = torch.float32).to(device))

In [None]:
fig = go.Figure(data=[go.Scatter3d(x=x[:,0], y=x[:,1], z=x[:,2],
                                   mode='markers',marker = dict(
                                   color = predictions.detach().numpy().reshape(len(x)),
                                   colorbar=dict(
                                       title = 'y_hat'
                                   ),
                                    colorscale="Viridis")
                                   )])
fig.show()

### Dataloader + SummaryWriter

In [None]:
from torch.utils.data import TensorDataset, DataLoader, random_split, default_collate
from torch.utils.tensorboard import SummaryWriter

In [None]:

x_tensor = torch.tensor(x, dtype=torch.float32).to(device)
y_tensor = torch.tensor(y, dtype=torch.float32).reshape(-1, 1).to(device)

In [None]:
trainset, testset = random_split(TensorDataset(x_tensor, y_tensor), [0.8, 0.2])

In [None]:
# Tworzymy loadery do zbiorów - zadbają o batch training

train_loader = DataLoader(trainset, shuffle=True, batch_size=32)
test_loader = DataLoader(testset, shuffle=False, batch_size=32)

In [None]:
# Wykorzystanie tensorboard do monitorowania procesu uczenia modelu
writer = SummaryWriter(filename_suffix='Initial training loop')

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, epoch, writer = None):
  model.train()
  size = len(dataloader.dataset)
  num_batches = len(dataloader)
  train_loss, correct = 0, 0
  running_size = 0
  running_batches = 0
  for X_batch, y_batch in (pbar := tqdm(dataloader, desc = 'Train: ')):
    y_pred = model(X_batch)
    loss = loss_fn(y_pred, y_batch)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    train_loss += loss.item()
    correct += (((y_pred>0.5)*1) == y_batch).type(torch.float).sum().item()
    running_size += len(y_batch)
    running_batches +=1
    pbar.set_postfix_str(f"loss: {float(train_loss/running_batches)}, acc: {correct/running_size}, [{running_size}/{size}]")

  train_loss /= num_batches
  correct /= size

  if writer is not None: # Dodajemy skalary do tensorboard
    writer.add_scalar("Loss", train_loss, epoch+1)
    writer.add_scalar("Accuracy", correct, epoch+1)
    for name, weight in model.named_parameters():
      writer.add_histogram(name,weight, epoch)
      writer.add_histogram(f'{name}.grad',weight.grad, epoch)
  return train_loss, correct

def test_loop(dataloader, model, loss_fn, epoch, writer = None):
  model.eval()
  size = len(dataloader.dataset)
  num_batches = len(dataloader)
  test_loss, correct = 0, 0
  running_size = 0
  running_batches = 0

  for X_batch, y_batch in (pbar := tqdm(dataloader, desc = 'Validation: ')):
    y_pred = model(X_batch)
    loss = loss_fn(y_pred, y_batch)
    test_loss += loss.item()
    correct += (((y_pred>0.5)*1) == y_batch).type(torch.float).sum().item()
    running_size += len(y_batch)
    running_batches +=1
    pbar.set_postfix_str(f"loss: {float(test_loss/running_batches)}, acc: {correct/running_size}, [{running_size}/{size}]")

  test_loss /= num_batches
  correct /= size

  if writer is not None:
    writer.add_scalar("Val Loss", test_loss, epoch+1)
    writer.add_scalar("Val Accuracy", correct, epoch+1)
  return test_loss, correct


class EarlyStopping:
  def __init__(self, tolerance=10, min_delta=0):

    self.tolerance = tolerance
    self.min_delta = min_delta
    self.counter = 0
    self.best_weights = None
    self.best_loss = np.inf

  def __call__(self, validation_loss, model):
    self._update_best_model_(validation_loss)
    if (self.best_loss - validation_loss) < self.min_delta:
      self.counter +=1
      if self.counter >= self.tolerance:  
          return True
    else:
      self.counter = 0
      self.best_loss = validation_loss
    return False

  def _update_best_model_(self, validation_loss):
    if validation_loss < self.best_loss:
      self.best_weights = copy.deepcopy(model.state_dict())


In [None]:
early_stopping = EarlyStopping(tolerance=10, min_delta=0.05)

In [None]:
class NeuralNetwork(nn.Module):
  def __init__(self, n_dims, n_class):
      super(NeuralNetwork, self).__init__()
      self.seq = nn.Sequential(
          nn.Linear(n_dims, 512),
          nn.ReLU(),
          nn.Linear(512, 512),
          nn.ReLU(),
          nn.Linear(512, n_class)
      )

  def forward(self, x):
      logits = self.seq(x)
      return logits

In [None]:
model = NeuralNetwork(3, 1)
print(model)

In [None]:
learning_rate = 1e-2

In [None]:
loss_fn = nn.BCEWithLogitsLoss() # Zauważ, że model nie jest zakończony funkcją Sigmoid

In [None]:
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [None]:
# dodanie grafu obliczeniowego do tensorboard, w przypadku sequential nie wygląda to porywająco

x_data, labels = next(iter(train_loader))
writer.add_graph(model, x_data)

In [None]:
train_losses = []
train_acc = []
val_losses  = []
val_acc = []
for epoch in range(n_epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    train_loss, train_correct = train_loop(train_loader, model, loss_fn, optimizer, epoch, writer)
    train_losses.append(train_loss), train_acc.append(train_correct)
    test_loss, test_correct = test_loop(test_loader, model, loss_fn, epoch, writer)
    val_losses.append(test_loss), val_acc.append(test_correct)
    if early_stopping(validation_loss = test_loss, model = model):
      print(f'\n-------------------------------\nEarly stopped at epoch {epoch+1}')
      model.load_state_dict(early_stopping.best_weights)

      break
writer.close()

In [None]:
writer.log_dir

In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir runs/Apr11_19-14-56_8312cedac1c4

In [None]:
plt.plot(train_losses)
plt.plot(val_losses)
plt.show()

In [None]:
plt.plot(train_acc)
plt.plot(val_acc)
plt.show()

# Zadanie - predykcja popularności uworów muzycznych

#### Wgrywanie danych, EDA

Na zajęciach przeanalizujemy zbiór danych dostępny na <a href="https://www.kaggle.com/priyang/health-insurance-cost-prediction-using-ml">kaggle</a>. Obserwacje dotyczą wybranych opłat za ubezpieczenie zdrowotne na rynku USA. Celem modeli będzie ceny ubezpieczenia w zależności od wartości parametrów opisujących daną osobę.

In [None]:
import seaborn as sns

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Proszę:
<li>Utworzyć folder "AI_datasets" w lokalizacji "Mój Dysk"</li>
<li>Dodać plik 'insurance.csv' z moodle.</li>

In [None]:
%cd /content/gdrive/My Drive/AI_datasets

In [None]:
!ls

In [None]:
data = pd.read_csv("song_data.csv")

In [None]:
data.shape

In [None]:
data.duplicated(subset='song_name').sum()

Co powinniśmy teraz zrobić?

In [None]:
data.drop_duplicates(inplace = True)

In [None]:
data.head()

In [None]:
data.describe()

In [None]:
sns.pairplot(data)

In [None]:
corr = data.corr(method = 'spearman')

# Generate a mask for the upper triangle
mask = np.triu(np.ones_like(corr, dtype=bool))

# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(11, 9))

# Generate a custom diverging colormap
cmap = sns.diverging_palette(230, 20, as_cmap=True)

# Draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr, mask=mask, cmap=cmap, vmax=1, vmin=-1, center=0,
            square=True, linewidths=.5, cbar_kws={"shrink": .5},annot=True)

### Analiza zmiennej zależnej

In [None]:
y_col = "song_popularity"

In [None]:
import scipy.stats

In [None]:
sns.displot(data, x=y_col, kind="kde", bw_adjust=.2)

In [None]:
upper_bound = (data[y_col].quantile(0.75)+1.5*scipy.stats.iqr(data[y_col]))
lower_bound = (data[y_col].quantile(0.25)-1.5*scipy.stats.iqr(data[y_col]))
print(upper_bound,lower_bound)

In [None]:
sns.displot(data.loc[data[y_col]>2], x=y_col, kind="kde", bw_adjust=.2)

In [None]:
data = data.loc[data[y_col]>2].copy()
data.shape

## Analiza zmiennych zależnych

TODO - przeanalizuj wszystkie zmienne pod kątem ich wpływu na zmienną niezależną, rozkładu oraz wartości skrajnych.

## Normalizacja i one hot encoding

In [None]:
data.drop(columns = 'song_name', inplace = True)

In [None]:
from sklearn.preprocessing import MinMaxScaler

In [None]:
scale_factors = {'min': data[y_col].min() , 'max' : data[y_col].max()}

In [None]:
scale_factors

In [None]:
scaler = MinMaxScaler() 
scaled_values = scaler.fit_transform(data) 
data = pd.DataFrame(scaled_values, columns = data.columns)

In [None]:
data.head()

In [None]:
data.describe()

In [None]:
y = data[y_col].copy()
x = data.drop(columns = y_col).copy()

## Predykcja popularności

## Wykorzystanie tensorboard do wyboru najlepszych parametrów sieci w pytorch

In [None]:
from itertools import product

In [None]:
# Tworzymy przestrzeń parametrów, które będą sprawdzane w treningu sieci
parameters = dict(
    lr = [0.01, 0.001],
    use_RMSE = [True,False]
)

param_values = [v for v in parameters.values()]

for lr,use_RMSE in product(*param_values):
    print(lr, use_RMSE)

In [None]:
import torch.nn.functional as F

In [None]:
# Zauważcie, że nie korzystamy z modułu Sequential - sprawdź jak wpłynie na widok grafu obliczniowego

class RegNet_4(nn.Module):
  def __init__(self, n_dims):
      super(RegNet_4, self).__init__()
      self.fc1 = nn.Linear(n_dims, 32)
      self.bn1 = nn.BatchNorm1d(32)
      self.fc2 = nn.Linear(32, 64)
      self.bn2 = nn.BatchNorm1d(64)
      self.fc3 = nn.Linear(64, 32)
      self.bn3 = nn.BatchNorm1d(32)
      self.fc4 = nn.Linear(32, 8)
      self.out =  nn.Linear(8, 1)

  def forward(self, x):
      x = F.relu(self.bn1(self.fc1(x)))
      x = F.relu(self.bn2(self.fc2(x)))
      x = F.relu(self.bn3(self.fc3(x)))
      x = F.relu(self.fc4(x))
      x = self.out(x)
      return x

Do poczytania - co to batch normalization ? 

In [None]:
x_tensor = torch.tensor(x.values, dtype=torch.float32).to(device)
y_tensor = torch.tensor(y.values, dtype=torch.float32).reshape(-1, 1).to(device)

In [None]:
trainset, testset = random_split(TensorDataset(x_tensor, y_tensor), [0.8, 0.2])

In [None]:
train_loader = DataLoader(trainset, shuffle=True, batch_size=32)
test_loader = DataLoader(testset, shuffle=False, batch_size=32)

In [None]:
from torch.utils.tensorboard import SummaryWriter

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, epoch, lr, use_RMSE, writer = None):
  model.train()
  size = len(dataloader.dataset)
  num_batches = len(dataloader)
  train_loss, correct = 0, 0
  running_size = 0
  running_batches = 0
  for X_batch, y_batch in (pbar := tqdm(dataloader, desc = 'Train: ')):
    y_pred = model(X_batch)
    loss = loss_fn(y_pred, y_batch)
    if use_RMSE:
      loss = torch.sqrt(loss)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    train_loss += loss.item()
    running_size += len(y_batch)
    running_batches +=1
    pbar.set_postfix_str(f"loss: {float(train_loss/running_batches)}, [{running_size}/{size}]")

  train_loss /= num_batches

  if writer is not None:
    if use_RMSE:
      writer.add_scalar("Loss", train_loss, epoch+1)
    else:
      writer.add_scalar("Loss", np.sqrt(train_loss), epoch+1)


def test_loop(dataloader, model, loss_fn, epoch, lr, use_RMSE, writer = None):
  model.eval()
  size = len(dataloader.dataset)
  num_batches = len(dataloader)
  test_loss, correct = 0, 0
  running_size = 0
  running_batches = 0

  for X_batch, y_batch in (pbar := tqdm(dataloader, desc = 'Validation: ')):
    y_pred = model(X_batch)
    loss = loss_fn(y_pred, y_batch)
    if use_RMSE:
      loss = torch.sqrt(loss)
    test_loss += loss.item()
    running_size += len(y_batch)
    running_batches +=1
    pbar.set_postfix_str(f"loss: {float(test_loss/running_batches)}, [{running_size}/{size}]")

  test_loss /= num_batches

  if writer is not None:
    if use_RMSE:
      writer.add_scalar("Val Loss", test_loss, epoch+1)
    else:
      writer.add_scalar("Val Loss", np.sqrt(test_loss), epoch+1)
  return test_loss

class EarlyStopping:
  def __init__(self, tolerance=10, min_delta=0):

    self.tolerance = tolerance
    self.min_delta = min_delta
    self.counter = 0
    self.best_weights = None
    self.best_loss = np.inf

  def __call__(self, validation_loss, model):
    self._update_best_model_(validation_loss)
    if (self.best_loss - validation_loss) < self.min_delta:
      self.counter +=1
      if self.counter >= self.tolerance:  
          return True
    else:
      self.counter = 0
      self.best_loss = validation_loss
    return False

  def _update_best_model_(self, validation_loss):
    if validation_loss < self.best_loss:
      self.best_weights = copy.deepcopy(model.state_dict())


In [None]:
n_epochs = 100
loss_fn = nn.MSELoss()

In [None]:
x.shape[1]

In [None]:
model_states = []
for run_id, (lr,use_RMSE) in enumerate(product(*param_values)):
    print("run id:", run_id + 1)
    model = RegNet_4(x.shape[1]).to(device)

    # dodanie grafu obliczeniowego do tensorboard, w przypadku sequential nie wygląda to porywająco

    x_data, labels = next(iter(train_loader))
    writer.add_graph(model, x_data)


    early_stopping = EarlyStopping(tolerance=10, min_delta=0.01)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.MSELoss()
    comment = f' lr = {lr} use_RMSE = {use_RMSE}'
    writer = SummaryWriter(comment=comment)
    for epoch in range(n_epochs):
      print(f"Epoch {epoch+1}\n-------------------------------")
      train_loop(train_loader, model, loss_fn, optimizer, epoch, lr, use_RMSE, writer)
      test_loss = test_loop(test_loader, model, loss_fn, epoch, lr, use_RMSE, writer)
      if early_stopping(validation_loss = test_loss, model = model):
        print(f'\n-------------------------------\nEarly stopped at epoch {epoch+1}')
        model_states.append(early_stopping.best_weights)
        if use_RMSE:
          writer.add_hparams(
                  {"lr": lr, "use_RMSE": use_RMSE},
                  {
                      "loss": test_loss,
                  },
              )
        else:
          writer.add_hparams(
                  {"lr": lr, "use_RMSE": use_RMSE},
                  {
                      "loss": np.sqrt(test_loss),
                  },
              )
        break
writer.close()

In [None]:
writer.log_dir

In [None]:
! rm -r runs

In [None]:
%tensorboard --logdir runs

In [None]:
!kill $( lsof -i:6006 -t )

In [None]:
model = RegNet_4(x.shape[1]).to(device)
model.load_state_dict(model_states[1])

# XAI

W przypadku pytorch dysponujemy dedykowaną biblioteką do XAI - Captum.

In [None]:
!pip install captum

In [None]:
x_train = trainset.dataset.tensors[0][trainset.indices[:100]]
x_test = testset.dataset.tensors[0][testset.indices[:100]]


In [None]:
feature_names = x.columns

In [None]:
# imports from captum library
from captum.attr import LayerConductance, LayerActivation, LayerIntegratedGradients, IntegratedGradients, DeepLift, GradientShap, NoiseTunnel, FeatureAblation, NeuronConductance

## Analiza modelu

Captum oferuje różne metody do sprawdzenia zachowania modelu oparte o gradienty, wartości SHAP czy wykorzystanie szumu. Porównajmy ich zachowanie:

In [None]:
ig = IntegratedGradients(model)
ig_nt = NoiseTunnel(ig)
dl = DeepLift(model)
gs = GradientShap(model)
fa = FeatureAblation(model)

ig_attr_test = ig.attribute(x_test, n_steps=50)
ig_nt_attr_test = ig_nt.attribute(x_test)
dl_attr_test = dl.attribute(x_test)
gs_attr_test = gs.attribute(x_test, x_train)
fa_attr_test = fa.attribute(x_test)

In [None]:
# prepare attributions for visualization

x_axis_data = np.arange(x_test.shape[1])
x_axis_data_labels = list(map(lambda idx: feature_names[idx], x_axis_data))

ig_attr_test_sum = ig_attr_test.detach().numpy().sum(0)
ig_attr_test_norm_sum = ig_attr_test_sum / np.linalg.norm(ig_attr_test_sum, ord=1)

ig_nt_attr_test_sum = ig_nt_attr_test.detach().numpy().sum(0)
ig_nt_attr_test_norm_sum = ig_nt_attr_test_sum / np.linalg.norm(ig_nt_attr_test_sum, ord=1)

dl_attr_test_sum = dl_attr_test.detach().numpy().sum(0)
dl_attr_test_norm_sum = dl_attr_test_sum / np.linalg.norm(dl_attr_test_sum, ord=1)

gs_attr_test_sum = gs_attr_test.detach().numpy().sum(0)
gs_attr_test_norm_sum = gs_attr_test_sum / np.linalg.norm(gs_attr_test_sum, ord=1)

fa_attr_test_sum = fa_attr_test.detach().numpy().sum(0)
fa_attr_test_norm_sum = fa_attr_test_sum / np.linalg.norm(fa_attr_test_sum, ord=1)

lin_weight = model.fc1.weight[0].detach().numpy()
y_axis_lin_weight = lin_weight / np.linalg.norm(lin_weight, ord=1)

width = 0.14
legends = ['Int Grads', 'Int Grads w/SmoothGrad','DeepLift', 'GradientSHAP', 'Feature Ablation', 'Weights']

plt.figure(figsize=(20, 10))

ax = plt.subplot()
ax.set_title('Comparing input feature importances across multiple algorithms and learned weights')
ax.set_ylabel('Attributions')

FONT_SIZE = 16
plt.rc('font', size=FONT_SIZE)            # fontsize of the text sizes
plt.rc('axes', titlesize=FONT_SIZE)       # fontsize of the axes title
plt.rc('axes', labelsize=FONT_SIZE)       # fontsize of the x and y labels
plt.rc('legend', fontsize=FONT_SIZE - 4)  # fontsize of the legend

ax.bar(x_axis_data, ig_attr_test_norm_sum, width, align='center', alpha=0.8, color='#eb5e7c')
ax.bar(x_axis_data + width, ig_nt_attr_test_norm_sum, width, align='center', alpha=0.7, color='#A90000')
ax.bar(x_axis_data + 2 * width, dl_attr_test_norm_sum, width, align='center', alpha=0.6, color='#34b8e0')
ax.bar(x_axis_data + 3 * width, gs_attr_test_norm_sum, width, align='center',  alpha=0.8, color='#4260f5')
ax.bar(x_axis_data + 4 * width, fa_attr_test_norm_sum, width, align='center', alpha=1.0, color='#49ba81')
ax.bar(x_axis_data + 5 * width, y_axis_lin_weight, width, align='center', alpha=1.0, color='grey')
ax.autoscale_view()
plt.tight_layout()

ax.set_xticks(x_axis_data + 0.5)
ax.set_xticklabels(x_axis_data_labels)

plt.legend(legends, loc=3)
plt.show()

Zauważ, że metody wydają się nie być ze sobą zgodne.

In [None]:
def visualize_importances(feature_names, importances, title="Average Feature Importances", plot=True, axis_title="Features"):
    print(title)
    for i in range(len(feature_names)):
        print(feature_names[i], ": ", '%.3f'%(importances[i]))
    x_pos = (np.arange(len(feature_names)))
    if plot:
        plt.figure(figsize=(18,6))
        FONT_SIZE = 10
        plt.rc('font', size=FONT_SIZE)            # fontsize of the text sizes
        plt.rc('axes', titlesize=FONT_SIZE)       # fontsize of the axes title
        plt.rc('axes', labelsize=FONT_SIZE)       # fontsize of the x and y labels
        plt.rc('legend', fontsize=FONT_SIZE - 4)  # fontsize of the legend
        plt.bar(x_pos, importances, align='center')
        plt.xticks(x_pos, feature_names, wrap=True)
        plt.xlabel(axis_title)
        plt.title(title)

In [None]:
visualize_importances(feature_names, np.mean(ig_attr_test.detach().numpy(), axis=0))

## Analiza wybranej warstwy

In [None]:
cond = LayerConductance(model, model.fc1)

In [None]:
cond_vals = cond.attribute(x_test)
cond_vals = cond_vals.detach().numpy()

In [None]:
visualize_importances(range(32),np.mean(cond_vals, axis=0),title="Average Neuron Importances", axis_title="Neurons")

In [None]:
cond = LayerConductance(model, model.fc4)

In [None]:
cond_vals = cond.attribute(x_test)
cond_vals = cond_vals.detach().numpy()

In [None]:
visualize_importances(range(8),np.mean(cond_vals, axis=0),title="Average Neuron Importances", axis_title="Neurons")

In [None]:
plt.hist(cond_vals[:,1], 100);
plt.title("Neuron 1 Distribution")
plt.figure()
plt.hist(cond_vals[:,2], 100);
plt.title("Neuron 2 Distribution");

In [None]:
plt.hist(cond_vals[:,0], 100);
plt.title("Neuron 0 Distribution")
plt.figure()
plt.hist(cond_vals[:,4], 100);
plt.title("Neuron 4 Distribution");

## Analiza wybranego neuronu

In [None]:
neuron_cond = NeuronConductance(model, model.fc4)

In [None]:
neuron_cond_vals_4 = neuron_cond.attribute(x_test, neuron_selector=4)

In [None]:
visualize_importances(feature_names, neuron_cond_vals_4.mean(dim=0).detach().numpy(), title="Average Feature Importances for Neuron 4")


Zadanie - wytrenuj model, w którym więcej neuronów będzie decydowało o predykcji.