In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn

In [2]:
# Ignore warnings
import warnings
warnings.filterwarnings('ignore')

In [3]:
# Load data
X = np.load('X.npy')
y = np.load('y.npy')
X_new = np.load('X_new.npy')
y_new = np.load('y_new.npy')

X = np.reshape(X, (120, 1, 10000))
X_new = np.reshape(X_new, (30, 1, 10000))

In [4]:
X.shape, y.shape, X_new.shape, y_new.shape

((120, 1, 10000), (120,), (30, 1, 10000), (30,))

In [5]:
# Convert to tensors
X = torch.from_numpy(X).type(torch.float)
y = torch.from_numpy(y).type(torch.LongTensor)
X_new = torch.from_numpy(X_new).type(torch.float)
y_new = torch.from_numpy(y_new).type(torch.LongTensor)

In [6]:
# Split into training and testing datasets
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, 
                                                    y,
                                                    test_size=0.2, # 0.2 = 20% of data will be test & 80% will be train
                                                    random_state=42) 

len(X_train), len(X_test), len(y_train), len(y_test)

(96, 24, 96, 24)

In [7]:
# Set up device diagnostic code
device = "cpu" if torch.cuda.is_available() else "cpu"

In [8]:
# Build a DNN model
class ConvModelV0(nn.Module):
    def __init__(self,in_shape, hidden_shape, out_shape):
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv1d(in_channels = in_shape,
                      out_channels = hidden_shape,
                      kernel_size = 124,
                      stride = 16,
                      padding = 2),
            nn.BatchNorm1d(hidden_shape),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size = 2, stride = 2))
        
        self.conv_block_2 = nn.Sequential(
            nn.Conv1d(in_channels = hidden_shape,
                      out_channels = hidden_shape,
                      kernel_size = 6,
                      stride = 1,
                      padding = 1),
            nn.BatchNorm1d(hidden_shape),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size = 2, stride = 2))
        
        self.conv_block_3 = nn.Sequential(
            nn.Conv1d(in_channels = hidden_shape,
                      out_channels = hidden_shape,
                      kernel_size = 3,
                      stride = 1,
                      padding = 1),
            nn.BatchNorm1d(hidden_shape),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size = 2, stride = 2))
    
        self.dense_block = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(p=0.5),
            nn.Linear(in_features =760,
                     out_features = 64),
            nn.ReLU(),
            nn.Linear(in_features = 64,
                     out_features = out_shape))
        
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.conv_block_3(x)
        x = self.dense_block(x)
        x = self.softmax(x)
        return x   

In [9]:
# Create a model
torch.manual_seed(42)
model0 = ConvModelV0(in_shape=1, hidden_shape = 10, out_shape = 3).to(device)

In [10]:
model0(X_train[2].unsqueeze(0)).dtype

torch.float32

In [11]:
X_train[2].unsqueeze(0).shape

torch.Size([1, 1, 10000])

In [12]:
# Set up a loss function and an optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model0.parameters(), lr = 0.0001)

# Set up an accuracy metric
from torchmetrics import Accuracy
accuracy_metric = Accuracy(task = 'multiclass', num_classes=3).to(device)

In [13]:
# Create Dataloaders
from torch.utils.data import TensorDataset, DataLoader

# Define your batch size
batch_size = 8

# Create training and testing datasets
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

# Create training and testing data loaders
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

In [14]:
# Training Function
def train_model(model, dataloader, loss_fn, accuracy_metric, device):
    train_acc, train_loss = 0, 0
    for batch, (X, y) in enumerate(dataloader):
        
        X, y = X.to(device), y.to(device)
        
        y_pred = model(X)
        
        loss = loss_fn(y_pred, y)
        train_loss += loss
        train_acc += accuracy_metric(y_pred.argmax(dim=1), y)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    
    print(f"Training Loss: {train_loss:6.2f} | Training Accuracy: {train_acc*100:6.2f}%,", end=' ')
    return train_loss, train_acc

# Testing function
def test_model(model, dataloader, loss_fn, accuracy_metric, device):
    test_loss, acc = 0, 0
    with torch.inference_mode():
        for X_test, y_test in dataloader:
            X_test, y_test = X_test.to(device), y_test.to(device)
            y_pred_test = model(X_test)

            test_loss += loss_fn(y_pred_test, y_test)
            acc += accuracy_metric(y_pred_test.argmax(dim=1), y_test)

        test_loss /= len(dataloader)
        acc /= len(dataloader)
        
    print(f"Testing Loss: {test_loss:6.2f} | Test Accuracy: {acc*100:6.2f}%")
    return test_loss, acc

In [15]:
# Model performance evaluator
torch.manual_seed(42)

def eval_model(model, dataloader, loss_fn, acc_fn, device):
    loss, acc = 0, 0
    model.eval()
    with torch.inference_mode():
        for X, y in tqdm(dataloader):
            X, y = X.to(device), y.to(device)
            y_preds = model(X)
            
            loss += loss_fn(y_preds, y)
            acc += acc_fn(y_preds.argmax(dim=1), y)
            
        loss /= len(dataloader)
        acc /= len(dataloader)
        
    return loss, acc

In [16]:
# Timing our experiments
from timeit import default_timer as timer

def print_training_time(start, end, device=None):
    total_time = end - start
    print(f"\n The training time on {device} is {total_time:.2f} seconds")
    return total_time

In [17]:
# Training a CNN model

train_loss_history = []
train_accuracy_history = []
test_loss_history = []
test_accuracy_history = []

start_time = timer()
epochs = 1

for epoch in range(epochs):
    print(f"\n Epoch: {epoch}\n----------")
    model0.train()
    train_loss, train_acc = train_model(model=model0,
                                       dataloader=train_dataloader,
                                       loss_fn=loss_fn,
                                       accuracy_metric=accuracy_metric,
                                       device=device)

    model0.eval()
    test_loss, test_acc = test_model(model=model0,
                           dataloader=test_dataloader,
                           loss_fn=loss_fn,
                           accuracy_metric=accuracy_metric,
                           device=device)
    
    train_loss_history.append(train_loss.detach().numpy())
    train_accuracy_history.append(train_acc.detach().numpy())
    test_loss_history.append(test_loss.detach().numpy())
    test_accuracy_history.append(test_acc.detach().numpy())

end_time = timer()
# Print experiment duration
cnn_training_time = print_training_time(start=start_time,
                                       end=end_time,
                                       device=device)


 Epoch: 0
----------
Training Loss:   1.09 | Training Accuracy:  29.17%, Testing Loss:   1.09 | Test Accuracy:  50.00%

 The training time on cpu is 0.12 seconds


In [18]:
# Plot metrics
def plot_metrics(train_loss_history,
                 train_accuracy_history,
                 test_loss_history,
                 test_accuracy_history):
    font = {'family': 'Times New Roman', 'size': 16}
    plt.rc('font', **font)
    plt.figure(figsize=(20, 6))
    plt.subplot(1, 2, 1)
    plt.plot(train_loss_history, label='Training Loss', linewidth = 1, color = 'red')
    plt.plot(test_loss_history, label='Testing Loss', linewidth = 1, color = 'blue')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss and Testing Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accuracy_history, label='Training Accuracy', linewidth = 1, color = 'red')
    plt.plot(test_accuracy_history, label='Testing Accuracy', linewidth = 1, color = 'blue')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Training Accuracy and Testing Accuracy')
    plt.legend()
    plt.savefig("output_figure.png", dpi=100)
    plt.show()
    

In [19]:
# from scipy.signal import savgol_filter, gaussian

# window_size = 9  # Must be an odd number
# poly_order = 3

# train_loss_smooth = savgol_filter(train_loss_history, window_size, poly_order)
# train_accuracy_smooth = savgol_filter(train_accuracy_history, window_size, poly_order)

In [20]:
signal = [a + b for a,b in zip(train_loss_history, test_loss_history)]

In [21]:
signal_np = np.array(signal)

In [22]:
signal_np = abs(signal_np - signal_np.max())

In [23]:
signal_np.max()

0.0

In [24]:
# plot_metrics(train_loss_history,
#              train_accuracy_smooth,
#              test_loss_history,
#              test_accuracy_history)

In [25]:
# # Save model
# from pathlib import Path

# save_path = Path("Models")
# save_path.mkdir(parents=True, exist_ok=True)
# model = model0

# model_filename = "model0.pth"  # Adjust the filename as needed
# model_path = save_path / model_filename
# torch.save(model.state_dict(), model_path)

# print(f"Model saved to {model_path}")

In [26]:
# # Load model
# model0_loaded = ConvModelV0(in_shape=1, hidden_shape = 100, out_shape = 3)
# model0_loaded.load_state_dict(torch.load("Models/model0.pth"))

In [27]:
from torchinfo import summary
model = ConvModelV0(in_shape=1, hidden_shape = 10, out_shape = 3)
batch_size = 8
summary(model, input_size=(batch_size, 1, 10000))

Layer (type:depth-idx)                   Output Shape              Param #
ConvModelV0                              [8, 3]                    --
├─Sequential: 1-1                        [8, 10, 309]              --
│    └─Conv1d: 2-1                       [8, 10, 618]              1,250
│    └─BatchNorm1d: 2-2                  [8, 10, 618]              20
│    └─ReLU: 2-3                         [8, 10, 618]              --
│    └─MaxPool1d: 2-4                    [8, 10, 309]              --
├─Sequential: 1-2                        [8, 10, 153]              --
│    └─Conv1d: 2-5                       [8, 10, 306]              610
│    └─BatchNorm1d: 2-6                  [8, 10, 306]              20
│    └─ReLU: 2-7                         [8, 10, 306]              --
│    └─MaxPool1d: 2-8                    [8, 10, 153]              --
├─Sequential: 1-3                        [8, 10, 76]               --
│    └─Conv1d: 2-9                       [8, 10, 153]              310
│    └─Bat