In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import wandb

from config import config
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import Dataset, DataLoader
from torchviz import make_dot

# Loading, scaling the data
data has been explored and concaternated in `data_preparerationipynb`

In [2]:
# Load the dataset
df = pd.read_csv('../data/train.csv')

In [3]:
# Separate the target variable from the features
y = df['price_range'].values
X = df.drop('price_range', axis=1)

scaler = StandardScaler()
X = scaler.fit_transform(X)

#  Split the data into train, validation, and test sets 0.7, 0.15, 0.15
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=config['seed'])
X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=0.5, random_state=config['seed'])

# Dataset

In [4]:
from sklearn.preprocessing import OneHotEncoder

class PhoneDataset(Dataset):
    def __init__(self, X, y):
        # Convert inputs to torch 32 bit float tensor
        X = torch.tensor(X, dtype=torch.float32)
        # One hot encode the labels
        self.enc = OneHotEncoder(sparse=False)
        y = self.enc.fit_transform(y.reshape(-1, 1))
        y = torch.tensor(y, dtype=torch.float32)
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        if idx < 0 or idx >= len(self):
            raise IndexError(f"Index {idx} is out of range")

        return self.X[idx], self.y[idx]

In [5]:
# how to evaluate model with wandb
def train(args, model, train_loader, optimizer, criterion):
    # Switch model to training mode. This is necessary for layers like dropout, batchnorm etc which behave differently in training and evaluation mode
    model.train()
    train_loss = 0
    correct_train = 0

    # We loop over the data iterator, and feed the inputs to the network and adjust the weights.
    for inputs, targets in train_loader:
        # Zero the gradients
        optimizer.zero_grad()

        # Feed the inputs to the network
        outputs = model(inputs)

        output_category_train = np.argmax(outputs.detach().numpy(), axis=1)
        target_category_train = np.argmax(targets.detach().numpy(), axis=1)
        # Compute the loss
        loss = criterion(outputs, targets)

        # Backpropagate the gradient
        loss.backward()

        # Update the weights
        optimizer.step()

        # Compute the loss sum up batch loss
        train_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        correct_train += (output_category_train == target_category_train).sum().item()

    train_loss /= len(train_loader.dataset)
    accuracy_train = 100. * correct_train / len(train_loader.dataset)
    wandb.log({'train_loss': train_loss, 'train_accuracy': accuracy_train})

In [6]:
def test(args, model, criterion, test_loader):
    # Switch model to evaluation mode. This is necessary for layers like dropout, batchnorm etc which behave differently in training and evaluation mode
    model.eval()
    test_loss = 0
    correct = 0

    with torch.no_grad():
        for data, target in test_loader:
            # Load the input features and labels from the test dataset

            output = model(data)
            output_category = np.argmax(output, axis=1)
            target_category = np.argmax(target, axis=1)

            # Compute the loss sum up batch loss
            test_loss += criterion(output, target).item()
            _, predicted = torch.max(output.data, 1)
            correct += (output_category == target_category).sum().item()

        test_loss /= len(test_loader.dataset)
        accuracy = 100. * correct / len(test_loader.dataset)
        wandb.log({'test_loss': test_loss, 'test_accuracy': accuracy})

Forward feed neural network


In [None]:
class FFNN(nn.Module):
    def __init__(self, input_dim=20, hidden_dims=config['hidden_layers'], output_dim=4, dropout_prob=config['dropout_fix']):
        super(FFNN, self).__init__()
        if hidden_dims[0] is not 0:
            self.input_layer = nn.Linear(input_dim, hidden_dims[0])
            self.hidden_layers = nn.ModuleList([nn.Linear(hidden_dims[i], hidden_dims[i+1]) for i in range(len(hidden_dims)-1)])
            self.output_layer = nn.Linear(hidden_dims[-1], output_dim)
        else:
            self.input_layer = nn.Linear(input_dim, output_dim)
            self.hidden_layers = []
            self.output_layer = nn.Linear(output_dim, output_dim)
        self.dropout = nn.Dropout(p=dropout_prob)
    def init_weights(self):
        for layer in self.hidden_layers:
            nn.init.xavier_uniform_(layer.weight)
        nn.init.xavier_uniform_(self.output_layer.weight)

    def init_bias(self):
        for layer in self.hidden_layers:
            nn.init.zeros_(layer.bias)
        nn.init.zeros_(self.output_layer.bias)


    def forward(self, x):
        x = nn.functional.relu(self.input_layer(x))
        x = self.dropout(x)
        # Check if hidden_layers is in this class
        for layer in self.hidden_layers:
            x = nn.functional.relu(layer(x))
            x = self.dropout(x)
        x = self.output_layer(x)
        x = nn.functional.softmax(x, dim=1)
        return x

# Config

In [None]:
# Sweep initialization



def main():
    global model_name
    global model_dims

    wandb.init(project="phone-price-prediction", name=model_name, config=config)

    # Create the datasets
    train_dataset = PhoneDataset(X_train, y_train)
    val_dataset = PhoneDataset(X_val, y_val)
    test_dataset = PhoneDataset(X_test, y_test)

    # Create the data loaders
    train_loader = DataLoader(train_dataset, batch_size=wandb.config.batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=wandb.config.batch_size)
    test_loader = DataLoader(test_dataset, batch_size=wandb.config.batch_size)

    # Define the device as cpu (training only on cpu)
    device  = torch.device("cpu")

    # Set the seed
    torch.manual_seed(config['seed'])

    # New intialization of the model with dimensions specified in config file
    model = FFNN(hidden_dims=model_dims, dropout_prob=wandb.config.dropout)

    # Initialize the weights and bias
    model.init_weights()
    model.init_bias()

    # Log the model architecture as an image
    dot = make_dot(model(torch.randn(1, 20)), params=dict(model.named_parameters()))
    dot.render('images/model', format='png')
    image = wandb.Image('images/model.png')

    # Log the model architecture as an image
    wandb.log({'model_image': image})

    # Define the loss function and optimizer
    loss = config['loss_pytorch']
    optimizer = optim.Adam(model.parameters(), lr=wandb.config.lr, betas=(config['beta1'], config['beta2']), eps=config['epsilon'])

    # Train the model
    for epoch in range(1, wandb.config.epochs + 1):
        train(config, model, train_loader, optimizer, loss)
        test(config, model, loss , val_loader)
    wandb.finish()

# Loop over predefined architectures and run the sweep agent
for layer in config['hidden_layers']:

    name = 'PyTorch_' + str(layer)
    # Global variables for name and dims workaround
    # Not the best solution but works for now :)
    global model_name
    global model_dims

    # Set global for each run of architecture
    model_name = name
    model_dims = layer
    # Sweep for each of the architectures
    sweep_id = wandb.sweep(sweep=config, project="phone-price-prediction")
    wandb.agent(sweep_id, function=main, count = 1)

## NN Architecture

In [None]:
train_dataset = PhoneDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# Get one batch of training data
X, y = next(iter(train_loader))
print(X.shape, y.shape)
model = FFNN()
y = model(X)

y = model(X)
make_dot(y, params=dict(model.named_parameters()))