# Linear Models and Basis Kernels

In [2]:
import torch
import torch.nn as nn
from torchvision import io
import pandas
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colors

#Predictors
class linear(nn.Module):
    def __init__(self, feature_dims):
        super().__init__()
        self.w = torch.nn.Parameter(torch.rand((feature_dims,)))
        self.b = torch.nn.Parameter(torch.rand((1,)))
    def forward(self, x):
        return torch.sum(self.w * x, dim=1) + self.b
        

class logistic(nn.Module):
    def __init__(self, feature_dims):
        super().__init__()
        self.w = torch.nn.Parameter(torch.rand((feature_dims,)))
        self.b = torch.nn.Parameter(torch.rand((1,)))
    def forward(self, x):
        y = torch.sum(self.w * x, dim=1) + self.b
        return 2. / (1 + torch.exp(-y)) - 1

class multi_linear(nn.Module):
    def __init__(self, feature_dims):
        super().__init__()
        self.conv = torch.nn.Conv1d(1, 1, 3, padding=1)
        self.conv2 = torch.nn.Conv1d(1, 1, 3, padding=1)
        self.conv3 = torch.nn.Conv1d(1, 1, 3, padding=1)
        self.conv4 = torch.nn.Conv1d(1, 1, 3, padding=1)
        self.fc1 = torch.nn.Linear(feature_dims, 5)
    def forward(self, x):
        x = self.conv(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.fc1(x)
        return x
#Loss functions
class loss_wrapper(torch.nn.Module):
    def __init__(self, loss_func):
        super().__init__()
        self.loss_func = loss_func
    
    def forward(self, x, y):
        return self.loss_func(x,y)

#Classifier
def hinge_loss(x, y):
    loss = 1 - x * y
    return torch.mean(torch.where(loss < 0, torch.zeros_like(loss), loss))

def mispredict_loss(x,y):
    loss = - x * y
    return torch.mean(torch.where(loss < 0, torch.zeros_like(loss), loss))

def incorrect(x,y):
    return torch.sum(x*y < 0) / len(x)

#Regression
def MSE_loss(x, y):
    loss = torch.pow(y - x, 2)
    return torch.mean(loss)

def MAE_loss(x, y):
    return torch.mean(torch.abs(y - x))


def l2_regularizer(model, loss, strength):
    def reg_loss(x, y):
        return loss(x,y) + strength * torch.sum(torch.pow(torch.cat([param for param in model.parameters()]),2))
    return reg_loss

def l1_regularizer(model, loss, strength):
    def reg_loss(x, y):
        return loss(x,y) + strength * torch.sum(torch.abs(torch.cat([param for param in model.parameters()])))
    return reg_loss

def rbf(x, z, gamma):
    return torch.exp(- gamma * torch.sum(torch.pow(x-z,2), dim=-1))

def kernel_mat(kernel, x, **kwargs):
    return kernel(x[:,None,:] , x[None,:,:], **kwargs)

In [3]:
def learn(net, optimizer, loss_func, X, y, batch_size=50, device='cpu'):

    torch.backends.cudnn.fastest = True
    calc = loss_func
    net.train()
    totalloss=0
    indicies = torch.randperm(len(X))
    batches = int(np.floor(len(X)/batch_size))
    for batch in range(batches):
        optimizer.zero_grad()
        tx = X[indicies[batch*batch_size:(batch+1)*batch_size]].to(device)
        ty = y[indicies[batch*batch_size:(batch+1)*batch_size]].to(device)
        pred = net(tx)
        loss = calc(pred, ty)
        totalloss += loss.detach()
        loss.backward()
        optimizer.step()
    loss = totalloss/batches
    return loss.item()

def test(net, loss_func, X, y, batch_size=50, device='cpu'):

    calc = loss_func
    net.eval()
    totalloss=0
    batches = int(np.floor(len(X)/batch_size))
    for batch in range(batches):
        tx = X[batch*batch_size:(batch+1)*batch_size].to(device)
        ty = y[batch*batch_size:(batch+1)*batch_size].to(device)
        pred = net(tx)
        totalloss += calc(pred, ty).detach()
    loss = totalloss/batches
    return loss.item()

from IPython import display
def plot_train(trainloss, testloss):
    plt.clf()
    plt.plot(np.linspace(2,len(trainloss),len(trainloss)-1), trainloss[1:], label="train")
    plt.plot(np.linspace(2,len(testloss),len(testloss)-1), testloss[1:], label="test")
    plt.legend()
    plt.xlabel('epoch')
    plt.ylabel('loss') 
    #display.display(plt.gcf())
    plt.show()
    #display.clear_output(wait=True)


def train(model, optimizer, loss, X, y, validation_size, batch_size=100, epochs=1, plot=False, verbose=False, metric=mispredict_loss):

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    ind = np.random.permutation(len(y))
    validationX = X[ind[:validation_size]]
    validationy = y[ind[:validation_size]]
    trainX = X[ind[validation_size:]]
    trainy = y[ind[validation_size:]]
    trainloss = []
    validation_loss = []
    plt.ion()
    epoch = 0
    for i in range(epochs):
        trainloss.append(learn(model, optimizer, loss, trainX, trainy, batch_size=batch_size, device=device))
        validation_loss.append(test(model, loss, validationX, validationy, batch_size=batch_size, device=device))
        if(verbose):
            print("Epoch:", i+1)
            print("Train loss:", trainloss[i])
            print("validation loss:", validation_loss[i])
        epoch += 1
    if(plot):
        plot_train(trainloss,validation_loss)
    Err = test(model, loss_wrapper(metric), validationX, validationy, batch_size=batch_size, device=device)
    print("Average Error (MAE): " + str(Err))
    

In [4]:
torch.manual_seed(0)

def separate_by_class(features, labels):
    classes = torch.unique(labels)
    separated = dict()
    for c in classes:
        separated[c] = features[labels == c]
    return separated

def gaussian(x, mean, std):
    prob = (1 / (np.sqrt(2 * np.pi) * std)) * torch.exp(-((x-mean)**2 / (2 * std**2 )))
    prob[:,std==0] = 1
    return prob

def summary(features, labels):
    sum = dict()
    separated = separate_by_class(features, labels)
    for label in separated:
        properties = dict()
        properties['mean'] = torch.mean(separated[label], dim=0)
        properties['std'] = torch.std(separated[label], dim=0)
        properties['count'] = len(separated[label])
        if (len(separated[label]) < 2):
            properties['std'] = -1
        sum[label] = properties
    return sum

def class_probability(sum, x):
    total_count = 0
    prob = torch.empty((len(x),len(sum)))
    labels = []
    for i, label in enumerate(sum):
        mean = sum[label]['mean']
        std = sum[label]['std']
        count = sum[label]['count']
        prob[:,i] = count * torch.prod(gaussian(x, mean, std),dim=1)
        total_count += count
        labels.append(label)
    return prob / total_count, labels

def discrete_class_probability(features, labels, x, smoothing=0):
    separated = separate_by_class(features,labels)
    prob = torch.empty((len(x),len(separated)))
    lab = []
    for i, label in enumerate(separated):
        exist = separated[label] > 0
        p = (x>0) * (torch.sum(exist,dim=0)+smoothing)/(len(exist)+smoothing*features.shape[-1])
        p[x == 0] = 1
        prob[:,i] = torch.prod(p, dim = 1)
        lab.append(label)
    return prob, lab
    

def predict(features, labels, x, discrete=False, smoothing=0):
    if (discrete):
        prob, label = discrete_class_probability(features, labels, x, smoothing) 
    else:
        prob, label = class_probability(summary(features,labels), x) 
    return torch.argmax(prob,dim=1), label

#Train validation split with random selection
#Input format:
#Two tensors with dimension [samples,feature_group1,feature_group2,...]
def cross_val_split(features, labels, validation_size):
    rand_ind = np.random.permutation(len(features))
    training_features = features[rand_ind[validation_size:]]
    training_labels = labels[rand_ind[validation_size:]]
    validation_features = features[rand_ind[:validation_size]]
    validation_labels = labels[rand_ind[:validation_size]]    
    return (training_features,training_labels,validation_features,validation_labels)

#Normalize by scaling each column linearly to range 0-1
def normalize(x):
    norm = (x - torch.min(x,dim=0)[0])/(torch.max(x,dim=0)[0] - torch.min(x,dim=0)[0])
    norm[torch.isnan(norm)] = 0
    return norm