<h1> Title <h1>

<h3> Step 1 - Setup </h3>

imports, declaring directories, and utility functions:

load_data(filepath, fraction)
- Reads a dataset from a file.
- Extracts only a specified fraction of the data.
- Returns the extracted portion as a NumPy array.

plot_two_lines()
- Plots two lines on the same graph using Plotly.
- Assigns different colors to each line (blue and red).
- Adds labels for the axes and a title.
- Enables interactive features like hovering over points.
- Displays the plot in a window.

In [8]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import TensorDataset, DataLoader, Dataset
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import optuna
import plotly.graph_objects as go
import json
import os

directory = r"C:\Users\alexa\PycharmProjects\CapstoneProject\.venv"
data_dir = os.path.join(directory, 'data', 'Constellation', '')
results_dir = os.path.join(directory, 'model results', 'Constellation')

def load_data(filepath, fraction):
    # Load the entire dataset
    full_data = pd.read_csv(filepath, sep=r',', header=None).to_numpy()

    # Calculate the number of rows to load
    num_rows = int(len(full_data) * fraction)

    # Select the first `num_rows` rows (contiguous block)
    sampled_data = full_data[:num_rows]

    return sampled_data

def plot_two_lines(x1, y1, x2, y2, title, x_label, y_label, label1="Line 1", label2="Line 2"):
    """
    Creates an interactive Plotly plot for two solid lines over the same x-axis.

    Parameters:
    - x1, y1: Data for the first line
    - x2, y2: Data for the second line
    - title: Plot title
    - x_label: Label for the x-axis
    - y_label: Label for the y-axis
    - label1: Legend name for the first line (default: "Line 1")
    - label2: Legend name for the second line (default: "Line 2")
    """

    # Create an interactive figure
    fig = go.Figure()

    # Add first line trace (solid blue)
    fig.add_trace(go.Scatter(
        x=x1, y=y1,
        mode='lines', name=label1,
        line=dict(color='blue', width=2)  # Solid line, blue color
    ))

    # Add second line trace (solid red)
    fig.add_trace(go.Scatter(
        x=x2, y=y2,
        mode='lines', name=label2,
        line=dict(color='red', width=2)  # Solid line, red color
    ))

    # Configure layout for better visibility
    fig.update_layout(
        title=title,
        xaxis_title=x_label,
        yaxis_title=y_label,
        template="plotly_white",
        hovermode="x",  # Enables hover tool on x-axis
    )

    # Show the interactive plot
    fig.show()

AttributeError: partially initialized module 'torch' has no attribute 'version' (most likely due to a circular import)

Step 2

In [None]:
input_data = load_data(os.path.join(data_dir, '256QAM_input_complex.csv'), 1)
output_data = load_data(os.path.join(data_dir, '256QAM_output_complex.csv'), 1)


class ModelDataset(Dataset):
    def __init__(self, data, window_size, step_size):
        """
        Args:
        - data (numpy array): 2D array of shape (N, 5) with columns [Time, I_in, Q_in, I_out, Q_out]
        - window_size (int): Number of time steps in each window
        - step_size (int): Step size between windows
        """
        self.data = data
        self.window_size = window_size
        self.step_size = step_size
        self.num_windows = (len(self.data) - self.window_size) // self.step_size + 1

    def __len__(self):
        return self.num_windows

    def __getitem__(self, idx):
        """
        Returns a single window of data.

        - X (torch.Tensor): Full input sequence (window_size,)
        - y (torch.Tensor): Target (middle value of output signal)
        - t (float): Time index of the middle point
        """
        start_idx = idx * self.step_size
        end_idx = start_idx + self.window_size
        window = self.data[start_idx:end_idx]

        # Extract input, output, and time signals
        time_signal = window[:, 0]  # Extract time column
        input_I = window[:, 1]
        input_Q = window[:, 2]
        output_I = window[:, 3]
        output_Q = window[:, 4]

        # Find middle index
        middle_index = len(output_I) // 2
        middle_time = time_signal[middle_index]  # Time at middle of the window

        # Normalize each window independently
        input_I_mean, input_I_std = input_I.mean(), input_I.std()
        input_Q_mean, input_Q_std = input_Q.mean(), input_Q.std()
        output_I_mean, output_I_std = output_I.mean(), output_I.std()
        output_Q_mean, output_Q_std = output_Q.mean(), output_Q.std()

        if input_I_std != 0:
            input_I = (input_I - input_I_mean) / input_I_std
        if input_Q_std != 0:
            input_Q = (input_Q - input_Q_mean) / input_Q_std
        if output_I_std != 0:
            output_I = (output_I - output_I_mean) / output_I_std
        if output_Q_std != 0:
            output_Q = (output_Q - output_Q_mean) / output_Q_std

        # Convert to PyTorch tensors
        input_tensor = torch.tensor(np.stack([input_I, input_Q], axis = 1), dtype=torch.float32)  # Full input sequence
        target_tensor = torch.tensor([output_I[middle_index], output_Q[middle_index]], dtype=torch.float32)  # Middle output

        return input_tensor, target_tensor, middle_time

# Align data sizes by truncating to the minimum length
if len(input_data) != len(output_data):
    min_length = min(len(input_data), len(output_data))
    input_data = input_data[:min_length]
    output_data = output_data[:min_length]
else:
    print("no truncating necessary")

# Replace NaN values with the mean of the column
for i in range(1, 3):  # Columns 1 and 2 (I and Q)
    if np.isnan(input_data[:, i]).any():
        input_mean = np.nanmean(input_data[:, i])
        input_data[np.isnan(input_data[:, i]), i] = input_mean
    else:
        print("no NaN values found in input data")

for i in range(1, 3):  # Columns 1 and 2 (I and Q)
    if np.isnan(output_data[:, i]).any():
        output_mean = np.nanmean(output_data[:, i])
        output_data[np.isnan(output_data[:, i]), i] = output_mean
    else:
        print("no NaN values found in output data")



# Convert NumPy arrays to lists for Plotly compatibility
time_input_list = input_data[:, 0].tolist()
I_input_list = input_data[:, 1].tolist()
Q_input_list = input_data[:, 2].tolist()

time_output_list = output_data[:, 0].tolist()
I_output_list = output_data[:, 1].tolist()
Q_output_list = output_data[:, 2].tolist()

plot_two_lines(
    x1=time_input_list, y1=I_input_list,
    x2=time_output_list, y2=I_output_list,
    title="raw in-phase input and output data ",
    x_label="time", y_label="amplitude",
    label1="input", label2="output"
)

plot_two_lines(
    x1=time_input_list, y1=Q_input_list,
    x2=time_output_list, y2=Q_output_list,
    title="raw quadrature input and output data ",
    x_label="time", y_label="amplitude",
    label1="input", label2="output"
)

# convert lists to numpy arrays
# input_data = np.vstack(input_data)
# output_data = np.vstack(output_data)

data = np.column_stack((
    input_data[:, 0],  # Time
    input_data[:, 1],  # Input I
    input_data[:, 2],  # Input Q
    output_data[:, 1], # Output I
    output_data[:, 2]  # Output Q
))

# Define window and step sizes
window_size = 256
step_size = 1  # Adjust step size to control overlap

# Create dataset instance
dataset = ModelDataset(data, window_size, step_size)

# Split dataset into train, validation, and test
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

# Define DataLoaders with shuffle only for training
batch_size = 512
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)

# Verify DataLoader
print(f"Total Training Batches: {len(train_loader)}")
for batch_idx, (X_batch, y_batch, _) in enumerate(train_loader):
    print(f"Batch {batch_idx + 1} - X shape: {X_batch.shape}, y shape: {y_batch.shape}")
    break  # Print only the first batch

step3

model 0:

In [None]:
# Define the basic FNN model
class FiberOpticFNN0(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout):
        super(FiberOpticFNN0, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),  # Batch normalization
            nn.ReLU(),
            nn.Dropout(dropout),  # Dropout for regularization
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.BatchNorm1d(hidden_dim // 2),  # Batch normalization
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, output_dim)
        )

    def forward(self, x):
        return self.fc(x)

model 1

In [None]:
# Define the deeper model
class FiberOpticFNN1(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout):
        super(FiberOpticFNN1, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, output_dim)
        )

    def forward(self, x):
        return self.fc(x)

model 2

In [None]:
# Define the wider model
class FiberOpticFNN2(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout):
        super(FiberOpticFNN2, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, hidden_dim * 2),
            nn.BatchNorm1d(hidden_dim * 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, output_dim)
        )
    def forward(self, x):
        return self.fc(x)

model 3

In [None]:
# Define the dynamic model
class FiberOpticFNN3(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout):
        super(FiberOpticFNN3, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, int(hidden_dim * 0.75)),
            nn.BatchNorm1d(int(hidden_dim * 0.75)),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(int(hidden_dim * 0.75), int(hidden_dim * 0.5)),
            nn.BatchNorm1d(int(hidden_dim * 0.5)),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(int(hidden_dim * 0.5), output_dim)
        )
    def forward(self, x):
        return self.fc(x)

model 4

In [None]:
# Define the noise-resilient model
class FiberOpticFNN4(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout):
        super(FiberOpticFNN4, self).__init__()

        # Initial feature extraction
        self.feature_extractor = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU()
        )

        # Noise-focused branch (captures small deviations)
        self.noise_branch = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.Tanh(),
            nn.Dropout(dropout)
        )

        # Residual connection for refined outputs
        self.residual = nn.Linear(hidden_dim, hidden_dim)

        # Final layer combining noise and refined features
        self.combined = nn.Sequential(
            nn.Linear(hidden_dim + hidden_dim // 2, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        # Extract features
        features = self.feature_extractor(x)

        # Process noise-sensitive features
        noise_features = self.noise_branch(features)

        # Add residual connection
        refined_features = features + self.residual(features)

        # Combine noise-sensitive and refined features
        combined_input = torch.cat((refined_features, noise_features), dim=1)

        # Final output
        output = self.combined(combined_input)
        return output

model 5

In [None]:
# Define the Residual Connections model
class FiberOpticFNN5(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(FiberOpticFNN5, self).__init__()
        self.input_layer = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
        )
        self.hidden_layer = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
        )
        self.output_layer = nn.Sequential(
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        x = self.input_layer(x)
        x = x + self.hidden_layer(x)  # Residual connection
        return self.output_layer(x)

step 4

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100, patience=10):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    train_losses = []
    val_losses = []
    best_weights = None

    best_val_loss = float('inf')
    patience_counter = 0

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        for X_batch, y_batch, _ in train_loader:

            # Move data to GPU
            X_batch = X_batch.to(device).view(X_batch.shape[0], -1)
            y_batch = y_batch.to(device)

            optimizer.zero_grad()
            predictions = model(X_batch)
            loss = criterion(predictions, y_batch)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        train_losses.append(running_loss / len(train_loader))

        # Validation phase
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for X_batch, y_batch, _ in val_loader:

                # Move data to GPU
                X_batch = X_batch.to(device).view(X_batch.shape[0], -1)
                y_batch = y_batch.to(device)

                predictions = model(X_batch)
                loss = criterion(predictions, y_batch)
                val_loss += loss.item()

        val_losses.append(val_loss / len(val_loader))

        print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}")

        # Early stopping
        if val_losses[-1] < best_val_loss:
            best_val_loss = val_losses[-1]
            patience_counter = 0
            best_weights = {"model weights": model.state_dict()}
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered")
                break

    return train_losses, val_losses, best_weights

step 5

In [None]:
def objective(trial, model_name, model_class):

    best_val_loss = float("inf")

    # Define model subdirectory
    model_dir = os.path.join(results_dir, '256QAM', model_name, 'saved model')
    os.makedirs(model_dir, exist_ok=True)

    # Hyperparameters to tune
    hidden_dim = trial.suggest_int("hidden dim", 256, 640, step=32)
    lr = trial.suggest_float("lr", 1e-5, 1e-2, log=True)
    weight_decay = trial.suggest_float("weight decay", 1e-6, 1e-3, log=True)

    # Model arguments dictionary
    model_args = {
        "input_dim": window_size * 2,
        "hidden_dim": hidden_dim,
        "output_dim": 2
    }

    if "dropout" in model_class.__init__.__code__.co_varnames:
        dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)
        model_args["dropout"] = dropout_rate

    model = model_class(**model_args)
    criterion = nn.MSELoss()  # Define loss function
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)  # Define optimizer

    # Train the model
    train_losses, val_losses, best_weights = train_model(
        model,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        optimizer=optimizer,
        num_epochs=100,
        patience=10
    )

    if min(val_losses) < best_val_loss:
        best_val_loss = min(val_losses)

        #Save model weights
        weights_path = os.path.join(model_dir, "model_weights_windowed.pth")
        torch.save(best_weights, weights_path)

        # Save best params
        best_params = {
            "hidden_dim": hidden_dim,
            "lr": lr,
            "weight_decay": weight_decay,
            "train_losses": train_losses,
            "val_losses": val_losses
        }
        if "dropout" in model_args:
            best_params["dropout"] = dropout_rate

        params_path = os.path.join(model_dir, "best_params_windowed.json")
        with open(params_path, "w") as json_file:
            json.dump(best_params, json_file, indent=4)
        print(f"Files saved successfully: {weights_path}, {params_path}")

    return best_val_loss



In [None]:

# Dictionary mapping model names to their respective classes
model_info = {
    "Basic": FiberOpticFNN0,
    "Deeper": FiberOpticFNN1,
    "Wider": FiberOpticFNN2,
    "Dynamic": FiberOpticFNN3,
    "Noise Resilient": FiberOpticFNN4,
     "Residual": FiberOpticFNN5
}

# Run Optuna study for each model
for model_name, model_class in model_info.items():
    print(f"🔄 Running Optuna for {model_name} model...")

    # Create an Optuna study
    study = optuna.create_study(direction="minimize")

    # Optimize using lambda to pass additional arguments
    study.optimize(lambda trial: objective(trial, model_name, model_class), n_trials=15)

    print(f"✅ Finished optimization for {model_name}\n")
