In [1]:
import numpy as np
import pandas as pd
import warnings

#suppress warnings
warnings.filterwarnings('ignore')

In [2]:
np.random.seed(42)

In [3]:
import matplotlib.pyplot as plot

In [4]:
from sklearn.metrics import classification_report
from sklearn.model_selection import ShuffleSplit, train_test_split
from sklearn.preprocessing import StandardScaler

# Implemetação

Definição de códigos.

## Rede Neural

MLP com uma camada escondida, utilzando gradiente conjugado para aproximação da hessiana e taxa de aprendizado dinâmica com bisseção.

In [5]:
def train(X_train, y_train, X_valid, y_valid, hidden_layers, ns, dataset='k-fold'):
  scaler = StandardScaler()
  X_train = scaler.fit_transform(X_train)
  X_valid = scaler.transform(X_valid)
    
  N, ne = X_train.shape
  N_valid, _ = X_valid.shape

  # weights
  A = np.random.rand(hidden_layers, ne + 1)/5
  B = np.random.rand(ns, hidden_layers + 1)/5

  X_train = np.hstack([X_train, np.ones((N, 1))])
  X_valid = np.hstack([X_valid, np.ones((N_valid, 1))])

  errors_train = []
  errors_valid = []
  errors_min = []

  # output
  y = calculate_output(X_train, A, B, N)
  error = y - y_train
  square_error_train = 1/N * sum(error * error)
  errors_train.append(square_error_train)
  
  y = calculate_output(X_valid, A, B, N_valid)
  error = y - y_valid
  square_error_valid = 1/N_valid * sum(error * error)
  errors_valid.append(square_error_valid)
  
  min_error = square_error_valid
  best_A = A
  best_B = B

  epoch = 0
  max_epoch = 5000
  
  p = Path(f"MLP/{dataset}/")
  p.mkdir(parents=True, exist_ok=True)
  (p / "error.txt").touch()

  f = open(f"MLP/{dataset}/error.txt","a+")
  f.write(f"epoch: {epoch}, train: {square_error_train}, valid: {square_error_valid}\n")
  while square_error_train > 1e-3 and epoch < max_epoch:
    epoch = epoch + 1

    dJdA_current, dJdB_current = calculate_gradient(X_train, y_train, A, B, N)

    if epoch == 1:
      dJdA = dJdA_current
      dJdB = dJdB_current
      dJdA_old = dJdA_current
      dJdB_old = dJdB_current
    else:
      dJdA, dJdB = direction_conjugate(A, B, dJdA_current, dJdB_current, dJdA_old, dJdB_old, X_train, y_train, N, epoch)
      dJdA_old = dJdA_current
      dJdB_old = dJdB_current

    alpha = bisection(dJdA, dJdB, dJdA_current, dJdB_current, A, B, X_train, y_train, N, epoch)

    A = A - alpha*dJdA;
    B = B - alpha*dJdB;

    y = calculate_output(X_train, A, B, N)
    error = y - y_train
    square_error_train = 1/N * sum(error * error)
    errors_train.append(square_error_train)

    y = calculate_output(X_valid, A, B, N_valid)
    error = y - y_valid
    square_error_valid = 1/N_valid * sum(error * error)
    errors_valid.append(square_error_valid)
    
    if square_error_valid < min_error:
      best_A = A
      best_B = B

      min_error = square_error_valid
      errors_min.append(square_error_valid)

    f.write(f"epoch: {epoch}, train: {square_error_train}, valid: {square_error_valid}\n")
    
  f.close()

  plot.plot(errors_train, 'r', errors_valid, 'g', errors_min, 'b')

  return best_A, best_B, min_error, scaler

In [6]:
def calculate_output(X, A, B, N):
  Zin = np.matmul(X, A.T)
  Z = g(Zin)
  Zb = np.hstack([Z, np.ones((N, 1))])

  Yin = np.matmul(Zb, B.T)
  Y = g(Yin)

  return Y.flatten()

In [7]:
def calculate_gradient(X, Yd, A, B, N):  
  Zin = np.matmul(X, A.T)
  Z = g(Zin)

  Zb = np.hstack([Z, np.ones((N, 1))])

  Yin = np.matmul(Zb, B.T)
  Y = g(Yin)
  Y = Y.flatten()

  error = Y - Yd

  # g() and f() derivatives 
  gl = (1-Y) * Y
  fl = (1-Z) * Z

  dJdB = 1/N * np.matmul((error * gl).T, Zb)
  
  # error
  error_gl = error * gl

  dJdZ = np.matmul(error_gl.reshape(-1, 1), B[:,:-1])
  dJdA = 1/N * np.matmul((dJdZ*fl).T, X)

  return dJdA, dJdB

In [8]:
def direction_conjugate(A, B, gradA, gradB, gradA_old, gradB_old, X, Yd, N, it):
  g0 = np.concatenate((-gradA_old.flatten(), -gradB_old.flatten()))
  gi = np.concatenate((gradA.flatten(), gradB.flatten()))

  if (it % 2) != 0:
    beta = np.divide(np.matmul(gi.flatten().T, (gi.flatten()-g0.flatten())), np.matmul(g0.flatten().T, g0.flatten()))
    if beta < 0:
      beta = np.divide(np.linalg.norm(gi.flatten()), np.linalg.norm(g0.flatten())**2)

    dJdA = gradA - beta * gradA_old
    dJdB = gradB - beta * gradB_old
  else:
    dJdA = gradA
    dJdB = gradB

  return dJdA, dJdB

In [9]:
def g(Zin):
  return 1/(1 + np.exp(-Zin))

In [10]:
def bisection(dJdA, dJdB, dJdA_current, dJdB_current, A, B, X, Yd, N, epoch):
    def alpha_gen():
      alpha_g = np.random.random()
      while h_l(alpha_g, dJdA, dJdB, dJdA_current, dJdB_current, A, B, X, Yd, N, epoch) < 0:
        alpha_g = alpha_g * 2

      return alpha_g

    alpha_l = 0
    alpha_u = alpha_gen()
    alpha = (alpha_l + alpha_u) / 2

    hl = h_l(alpha, dJdA, dJdB, dJdA_current, dJdB_current, A, B, X, Yd, N, epoch)

    it = 0
    it_max = int(np.ceil(np.log(alpha_u - alpha_l) - np.log(1e-5))/np.log(2))
    while it < it_max:
        it += 1
        
        if hl > 0:
            alpha_u = alpha
        elif hl < 0:
            alpha_l = alpha
        elif hl == 0:
          return alpha

        alpha = (alpha_l + alpha_u) / 2
        hl = h_l(alpha, dJdA, dJdB, dJdA_current, dJdB_current, A, B, X, Yd, N, epoch)

    return alpha

In [11]:
def h_l(alpha, dJdA, dJdB, dJdA_current, dJdB_current, A, B, X, Yd, N, epoch):
  An = A - alpha * dJdA;
  Bn = B - alpha * dJdB;

  gradAn, gradBn = calculate_gradient(X, Yd, An, Bn, N)
  dJdAn, dJdBn = direction_conjugate(A, B, gradAn, gradBn, dJdA_current, dJdB_current, X, Yd, N, epoch)
  
  grad = np.concatenate((dJdA.flatten(), dJdB.flatten()))
  grad_alpha = np.concatenate((dJdAn.flatten(), dJdBn.flatten()))

  return np.dot(grad_alpha.T.flatten(), -grad.flatten())

## Treinamento

Definição de código para seleção do número de neurônios atráves de validação com 5 folds.

In [12]:
def best_params(X, y, hidden_layers, ns):
  rs = ShuffleSplit(n_splits=5, test_size=.20, random_state=42)

  errors = []
  for train_index, valid_index in rs.split(X, y):
    df_train_x = np.array([X[idx] for idx in train_index])
    df_train_y = np.array([y[idx] for idx in train_index])

    df_valid_x = np.array([X[idx] for idx in valid_index])
    df_valid_y = np.array([y[idx] for idx in valid_index])

    A, B, error_valid, scaler = train(df_train_x, df_train_y, df_valid_x, df_valid_y, hidden_layers, ns)
    print("VALID:", error_valid)

    errors.append(error_valid)

  return np.mean(errors)

In [13]:
def select_params(X, y, ns):
    params = [5, 15, 25, 50, 100]
    errors = {}
    
    for param in params:
        error = best_params(X, y, param, ns)
        errors[error] = param

        print("PARAM:", param, "ERROR:", error)
  
    min_error = min(errors.keys())
    return errors[min_error]

## Teste

In [14]:
def neural_network_test(X, A, B, scaler):
  N, ne = X.shape
  X = np.hstack([X, np.ones((N, 1))])
  X = scaler.transform(X)

  return calculate_output(X, A, B, N)

In [15]:
def test(X, Yd, A, B, scaler):
  N, _ = X.shape

  Y = neural_network_test(X, A, B, scaler)
  error = Y - Yd

  square_error = 1/N * sum(error * error)
  return square_error

## Artefatos

In [16]:
from pathlib import Path
import os

p = Path('MLP')
hog_path = p / "HOG"
lbp_path = p / "LBP"

hog_path.mkdir(parents=True, exist_ok=True)
lbp_path.mkdir(parents=True, exist_ok=True)

# Execução

Aplicação dos modelos para datasets.

## HOG

In [17]:
df = pd.read_parquet(r'../../data/preprocessed/feature_matrix_hog.parquet')

In [18]:
df = df.dropna()

In [19]:
df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,51,52,53,54,55,56,57,58,59,match
0,2.306814,3.954538,3.624993,3.460221,2.636359,3.130676,2.636359,2.306814,1.647724,2.636359,...,0.295641,0.0,0.295641,0.0,0.0,0.14782,0.0,0.0,0.14782,True
1,2.193303,2.976625,3.759947,3.603283,4.073276,4.073276,1.253316,1.40998,2.349967,0.939987,...,0.500357,0.333572,0.333572,0.0,0.333572,0.0,0.500357,0.0,0.166786,True
2,3.87563,3.720604,3.100504,3.410554,3.100504,4.030655,1.860302,1.860302,1.550252,1.395227,...,0.482976,0.160992,0.321984,0.160992,0.321984,0.160992,0.160992,0.160992,0.160992,True
3,4.624305,3.5453,3.5453,2.466296,3.391157,2.928726,1.849722,2.158009,1.387291,1.695578,...,0.13229,0.13229,0.13229,0.0,0.0,0.0,0.13229,0.0,0.13229,True
4,2.934841,4.25552,4.549004,3.962035,3.521809,1.614163,1.614163,1.467421,0.73371,1.173936,...,0.501895,0.167298,0.167298,0.334597,0.167298,0.167298,0.0,0.0,0.334597,True


In [20]:
X = df.iloc[:, :-1].values
y = df.iloc[:, -1].values

In [21]:
X.shape

(3194, 60)

In [22]:
y.shape

(3194,)

In [23]:
y = [int(tag is True) for tag in y]

In [24]:
y = np.array(y)

Separando dados de treino e teste.

In [25]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=42)

In [26]:
X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.20, random_state=42)

Utilizando apenas dados de treino para seleção do número de neurônios para treinamneto final.

In [None]:
neurons = select_params(X_train, y_train, 1)

Rede com 50 neurônios obteve melhor desempenho considerando erro de validação.

In [None]:
neurons

Rede de uma camada com 50 neurônios agora utilizando todos os dados de treino.

In [None]:
f = open(f"MLP/HOG/error.txt","a+")
f.write(f"MLP - DATASET HOG\n\n")
f.write(f"Treino oficial com todo o conjunto de treinamneto e número de neurônios selecionado no k-fold igual a ({neurons}).\n\n")
f.close()

In [None]:
ns = 1

In [None]:
A, B, error, scaler = train(X_train, y_train, X_valid, y_valid, neurons, ns, 'HOG')

In [None]:
error

Verificando erro no conjunto de teste.
Resultado próximo ao obtido com conjunto de treino, demonstrando não ter ocorrido overfit.

In [None]:
error_test = test(X_test, y_test, A, B, scaler)

In [None]:
error_test

In [None]:
f = open(f"MLP/HOG/config.txt","a+")
f.write(f"MLP - DATASET HOG")
f.write(f"\n\n")
f.write(f"Número de neurônios: {neurons}\n")
f.write(f"Erro de validação: {error}\n\n")
f.write(f"Erro de teste: {error_test}\n\n")
f.write(f"\n\n")
f.close()

Gerando matriz de confusão:

In [None]:
y_predict = neural_network_test(X_test, A, B, scaler)
y_predict = [0 if pred < 0.5 else 1 for pred in y_predict]

In [None]:
print(classification_report(y_pred=y_predict, y_true=y_test, labels=[0,1]))

In [None]:
df = pd.DataFrame({"y_test" : y_test, "y_predict" : y_predict})
df.to_csv("MLP/HOG/compare_y.csv", index=False)

Salvando pesos.

In [None]:
with open('./MLP/HOG/A.npy', 'wb') as f:
    np.save(f, A)

In [None]:
with open('./MLP/HOG/B.npy', 'wb') as f:
    np.save(f, B)

Testando recupração de pesos gravados em disco.

In [None]:
with open('./MLP/HOG/A.npy', 'rb') as f:
    A_load = np.load(f)

In [None]:
with open('./MLP/HOG/B.npy', 'rb') as f:
    B_load = np.load(f)

In [None]:
error_test_load = test(X_test, y_test, A_load, B_load)
error_test_load

## LBP

In [None]:
df = pd.read_parquet(r'../../data/preprocessed/feature_matrix_lbp.parquet')

In [None]:
df = df.dropna()

In [None]:
df.head()

In [None]:
X = df.iloc[:, :-1].values
y = df.iloc[:, -1].values

In [None]:
X.shape

In [None]:
y.shape

In [None]:
y = [int(tag is True) for tag in y]

In [None]:
y = np.array(y)

Separando dados de treino e teste.

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=42)

In [None]:
X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.20, random_state=42)

Utilizando apenas dados de treino para seleção do número de neurônios para treinamneto final.

In [None]:
neurons = select_params(X_train, y_train, 1)

Rede com 50 neurônios, para o conjunto de dados LBP, obteve melhor desempenho considerando erro de validação.

In [None]:
neurons

Rede de uma camada com 50 neurônios agora utilizando todos os dados de treino.

In [None]:
f = open(f"MLP/LBP/error.txt","a+")
f.write(f"MLP - DATASET LBP\n\n")
f.write(f"Treino oficial com todo o conjunto de treinamneto e número de neurônios selecionado no k-fold igual a ({neurons}).\n\n")
f.close()

In [None]:
A, B, error, scaler = train(X_train, y_train, X_valid, y_valid, neurons, 1, 'LBP')

In [None]:
error

Verificando erro no conjunto de teste.
Resultado próximo ao obtido com conjunto de treino, demonstrando não ter ocorrido overfit.

In [None]:
error_test = test(X_test, y_test, A, B, scaler)

In [None]:
error_test

In [None]:
f = open(f"MLP/LBP/config.txt","a+")
f.write(f"MLP - DATASET LBP")
f.write(f"\n\n")
f.write(f"Número de neurônios: {neurons}\n")
f.write(f"Erro de validação: {error}\n\n")
f.write(f"Erro de teste: {error_test}\n\n")
f.write(f"\n\n")
f.close()

Gerando matriz de confusão:

In [None]:
y_predict = neural_network_test(X_test, A, B, scaler)
y_predict = [0 if pred < 0.5 else 1 for pred in y_predict]

In [None]:
print(classification_report(y_pred=y_predict, y_true=y_test, labels=[0,1]))

In [None]:
df = pd.DataFrame({"y_test" : y_test, "y_predict" : y_predict})
df.to_csv("MLP/LBP/compare_y.csv", index=False)

Salvando pesos.

In [None]:
with open('./MLP/LBP/A.npy', 'wb') as f:
    np.save(f, A)

In [None]:
with open('./MLP/LBP/B.npy', 'wb') as f:
    np.save(f, B)

Testando recupração de pesos gravados em disco.

In [None]:
with open('./MLP/LBP/A.npy', 'rb') as f:
    A_load = np.load(f)

In [None]:
with open('./MLP/LBP/B.npy', 'rb') as f:
    B_load = np.load(f)

In [None]:
error_test_load = test(X_test, y_test, A_load, B_load)
error_test_load