In [None]:
import torch
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
%matplotlib inline

# Parameters

In [None]:
import os

# Given parameters
circuit = 'nonlinear'  # 可选值 [nonlinear, differential]          # nonlinear : 非线性电路，          differential ： 微分电路
model_structure = 'CNN'  # 可选值 [CNN, REG] 
input_dim = 100  # 可选值 [25, 50, 100]

learning_rate = 0.0001

# Construct the model name
model_name = f'{circuit}_{input_dim}_{model_structure}'
model_filename = model_name + '.pth'
model_existed = False


# Define the folder path
folder_path = '../pretrained_network/'

# Check if the pretrained model exists
if os.path.exists(folder_path):
    files = os.listdir(folder_path)
    
    if model_filename in files:
        model_filename = os.path.join(folder_path, model_filename)
        # print(f"File '{model_filename}' exists in the folder '{folder_path}'.")
        model_existed = True


# If the file path exists then use the pretrained model
if model_existed:
    print(f"Model file path: {model_filename}")
else:
    print('Pretrained model does not exist')


# Preprocessing

## Remove quantity units from columns

In [None]:
# Eliminate units in dataset (msec, mV, uV, nV, pV, aV), Keep only their values
def remove_units(value):
    parts = value.split()
    if (len(parts) == 2 and parts[1] == 'mV') or (len(parts) == 2 and parts[1] == 'msec'):
        return float(parts[0]) / 1000
    elif (len(parts) == 2 and parts[1] == 'uV') or (len(parts) == 2 and parts[1] == 'usec'):
        return float(parts[0]) / 1e6
    elif (len(parts) == 2 and parts[1] == 'nV'):
        return float(parts[0]) / 1e9
    elif (len(parts) == 2 and parts[1] == 'pV'):
        return float(parts[0]) / 1e12
    elif (len(parts) == 2 and parts[1] == 'aV'):
        return float(parts[0]) / 1e18
    else:
        return float(parts[0])

## Read all dataset into a dataframe

In [None]:
if circuit == 'nonlinear':
    filenames = [
    r'nonlinear_data\nonlinear_50_sine.csv',        # Sine wave data
    r'nonlinear_data\nonlinear_50_cosine.csv',      # Cosine wave data    
    r'nonlinear_data\nonlinear_50_expoDec.csv',     # Exponentially decreasing waveform data
    r'nonlinear_data\nonlinear_50_expoInc.csv',     # Exponentially increasing waveform data
    r'nonlinear_data\nonlinear_50_combined.csv',    # Combined waveform data
    r'nonlinear_data\nonlinear_50_test.csv',        # Test set for evaluation
    # Uncomment the following line to use the updated test set, if use the updated test set, we need to comment original testset:
    # r'nonlinear_data\nonlinear_50_test_updated.csv'  # Updated Test Set: Lower Input Voltage Frequency to Activate Second JFET and Ensure Uncorrupted Waveform
    ]   
else: 
    filenames = [
        r'differential_data\differ_50_sine.csv',
        r'differential_data\differ_50_cosine.csv',
        r'differential_data\differ_50_expoDec.csv',
        r'differential_data\differ_50_expoInc.csv',
        r'differential_data\differ_50_combined.csv',
        r'differential_data\differ_50_test.csv'  # Test set
    ]

# Read, process, and store each file
dataframes = []
for filename in filenames:
    df = pd.read_csv(filename)
    
    # Drop the first unnamed column if it exists
    if 'Unnamed: 0' in df.columns:
        df = df.drop(columns='Unnamed: 0')

    # Drop the first column and rename columns
    df = df.drop(columns=df.columns[0])
    df.columns = ['time', 'v_in', 'v_out']

    # Convert units to base values
    df = df.map(remove_units)

    # Append the processed DataFrame to the list
    dataframes.append(df)

# Concatenate all DataFrames into one
overall_df = pd.concat(dataframes, ignore_index=True)
df = overall_df

# Print the first 10 rows of dataset
df.head(10)

## Normalized Dataset

In [None]:
# Dataframe to numpy
X = df['v_in'].to_numpy()
y = df['v_out'].to_numpy()

from sklearn.preprocessing import MinMaxScaler

def normalize_and_convert(arr):
    scaler = MinMaxScaler()
    # Reshape the array for MinMaxScaler
    arr = arr.reshape(-1, 1)
    # Normalize the entire array
    normalized_arr = scaler.fit_transform(arr)
    # Convert the normalized array to a PyTorch tensor
    return torch.from_numpy(normalized_arr).float()

# Normalize
X_normalized = normalize_and_convert(X)
y_normalized = normalize_and_convert(y)

## Separate Trainset and Testset

In [None]:
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

delta_t = 2e-2
batches = int(len(X_normalized) / input_dim)
t = np.arange(0, 2e-2*input_dim, delta_t)

# Separate into batches 
X_reshaped = X_normalized.view(batches, input_dim)  
y_reshaped = y_normalized.view(batches, input_dim)  

# Split the dataset into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X_reshaped, y_reshaped, test_size=50/300, shuffle=False)
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

# Plot the waveform of each dataset
for batch_idx, (inputs, targets) in enumerate(train_dataset):
    plt.title('Train Set')
    plt.xlabel('Time')
    plt.ylabel('Input Voltage')
    plt.plot(t, inputs, alpha=0.5)
plt.show()

for batch_idx, (inputs, targets) in enumerate(train_dataset):
    plt.title('Train Set')
    plt.xlabel('Time')
    plt.ylabel('Output Voltage')
    plt.plot(t, targets)
plt.show()

for batch_idx, (inputs, targets) in enumerate(test_dataset):
    plt.title('Test Set')
    plt.xlabel('Time')
    plt.ylabel('Input Voltage')
    plt.plot(t, inputs)
plt.show()

for batch_idx, (inputs, targets) in enumerate(test_dataset):
    plt.title('Test Set')
    plt.xlabel('Time')
    plt.ylabel('Output Voltage')
    plt.plot(t, targets)
plt.show()

# Print the length of train and test sets
print(f"Length of Train Set: {len(train_dataset)}")
print(f"Length of Test Set: {len(test_dataset)}")

# Create DataLoaders for training and testing
train_loader = DataLoader(train_dataset, shuffle=False)
test_loader = DataLoader(test_dataset, shuffle=False)


# Model Structure and Training  

## CNN Model Structure

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Nerual Network : CNN 
class VoltageNetConv(nn.Module):
    def __init__(self, input_channels, input_dim):
        super(VoltageNetConv, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv1d(input_channels, 32, kernel_size=3, padding=1)    # Keeps the length the same
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, padding=1)                # Keeps the length the same
        self.flatten = nn.Flatten(start_dim=0)

        # Fully connected layers
        self.relu = nn.ReLU()
        self.fc1 = nn.Linear(64 * input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, input_dim)

    def forward(self, x):
        # Apply convolutional layers with ReLU activation
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))

        # Flatten the output from the conv layers to feed into the fully connected layers
        x = self.flatten(x).unsqueeze(0)

        # Apply fully connected layers
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

## Regression Model Structure 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
# Nerual Network : Regression 
class VoltageNetReg(nn.Module):
    def __init__(self, input_dim):
        super(VoltageNetReg, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)  # Input layer to hidden layer
        self.relu = nn.ReLU()           # Activation function
        self.fc2 = nn.Linear(128, 64)  # Hidden layer to output layer
        self.fc3 = nn.Linear(64, input_dim)  # Hidden layer to output layer


    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        return x

## Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim


if model_structure == 'CNN':
    model = VoltageNetConv(1 , input_dim) # There is only 1 channel input
else:
    model = VoltageNetReg(input_dim)
print(model)

loss = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Move network to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

num_epoch = 200
train_loss_values = []
test_loss_values = []
avg_loss_values = []

if not model_existed:
    # Training loop
    for epoch in range(num_epoch):
        # Set the model to training mode
        model.train()
        for inputs, targets in train_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            optimizer.zero_grad()                   # Clear gradients for the next train
            outputs = model(inputs)                 # Forward pass
            train_loss = loss(outputs, targets)     # Calculate loss
            train_loss.backward()                   # Backward pass
            optimizer.step()                        # Optimize the weights

        # Calculate and store the average training loss for this epoch
        train_loss_values.append(train_loss.item())

        # Evaluate the model on the test data
        model.eval()                                # Set the model to evaluation mode
        with torch.no_grad():
            test_loss_sum = 0.0
            num_batches = 0
            for inputs, targets in test_loader:
                inputs = inputs.to(device)
                targets = targets.to(device)
                outputs = model(inputs)             # Forward pass
                test_loss = loss(outputs, targets)  # Calculate loss
                test_loss_sum += test_loss.item()
                num_batches += 1

            # Calculate and store the average test loss for this epoch
            avg_test_loss = test_loss_sum / num_batches
            test_loss_values.append(avg_test_loss)

        # Print loss every 10 epochs
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch + 1}/{num_epoch}], Training Loss: {train_loss.item():.4f}, Test Loss: {avg_test_loss:.4f}')
        


    # Save trained network's parameters
    torch.save(model.state_dict(), model_filename)

    # Plotting the training and test loss over epochs
    plt.figure(figsize=(10, 5))
    plt.plot(train_loss_values, label='Training Loss')
    plt.plot(test_loss_values, label='Test Loss')
    plt.title('Training and Test Loss per Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.show()


# Prediction & Plots

## Determine Test Loss

In [None]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt

if model_existed:
    model.load_state_dict(torch.load(model_filename))
else: # if model does not exist, save the state of trained model
    torch.save(model.state_dict(), model_name + '.pth')

model.to(device)
model.eval()

# Test loss
loss = nn.MSELoss()
test_loss, test_loss_sum = 0, 0

# Containers for data
test_inputs = []
actual_outputs = []
predicted_outputs = []

with torch.no_grad():
    test_loss = 0
    for inputs, targets in test_loader:
        inputs = inputs
        outputs = model(inputs)
        targets = targets

        # Calculate the test loss
        test_inputs.append(inputs)
        test_loss = loss(outputs, targets)
        test_loss_sum += test_loss.item()

        # Store data for plotting
        actual_outputs.append(targets)
        predicted_outputs.append(outputs)

print(f'Test Loss: {test_loss_sum/50}')

## Ploting Prediction Graphs

In [None]:
# Converting lists of batches into a single tensor

test_inputs = torch.cat(test_inputs, dim=0).cpu()
actual_outputs = torch.cat(actual_outputs, dim=0).cpu()
predicted_outputs = torch.cat(predicted_outputs, dim=0).cpu()

# Compute and plot correlation coefficient and its CDF
correlation_coefficients = []

# Plotting
for idx in range(test_inputs.shape[0]):
    # Calculate the Pearson correlation coefficient for each sample
    if actual_outputs[idx].numel() > 1:  # Ensure there are enough data points to compute correlation
        correlation = np.corrcoef(actual_outputs[idx].numpy(), predicted_outputs[idx].numpy())[0, 1]
        correlation_coefficients.append(correlation)

    # Create a figure with two subplots side by side
    fig, axs = plt.subplots(1, 2, figsize=(12, 5))

    # Plot 1: Actual vs Predicted Output Voltage
    axs[0].plot(t, test_inputs[idx], label='Input Waveform')
    axs[0].plot(t, actual_outputs[idx], label='Actual Output Waveform')
    axs[0].plot(t, predicted_outputs[idx], linestyle='--', label='Predicted Output Waveform')
    axs[0].set_title(f'Sample {idx + 1} - Comparison of Actual and Predicted Waveform')
    axs[0].set_xlabel('Time')
    axs[0].set_ylabel('Voltage')
    axs[0].legend()
    axs[0].grid(True)

    # Plot 2: Correlation between Actual and Predicted Voltages
    axs[1].scatter(actual_outputs[idx].numpy(), predicted_outputs[idx].numpy(), alpha=0.5)
    axs[1].set_title(f'Sample {idx + 1} - Correlation between Actual and Predicted Waveform')
    axs[1].set_xlabel('Actual Waveform')
    axs[1].set_ylabel('Predicted Waveform')
    axs[1].grid(True)

    # Adjust layout to prevent overlap
    plt.tight_layout()

    # Show the figure containing both subplots
    plt.show()

## Draw CDF Graph

In [None]:
# Function to plot CDF
def plot_cdf(data, label, color):
    sorted_data = np.sort(data)
    cdf = np.arange(len(sorted_data)) / float(len(sorted_data))
    plt.plot(sorted_data, cdf, label=label, color=color)
    
# Plot CDF of correlation coefficients
plt.figure(figsize=(6, 6))
plot_cdf(np.array(correlation_coefficients), 'Correlation Coefficient', 'blue')
plt.title('CDF of Correlation Coefficients')
plt.xlabel('Correlation Coefficient')
plt.ylabel('Cumulative Proportion')
plt.legend()
plt.grid(True)
plt.show()