In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Import Libraries

In [None]:
import os
import numpy as np
import torch
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from PIL import Image
from matplotlib import pyplot as plt
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, random_split
from skimage.io import imread, imshow, imsave
from skimage.transform import resize
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score

# Dataset

In [None]:
# A dictionary mapping from shape names to numerical labels.
shape2number = {'circle': 0, 'square': 1, 'triangle': 2, 'pentagon': 3}

# Dataset Path
dataset_path = '/content/drive/MyDrive/SF2/Imagefusion_Dataset-2'

# Define the folders for each sensor
img1_folder = os.path.join(dataset_path, 'img1')
img2_folder = os.path.join(dataset_path, 'img2')
img3_folder = os.path.join(dataset_path, 'img3')
fused_folder = os.path.join(dataset_path, 'fused_dataset')


# Create the fused folder if it doesn't exist
if not os.path.exists(fused_folder):
    os.makedirs(fused_folder)



# Get a list of all image files in each folder
img1_files = [f for f in os.listdir(img1_folder) if f.endswith('.png')]
img2_files = [f for f in os.listdir(img2_folder) if f.endswith('.png')]
img3_files = [f for f in os.listdir(img3_folder) if f.endswith('.png')]


# Low-Level Fusion Methods

In [None]:
def average_fusion(image1, image2, image3):
    if len(image1.shape) == 3:
        image1 = image1.mean(axis = 2)
    if len(image2.shape) == 3:
        image2 = image2.mean(axis = 2)
    if len(image3.shape) == 3:
        image3 = image3.mean(axis = 2)
    return (image1 + image2 + image3) / 3

def max_selection_fusion(image1, image2, image3):
    if len(image1.shape) == 3:
        image1 = image1.max(axis = 2)
    if len(image2.shape) == 3:
        image2 = image2.max(axis = 2)
    if len(image3.shape) == 3:
        image3 = image3.max(axis = 2)
    return np.maximum(np.maximum(image1, image2), image3)

def min_selection_fusion(image1, image2, image3):
    if len(image1.shape) == 3:
        image1 = image1.min(axis = 2)
    if len(image2.shape) == 3:
        image2 = image2.min(axis = 2)
    if len(image3.shape) == 3:
        image3 = image3.min(axis = 2)
    return np.minimum(np.minimum(image1, image2), image3)

fused_images= []

# Apply Fusion to all images
for i, (img1_file, img2_file, img3_file) in enumerate(zip(img1_files, img2_files, img3_files)):
    image1 = imread(os.path.join(img1_folder, img1_file)).astype(float)
    image2 = imread(os.path.join(img2_folder, img2_file)).astype(float)
    image3 = imread(os.path.join(img3_folder, img3_file)).astype(float)

    fused_average = average_fusion(image1, image2, image3)
    fused_max = max_selection_fusion(image1, image2, image3)
    fused_min = min_selection_fusion(image1, image2, image3)

    # Store the fused average images in the list
    fused_images.append(fused_average)

    # Save the fused average images to the specified directory
    fused_image_filename = os.path.join(fused_folder, f'{str(i).zfill(5)}.png')
    imsave(fused_image_filename, fused_average.astype(np.uint8))


    # Show Result
    plt.figure(figsize=(10, 12))
    plt.suptitle(f'Image No. {str(i+1).zfill(5)}', fontsize=16)

    plt.subplot(3, 2, 1)
    plt.title('Gradient')
    plt.axis('off')
    plt.imshow(image1.astype(np.uint8))

    plt.subplot(3, 2, 2)
    plt.title('Noise')
    plt.axis('off')
    plt.imshow(image2.astype(np.uint8), cmap=None)

    plt.subplot(3, 2, 3)
    plt.title('Spotlight')
    plt.axis('off')
    plt.imshow(image3.astype(np.uint8), cmap=None)

    plt.subplot(3, 2, 4)
    plt.title('Average Fusion')
    plt.axis('off')
    plt.imshow(fused_average.astype(np.uint8), cmap=None)

    plt.subplot(3, 2, 5)
    plt.title('Max Selection Fusion')
    plt.axis('off')
    plt.imshow(fused_max.astype(np.uint8), cmap=None)

    plt.subplot(3, 2, 6)
    plt.title('Min Selection Fusion')
    plt.axis('off')
    plt.imshow(fused_min.astype(np.uint8), cmap=None)

    plt.show()

# Define Classes for handelling individual sensor dataset and Image fusion dataset

In [None]:
class SingleSensorDataset(Dataset):
    """
    A PyTorch Dataset class for handling images from a single sensor along with their labels.

    Attributes:
        img_dir (str): Directory containing the images.
        label_dir (str): Directory containing the labels.
        transform (callable, optional): Optional transform to be applied on a sample.
        image_files (list): List of image filenames.
        labels (list): List of labels corresponding to the images.
    """
    def __init__(self, img_dir, label_dir, transform=None):
        """
        Initialize the SingleSensorDataset.

        Args:
            img_dir (str): Directory containing the images.
            label_dir (str): Directory containing the labels.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.transform = transform
        self.image_files = sorted(os.listdir(img_dir))
        self.labels = [self.load_label(os.path.join(label_dir, f"{os.path.splitext(img)[0]}.txt")) for img in self.image_files]

    def load_label(self, idx):
        """
        Load the label for a given image.

        Args:
            idx (int): Index of the image.

        Returns:
            int: Label associated with the image.
        """
        label_path = os.path.join(self.label_dir, f"{idx}")
        with open(label_path, 'r') as file:
            label = file.readline().strip()
        return shape2number[label]

    def __len__(self):
        """
        Get the length of the dataset.

        Returns:
            int: Number of samples in the dataset.
        """
        return len(self.image_files)

    def __getitem__(self, idx):
        """
        Get an item from the dataset.

        Args:
            idx (int): Index of the item.

        Returns:
            tuple: A tuple containing the image and its label.
        """
        img_path = os.path.join(self.img_dir, self.image_files[idx])
        label = self.labels[idx]

        img = Image.open(img_path)

        if self.transform:
            img = self.transform(img)

        return img, label

class ImageFusionDataset(Dataset):
    """
    A PyTorch Dataset class for handling fused images from multiple sensors along with their labels.

    Attributes:
        img1_dir (str): Directory containing the first set of images.
        img2_dir (str): Directory containing the second set of images.
        img3_dir (str): Directory containing the third set of images.
        label_dir (str): Directory containing the labels.
        transform (callable, optional): Optional transform to be applied on a sample.
        image_files (list): List of image filenames.
        labels (list): List of labels corresponding to the images.
    """
    def __init__(self, fused_dir, label_dir, transform=None):
        """
        Initializes the dataset by setting the image and label directories for three different sensors,
        applying transformations if any, and loading image filenames and corresponding labels.

        Args:
            img1_dir (str): Path to the directory containing the first set of images.
            img2_dir (str): Path to the directory containing the second set of images.
            img3_dir (str): Path to the directory containing the third set of images.
            label_dir (str): Path to the directory containing labels.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.img1_dir = img1_dir
        self.img2_dir = img2_dir
        self.img3_dir = img3_dir
        self.label_dir = label_dir
        self.transform = transform
        self.image_files = sorted(os.listdir(img1_dir))
        self.labels = [self.load_label(os.path.join(label_dir, f"{os.path.splitext(img)[0]}.txt")) for img in self.image_files]

    def load_label(self, idx):
        """
        Loads the label for a given image.

        Args:
            idx (int): Index of the image.

        Returns:
            int: Label associated with the image.
        """
        label_path = os.path.join(self.label_dir, f"{idx}")
        with open(label_path, 'r') as file:
            label = file.readline().strip()
        return shape2number[label]

    def __len__(self):
        """
        Returns the number of samples in the dataset.

        Returns:
            int: Number of samples in the dataset.
        """
        return len(self.image_files)

    def __getitem__(self, idx):
        """
        Returns a sample from the dataset.

        Args:
            idx (int): Index of the sample.

        Returns:
            tuple: A tuple containing the combined image and its label.
        """
        img1_path = os.path.join(self.img1_dir, self.image_files[idx])
        img2_path = os.path.join(self.img2_dir, self.image_files[idx])
        img3_path = os.path.join(self.img3_dir, self.image_files[idx])
        label = self.labels[idx]

        img1 = Image.open(img1_path)
        img2 = Image.open(img2_path)
        img3 = Image.open(img3_path)

        if self.transform:

            img1 = self.transform(img1)
            img2 = self.transform(img2)
            img3 = self.transform(img3)

        combined_img = torch.cat((img1, img2, img3), dim=0)
        return combined_img, label



# Define modified LeNet-5 model architecture

In [None]:
# Define the LeNet-5 model
class LeNet5(nn.Module):
    """
    A PyTorch implementation of the LeNet-5 architecture for image classification.

    Attributes:
        conv1 (nn.Conv2d): First convolutional layer with 6 output channels.
        conv2 (nn.Conv2d): Second convolutional layer with 16 output channels.
        fc1 (nn.Linear): First fully connected layer.
        fc2 (nn.Linear): Second fully connected layer.
        fc3 (nn.Linear): Third fully connected layer for output.
    """

    def __init__(self, num_classes=4, input_channels=1):
        """
        Initializes the LeNet-5 model.

        Args:
            num_classes (int): Number of output classes. Default is 4.
            input_channels (int): Number of input channels. Default is 1 (grayscale images).
        """
        super(LeNet5, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 6, kernel_size=5, stride=1, padding=2)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5, stride=1)
        self.fc1 = nn.Linear(16*6*6, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, num_classes)

    def forward(self, x):
        """
        Defines the forward pass of the model.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output tensor after passing through the network.
        """
        x = F.relu(self.conv1(x))  # First convolution + ReLU activation
        x = F.max_pool2d(x, 2)     # Max pooling with 2x2 window
        x = F.relu(self.conv2(x))  # Second convolution + ReLU activation
        x = F.max_pool2d(x, 2)     # Max pooling with 2x2 window
        x = x.view(-1, 16*6*6)     # Flatten the tensor
        x = F.relu(self.fc1(x))    # First fully connected layer + ReLU activation
        x = F.relu(self.fc2(x))    # Second fully connected layer + ReLU activation
        x = self.fc3(x)            # Third fully connected layer (output layer)
        return x


# Training models 1,2,3 without fusion and model_fusion with low level Pixel based fusion

In [None]:
# Function to train the model
def train_model(model, train_loader, criterion, optimizer, epochs=10):
    """
    Trains the given model using the provided data loader, loss function, and optimizer.

    Args:
        model (nn.Module): The neural network model to train.
        train_loader (DataLoader): DataLoader for the training data.
        criterion (nn.Module): Loss function.
        optimizer (optim.Optimizer): Optimizer.
        epochs (int): Number of epochs to train. Default is 10.
    """
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f'Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader)}')

# Function to evaluate the model
def evaluate_model(model, test_loader):
    """
    Evaluates the given model using the provided test data loader.

    Args:
        model (nn.Module): The neural network model to evaluate.
        test_loader (DataLoader): DataLoader for the test data.

    Returns:
        float: Accuracy of the model on the test data.
    """
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    return accuracy

def save_model_weights(model, path):
    torch.save(model.state_dict(), path)

# Define the transformations
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((32, 32)),
    transforms.ToTensor()
])

# Create the datasets
img1_dir = os.path.join(dataset_path, 'img1')
img2_dir = os.path.join(dataset_path, 'img2')
img3_dir = os.path.join(dataset_path, 'img3')
fused_dir = os.path.join(dataset_path, 'fused_dataset')
label_dir = os.path.join(dataset_path, 'label')

dataset1 = SingleSensorDataset(img1_dir, label_dir, transform)
dataset2 = SingleSensorDataset(img2_dir, label_dir, transform)
dataset3 = SingleSensorDataset(img3_dir, label_dir, transform)
fused_dataset = ImageFusionDataset(fused_dir, label_dir, transform)


# Split the datasets into training and testing sets
train_size = int(0.8 * len(dataset1))
test_size = len(dataset1) - train_size

train_dataset1, test_dataset1 = random_split(dataset1, [train_size, test_size])
train_dataset2, test_dataset2 = random_split(dataset2, [train_size, test_size])
train_dataset3, test_dataset3 = random_split(dataset3, [train_size, test_size])
train_dataset_fused, test_dataset_fused = random_split(fused_dataset, [train_size, test_size])

# Create data loaders
batch_size = 40

train_loader1 = DataLoader(train_dataset1, batch_size=batch_size, shuffle=True)
test_loader1 = DataLoader(test_dataset1, batch_size=batch_size, shuffle=False)

train_loader2 = DataLoader(train_dataset2, batch_size=batch_size, shuffle=True)
test_loader2 = DataLoader(test_dataset2, batch_size=batch_size, shuffle=False)

train_loader3 = DataLoader(train_dataset3, batch_size=batch_size, shuffle=True)
test_loader3 = DataLoader(test_dataset3, batch_size=batch_size, shuffle=False)

train_loader_fused = DataLoader(train_dataset_fused, batch_size=batch_size, shuffle=True)
test_loader_fused = DataLoader(test_dataset_fused, batch_size=batch_size, shuffle=False)

# Instantiate models for each sensor dataset and fusion dataset
model1 = LeNet5(num_classes=4)
model2 = LeNet5(num_classes=4)
model3 = LeNet5(num_classes=4)
model_fusion = LeNet5(num_classes=4, input_channels=3)

# Define loss function and optimizer with learning rate 0.001
criterion = nn.CrossEntropyLoss()
optimizer1 = optim.Adam(model1.parameters(), lr=0.001)
optimizer2 = optim.Adam(model2.parameters(), lr=0.001)
optimizer3 = optim.Adam(model3.parameters(), lr=0.001)
optimizer_fusion = optim.Adam(model_fusion.parameters(), lr=0.001)

# Train the models
print("Training model 1...")
train_model(model1, train_loader1, criterion, optimizer1, epochs=50)
print("Training model 2...")
train_model(model2, train_loader2, criterion, optimizer2, epochs=50)
print("Training model 3...")
train_model(model3, train_loader3, criterion, optimizer3, epochs=50)
print("Training fused model...")
train_model(model_fusion, train_loader_fused, criterion, optimizer_fusion, epochs=50)

# Save model weights
save_model_weights(model1, '/content/drive/MyDrive/SF2/Lenet_model1_weights.pth')
save_model_weights(model2, '/content/drive/MyDrive/SF2/Lenet_model2_weights.pth')
save_model_weights(model3, '/content/drive/MyDrive/SF2/Lenet_model3_weights.pth')
save_model_weights(model_fusion, '/content/drive/MyDrive/SF2/Lenet_fusion_model_weights.pth')


# High Level Fusion- Majority Voting

In [None]:
# Function to perform majority voting with tie breaker
def majority_voting(models, test_loader):
    """
    Performs majority voting on the predictions of multiple models with a tie-breaking mechanism.

    Args:
        models (list): List of trained models [model1, model2, model3, model_fusion].
        test_loader (DataLoader): DataLoader for the test data.

    Returns:
        float: Accuracy of the majority voting method on the test data.
        list: List of predictions made by the majority voting method.
    """
    model1, model2, model3, model_fusion = models
    model1.eval()
    model2.eval()
    model3.eval()
    model_fusion.eval()

    correct = 0
    total = 0
    predictions_all = []

    with torch.no_grad():
        for images, labels in test_loader:
            batch_size = images.size(0)
            predictions = torch.zeros(batch_size, dtype=torch.long)
            image1 = images[:, 0:1, :, :]
            image2 = images[:, 1:2, :, :]
            image3 = images[:, 2:3, :, :]

            output1 = model1(image1)
            output2 = model2(image2)
            output3 = model3(image3)
            output_fusion = model_fusion(images)

            _, predicted1 = torch.max(output1, 1)
            _, predicted2 = torch.max(output2, 1)
            _, predicted3 = torch.max(output3, 1)
            _, predicted_fusion = torch.max(output_fusion, 1)

            stacked_preds = torch.stack((predicted1, predicted2, predicted3), dim=1)

            for i in range(batch_size):
                vote_counts = torch.bincount(stacked_preds[i])
                max_votes = torch.max(vote_counts)
                candidates = torch.where(vote_counts == max_votes)[0]
                if len(candidates) > 1:
                    predictions[i] = predicted_fusion[i]  # Prioritize fused classifier in case of a tie
                else:
                    predictions[i] = candidates[0]

            predictions_all.extend(predictions.tolist())

            total += batch_size
            correct += (predictions == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy, predictions_all

# List of your already trained models
models = [model1, model2, model3, model_fusion]

#Evaluate Models and Calculate Accuracy

In [None]:
# Evaluate the models
accuracy1 = evaluate_model(model1, test_loader1)
accuracy2 = evaluate_model(model2, test_loader2)
accuracy3 = evaluate_model(model3, test_loader3)
accuracy_fusion = evaluate_model(model_fusion, test_loader_fused)

# Perform majority voting
accuracy_majority_voting_fusion, _ = majority_voting(models, test_loader_fused)


print('Accuracy of model 1:', accuracy1)
print('Accuracy of model 2:', accuracy2)
print('Accuracy of model 3:', accuracy3)
print('Accuracy with low-level fusion:', accuracy_fusion)
print('Accuracy with low and high level image fusion:', accuracy_majority_voting_fusion)


# Plotting Results with Confusion Matrices

In [None]:
def get_predictions(model, loader):
  model.eval()
  predictions = []
  true_labels = []
  with torch.no_grad():
      for data, labels in loader:
          outputs = model(data)
          _, preds = torch.max(outputs, 1)
          predictions.extend(preds.cpu().numpy())
          true_labels.extend(labels.cpu().numpy())
  return predictions, true_labels

# Plot confusion matrices for each model
def plot_confusion_matrix(true_labels, predictions, classes, title):
  """
    Plot a confusion matrix for the given true labels and predictions.

    Args:
        true_labels (list): True labels.
        predictions (list): Model predictions.
        classes (list): List of class names.
        title (str): Title for the plot.
    """
  cm = confusion_matrix(true_labels, predictions)
  disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
  disp.plot(cmap=plt.cm.Blues)
  plt.title(title)
  plt.xlabel('Predicted label')
  plt.ylabel('True label')
  plt.show()


class_labels = ['Circle', 'Square', 'Triangle', 'Pentagon']

# Get predictions and true labels for each model
predictions_model1, true_labels_model1 = get_predictions(model1, test_loader1)
predictions_model2, true_labels_model2 = get_predictions(model2, test_loader2)
predictions_model3, true_labels_model3 = get_predictions(model3, test_loader3)
predictions_fusion, true_labels_fusion = get_predictions(model_fusion, test_loader_fused)
accuracy_majority_voting_fusion, predictions_all = majority_voting(models, test_loader_fused) # Assign output of majority_voting to predictions_all

# Calculate accuracy for each model
accuracy_model1 = accuracy_score(true_labels_model1, predictions_model1)
accuracy_model2 = accuracy_score(true_labels_model2, predictions_model2)
accuracy_model3 = accuracy_score(true_labels_model3, predictions_model3)
accuracy_fusion = accuracy_score(true_labels_fusion, predictions_fusion)

# Plot Confusion Matrix for each model
plot_confusion_matrix(true_labels_model1, predictions_model1, class_labels, "Model 1")
plot_confusion_matrix(true_labels_model2, predictions_model2, class_labels, "Model 2")
plot_confusion_matrix(true_labels_model3, predictions_model3, class_labels, "Model 3")
plot_confusion_matrix(true_labels_fusion, predictions_all, class_labels, "Model Fusion")

