In [None]:
# load the packages needed
# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import math
import pandas as pd
import io
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(device)

In [None]:
######
###### Part I: use the original trainset and testset to train and test (without data-preprocessing and pre-training)
######
# define the class for training data
class autoinsurance(Dataset):
  def __init__(self):
    # data loading
    df = pd.read_csv(io.BytesIO(uploaded['Autoinsurance_train.csv']),header=None)
    df1=df.iloc[1:,:]
    df1=df1.reset_index(drop=True)
    df1=df1.to_numpy()
    df1=np.vstack(df1).astype(float)
    self.x = torch.from_numpy(df1[:,0:9]).float()
    self.y = torch.from_numpy(df1[:,[9]]).float() # n_samples, 1
    self.n_samples = df1.shape[0]
  def __getitem__(self,index):
    #dataset[0]
    return self.x[index], self.y[index]
  def __len__(self):
    # len(dataset)
    return self.n_samples 

In [None]:
# upload the training set 'Autoinsurance_train.csv' in the folder
from google.colab import files
uploaded = files.upload()

In [None]:
# loading the training set and define the train_dataloader
training_set = autoinsurance()
train_dataloader = DataLoader(dataset=training_set,batch_size=64,shuffle=True,num_workers=2)
train_dataiter = next(iter(train_dataloader))

In [None]:
# define the class for test data
class autoinsurance1(Dataset):
  def __init__(self):
    # data loading
    df = pd.read_csv(io.BytesIO(uploaded['Autoinsurance_test.csv']),header=None)
    df1=df.iloc[1:,:]
    df1=df1.reset_index(drop=True)
    df1=df1.to_numpy()
    df1=np.vstack(df1).astype(float)
    self.x = torch.from_numpy(df1[:,0:9]).float()
    self.y = torch.from_numpy(df1[:,[9]]).float() # n_samples, 1
    self.n_samples = df1.shape[0]
  def __getitem__(self,index):
    #dataset[0]
    return self.x[index], self.y[index]
  def __len__(self):
    # len(dataset)
    return self.n_samples

In [None]:
# upload the test set 'Autoinsurance_test.csv' in the folder
from google.colab import files
uploaded = files.upload()

In [None]:
# loading the test set and define the test_dataloader
test_set = autoinsurance1()
test_dataloader = DataLoader(dataset=test_set,batch_size=1,num_workers=2)
test_dataiter = next(iter(test_dataloader))
Y_test = test_set.y
Y_test=Y_test.squeeze().numpy()

In [None]:
# define a network
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        # Number of input features is 9
        self.layer_1 = nn.Linear(9, 64) 
        self.layer_2 = nn.Linear(64, 64)
        self.layer_out = nn.Linear(64, 1) 
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.1)
        self.batchnorm1 = nn.BatchNorm1d(64)
        self.batchnorm2 = nn.BatchNorm1d(64)       
    def forward(self, inputs):    
      # Compute forward pass: run x through each layer and return a PyTorch tensor
        x = self.relu(self.layer_1(inputs))
        x = self.batchnorm1(x)
        x = self.relu(self.layer_2(x))
        x = self.batchnorm2(x)
        x = self.dropout(x)
        x = self.layer_out(x)
        
        return x

In [None]:
# set up the network, use the BCEWithLogitsLoss as loss function and set up the optimizer
model = Net()
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# train the network
model.train()
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 20 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
epochs = 20
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, criterion, optimizer)


In [None]:
# use the trained network to predict the response for test set
y_pred_list = []
model.eval()
with torch.no_grad():
    for batch, (X_batch, y_labels) in enumerate(test_dataloader):
        y_test_pred = model(X_batch)
        y_test_pred = torch.sigmoid(y_test_pred)
        y_pred_tag = torch.round(y_test_pred)
        y_pred_list.append(y_pred_tag)

y_pred_list = [a.squeeze().tolist() for a in y_pred_list]

In [None]:
# compute the accuracy rate
y_pred_list=np.asarray(y_pred_list)
correct = (y_pred_list==Y_test).sum()
accuracy_rate = correct/len(test_set)
print(accuracy_rate)

In [None]:
####### Part II is in the file 'Data_preprocessing_Normalization+BoxCox.Rmd'

####### Part III: 1. Based on the dataset preprocessed by R, use autoencoder pretrain the trainset features 2. fit the same model again and compare the result with the original model
# upload the preprocessed training set 'Autoinsurance_trainnew.csv'
from google.colab import files
uploaded = files.upload()

In [None]:
# define the class for training data
class autoinsurance(Dataset):
  def __init__(self):
    # data loading
    df = pd.read_csv(io.BytesIO(uploaded['Autoinsurance_trainnew.csv']),header=None)
    df1=df.iloc[1:,:]
    df1=df1.reset_index(drop=True)
    df1=df1.to_numpy()
    df1=np.vstack(df1).astype(float)
    self.x = torch.from_numpy(df1[:,0:9]).float()
    self.y = torch.from_numpy(df1[:,[9]]).float() # n_samples, 1
    self.n_samples = df1.shape[0]
  def __getitem__(self,index):
    #dataset[0]
    return self.x[index], self.y[index]
  def __len__(self):
    # len(dataset)
    return self.n_samples 

In [None]:
# loading the training set and define the train_dataloader
trainingnew_set = autoinsurance()
# extract X and Y from the training set
X_train = trainingnew_set.x
Y_train = trainingnew_set.y
# feat,labe = training_set[0]
# define the train_dataloader
train_dataloader = DataLoader(dataset=trainingnew_set,batch_size=64,shuffle=True,num_workers=2)
train_dataiter = next(iter(train_dataloader))

In [None]:
# upload the preprocessed test set 'Autoinsurance_testnew.csv'
from google.colab import files
uploaded = files.upload()

In [None]:
# define the class for test data
class autoinsurance1(Dataset):
  def __init__(self):
    # data loading
    df = pd.read_csv(io.BytesIO(uploaded['Autoinsurance_testnew.csv']),header=None)
    df1=df.iloc[1:,:]
    df1=df1.reset_index(drop=True)
    df1=df1.to_numpy()
    df1=np.vstack(df1).astype(float)
    self.x = torch.from_numpy(df1[:,0:9]).float()
    self.y = torch.from_numpy(df1[:,[9]]).float() # n_samples, 1
    self.n_samples = df1.shape[0]
  def __getitem__(self,index):
    #dataset[0]
    return self.x[index], self.y[index]
  def __len__(self):
    # len(dataset)
    return self.n_samples

In [None]:
# loading the test set and define the test_dataloader
testnew_set = autoinsurance1()
test_dataloader = DataLoader(dataset=testnew_set,batch_size=1,num_workers=2)
test_dataiter = next(iter(test_dataloader))
Y_test = testnew_set.y

In [None]:
# define the autoencoder
class AE(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder_hidden_layer = nn.Linear(9,5)
        self.encoder_output_layer = nn.Linear(5,5)
        self.decoder_hidden_layer = nn.Linear(5,5)
        self.decoder_output_layer = nn.Linear(5,9)

    def forward(self, features):
        activation = self.encoder_hidden_layer(features)
        code = self.encoder_output_layer(activation)
        activation = self.decoder_hidden_layer(code)
        activation = self.decoder_output_layer(activation)
        reconstructed = activation
        return reconstructed

In [None]:
# create a model from `AE` autoencoder class
# load it to the specified device
model = AE()

# create an optimizer object
# Adam optimizer with learning rate 1e-3
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# mean-squared error loss
criterion = nn.MSELoss()

In [None]:
# train the autoencoder
model.train()
epoches=50
for epoch in range(epoches):
    loss = 0
    for batch, (batch_features, y) in enumerate(train_dataloader):
        # reset the gradients back to zero
        optimizer.zero_grad()
        
        # compute reconstructions
        outputs = model(batch_features)
        
        # compute training reconstruction loss
        train_loss = criterion(outputs, batch_features)
        
        # compute accumulated gradients
        train_loss.backward()
        
        # perform parameter update based on current gradients
        optimizer.step()
        
        # add the mini-batch training loss to epoch loss
        loss += train_loss.item()
    
    # compute the epoch training loss
    loss = loss / len(train_dataloader)
    
    # display the epoch training loss
    print("epoch : {}/{}, loss = {:.6f}".format(epoch + 1, epoches, loss))

In [None]:
# Use the trained autoencoder to map the old features to new features
X_trainnew = model(X_train)
X_trainnew = X_trainnew.detach().numpy()
# define the new training data class based on the new features
class autoinsurance2(Dataset):
  def __init__(self):
    # data loading
    self.x = torch.from_numpy(X_trainnew).float()
    self.y = Y_train
    self.n_samples = X_trainnew.shape[0]
  def __getitem__(self,index):
    #dataset[0]
    return self.x[index], self.y[index]
  def __len__(self):
    # len(dataset)
    return self.n_samples 

In [None]:
# loading the new training set from the class defined above and define the new training dataloader
trainingnewnew_set = autoinsurance2()
# define the new training dataloader
trainnew_dataloader = DataLoader(dataset=trainingnewnew_set,batch_size=64,shuffle=True,num_workers=2)
trainnew_dataiter = next(iter(trainnew_dataloader))

In [None]:
# define the same neural network as part I
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        # Number of input features is 9
        self.layer_1 = nn.Linear(9, 64) 
        self.layer_2 = nn.Linear(64, 64)
        self.layer_out = nn.Linear(64, 1) 
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.1)
        self.batchnorm1 = nn.BatchNorm1d(64)
        self.batchnorm2 = nn.BatchNorm1d(64)       
    def forward(self, inputs):    
      # Compute forward pass: run x through each layer and return a PyTorch tensor
        x = self.relu(self.layer_1(inputs))
        x = self.batchnorm1(x)
        x = self.relu(self.layer_2(x))
        x = self.batchnorm2(x)
        x = self.dropout(x)
        x = self.layer_out(x)
        
        return x

In [None]:
# set up the network and use the BCEWithLogitsLoss as loss function and set up the optimizer
model = Net()
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# train the network
model.train()
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 20 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
epochs = 20
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(trainnew_dataloader, model, criterion, optimizer)

In [None]:
# use the trained network to predict the response for test set
y_pred_list = []
model.eval()
with torch.no_grad():
    for batch, (X_batch, y_labels) in enumerate(test_dataloader):
        y_test_pred = model(X_batch)
        y_test_pred = torch.sigmoid(y_test_pred)
        y_pred_tag = torch.round(y_test_pred)
        y_pred_list.append(y_pred_tag)

y_pred_list = [a.squeeze().tolist() for a in y_pred_list]

In [None]:
# compute the accuracy rate and copare it to that of the original model
Y_test=Y_test.squeeze().numpy()
y_pred_list=np.asarray(y_pred_list)
correct = (y_pred_list==Y_test).sum()
accuracy_rate = correct/len(testnew_set)
accuracy_rate