In [73]:
from scipy.io import loadmat
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
from tqdm import tqdm


import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import datasets, transforms
import torch.nn.functional as F
from torchmetrics.classification import Accuracy, Precision, Recall, F1Score, ConfusionMatrix


In [74]:
device = torch.device("cuda")

In [75]:
path = "data\weizmann_dataset\classification_masks.mat"
data = loadmat(path)

In [76]:
def remove_trailing_digits(s:str):
    if s in ["wave1" or "wave" or "wave2"]:
        return s
    while s and s[-1].isdigit():
        s = s[:-1]
    return s

labels = [name.split('_')[-1] for name, _ in data['aligned_masks'].dtype.descr]
labels = [remove_trailing_digits(s) for s in labels]

class_names = np.unique(labels)

In [77]:
train_data = [data['aligned_masks'][0][0][idx] for idx in range(93)]

In [78]:
dataset, class_labels = [], []

for idx, video in enumerate(train_data):
    video_frames = np.array([cv2.resize(frame, (30, 70)) for frame in video])

    # Parameters
    group_size = 4
    stride = 3

    # Calculate the number of groups
    num_groups = (video_frames.shape[0] - 1) // stride + 1

    # Initialize an array to store the groups
    groups = np.empty((num_groups, group_size, 70, 30), dtype=video_frames.dtype)

    # Create the groups with the specified stride
    for i in range(num_groups):
        start_index = i * stride
        end_index = start_index + group_size

        if end_index <= video_frames.shape[0]:
            # If enough frames are available, take the group directly
            groups[i] = video_frames[start_index:end_index]
        else:
            # If not enough frames are left, pad with zeros
            frames_left = video_frames[start_index:]
            padding = np.zeros((group_size - frames_left.shape[0], 70, 30), dtype=video_frames.dtype)
            groups[i] = np.vstack((frames_left, padding))

    dataset.append(groups)
    class_labels.extend([np.where(class_names == labels[idx])[0][0]]*num_groups)

dataset = np.concatenate(dataset, axis=0)
class_labels = np.array(class_labels)

mask = np.mean(dataset, axis=(1, 2, 3)) > 0.12
dataset = dataset[mask]
class_labels = class_labels[mask]

dataset = (
    dataset[:, 3, :, :]*1000+
    dataset[:, 2, :, :]*100+
    dataset[:, 1, :, :]*10+
    dataset[:, 0, :, :]

)



In [79]:

# Set the split ratios
train_ratio = 0.5
val_ratio = 0.2
test_ratio = 0.3

# Ensure the split ratios add up to 1
assert train_ratio + val_ratio + test_ratio == 1

# Shuffle the dataset and labels with the same permutation
num_samples = dataset.shape[0]
indices = np.random.permutation(num_samples)
shuffled_dataset = dataset[indices]
shuffled_labels = class_labels[indices]

# Calculate split indices
train_end = int(train_ratio * num_samples)
val_end = train_end + int(val_ratio * num_samples)

# Split the data and labels
train_data, val_data, test_data = (
    shuffled_dataset[:train_end],
    shuffled_dataset[train_end:val_end],
    shuffled_dataset[val_end:],
)
train_labels, val_labels, test_labels = (
    shuffled_labels[:train_end],
    shuffled_labels[train_end:val_end],
    shuffled_labels[val_end:],
)

# Print the shapes of the splits
print("Train data shape:", train_data.shape)
print("Validation data shape:", val_data.shape)
print("Test data shape:", test_data.shape)
print("Train labels shape:", train_labels.shape)
print("Validation labels shape:", val_labels.shape)
print("Test labels shape:", test_labels.shape)

Train data shape: (1100, 70, 30)
Validation data shape: (440, 70, 30)
Test data shape: (661, 70, 30)
Train labels shape: (1100,)
Validation labels shape: (440,)
Test labels shape: (661,)


In [80]:
# Dictionary of all the rows of each mask-set in every file 
json_data = {
    "365nm":{
        "I1":range(0, 5),
        "I2":range(10, 15),
        "I3":range(18, 23),
        "I4":range(25, 30)
    },
    "455nm":{
        "I1":range(0, 5),
        "I2":range(7, 12),
        "I3":range(14, 19),
        "I4":range(21, 26)
    },
    "White":{
        "I1":range(0, 5),
        "I2":range(9, 14),
        "I3":range(16, 21),
        "I4":range(24, 29)
    }
}

In [81]:
combined_tables = []
for filename in ["White", "365nm", "455nm"]:
    path = "data/"+filename+".xlsx" 
    df = pd.read_excel(path, usecols='B:Q') # Read the excel sheet
    tables = [df.iloc[json_data[filename][key]].copy().reset_index(drop=True) for key in list(json_data[filename].keys())]
    combined_table = pd.concat(tables, axis=0)
    combined_tables.append(combined_table)
    del(df, tables, combined_table)

combined_tables[1] = combined_tables[1].reindex(columns=combined_tables[0].columns)

In [82]:
NUMBER_OF_MASKS = 8
optical_range = np.array([0, 1, 2])  # Optical masks: 0, 1, 2
electrical_range = np.array([0, 1, 2, 3])  # Electrical masks: 0, 1, 2, 3

# Generate a meshgrid of all combinations, so that we don't sample same pair twice
optical_masks, electrical_masks = np.meshgrid(optical_range, electrical_range, indexing='ij')
all_pairs = np.column_stack((optical_masks.ravel(), electrical_masks.ravel()))

unique_indices = np.random.choice(all_pairs.shape[0], size=NUMBER_OF_MASKS, replace=False)

selected_pairs = all_pairs[unique_indices]
optical_masks = selected_pairs[:, 0]
electrical_masks = selected_pairs[:, 1]


# Use this to override previous values,  if you want a specific set of masks
# optical_masks = np.array([0, 1, 2])
# electrical_masks = np.array([0, 1, 2])

# Device masks

number_of_devices = np.multiply(*dataset.shape[-2:])

# device_mask = np.random.randint(0, 5, (NUMBER_OF_MASKS, number_of_devices))

device_mask = np.ones((NUMBER_OF_MASKS, number_of_devices))



In [83]:
print(optical_masks, electrical_masks, device_mask.shape)

[2 2 0 1 1 1 1 0] [2 0 2 1 2 3 0 1] (8, 2100)


In [84]:
class CustomDataset(Dataset):
    def __init__(self, dataset:np.array, 
                 labels:np.array, 
                 combined_tables:np.array, 
                 optical_masks:np.array, 
                 electrical_masks:np.array, 
                 device_mask:np.array):
        self.processed_data = []
        self.labels = []

        for idx in tqdm(range(len(dataset))):
            image, label = dataset[idx], labels[idx]
            # image = (
            #     images[0]*1000+
            #     images[1]*100+
            #     images[2]*10+
            #     images[3]
            # )
            image = image.flatten()
            
            column_indices = combined_tables[0].columns.get_indexer(image.tolist())
            x = []
            for j, (optical_mask, electrical_mask) in enumerate(zip(optical_masks, electrical_masks)):
                required_table = combined_tables[optical_mask].iloc[electrical_mask*5:(electrical_mask+1)*5].iloc[device_mask[j]]
                
                x.append(required_table.values[np.arange(device_mask.shape[-1]), column_indices]*1e9)
            
            x = np.concatenate(x, axis=0)

            # print(y)
            
            self.processed_data.append(x)
            self.labels.append(label)

        
        
        self.processed_data = torch.tensor(self.processed_data).to(device=device)
        self.labels = torch.tensor(self.labels).to(device=device)

    
    def __len__(self):
        return self.processed_data.shape[0]

    def __getitem__(self, idx):
        return self.processed_data[idx], self.labels[idx]
    
# train_dataset = CustomDataset(dataset, class_labels, combined_tables, optical_masks, electrical_masks, device_mask)

    
    

In [85]:

BATCH_SIZE = 256
class ReadoutLayer(nn.Module):
    def __init__(self, input_size:int):
        super(ReadoutLayer, self).__init__()
        self.fc = nn.Linear(input_size, 10)
        # self.fc2 = nn.Linear(50, 10)
        self.dropout = nn.Dropout(0.2)
        self.activation = nn.functional.leaky_relu
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        # x = self.activation(self.fc(x))
        # x = self.softmax(x)
        # x = x.reshape(BATCH_SIZE, 1, NUMBER_OF_MASKS*144, 180)
        # x = self.dropout(x)
        x = self.fc(x)
        return x

In [86]:
train_dataset = CustomDataset(train_data, train_labels, combined_tables, optical_masks, electrical_masks, device_mask)
validation_dataset = CustomDataset(val_data, val_labels, combined_tables, optical_masks, electrical_masks, device_mask)
test_dataset = CustomDataset(test_data, test_labels, combined_tables, optical_masks, electrical_masks, device_mask)


train_loader = DataLoader(train_dataset, BATCH_SIZE, shuffle=True)
val_loader = DataLoader(validation_dataset, BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, BATCH_SIZE, shuffle=False)



100%|██████████| 1100/1100 [00:02<00:00, 482.12it/s]
100%|██████████| 440/440 [00:00<00:00, 471.67it/s]
100%|██████████| 661/661 [00:01<00:00, 513.63it/s]


In [87]:

EPOCHS = 1000
learning_rate = 0.001

model = ReadoutLayer(number_of_devices*NUMBER_OF_MASKS).to(device=device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


val_accuracy, val_precision, val_recall, val_fscore = [], [], [], []

accuracy = Accuracy(task="multiclass", num_classes=10).to(device)
precision = Precision(task="multiclass", num_classes=10, average='macro').to(device)
recall = Recall(task="multiclass", num_classes=10, average='macro').to(device)
f1_score = F1Score(task="multiclass", num_classes=10, average='macro').to(device)

confusion_matrix = ConfusionMatrix(task="multiclass", num_classes=10).to(device)

In [88]:
for epoch in range(EPOCHS):
    model.train()
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)  # Move to device
        outputs = model(images.float())  # Forward pass
        loss = criterion(outputs, labels)  # Loss calculation
        optimizer.zero_grad()
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights
    
    
    # Validation phase
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images, labels
            outputs = model(images.float())
            preds = outputs.argmax(dim=1)

            # Update metrics
            accuracy.update(preds, labels)
            precision.update(preds, labels)
            recall.update(preds, labels)
            f1_score.update(preds, labels)

        # Print validation metrics

        if (epoch+1)%10 == 0:
            print(f'Epoch [{epoch+1}/{EPOCHS}], Loss: {loss.item():.4f}', end=" ")
            print(f'Validation - Accuracy: {accuracy.compute().item():.4f} Precision: {precision.compute().item():.4f} ', end=" ")
            print(f'Recall: {recall.compute().item():.4f} F1 Score: {f1_score.compute().item():.4f}')

        # Updating the list to save current metrics
        val_accuracy.append(accuracy.compute().item())
        val_precision.append(precision.compute().item())
        val_recall.append(recall.compute().item())
        val_fscore.append(f1_score.compute().item())

        # Reset metrics for the next epoch
        accuracy.reset()
        precision.reset()
        recall.reset()
        f1_score.reset()
        confusion_matrix.reset()

Epoch [10/1000], Loss: 1.1524 Validation - Accuracy: 0.4705 Precision: 0.5282  Recall: 0.4804 F1 Score: 0.4766
Epoch [20/1000], Loss: 0.5827 Validation - Accuracy: 0.5364 Precision: 0.5859  Recall: 0.5528 F1 Score: 0.5410
Epoch [30/1000], Loss: 0.6317 Validation - Accuracy: 0.5591 Precision: 0.6097  Recall: 0.5757 F1 Score: 0.5607
Epoch [40/1000], Loss: 0.4200 Validation - Accuracy: 0.5636 Precision: 0.5805  Recall: 0.5698 F1 Score: 0.5670
Epoch [50/1000], Loss: 0.2605 Validation - Accuracy: 0.5591 Precision: 0.5855  Recall: 0.5679 F1 Score: 0.5597
Epoch [60/1000], Loss: 0.2697 Validation - Accuracy: 0.5591 Precision: 0.5901  Recall: 0.5591 F1 Score: 0.5635
Epoch [70/1000], Loss: 0.4258 Validation - Accuracy: 0.5545 Precision: 0.5874  Recall: 0.5575 F1 Score: 0.5529
Epoch [80/1000], Loss: 0.3173 Validation - Accuracy: 0.5432 Precision: 0.5744  Recall: 0.5500 F1 Score: 0.5447
Epoch [90/1000], Loss: 0.2897 Validation - Accuracy: 0.5523 Precision: 0.5923  Recall: 0.5677 F1 Score: 0.5586
E

In [89]:

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for images, labels in train_loader:
        # Move images and labels to GPU
        images, labels = images.to(device), labels.to(device)
        outputs = model(torch.tensor(images, dtype=torch.float32))
        _, predicted = torch.max(outputs, 1)

        # Append predictions and labels for metric calculations
        all_preds.append(predicted)
        all_labels.append(labels)


# Concatenate all predictions and labels
all_preds = torch.cat(all_preds).to(device)
all_labels = torch.cat(all_labels).to(device)

# Calculate metrics
test_accuracy = accuracy(all_preds, all_labels)
test_precision = precision(all_preds, all_labels)
test_recall = recall(all_preds, all_labels)
test_f1 = f1_score(all_preds, all_labels)
test_confusion_matrix = confusion_matrix(all_preds, all_labels)

print(f'Test Accuracy: {test_accuracy * 100:.2f}%')
print(f'Test Precision: {test_precision*100:.4f}%')
print(f'Test Recall: {test_recall*100:.4f}%')
print(f'Test F1 Score: {test_f1:.4f}')
print("Confusion Matrix:")
print(test_confusion_matrix)

        

Test Accuracy: 99.45%
Test Precision: 99.3672%
Test Recall: 99.4520%
Test F1 Score: 0.9940
Confusion Matrix:
tensor([[100,   0,   0,   0,   0,   0,   0,   0,   0,   1],
        [  0, 104,   0,   0,   0,   0,   0,   0,   0,   0],
        [  0,   0, 122,   0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0, 105,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0, 121,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0, 109,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0, 132,   1,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0, 111,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0,   0, 103,   4],
        [  0,   0,   0,   0,   0,   0,   0,   0,   0,  87]], device='cuda:0')


  outputs = model(torch.tensor(images, dtype=torch.float32))
