In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
import os
import random

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Load your data
file_path = 'dataset.csv'  # Replace with the correct file path
data = pd.read_csv(file_path)

# Print the column names to understand their format
print(data.columns)

# Ensure the columns are strings before removing degree symbols
data['Rx_θr'] = data['Rx_θr'].astype(str).str.replace('°', '').astype(float)
data['Rx_Φr'] = data['Rx_Φr'].astype(str).str.replace('°', '').astype(float)
data['RIS_θₙ'] = data['RIS_θₙ'].astype(str).str.replace('°', '').astype(float)
data['RIS_Φₙ'] = data['RIS_Φₙ'].astype(str).str.replace('°', '').astype(float)

# Extract only the angle columns
angle_columns = [col for col in data.columns if 'θᵣ=' in col]
angles = [float(col.split('=')[1]) for col in angle_columns]
angles_rad = np.radians(angles)

# Extract azimuth and elevation angles for the receiver
receiver_angles = data[['Rx_θr', 'Rx_Φr']].values.astype(np.float32)

# Extract beam angles
beam_angles = data[['RIS_θₙ', 'RIS_Φₙ']].values.astype(np.float32)

# Normalize the data if necessary
data_normalized = (data[angle_columns] - data[angle_columns].mean()) / data[angle_columns].std()

# Combine receiver angles and beam angles with the normalized data
X = np.hstack((receiver_angles, beam_angles, data_normalized.values.astype(np.float32)))
y = data_normalized.values.astype(np.float32)  # For this example, we'll use the same data as targets

# Print shapes to verify
print(f'X shape: {X.shape}')
print(f'y shape: {y.shape}')

# Reshape X to include a sequence dimension
seq_length = 1  # Assuming each sample is a sequence of length 1
X_reshaped = X.reshape((X.shape[0], seq_length, X.shape[1]))

# Convert to PyTorch tensors
X_tensor = torch.tensor(X_reshaped, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32)

class CNNModel(nn.Module):
    def __init__(self, input_dim, output_dim, seq_length, dropout_rate=0.2):
        super(CNNModel, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)

        # Layer normalization
        self.layer_norm = nn.LayerNorm(128)

        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

        # Dummy input to calculate fc_input_size
        dummy_input = torch.rand(1, input_dim, seq_length)
        dummy_output = self.conv2(self.conv1(dummy_input))
        self.fc_input_size = dummy_output.view(dummy_output.size(0), -1).size(1)

        # Fully connected layers
        self.fc1 = nn.Linear(self.fc_input_size, 256)
        self.fc2 = nn.Linear(256, output_dim)

        # Activation
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.permute(0, 2, 1)  # (batch_size, in_channels, seq_length)
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = x.permute(0, 2, 1)  # (batch_size, seq_length, in_channels)
        x = self.layer_norm(x)
        x = self.dropout(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.relu(self.fc1(x))
        return self.fc2(x)

# k-Fold Cross-Validation
k = 5
kfold = KFold(n_splits=k, shuffle=True, random_state=42)

# Lists to store the history of losses
train_losses = []
val_losses = []
train_maes = []
val_maes = []
train_rmses = []
val_rmses = []
train_mses = []
val_mses = []

# DataFrame to store evaluation metrics
evaluation_df = pd.DataFrame(columns=['Fold', 'Epoch', 'Train Loss', 'Val Loss', 'Train MAE', 'Val MAE', 'Train RMSE', 'Val RMSE', 'Train MSE', 'Val MSE'])

# Create a directory to save the plots
output_dir = 'beam_pattern_plots_combined_20'
os.makedirs(output_dir, exist_ok=True)

# Perform k-fold cross-validation
for fold, (train_ids, val_ids) in enumerate(kfold.split(X_tensor)):
    print(f'Fold {fold + 1}')
    # Split the data into training and validation sets
    X_train, X_val = X_tensor[train_ids], X_tensor[val_ids]
    y_train, y_val = y_tensor[train_ids], y_tensor[val_ids]

    # Initialize the model, loss function, and optimizer
    model = CNNModel(input_dim=X_tensor.shape[2], output_dim=y_tensor.shape[1], seq_length=seq_length, dropout_rate=0.2)
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-5)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True)

    # Lists to store the history of losses for this fold
    fold_train_losses = []
    fold_val_losses = []
    fold_train_maes = []
    fold_val_maes = []
    fold_train_rmses = []
    fold_val_rmses = []
    fold_train_mses = []
    fold_val_mses = []

    # Train the model
    num_epochs = 300
    for epoch in range(num_epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()

        # Calculate training loss, MAE, RMSE, and MSE
        train_loss = loss.item()
        train_mae = mean_absolute_error(y_train.detach().numpy(), outputs.detach().numpy())
        train_rmse = np.sqrt(mean_squared_error(y_train.detach().numpy(), outputs.detach().numpy()))
        train_mse = mean_squared_error(y_train.detach().numpy(), outputs.detach().numpy())

        # Calculate validation loss, MAE, RMSE, and MSE
        model.eval()
        with torch.no_grad():
            val_outputs = model(X_val)
            val_loss = criterion(val_outputs, y_val).item()
            val_mae = mean_absolute_error(y_val.detach().numpy(), val_outputs.detach().numpy())
            val_rmse = np.sqrt(mean_squared_error(y_val.detach().numpy(), val_outputs.detach().numpy()))
            val_mse = mean_squared_error(y_val.detach().numpy(), val_outputs.detach().numpy())

        # Store the losses for this epoch
        fold_train_losses.append(train_loss)
        fold_val_losses.append(val_loss)
        fold_train_maes.append(train_mae)
        fold_val_maes.append(val_mae)
        fold_train_rmses.append(train_rmse)
        fold_val_rmses.append(val_rmse)
        fold_train_mses.append(train_mse)
        fold_val_mses.append(val_mse)

        # Append the evaluation metrics to the DataFrame
        evaluation_df = pd.concat([evaluation_df, pd.DataFrame({
            'Fold': fold + 1,
            'Epoch': epoch + 1,
            'Train Loss': train_loss,
            'Val Loss': val_loss,
            'Train MAE': train_mae,
            'Val MAE': val_mae,
            'Train RMSE': train_rmse,
            'Val RMSE': val_rmse,
            'Train MSE': train_mse,
            'Val MSE': val_mse
        }, index=[0])], ignore_index=True)

        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Train MAE: {train_mae:.4f}, Val MAE: {val_mae:.4f}, Train RMSE: {train_rmse:.4f}, Val RMSE: {val_rmse:.4f}, Train MSE: {train_mse:.4f}, Val MSE: {val_mse:.4f}')

    # Store the losses for this fold
    train_losses.append(fold_train_losses)
    val_losses.append(fold_val_losses)
    train_maes.append(fold_train_maes)
    val_maes.append(fold_val_maes)
    train_rmses.append(fold_train_rmses)
    val_rmses.append(fold_val_rmses)
    train_mses.append(fold_train_mses)
    val_mses.append(fold_val_mses)

# Calculate the average losses across all folds
avg_train_losses = np.mean(train_losses, axis=0)
avg_val_losses = np.mean(val_losses, axis=0)
avg_train_maes = np.mean(train_maes, axis=0)
avg_val_maes = np.mean(val_maes, axis=0)
avg_train_rmses = np.mean(train_rmses, axis=0)
avg_val_rmses = np.mean(val_rmses, axis=0)
avg_train_mses = np.mean(train_mses, axis=0)
avg_val_mses = np.mean(val_mses, axis=0)

# Plot training vs. validation loss
plt.figure(figsize=(10, 6))
plt.plot(avg_train_losses, label='Training Loss')
plt.plot(avg_val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training vs. Validation Loss')
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(output_dir, 'training_vs_validation_loss.png'))
plt.show()

# Plot training vs. validation MAE
plt.figure(figsize=(10, 6))
plt.plot(avg_train_maes, label='Training MAE')
plt.plot(avg_val_maes, label='Validation MAE')
plt.xlabel('Epoch')
plt.ylabel('Mean Absolute Error')
plt.title('Training vs. Validation MAE')
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(output_dir, 'training_vs_validation_mae.png'))
plt.show()

# Plot training vs. validation RMSE
plt.figure(figsize=(10, 6))
plt.plot(avg_train_rmses, label='Training RMSE')
plt.plot(avg_val_rmses, label='Validation RMSE')
plt.xlabel('Epoch')
plt.ylabel('Root Mean Squared Error')
plt.title('Training vs. Validation RMSE')
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(output_dir, 'training_vs_validation_rmse.png'))
plt.show()

# Plot training vs. validation MSE
plt.figure(figsize=(10, 6))
plt.plot(avg_train_mses, label='Training MSE')
plt.plot(avg_val_mses, label='Validation MSE')
plt.xlabel('Epoch')
plt.ylabel('Mean Squared Error')
plt.title('Training vs. Validation MSE')
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(output_dir, 'training_vs_validation_mse.png'))
plt.show()

# Save the evaluation metrics to a CSV file
evaluation_df.to_csv(os.path.join(output_dir, 'evaluation_metrics.csv'), index=False)

# Make predictions on the test set
model.eval()
with torch.no_grad():
    predictions = model(X_tensor).numpy()

# Check shapes
print(f'y_test shape: {y_tensor.shape}')
print(f'predictions shape: {predictions.shape}')
print(f'angles_rad shape: {len(angles_rad)}')
'''
# Visualize the predictions for all test samples
for i in range(len(y_tensor)):  # Visualize all predictions
    plt.figure(figsize=(8, 8))
    ax = plt.subplot(111, polar=True)
    ax.plot(angles_rad, y_tensor[i].numpy(), label='Measurement')
    ax.plot(angles_rad, predictions[i], label='DNN Model', linestyle='--')
    ax.fill(angles_rad, y_tensor[i].numpy(), alpha=0.3)  # Optional: fill under the true curve
    ax.fill(angles_rad, predictions[i], alpha=0.3)  # Optional: fill under the predicted curve

    # Add receiver location
    rx_theta = np.radians(X_tensor[i][0][0].item())
    rx_phi = X_tensor[i][0][1].item()  # Elevation angle in degrees
    ax.plot(rx_theta, 1, 'ro', markersize=10, label='Receiver')  # Mark the receiver location

    # Add beam angles to the title
    ris_theta = X_tensor[i][0][2].item()
    ris_phi = X_tensor[i][0][3].item()
    ax.set_title(f"Beam Pattern Prediction\nRx_θr: {X_tensor[i][0][0].item():.2f}°, Rx_Φr: {X_tensor[i][0][1].item():.2f}°\nRIS_θₙ: {ris_theta:.2f}°, RIS_Φₙ: {ris_phi:.2f}°", va='bottom', fontsize=14)
    plt.legend(loc="upper right", bbox_to_anchor=(1.1, 1.1), title="Legend")

    # Save the plot to a file
    plot_filename = os.path.join(output_dir, f'beam_pattern_{i}.png')
    plt.savefig(plot_filename)
    plt.close()
'''
print(f'Plots saved to {output_dir}')

## Define a wrapper function for the model
#def model_predict(X):
#    model.eval()
#    with torch.no_grad():
#        return model(torch.tensor(X, dtype=torch.float32)).numpy()
#
## Use a smaller subset of the data for SHAP explanations
#X_shap = X_tensor[:100].numpy()
#
## SHAP values
#explainer = shap.Explainer(model_predict, X_shap)
#shap_values = explainer(X_shap)
#
## Plot SHAP values
#feature_names = ['Rx_θr', 'Rx_Φr', 'RIS_θₙ', 'RIS_Φₙ'] + angle_columns
#shap.summary_plot(shap_values, X_shap, feature_names=feature_names)
#plt.savefig(os.path.join(output_dir, 'shap_summary.png'))
#plt.show()

# Save MSE and other evaluation parameters in a separate CSV file
mse_df = pd.DataFrame({
    'Fold': evaluation_df['Fold'].unique(),
    'Train MSE': [np.mean(fold_train_mses) for fold_train_mses in train_mses],
    'Val MSE': [np.mean(fold_val_mses) for fold_val_mses in val_mses]
})
mse_df.to_csv(os.path.join(output_dir, 'mse_evaluation_metrics.csv'), index=False)
