# Redes Neuronales RBF

In [122]:
params = {
    "learning_rate": 0.001,
    "momentum": 0.9,
    "acc": 0.0,
    "epoca": 0,
    "input_file": '/data/concentlite.csv',
    "EXP_NAME": 'EXP009',
    "MIN_ACC": 1.0,
    "MIN_ERROR": 1E6,
    "MAX_EPOCAS": 1000,
    "MAX_COUNTER": 50,
    "BATCH_SIZE": 10
}

In [123]:
import numpy as np
import os
import math
import time
import pandas as pd
import json
# NN
import torch
import torch.nn as nn
import torch.nn.functional as F
# RBF source model -----------------------
from src import torch_rbf as rbf
#from torch.utils.data import random_split
from torch.utils.data import DataLoader, Dataset
from torch import optim
from copy import deepcopy
from sklearn.model_selection import train_test_split
## Ploting
import matplotlib.pyplot as plt
%matplotlib inline 
from IPython.display import set_matplotlib_formats
from matplotlib.colors import to_rgba
## Progress bar
from tqdm.notebook import tqdm
# Path
import sys
sys.path.append('/home/sebacastillo/neuralnets/')
from src.utils import get_project_root
root = get_project_root()
## Check torch version
print(f'Using {torch.__version__}')


Using 2.0.0+cu117


In [124]:
device = torch.device('cuda' if torch.cuda.is_available() else torch.device('cpu'))
torch.manual_seed(42)
# GPU operations have a separate seed we also want to set
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)
    torch.cuda.manual_seed_all(42)
# Additionally, some operations on a GPU are implemented stochastic for efficiency
# We want to ensure that all operations are deterministic on GPU (if used) for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [125]:
def load_split_save_data(input_filename, output_name='EXP', split_type='train_test', train_ratio=0.75, validate_ratio=None, test_ratio=None):

    data = pd.read_csv(input_filename)

    # Check if 'exp' folder exists, create it if it doesn't
    if not os.path.exists('exp'):
        os.makedirs('exp')
    
    # Create a subfolder with the output_name
    output_path = os.path.join('exp', output_name)
    if not os.path.exists(output_path):
        os.makedirs(output_path)
        
    if split_type == 'train_validate_test':
        if not validate_ratio or not test_ratio:
            raise ValueError("Please provide validate_ratio and test_ratio for 'train_validate_test' split type.")
        
        train_data, temp_data = train_test_split(data, train_size=train_ratio, random_state=42)
        validate_data, test_data = train_test_split(temp_data, train_size=validate_ratio / (validate_ratio + test_ratio), random_state=42)
        
        # Save the train, validate, and test data as CSV files in the output folder
        train_data.to_csv(os.path.join(output_path, f'{output_name}_train_data.csv'), index=False)
        validate_data.to_csv(os.path.join(output_path, f'{output_name}_validate_data.csv'), index=False)
        test_data.to_csv(os.path.join(output_path, f'{output_name}_test_data.csv'), index=False)


        return train_data, validate_data, test_data    

    elif split_type == 'train_test':
        train_data, test_data = train_test_split(data, train_size=train_ratio, random_state=42)
        
        # Save the train and test data as CSV files in the output folder
        train_data.to_csv(os.path.join(output_path, f'{output_name}_train_data.csv'), index=False)
        test_data.to_csv(os.path.join(output_path, f'{output_name}_test_data.csv'), index=False)


        return train_data, test_data
    
    else:
        raise ValueError("Invalid split_type. Use either 'train_validate_test' or 'train_test'.")



In [126]:
class DATASET(Dataset):  
    '''
    Esta clase maneja la lectura de los datos y provee un mecanismo
    para alimentar los modelos con los patrones.
    '''
    
    #===================================================
    def __init__(self, filename):
        
        #------------------------------------
        # LECTURA DE LOS DATOS
        data = pd.read_csv(filename, header=None).to_numpy() # Levanta los datos en formato numpy
        
        #------------------------------------
        # INSERTAMOS COLUMNA DEL "BIAS"
        #bias = -np.ones((len(data), 1))
        #data = np.concatenate((bias, data), axis=1)  # Insertamos el "bias" en la primera columna
        
        #------------------------------------
        # ALEATORIZO LOS PATRONES (filas)
        idxs = np.arange(len(data))  # Genero un vector de índices
        np.random.shuffle(idxs)
        data = data[idxs,:]
        
        #------------------------------------
        # SEPARO LOS DATOS
        self.x = data[:,:-1].astype(np.float32)
        self.y = data[:,-1].astype(np.float32)  # La clase está en la última columna
    
    #===================================================
    def __len__(self):
        '''
        Devuelve el número de patrones en el dataset.
        '''
        return len(self.x)
    
    
    #===================================================
    def __getitem__(self, idx):
        '''
        Devuelve el/los patrones indicados.
        '''
        return self.x[idx,:], self.y[idx]

In [127]:
def plot_scatter_with_labels(data):
    # Filter data by label
    data_label_1 = data[data[:, -1] == 1][:, 0:2]
    data_label_minus_1 = data[data[:, -1] == -1][:, 0:2]

    # Create scatter plots for each label
    plt.scatter(data_label_1[:, 0], data_label_1[:, 1], label='1', alpha=0.5)
    plt.scatter(data_label_minus_1[:, 0], data_label_minus_1[:, 1], label='-1', alpha=0.5)

    plt.xlabel('Feature 1')
    plt.ylabel('Feature 2')
    plt.legend()
    plt.show()

# Model1: NN + RBFLayer 

- source: https://github.com/rssalessio/PytorchRBFLayer

In [128]:
class MyDataset(Dataset):
    
    def __init__(self, x, y):
        self.x = x
        self.y = y
    
    def __len__(self):
        return self.x.size(0)
    
    def __getitem__(self, idx):
        x = self.x[idx]
        y = self.y[idx]
        return (x, y)


In [129]:
class Network(nn.Module):
    
    def __init__(self, layer_widths, layer_centres, basis_func):
        super(Network, self).__init__()
        self.rbf_layers = nn.ModuleList()
        self.linear_layers = nn.ModuleList()
        for i in range(len(layer_widths) - 1):
            self.rbf_layers.append(rbf.RBF(layer_widths[i], layer_centres[i], basis_func))
            self.linear_layers.append(nn.Linear(layer_centres[i], layer_widths[i+1]))
    
    def forward(self, x):
        out = x
        for i in range(len(self.rbf_layers)):
            out = self.rbf_layers[i](out)
            out = self.linear_layers[i](out)
        return out
    
    def fit(self, x, y, epochs, batch_size, lr, loss_func):
        self.train()
        obs = x.size(0)
        trainset = MyDataset(x, y)
        trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True)
        optimiser = torch.optim.Adam(self.parameters(), lr=lr)
        epoch = 0         
        accuracy = []  
        while epoch < epochs:
            epoch += 1
            current_loss = 0
            batches = 0
            progress = 0
            correct_predictions = 0
            for x_batch, y_batch in trainloader:
                batches += 1
                optimiser.zero_grad()
                y_hat = self.forward(x_batch)
                loss = loss_func(y_hat, y_batch)
                current_loss += (1/batches) * (loss.item() - current_loss)            
                # Calculate accuracy
                pred = torch.sign(y_hat)  # Get predictions as either -1 or 1
                correct_predictions += (pred == y_batch).sum().item()

                loss.backward()
                optimiser.step()
                progress += y_batch.size(0)
                sys.stdout.write('\rEpoch: %d, Progress: %d/%d, Loss: %f, Accuracy: %f      ' % \
                                (epoch, progress, obs, current_loss, correct_predictions / progress))
                sys.stdout.flush()

In [130]:
# experiment
x1 = np.linspace(-1, 1, 101)
x2 = 0.5*np.cos(np.pi*x1) + 0.5*np.cos(4*np.pi*(x1+1)) # <- decision boundary

samples = 200
x = np.random.uniform(-1, 1, (samples, 2))
for i in range(samples):
    if i < samples//2:
        x[i,1] = np.random.uniform(-1, 0.5*np.cos(np.pi*x[i,0]) + 0.5*np.cos(4*np.pi*(x[i,0]+1)))
    else:
        x[i,1] = np.random.uniform(0.5*np.cos(np.pi*x[i,0]) + 0.5*np.cos(4*np.pi*(x[i,0]+1)), 1)

steps = 100
x_span = np.linspace(-1, 1, steps)
y_span = np.linspace(-1, 1, steps)
xx, yy = np.meshgrid(x_span, y_span)
values = np.append(xx.ravel().reshape(xx.ravel().shape[0], 1),
                   yy.ravel().reshape(yy.ravel().shape[0], 1),
                   axis=1)

tx = torch.from_numpy(x).float()
ty = torch.cat((torch.zeros(samples//2,1), torch.ones(samples//2,1)), dim=0)


In [136]:
#filename_train_data = str(root) + '/data/XOR_trn.csv'
filename_train_data = str(root) + '/data/concentlite.csv'
filename_validation_data = str(root) +'/data/XOR_tst.csv'
train = pd.read_csv(filename_train_data, header=None).to_numpy() # Levanta los datos en formato numpy
test = pd.read_csv(filename_train_data, header=None).to_numpy() # Levanta los datos en formato numpy


In [140]:
X = train[:,:-1]
y = train[:,-1]
X = torch.from_numpy(X).float()
y = torch.from_numpy(y).float()
y = y.unsqueeze(1)
X.shape, y.shape

(torch.Size([833, 2]), torch.Size([833, 1]))

In [151]:
# Instanciating and training an RBF network with the Gaussian basis function
# This network receives a 2-dimensional input, transforms it into a 40-dimensional
# hidden representation with an RBF layer and then transforms that into a
# 1-dimensional output/prediction with a linear layer

# To add more layers, change the layer_widths and layer_centres lists

layer_widths = [2, 1]
layer_centres = [10]
basis_func = rbf.gaussian
batch_size = 300


rbfnet = Network(layer_widths, layer_centres, basis_func)
rbfnet.fit(X, y, 1000, batch_size, 0.001, nn.MSELoss(reduction='mean'))
rbfnet.eval()


Epoch: 1000, Progress: 833/833, Loss: 0.234270, Accuracy: 0.969988      

Network(
  (rbf_layers): ModuleList(
    (0): RBF()
  )
  (linear_layers): ModuleList(
    (0): Linear(in_features=10, out_features=1, bias=True)
  )
)

# Extra code

# Model2: RBFNet-Simple


In [6]:
import torch
import torch.nn as nn


class RBFNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_centers, sigma):
        super(RBFNet, self).__init__()
        
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.num_centers = num_centers
        
        self.centers = nn.Parameter(torch.randn(num_centers, input_dim))
        self.beta = nn.Parameter(torch.ones(num_centers, 1) / num_centers)
        self.sigma = sigma
        
        self.fc = nn.Linear(num_centers, output_dim)
    
    def radial_basis(self, x):
        C = self.centers.view(self.num_centers, -1)
        x = x.unsqueeze(1).expand(-1, self.num_centers, -1)
        return torch.exp(-torch.sum((x - C) ** 2, dim=2) / (2 * self.sigma ** 2))
    
    def forward(self, x):
        batch_size = x.size(0)
        x = x.view(batch_size, -1)
        H = self.radial_basis(x)
        out = self.fc(H)
        return out

In [9]:
input_dim = 10
hidden_dim = 100
output_dim = 2
num_centers = 20
sigma = 1.0

model = RBFNet(input_dim, hidden_dim, output_dim, num_centers, sigma)

x = torch.randn(32, input_dim)
output = model(x)
output[1]

tensor([ 0.0059, -0.1851], grad_fn=<SelectBackward0>)