In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from mpl_toolkits.mplot3d import Axes3D  # For 3D surface plot
from matplotlib import cm  # For color maps

# Define the Multilayer Feedforward Neural Network (MLFFNN)
class MLFFNN(nn.Module):
    def _init_(self, input_size, output_size, hidden_layers):
        super(MLFFNN, self)._init_()
        
        layers = []
        layers.append(nn.Linear(input_size, hidden_layers[0]))
        layers.append(nn.Tanh())
        for i in range(1, len(hidden_layers)):
            layers.append(nn.Linear(hidden_layers[i-1], hidden_layers[i]))
            layers.append(nn.Tanh())
        layers.append(nn.Linear(hidden_layers[-1], output_size))
        
        self.network = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.network(x)

# Function to train the network and capture training loss
def train_model(model, criterion, optimizer, train_loader, epochs=100):
    model.train()
    losses = []  # To store loss for each epoch
    
    for epoch in range(epochs):
        total_loss = 0.0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        avg_loss = total_loss / len(train_loader)
        losses.append(avg_loss)  # Append loss for plotting
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.6f}')
    
    return losses  # Return losses for the training error vs. epoch plot

# Function to plot training error vs. epoch
def plot_training_error(losses):
    plt.figure()
    plt.plot(range(1, len(losses) + 1), losses, label='Training Error')
    plt.xlabel('Epoch')
    plt.ylabel('Error (MSE)')
    plt.title('Training Error vs. Epoch')
    plt.legend()
    plt.show()

# Function to scatter plot training and test data with predictions
def scatter_plot_data(model, X_train, y_train, X_test, y_test):
    model.eval()
    with torch.no_grad():
        y_train_pred = model(X_train).detach().numpy()
        y_test_pred = model(X_test).detach().numpy()
    
    plt.figure()
    plt.scatter(X_train[:, 0], X_train[:, 1], c=y_train.flatten(), label='Training Data')
    plt.scatter(X_train[:, 0], X_train[:, 1], c=y_train_pred.flatten(), marker='x', label='Train Prediction')
    plt.title('Training Data Scatter Plot with Predictions')
    plt.legend()
    plt.show()

    plt.figure()
    plt.scatter(X_test[:, 0], X_test[:, 1], c=y_test.flatten(), label='Test Data')
    plt.scatter(X_test[:, 0], X_test[:, 1], c=y_test_pred.flatten(), marker='x', label='Test Prediction')
    plt.title('Test Data Scatter Plot with Predictions')
    plt.legend()
    plt.show()

# Function to plot surface plots for two hidden nodes and the output

# Function to plot surface plots for two hidden nodes and the output
def plot_surface(model, X, epoch, node_idx=[0, 1], grid_size=100):
    model.eval()
    
    # Generate a mesh grid over the input space
    x1_min, x1_max = X[:, 0].min().item(), X[:, 0].max().item()
    x2_min, x2_max = X[:, 1].min().item(), X[:, 1].max().item()
    x1_grid, x2_grid = np.meshgrid(np.linspace(x1_min, x1_max, grid_size),
                                   np.linspace(x2_min, x2_max, grid_size))
    
    # Flatten the grid and create input tensor
    grid_input = np.c_[x1_grid.ravel(), x2_grid.ravel()]
    grid_input_tensor = torch.tensor(grid_input, dtype=torch.float32)
    
    # Get the activations of the hidden layers and the output node
    with torch.no_grad():
        hidden_activations = model.network[:-1](grid_input_tensor).detach().numpy()  # Hidden layer activations
        output_activations = model(grid_input_tensor).detach().numpy()  # Output layer activations
    
    # Reshape activations to match the grid
    hidden_activations_1 = hidden_activations[:, node_idx[0]].reshape(x1_grid.shape)
    hidden_activations_2 = hidden_activations[:, node_idx[1]].reshape(x1_grid.shape)
    output_activations = output_activations.reshape(x1_grid.shape)
    
    # Create the 3D surface plot for the hidden nodes
    fig = plt.figure(figsize=(12, 6))
    ax = fig.add_subplot(121, projection='3d')
    ax.plot_surface(x1_grid, x2_grid, hidden_activations_1, cmap=cm.coolwarm)
    ax.set_title(f'Hidden Node {node_idx[0]} Activations (Epoch {epoch})')
    ax.set_xlabel('X1')
    ax.set_ylabel('X2')
    ax.set_zlabel(f'Activation Node {node_idx[0]}')

    ax2 = fig.add_subplot(122, projection='3d')
    ax2.plot_surface(x1_grid, x2_grid, hidden_activations_2, cmap=cm.coolwarm)
    ax2.set_title(f'Hidden Node {node_idx[1]} Activations (Epoch {epoch})')
    ax2.set_xlabel('X1')
    ax2.set_ylabel('X2')
    ax2.set_zlabel(f'Activation Node {node_idx[1]}')

    # Create another figure for output node surface
    fig2 = plt.figure(figsize=(8, 6))
    ax3 = fig2.add_subplot(111, projection='3d')
    ax3.plot_surface(x1_grid, x2_grid, output_activations, cmap=cm.viridis)
    ax3.set_title(f'Output Node Activation (Epoch {epoch})')
    ax3.set_xlabel('X1')
    ax3.set_ylabel('X2')
    ax3.set_zlabel('Output')
    
    plt.show()

# # Surface plot for hidden nodes and output after 1, 10, 50, and final epoch
# for epoch in [1, 10, 50, 100]:
#     plot_surface(model, X_train, epoch)

# Function to load and normalize the data
def load_and_normalize_data(train_file_path, test_file_path):
    train_data = pd.read_csv(train_file_path)
    test_data = pd.read_csv(test_file_path)
    
    X_train = train_data[['x1', 'x2']].values
    y_train = train_data['output'].values
    
    X_test = test_data[['x1', 'x2']].values
    y_test = test_data['output'].values
    
    scaler_X = StandardScaler()
    scaler_y = StandardScaler()
    
    X_train = scaler_X.fit_transform(X_train)
    y_train = scaler_y.fit_transform(y_train.reshape(-1, 1)).ravel()
    
    X_test = scaler_X.transform(X_test)
    y_test = scaler_y.transform(y_test.reshape(-1, 1)).ravel()
    
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
    
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)
    
    return X_train_tensor, y_train_tensor, X_test_tensor, y_test_tensor

In [None]:
# Hyperparameters
input_size = 2  # Two input features (x1, x2)
output_size = 1  # Single output (function approximation)
hidden_layers = [8]  # Example: 1 hidden layer with 8 nodes
learning_rate = 0.2

# Create the model
model = MLFFNN(input_size, output_size, hidden_layers)

# Define the loss function and optimizer
criterion = nn.MSELoss()  # Mean Squared Error for function approximation
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

# File paths for your dataset
train_file_path = r"C:\Users\deben\OneDrive\Desktop\IITM\PRML\Assignment\Assignment 3\dataset1\Train-2a-25.csv"
test_file_path = r"C:\Users\deben\OneDrive\Desktop\IITM\PRML\Assignment\Assignment 3\dataset1\Test-50.csv"

# Load and normalize the data
X_train, y_train, X_test, y_test = load_and_normalize_data(train_file_path, test_file_path)

# Create DataLoader for batch processing
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Train the model
losses = train_model(model, criterion, optimizer, train_loader, epochs=100)

# Plot the training error vs. epoch
plot_training_error(losses)

# Scatter plot the training and test data with predictions
scatter_plot_data(model, X_train, y_train, X_test, y_test)

# Surface plot for hidden nodes and output after 1, 10, 50, and final epoch
for epoch in [1, 10, 50, 100]:
    plot_surface(model, X_train, epoch)