In [1]:
import pandas as pd
import numpy as np

In [2]:
file_url = 'https://raw.githubusercontent.com/aso-uts/applied_ds/master/unit3/dataset/Car%20Evaluation.csv'

In [3]:
#Solution:
df = pd.read_csv(file_url)

In [4]:
# Solution
df.head()

Unnamed: 0,buying_price,maintenance_cost,doors,persons_capacity,luggage_boot,safety,evaluation
0,vhigh,vhigh,2,2,small,low,unacc
1,vhigh,vhigh,2,2,small,med,unacc
2,vhigh,vhigh,2,2,small,high,unacc
3,vhigh,vhigh,2,2,med,low,unacc
4,vhigh,vhigh,2,2,med,med,unacc


In [5]:
df_cleaned = df.copy()

In [6]:
# Solution
cats_dict = {
    'buying_price': [['low', 'med', 'high', 'vhigh']],
    'maintenance_cost': [['low', 'med', 'high', 'vhigh']],
    'doors': [['2', '3', '4', '5more']],
    'persons_capacity': [['2', '4', 'more']],
    'luggage_boot': [['small', 'med', 'big']],
    'safety': [['low', 'med', 'high']],
    'evaluation': [['unacc', 'acc', 'good', 'vgood']],
}

In [7]:
from sklearn.preprocessing import StandardScaler, OrdinalEncoder

In [8]:
for col, cats in cats_dict.items():
    col_encoder = OrdinalEncoder(categories=cats)
    df_cleaned[col] = col_encoder.fit_transform(df_cleaned[[col]])

In [9]:
num_cols = ['buying_price', 'maintenance_cost', 'doors', 'persons_capacity', 'luggage_boot', 'safety']

In [10]:
sc = StandardScaler()

In [11]:
df_cleaned[num_cols] = sc.fit_transform(df_cleaned[num_cols])

In [12]:
df_cleaned['evaluation'] = df_cleaned['evaluation'].astype(int)

In [13]:
def split_sets_random(df, target_col, test_ratio=0.2, to_numpy=False):
    """Split sets randomly

    Parameters
    ----------
    df : pd.DataFrame
        Input dataframe
    target_col : str
        Name of the target column
    test_ratio : float
        Ratio used for the validation and testing sets (default: 0.2)

    Returns
    -------
    Numpy Array
        Features for the training set
    Numpy Array
        Target for the training set
    Numpy Array
        Features for the validation set
    Numpy Array
        Target for the validation set
    Numpy Array
        Features for the testing set
    Numpy Array
        Target for the testing set
    """
    
    from sklearn.model_selection import train_test_split
    
    features, target = pop_target(df=df, target_col=target_col, to_numpy=to_numpy)
    
    X_data, X_test, y_data, y_test = train_test_split(features, target, test_size=test_ratio, random_state=8)
    
    val_ratio = test_ratio / (1 - test_ratio)
    X_train, X_val, y_train, y_val = train_test_split(X_data, y_data, test_size=val_ratio, random_state=8)

    return X_train, y_train, X_val, y_val, X_test, y_test



In [14]:
def save_sets(X_train=None, y_train=None, X_val=None, y_val=None, X_test=None, y_test=None, path='../data/processed/'):
    import numpy as np

    if X_train is not None:
      np.save(f'{path}X_train', X_train)
    if X_val is not None:
      np.save(f'{path}X_val',   X_val)
    if X_test is not None:
      np.save(f'{path}X_test',  X_test)
    if y_train is not None:
      np.save(f'{path}y_train', y_train)
    if y_val is not None:
      np.save(f'{path}y_val',   y_val)
    if y_test is not None:
      np.save(f'{path}y_test',  y_test)

In [15]:
def pop_target(df, target_col, to_numpy=False):
    """Extract target variable from dataframe and convert to nympy arrays if required

    Parameters
    ----------
    df : pd.DataFrame
        Dataframe
    target_col : str
        Name of the target variable
    to_numpy : bool
        Flag stating to convert to numpy array or not

    Returns
    -------
    pd.DataFrame/Numpy array
        Subsetted Pandas dataframe containing all features
    pd.DataFrame/Numpy array
        Subsetted Pandas dataframe containing the target
    """

    df_copy = df.copy()
    target = df_copy.pop(target_col)
    
    if to_numpy:
        df_copy = df_copy.to_numpy()
        target = target.to_numpy()
    
    return df_copy, target

In [16]:
X_train, y_train, X_val, y_val, X_test, y_test = split_sets_random(df_cleaned, target_col='evaluation', test_ratio=0.2, to_numpy=True)

In [17]:
from torch.utils.data import Dataset, DataLoader
class PytorchDataset(Dataset):
    """
    Pytorch dataset
    ...

    Attributes
    ----------
    X_tensor : Pytorch tensor
        Features tensor
    y_tensor : Pytorch tensor
        Target tensor

    Methods
    -------
    __getitem__(index)
        Return features and target for a given index
    __len__
        Return the number of observations
    to_tensor(data)
        Convert Pandas Series to Pytorch tensor
    """
        
    def __init__(self, X, y):
        self.X_tensor = self.to_tensor(X)
        self.y_tensor = self.to_tensor(y)
    
    def __getitem__(self, index):
        return self.X_tensor[index], self.y_tensor[index]
        
    def __len__ (self):
        return len(self.X_tensor)
    
    def to_tensor(self, data):
        return torch.Tensor(np.array(data))

In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [19]:
train_dataset = PytorchDataset(X=X_train, y=y_train)
val_dataset = PytorchDataset(X=X_val, y=y_val)
test_dataset = PytorchDataset(X=X_test, y=y_test)

In [20]:
import pandas as pd
import numpy as np

class NullModel:
    """
    Class used as baseline model for both regression and classification
    ...

    Attributes
    ----------
    target_type : str
        Type of ML problem (default regression)
    y : Numpy Array-like
        Target variable
    pred_value : Float
        Value to be used for prediction
    preds : Numpy Array
        Predicted array

    Methods
    -------
    fit(y)
        Store the input target variable and calculate the predicted value to be used based on the problem type
    get_length
        Calculate the number of observations from the target variable
    predict(y)
        Generate the predictions
    fit_predict(y)
        Perform a fit followed by predict
    """
        
    
    def __init__(self, target_type: str = "regression"):
        self.target_type = target_type
        self.y = None
        self.pred_value = None
        self.preds = None
        
    def fit(self, y):
        self.y = y
        if self.target_type == "regression":
            self.pred_value = y.mean()
        else:
            from scipy.stats import mode
            self.pred_value = mode(y)[0][0]
            
    def get_length(self):
        return len(self.y)
    
    def predict(self, y):
        self.preds = np.full((self.get_length(), 1), self.pred_value)
        return self.preds
    
    def fit_predict(self, y):
        self.fit(y)
        return self.predict(self.y)

In [21]:
baseline_model = NullModel(target_type='classification')
y_base = baseline_model.fit_predict(y_train)

In [22]:
def print_class_perf(y_preds, y_actuals, set_name=None, average='binary'):
    """Print the Accuracy and F1 score for the provided data

    Parameters
    ----------
    y_preds : Numpy Array
        Predicted target
    y_actuals : Numpy Array
        Actual target
    set_name : str
        Name of the set to be printed
    average : str
        Parameter  for F1-score averaging
    Returns
    -------
    """
    from sklearn.metrics import accuracy_score
    from sklearn.metrics import f1_score

    print(f"Accuracy {set_name}: {accuracy_score(y_actuals, y_preds)}")
    print(f"F1 {set_name}: {f1_score(y_actuals, y_preds, average=average)}")

In [23]:
print_class_perf(y_base, y_train, set_name='Training', average='weighted')

Accuracy Training: 0.6988416988416989
F1 Training: 0.5749561249561249


In [24]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [25]:
class PytorchMultiClass(nn.Module):
    def __init__(self, num_features):
        super(PytorchMultiClass, self).__init__()
        
        self.layer_1 = nn.Linear(num_features, 32)
        self.layer_out = nn.Linear(32, 4)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = F.dropout(F.relu(self.layer_1(x)), training=self.training)
        x = self.layer_out(x)
        return self.softmax(x)

In [26]:
model = PytorchMultiClass(X_train.shape[1])

In [27]:
def get_device():
    if torch.cuda.is_available():
        device = torch.device('cuda:0')
    else:
        device = torch.device('cpu') # don't have GPU 
    return device

In [28]:
device = get_device()
model.to(device)

PytorchMultiClass(
  (layer_1): Linear(in_features=6, out_features=32, bias=True)
  (layer_out): Linear(in_features=32, out_features=4, bias=True)
  (softmax): Softmax(dim=1)
)

In [29]:
criterion = nn.CrossEntropyLoss()

In [30]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

In [31]:
def train_classification(train_data, model, criterion, optimizer, batch_size, device, scheduler=None, generate_batch=None):
    """Train a Pytorch multi-class classification model

    Parameters
    ----------
    train_data : torch.utils.data.Dataset
        Pytorch dataset
    model: torch.nn.Module
        Pytorch Model
    criterion: function
        Loss function
    optimizer: torch.optim
        Optimizer
    bacth_size : int
        Number of observations per batch
    device : str
        Name of the device used for the model
    scheduler : torch.optim.lr_scheduler
        Pytorch Scheduler used for updating learning rate
    collate_fn : function
        Function defining required pre-processing steps

    Returns
    -------
    Float
        Loss score
    Float:
        Accuracy Score
    """
    
    # Set model to training mode
    model.train()
    train_loss = 0
    train_acc = 0
    
    # Create data loader
    data = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=generate_batch)
    
    # Iterate through data by batch of observations
    for feature, target_class in data:

        # Reset gradients
        optimizer.zero_grad()
        
        # Load data to specified device
        feature, target_class = feature.to(device), target_class.to(device)
        
        # Make predictions
        output = model(feature)
        
        # Calculate loss for given batch
        loss = criterion(output, target_class.long())

        # Calculate global loss
        train_loss += loss.item()
        
        # Calculate gradients
        loss.backward()

        # Update Weights
        optimizer.step()
        
        # Calculate global accuracy
        train_acc += (output.argmax(1) == target_class).sum().item()

    # Adjust the learning rate
    if scheduler:
        scheduler.step()

    return train_loss / len(train_data), train_acc / len(train_data)

In [32]:
def test_classification(test_data, model, criterion, batch_size, device, generate_batch=None):
    """Calculate performance of a Pytorch multi-class classification model

    Parameters
    ----------
    test_data : torch.utils.data.Dataset
        Pytorch dataset
    model: torch.nn.Module
        Pytorch Model
    criterion: function
        Loss function
    bacth_size : int
        Number of observations per batch
    device : str
        Name of the device used for the model
    collate_fn : function
        Function defining required pre-processing steps

    Returns
    -------
    Float
        Loss score
    Float:
        Accuracy Score
    """    
    
    # Set model to evaluation mode
    model.eval()
    test_loss = 0
    test_acc = 0
    
    # Create data loader
    data = DataLoader(test_data, batch_size=batch_size, collate_fn=generate_batch)
    
    # Iterate through data by batch of observations
    for feature, target_class in data:
        
        # Load data to specified device
        feature, target_class = feature.to(device), target_class.to(device)
        
        # Set no update to gradients
        with torch.no_grad():
            
            # Make predictions
            output = model(feature)
            
            # Calculate loss for given batch
            loss = criterion(output, target_class.long())

            # Calculate global loss
            test_loss += loss.item()
            
            # Calculate global accuracy
            test_acc += (output.argmax(1) == target_class).sum().item()

    return test_loss / len(test_data), test_acc / len(test_data)

In [33]:
N_EPOCHS = 50
BATCH_SIZE = 32

In [34]:
for epoch in range(N_EPOCHS):
    train_loss, train_acc = train_classification(train_dataset, model=model, criterion=criterion, optimizer=optimizer, batch_size=BATCH_SIZE, device=device)
    valid_loss, valid_acc = test_classification(val_dataset, model=model, criterion=criterion, batch_size=BATCH_SIZE, device=device)

    print(f'Epoch: {epoch}')
    print(f'\t(train)\t|\tLoss: {train_loss:.4f}\t|\tAcc: {train_acc * 100:.1f}%')
    print(f'\t(valid)\t|\tLoss: {valid_loss:.4f}\t|\tAcc: {valid_acc * 100:.1f}%')

Epoch: 0
	(train)	|	Loss: 0.0315	|	Acc: 75.2%
	(valid)	|	Loss: 0.0295	|	Acc: 80.9%
Epoch: 1
	(train)	|	Loss: 0.0302	|	Acc: 79.3%
	(valid)	|	Loss: 0.0294	|	Acc: 81.2%
Epoch: 2
	(train)	|	Loss: 0.0294	|	Acc: 82.0%
	(valid)	|	Loss: 0.0289	|	Acc: 83.8%
Epoch: 3
	(train)	|	Loss: 0.0293	|	Acc: 82.3%
	(valid)	|	Loss: 0.0289	|	Acc: 84.1%
Epoch: 4
	(train)	|	Loss: 0.0290	|	Acc: 83.4%
	(valid)	|	Loss: 0.0273	|	Acc: 88.7%
Epoch: 5
	(train)	|	Loss: 0.0286	|	Acc: 84.2%
	(valid)	|	Loss: 0.0285	|	Acc: 84.4%
Epoch: 6
	(train)	|	Loss: 0.0297	|	Acc: 81.2%
	(valid)	|	Loss: 0.0289	|	Acc: 83.5%
Epoch: 7
	(train)	|	Loss: 0.0294	|	Acc: 82.3%
	(valid)	|	Loss: 0.0284	|	Acc: 85.3%
Epoch: 8
	(train)	|	Loss: 0.0294	|	Acc: 81.7%
	(valid)	|	Loss: 0.0288	|	Acc: 83.5%
Epoch: 9
	(train)	|	Loss: 0.0282	|	Acc: 85.5%
	(valid)	|	Loss: 0.0273	|	Acc: 88.4%
Epoch: 10
	(train)	|	Loss: 0.0281	|	Acc: 86.1%
	(valid)	|	Loss: 0.0285	|	Acc: 84.7%
Epoch: 11
	(train)	|	Loss: 0.0285	|	Acc: 84.7%
	(valid)	|	Loss: 0.0275	|	Acc: 88.2%
Ep

In [35]:
test_loss, test_acc = test_classification(test_dataset, model=model, criterion=criterion, batch_size=BATCH_SIZE, device=device)
print(f'\tLoss: {test_loss:.4f}\t|\tAccuracy: {test_acc:.1f}')

	Loss: 0.0286	|	Accuracy: 0.8
