In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

import matplotlib.pyplot as plt
import numpy as np

In [2]:
# check for cuda
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Device: ", device)

Device:  cuda


In [None]:
# Function to load and preprocess the data
def load_data(file_path):
    # Load data from CSV
    data = pd.read_csv(file_path)
    data = data[data.iloc[:, -1] >= 0]
    for col in [4, 7, 8]:  # Columns with potential mixed types
        data.iloc[:, col] = pd.to_numeric(data.iloc[:, col], errors='coerce')

    # Drop rows with NaN values in the critical columns
    data = data.dropna(subset=data.columns[[4, 7, 8]])

    # Convert all remaining columns to numeric
    data = data.apply(pd.to_numeric, errors='coerce')

    # Drop any rows with NaN after conversion
    data = data.dropna()

    # Split into input features and output (last column is the output)
    X = data.iloc[:, :-1].values  # All columns except the last one
    y = data.iloc[:, -1].values   # Last column is the target

    # Normalize X[:, 0] and X[:, 2] by their respective max values
    X[:, 0] = X[:, 0] / 0.38615
    X[:, 1] = X[:, 1] / 1.6056
    X[:, 2] = X[:, 2] / 1.518
    X[:, 3] = X[:, 3] / 3.14
    X[:, 4] = X[:, 4] / 2.251
    X[:, 5] = X[:, 5] / 3.14
    X[:, 6] = X[:, 6] / 2.16
    X[:, 7] = X[:, 7] / 3.14

    print("y: ", np.max(y))
    y = y / np.max(y) 

    print(X.shape)

    # Split into train, validation, and test sets (85%, 5%, 10%)
    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.20, random_state=42)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.4, random_state=42)

    # Convert to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.float32).view(-1, 1)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

    # Create DataLoader for batching
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

    train_loader = DataLoader(train_dataset, batch_size=191, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=191, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=191, shuffle=False)

    return train_loader, val_loader, test_loader


In [4]:
class EarlyStopping:
    def __init__(self, patience=10, min_delta=1e-4):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
        
    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                return True
        else:
            self.best_loss = val_loss
            self.counter = 0
        return False

In [5]:
# Training loop
def train(model, train_loader, val_loader, criterion, optimizer, epochs=100):
    model.train()  # Set the model to training mode
    early_stopping = EarlyStopping(patience=8, min_delta=1e-4)
    # scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=8, verbose=True, min_lr=1e-6)
    for epoch in range(epochs):
        epoch_loss = 0.0
        for inputs, target in train_loader:
            optimizer.zero_grad()  # Zero the gradients
            inputs, target = inputs.to(device), target.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, target)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            # scheduler.step()

            epoch_loss += loss.item()

        # Validation step
        val_loss = 0.0
        model.eval()  # Set the model to evaluation mode
        with torch.no_grad():
            for inputs, target in val_loader:
                inputs, target = inputs.to(device), target.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, target)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)
        # scheduler.step(avg_val_loss)
        # Print training and validation loss
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {epoch_loss / len(train_loader)}, Validation Loss: {val_loss / len(val_loader)}")
        if early_stopping(val_loss/len(val_loader)):
            print(f"Early stopping triggered at epoch {epoch+1}")
            break

In [None]:
# Assuming the CSV file is located at 'data.csv'
train_loader, val_loader, test_loader = load_data('dec11_cleaned.csv')

In [None]:
# dimensions of datasets
print(len(train_loader.dataset), len(val_loader.dataset), len(test_loader.dataset))

# initial values of dataset
print(train_loader.dataset[0])


In [8]:
# Test the model and calculate error percentages
def test(model, test_loader, criterion):
    model.to(device)
    model.eval()
    test_loss = 0.0
    all_errors = []
    actuals = []
    predictions = []

    with torch.no_grad():
        for inputs, target in test_loader:
            inputs, target = inputs.to(device), target.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, target)
            test_loss += loss.item()

            # Calculate the error percentage
            error_percent = (torch.abs(outputs - target) / torch.abs(target)) * 100
            all_errors.extend(error_percent.squeeze().tolist())
            actuals.extend(target.squeeze().tolist())
            predictions.extend(outputs.squeeze().tolist())

    # Average test loss
    avg_test_loss = test_loss / len(test_loader)
    print(f"Test Loss: {avg_test_loss}")

    errors_array = np.array(all_errors)

    mid_error_count = np.sum(errors_array > 10)
    mid_error_percentage = (mid_error_count / len(errors_array)) * 100
    print(f"Percentage of test samples with error > 10%: {mid_error_percentage:.2f}%")
    high_error_count = np.sum(errors_array > 20)
    high_error_percentage = (high_error_count / len(errors_array)) * 100
    print(f"Percentage of test samples with error > 20%: {high_error_percentage:.2f}%")

    # Remove outliers from error percentages using IQR
    Q1 = np.percentile(errors_array, 25)
    Q3 = np.percentile(errors_array, 75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    filtered_errors = errors_array[(errors_array >= lower_bound) & (errors_array <= upper_bound)]

    # Plotting the filtered error percentages
    plt.figure(figsize=(10, 5))
    plt.plot(filtered_errors, label="Error Percentage (Filtered)", marker='o', linestyle='', color='b')
    plt.xlabel("Filtered Test Sample Index")
    plt.ylabel("Error Percentage (%)")
    plt.title("Filtered Error Percentage in Predicted vs Actual Values")
    plt.legend()
    plt.grid(True)
    plt.show()

    # Optional: Scatter plot of predictions vs actuals
    plt.figure(figsize=(10, 5))
    plt.scatter(actuals, predictions, label="Predicted vs Actual", color='r', alpha=0.5)
    plt.plot([min(actuals), max(actuals)], [min(actuals), max(actuals)], 'g--', label="Ideal Fit (y=x)")
    plt.xlabel("Actual Values")
    plt.ylabel("Predicted Values")
    plt.title("Predicted vs Actual Values")
    plt.legend()
    plt.grid(True)
    plt.show()

    random_indices = np.random.choice(len(errors_array), 5)

    for i in random_indices:
        print(f"Actual: {actuals[i]}, Predicted: {predictions[i]}, Error: {all_errors[i]:.2f}%")


In [None]:
def dummy_test(model, test_loader, criterion):
    model.to(device)
    model.eval()
    test_loss = 0.0
    all_errors = []
    abs_errors = []
    actuals = []
    predictions = []

    with torch.no_grad():
        for inputs, target in test_loader:
            inputs, target = inputs.to(device), target.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, target)
            test_loss += loss.item()

            # Calculate the error percentage
            error_percent = (torch.abs(outputs - target) / torch.abs(target)) * 100
            all_errors.extend(error_percent.squeeze().tolist())
            abs_errors.extend(torch.abs(outputs - target).squeeze().tolist())
            actuals.extend(target.squeeze().tolist())
            predictions.extend(outputs.squeeze().tolist())

    # Average test loss
    avg_test_loss = test_loss / len(test_loader)
    print(f"Test Loss: {avg_test_loss}")

    errors_array = np.array(all_errors)
    abs_errors_array = np.array(abs_errors)

    mid_error_count = np.sum(errors_array > 10)
    mid_error_percentage = (mid_error_count / len(errors_array)) * 100
    print(f"Percentage of test samples with error > 10%: {mid_error_percentage:.2f}%")
    high_error_count = np.sum(errors_array > 20)
    high_error_percentage = (high_error_count / len(errors_array)) * 100
    print(f"Percentage of test samples with error > 20%: {high_error_percentage:.2f}%")

    # Filter errors less than 20%
    low_error_mask = abs_errors_array <= 0.1
    low_error_actuals = np.array(actuals)[low_error_mask]
    low_error_predictions = np.array(predictions)[low_error_mask]

    # Optional: Scatter plot of predictions vs actuals with errors < 20%
    plt.figure(figsize=(10, 5))
    plt.scatter(low_error_actuals, low_error_predictions, label="Predicted vs Actual (Error < 20%)", color='r', alpha=0.5)
    plt.plot([min(low_error_actuals), max(low_error_actuals)], 
             [min(low_error_actuals), max(low_error_actuals)], 
             'g--', label="Ideal Fit (y=x)")
    plt.xlabel("Actual Values")
    plt.ylabel("Predicted Values")
    plt.title("Predicted vs Actual Values (Errors < 20%)")
    plt.legend()
    plt.grid(True)
    plt.show()

    # Print details of 5 random samples with low errors
    low_error_indices = np.where(low_error_mask)[0]
    random_indices = np.random.choice(low_error_indices, min(5, len(low_error_indices)))

    for i in random_indices:
        print(f"Actual: {actuals[i]}, Predicted: {predictions[i]}, Error: {all_errors[i]:.2f}%")

In [9]:
# Define the neural network model
class DistancePredictor(nn.Module):
    # def __init__(self, input_size=14, hidden_layers=[32, 64, 32, 16], output_size=1): # 19.42%, lr=0.002, epochs=150
    # def __init__(self, input_size=14, hidden_layers=[32, 64, 128], output_size=1): # 18.31%, lr=0.001, epochs=150
    # def __init__(self, input_size=14, hidden_layers=[32, 64, 128, 64, 32], output_size=1): # 16.6%, lr=0.005, epochs=300
    # def __init__(self, input_size=14, hidden_layers=[32, 64], output_size=1): # 21.13%, lr=0.01, epochs=100
    # def __init__(self, input_size=14, hidden_layers=[16, 16], output_size=1): # 24.72%, lr=0.01, epochs=100
    # def __init__(self, input_size=14, hidden_layers=[16, 32, 16], output_size=1): # 21.51%, lr=0.005, epochs=300
    # def __init__(self, input_size=14, hidden_layers=[32, 128, 512, 1024, 256, 16], output_size=1): # 15.8%, lr=0.003, epochs=200
    # def __init__(self, input_size=14, hidden_layers=[32, 128, 512, 256, 16], output_size=1): # 14.78%%, lr=0.003, epochs=200
    # def __init__(self, input_size=14, hidden_layers=[32, 128, 512, 256, 16, 4], output_size=1): # 16.54%, lr=0.002, epochs=150
    # def __init__(self, input_size=14, hidden_layers=[256, 256, 64, 32], output_size=1): # 21.13%, lr=0.01, epochs=100
    # def __init__(self, input_size=14, hidden_layers=[128,256,128], output_size=1):
    def __init__(self, input_size=14, hidden_layers=[1400, 1400], output_size=1): 
        super(DistancePredictor, self).__init__()
        
        # Define the network layers
        layers = []
        in_features = input_size
        for hidden_units in hidden_layers:
            layers.append(nn.Linear(in_features, hidden_units))
            layers.append(nn.ReLU())  # Activation function
            # add a dropout layer
            layers.append(nn.Dropout(0.01))
            in_features = hidden_units
        layers.append(nn.Linear(in_features, output_size))  # Output layer

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)

In [11]:
class WeightedMSELoss(nn.Module):
    def __init__(self, weight_factor=10):
        super(WeightedMSELoss, self).__init__()
        self.weight_factor = weight_factor

    def forward(self, inputs, targets):
        weight = torch.where(targets < 0.5, self.weight_factor, 1.0)
        weight = torch.where(targets < 0.25, self.weight_factor*3, self.weight_factor)
        # weight = torch.where(targets > 0.5, 1.0, weight)
        return torch.mean(weight * (inputs - targets) ** 2)

In [12]:
# class WeightedMSELoss(torch.nn.Module):
#     def __init__(self, positive_weight=1.0, negative_weight=9.5):
#         super(WeightedMSELoss, self).__init__()
#         self.positive_weight = positive_weight
#         self.negative_weight = negative_weight

#     def forward(self, y_pred, y_true):
#         weights = torch.where(y_true >= 0, self.positive_weight, self.negative_weight)
#         loss = weights * (y_pred - y_true) ** 2
#         return loss.mean()


In [13]:
# Instantiate the model
model = DistancePredictor().to(device)

# Define loss function and optimizer
# criterion = nn.MSELoss().to(device)
# criterion = nn.HuberLoss().to(device)
# criterion = WeightedMSELoss(weight_factor=70).to(device)  # Adjust the weight factor as needed
criterion = WeightedMSELoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=1.7495e-04)  # Adjust the learning rate as needed

In [None]:
model.load_state_dict(torch.load("model_cnrrt_dec11_new_weight.pth"))

In [15]:
# Train the model
train(model, train_loader, val_loader, criterion, optimizer, epochs=300)

Epoch 1/300, Train Loss: 0.09832068136551532, Validation Loss: 0.07408497602016392
Epoch 2/300, Train Loss: 0.06424453103938287, Validation Loss: 0.05746081264904235
Epoch 3/300, Train Loss: 0.05185185428620931, Validation Loss: 0.046427000076558804
Epoch 4/300, Train Loss: 0.044387859131901373, Validation Loss: 0.04237768365009509
Epoch 5/300, Train Loss: 0.03956249532462921, Validation Loss: 0.039257396388217826
Epoch 6/300, Train Loss: 0.03601262263272278, Validation Loss: 0.03692296997842415
Epoch 7/300, Train Loss: 0.033228756908221906, Validation Loss: 0.03558115279833256
Epoch 8/300, Train Loss: 0.03111396458017517, Validation Loss: 0.034922684098315254
Epoch 9/300, Train Loss: 0.029330702322508534, Validation Loss: 0.03071325433053449
Epoch 10/300, Train Loss: 0.027905381127562606, Validation Loss: 0.028836127880167074
Epoch 11/300, Train Loss: 0.026621661994867887, Validation Loss: 0.02783529499840594
Epoch 12/300, Train Loss: 0.025533222536908515, Validation Loss: 0.027436711

In [None]:
# Test the model on test set
# test(model, test_loader, criterion)
dummy_test(model, test_loader, criterion)

In [17]:
# Saving model's state dictionary
torch.save(model.state_dict(), "model_cnrrt_dec11_new_weight.pth")