In [80]:
import matplotlib.pyplot as plt
from glob import glob
import cv2
import random
import os
%matplotlib inline

In [81]:
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image

class FaceEncodingModule(nn.Module):
    def __init__(self):
        super(FaceEncodingModule, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.relu = nn.ReLU(inplace=True)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv5 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(256)
        self.pool5 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc = nn.Linear(256 * 7 * 7, 256)  # Adjusted input size for the linear layer

    def forward(self, x):
        # Face detection and blackening boxes
        x = self.detect_faces_and_blacken_boxes(x)
        # Preprocessing
        x = self.preprocess_image(x)
        # Convolutional layers
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)
        x = self.pool3(x)
        x = self.conv4(x)
        x = self.bn4(x)
        x = self.relu(x)
        x = self.pool4(x)
        x = self.conv5(x)
        x = self.bn5(x)
        x = self.relu(x)
        x = self.pool5(x)
        # Flatten the feature map
        x = torch.flatten(x, 1)
        # Apply the linear layer to reduce the dimensionality to 256
        x = self.fc(x)
        return x

    def detect_faces_and_blacken_boxes(self, image):
        image = np.uint8(image)
        face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        # image = cv2.imread(image_path)
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
        new_image = cv2.copyMakeBorder(image, 0, 0, 0, 0, cv2.BORDER_CONSTANT, value=[0, 0, 0])
        cropped_image = []
        for (x, y, w, h) in faces:
            cropped_image = image[y:y+h, x:x+w]
        return cropped_image

    def preprocess_image(self, cropped_image):
        # Convert image to PIL Image
        pil_image = Image.fromarray(cropped_image)
        # Apply transformations
        transform = transforms.Compose([
            transforms.Resize((224, 224)),  # Assuming your model expects input size of 224x224
            transforms.ToTensor(),
        ])
        image = transform(pil_image)
        # If the image has only one channel, expand it to three channels
        if image.shape[0] == 1:
            image = torch.cat([image] * 3)
        # Add batch dimension
        image = image.unsqueeze(0)
        return image

# # Example usage:
# # Load image
# image = cv2.imread('/content/COCO_train2014_000000004180.jpg')
# # Create an instance of the FaceEncodingModule
# face_encoding_model = FaceEncodingModule()
# # Forward pass
# output = face_encoding_model(image)


In [82]:
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image

class ContextEncodingStream(nn.Module):
    def __init__(self, input_channels):
        super(ContextEncodingStream, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        return x

class AttentionInferenceModule(nn.Module):
    def __init__(self, input_channels):
        super(AttentionInferenceModule, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(input_channels, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 1, kernel_size=3, padding=1),
            nn.BatchNorm2d(1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        return x

class ContextAttentionModule(nn.Module):
    def __init__(self, input_channels):
        super(ContextAttentionModule, self).__init__()
        self.context_stream = ContextEncodingStream(input_channels)
        self.attention_inference = AttentionInferenceModule(256)

    def forward(self, x):
        # Face detection and blackening boxes
        x = self.detect_faces_and_blacken_boxes(x)
        # Preprocessing
        x = self.preprocess_image(x)
        context_features = self.context_stream(x)
        attention = self.attention_inference(context_features)
        attention = attention.squeeze(1)  # Remove singleton dimension
        attention = F.softmax(attention, dim=1)  # Apply softmax along the channel dimension
        context_attention = attention * context_features
        context_attention = torch.mean(context_attention, dim=(2, 3))
        return context_attention

    def detect_faces_and_blacken_boxes(self, image):
      image = np.uint8(image)
    # Continue with face detection and blackening boxes
      face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
      gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
      faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
      new_image = cv2.copyMakeBorder(image, 0, 0, 0, 0, cv2.BORDER_CONSTANT, value=[0, 0, 0])
      for (x, y, w, h) in faces:
          new_image[y:y+h, x:x+w] = [0,0,0]

      return new_image


    def preprocess_image(self, new_image):
        # Convert image to PIL Image
        pil_image = Image.fromarray(new_image)
        # Apply transformations
        transform = transforms.Compose([
            transforms.Resize((224, 224)),  # Assuming your model expects input size of 224x224
            transforms.ToTensor(),
        ])
        image = transform(pil_image)
        # If the image has only one channel, expand it to three channels
        if image.shape[0] == 1:
            image = torch.cat([image] * 3)
        # Add batch dimension
        image = image.unsqueeze(0)
        return image

# Example usage:
# # Load image
# image = cv2.imread('/content/COCO_train2014_000000004180.jpg')
# # Create an instance of the ContextAttentionModule
# context_attention_model = ContextAttentionModule(input_channels=3)
# # Forward pass
# output = context_attention_model(image)

# output.shape

In [83]:
class AdaptiveFusionNetwork2D(nn.Module):
    def __init__(self, input_channels, num_classes):
        super(AdaptiveFusionNetwork2D, self).__init__()
        # Define convolutional layers for fusion attention
        self.conv1 = nn.Conv2d(in_channels=input_channels, out_channels=128, kernel_size=1)
        self.conv2 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=1)
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=1, kernel_size=1)
        self.conv4 = nn.Conv2d(in_channels=1, out_channels=1, kernel_size=1)
        # Define convolutional layers for final classification
        self.conv5 = nn.Conv2d(in_channels=512, out_channels=128, kernel_size=1)  # Adjusted input_channels to 512
        self.conv6 = nn.Conv2d(in_channels=128, out_channels=num_classes, kernel_size=1)
        # Define ReLU and dropout layers
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        # Define softmax layer
        self.softmax = nn.Softmax(dim=1)
        # Initialize FaceEncodingModule and ContextAttentionModule
        self.face_encoder = FaceEncodingModule()
        self.context_attention = ContextAttentionModule(input_channels=3)  # Assuming input_channels is 3 for RGB images

    def forward(self, image):
        # Extract face features
        face_features = self.face_encoder(image)
        # Extract context attention
        context_attention = self.context_attention(image)

        # Expand dimensions of face features and context attention to match each other
        face_features = face_features.unsqueeze(2).unsqueeze(3)
        context_attention = context_attention.unsqueeze(2).unsqueeze(3)

        # Concatenate features from face and context encoding modules
        x_concat = torch.cat((face_features, context_attention), dim=1)  # Concatenate along the channel dimension

        # Apply convolutional layers for fusion attention
        lambda_face = self.softmax(self.conv4(self.relu(self.conv3(self.relu(self.conv2(self.relu(self.conv1(x_concat))))))))
        lambda_context = self.softmax(self.conv4(self.relu(self.conv3(self.relu(self.conv2(self.relu(self.conv1(x_concat))))))))

        # Concatenate features with attention weights
        x_final = torch.cat((face_features * lambda_face, context_attention * lambda_context), dim=1)

        # Ensure x_final has correct number of channels for conv5
        x_final = self.relu(self.conv5(x_final))

        x_final = self.dropout(x_final)
        x_final = self.conv6(x_final)
        x_final = x_final.view(x_final.size(0), -1)  # Flatten the feature map
        return x_final

Defining model

In [84]:
fusion_network = AdaptiveFusionNetwork2D(input_channels=512, num_classes=26)

Upload Dataset

In [85]:
# from google.colab import files
# uploaded = files.upload()

In [86]:
# import zipfile
# import os

# # Specify the path to your zip file
# zip_file_path = "/content/Dataset.zip"  # Update this with your file path

# # Specify the directory where you want to extract the contents
# extracted_dir_path = "Dataset"  # Update this with your desired directory path

# # Check if the directory exists, if not create it
# if not os.path.exists(extracted_dir_path):
#     os.makedirs(extracted_dir_path)

# # Unzip the file
# with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
#     zip_ref.extractall(extracted_dir_path)

# print("Extraction complete.")

Random images

In [87]:
# images = glob("/content/Dataset/Dataset/train/**/**")
# for i in range(9):
#     image = random.choice(images)
#     plt.figure(figsize=(12,12))
#     plt.subplot(331+i)
#     plt.imshow(cv2.imread(image));plt.axis('off')

In [88]:
from torch import nn
from torchvision import transforms

!pip install -q torchinfo
from torchinfo import summary

import matplotlib.pyplot as plt

In [89]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

In [90]:
from torchvision.transforms import Resize

# Define transformations for resizing
resize_transform = Resize((224, 224))  # Specify desired height and width

# Create PyTorch datasets from the ImageDataGenerator outputs with resizing
train_data = ImageFolder("/content/Dataset/Dataset/train", transform=transforms.Compose([resize_transform, ToTensor()]))
test_data = ImageFolder("/content/Dataset/Dataset/train", transform=transforms.Compose([resize_transform, ToTensor()]))

print(f"Train data:\n{train_data}\nTest data:\n{test_data}")

Train data:
Dataset ImageFolder
    Number of datapoints: 6892
    Root location: /content/Dataset/Dataset/train
    StandardTransform
Transform: Compose(
               Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
               ToTensor()
           )
Test data:
Dataset ImageFolder
    Number of datapoints: 6892
    Root location: /content/Dataset/Dataset/train
    StandardTransform
Transform: Compose(
               Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
               ToTensor()
           )


In [91]:
img, label = train_data[0][0], train_data[0][1]
print(f"Image tensor:\n{img}")
print(f"Image shape: {img.shape}")
print(f"Image datatype: {img.dtype}")
print(f"Image label: {label}")
print(f"Label datatype: {type(label)}")

Image tensor:
tensor([[[0.3882, 0.3882, 0.4039,  ..., 0.0902, 0.0980, 0.1529],
         [0.3882, 0.3922, 0.4039,  ..., 0.1137, 0.1137, 0.0863],
         [0.4000, 0.3922, 0.4039,  ..., 0.2471, 0.1255, 0.0706],
         ...,
         [0.7529, 0.7490, 0.7490,  ..., 0.6863, 0.7569, 0.8588],
         [0.7490, 0.7569, 0.7529,  ..., 0.6745, 0.7216, 0.7882],
         [0.7412, 0.7569, 0.7451,  ..., 0.6706, 0.6902, 0.7804]],

        [[0.6078, 0.6078, 0.5961,  ..., 0.1059, 0.1294, 0.1961],
         [0.6078, 0.6078, 0.6000,  ..., 0.1294, 0.1412, 0.1255],
         [0.6118, 0.6118, 0.6118,  ..., 0.2588, 0.1451, 0.0941],
         ...,
         [0.7020, 0.6980, 0.6980,  ..., 0.6392, 0.6549, 0.6784],
         [0.6980, 0.7059, 0.7020,  ..., 0.6431, 0.6510, 0.6353],
         [0.6902, 0.7059, 0.6941,  ..., 0.6549, 0.6392, 0.6549]],

        [[0.8039, 0.8039, 0.7529,  ..., 0.0588, 0.0824, 0.1451],
         [0.8000, 0.8039, 0.7686,  ..., 0.0431, 0.0706, 0.0667],
         [0.8000, 0.8078, 0.8000,  ..., 0.12

In [92]:
class_names = train_data.classes
class_names

['Affection',
 'Anger',
 'Annoyance',
 'Anticipation',
 'Aversion',
 'Confidence',
 'Confusion',
 'Disapproval',
 'Disconnection',
 'Disquietment',
 'Embarrassment',
 'Engagement',
 'Esteem',
 'Excitement',
 'Fatigue',
 'Fear',
 'Happiness',
 'Pain',
 'Peace',
 'Pleasure',
 'Sadness',
 'Sensitivity',
 'Suffering',
 'Surprise',
 'Sympathy',
 'Yearning']

In [93]:
from torch.utils.data import DataLoader
num_workers = os.cpu_count()
train_dataloader = DataLoader(dataset=train_data,
                             num_workers=num_workers,
                             shuffle=True)
test_dataloader = DataLoader(dataset=test_data,
                            num_workers=num_workers,
                            shuffle=False)
train_dataloader, test_dataloader

(<torch.utils.data.dataloader.DataLoader at 0x7ce6cfb7be80>,
 <torch.utils.data.dataloader.DataLoader at 0x7ce6cfb7b2b0>)

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision.transforms import ToTensor
from torchvision.datasets import ImageFolder
import numpy as np

fusion_network = AdaptiveFusionNetwork2D(input_channels=512, num_classes=26)

criterion = nn.CrossEntropyLoss()

optimizer = optim.Adam(fusion_network.parameters() , lr=0.001)

fusion_network.train()

def calculate_accuracy(outputs, labels):
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == labels).sum().item()
    total = labels.size(0)
    accuracy = correct / total
    return accuracy

num_epochs = 25

# Training loop
for epoch in range(2):
    # Print epoch number
    print(f'Epoch [{epoch+1}/{num_epochs}]')

    # Iterate through each batch in the train_dataloader
    for images, labels in train_dataloader:
        optimizer.zero_grad()
        images = images.numpy()
        images = np.squeeze(images)
        images = np.transpose(images, (1, 2, 0))

        outputs = fusion_network(images)

        # Compute the loss
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

    fusion_network.eval()

    total_accuracy = 0
    total_batches = 0

    # Iterate through each batch in the test_dataloader
    for images, labels in test_dataloader:
        # Forward pass: Pass the images through the fusion_network
        outputs = fusion_network(images)

        # Compute accuracy for the current batch
        accuracy = calculate_accuracy(outputs, labels)

        # Accumulate accuracy and update total batches
        total_accuracy += accuracy
        total_batches += 1

    # Compute average accuracy over all batches in the test dataset
    average_accuracy = total_accuracy / total_batches

    # Print or store the evaluation metrics
    print(f'Test accuracy: {average_accuracy}')

    # Set the model back to training mode for the next epoch
    fusion_network.train()

<!-- This the extra work/handling error and other part  -->

In [None]:
# fusion_network = AdaptiveFusionNetwork2D(input_channels=512, num_classes=26)
# model = fusion_network(train_dataloader)

In [None]:
# summary(model=model,
#         input_size=(64, 3, 224, 224),
#        col_names=["input_size", "output_size", "num_params", "trainable"],
#        col_width=20,
#        row_settings=["var_names"])

In [None]:
# loss_fn = nn.BCEWithLogitsLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# !pip install torcheval
# import torcheval
# from torcheval.metrics import BinaryAccuracy
# def accuracy_fn(y_true: torch.Tensor, y_pred: torch.Tensor):
#     metric = BinaryAccuracy(threshold=0.5)
#     metric.update(y_pred, y_true)
#     return metric.compute().item()

In [None]:
# img, label = next(iter(train_dataloader))
# pred = model(img.to(device))
# print(pred.squeeze())
# print(torch.sigmoid(pred.squeeze()))
# pred.squeeze().shape, label.shape

In [None]:
# def train_step(model: torch.nn.Module,
#               dataloader: torch.utils.data.DataLoader,
#               loss_fn: torch.nn.Module,
#               optimizer: torch.optim.Optimizer,
#               accuracy_fn):

#     model.train()

#     train_loss, train_acc = 0, 0

#     for batch, (X, y) in enumerate(dataloader):

#         X, y = X.to(device), y.to(device)

#         y_pred = model(X).squeeze()

#         loss = loss_fn(y_pred, y.float())
#         train_loss += loss.item()

#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#         train_acc += accuracy_fn(y_true=y, y_pred=torch.sigmoid(y_pred))

#     train_loss /= len(dataloader)
#     train_acc /= len(dataloader)

#     return train_loss, train_acc

# def test_step(model: torch.nn.Module,
#               dataloader: torch.utils.data.DataLoader,
#               loss_fn: torch.nn.Module,
#               accuracy_fn):

#     model.eval()

#     test_loss, test_acc = 0, 0

#     with torch.inference_mode():
#         for batch, (X, y) in enumerate(dataloader):
#             X, y = X.to(device), y.to(device)

#             test_pred_logits = model(X).squeeze()

#             loss = loss_fn(test_pred_logits, y.float())
#             test_loss += loss.item()

#             test_acc += accuracy_fn(y_true=y, y_pred=torch.sigmoid(test_pred_logits))

#     test_loss /= len(dataloader)
#     test_acc /= len(dataloader)

#     return test_loss, test_acc

In [None]:
# from tqdm.auto import tqdm

# def train(model: torch.nn.Module,
#          loss_fn: torch.nn.Module,
#          optimizer: torch.optim.Optimizer,
#          train_dataloader: torch.utils.data.DataLoader,
#          test_dataloader: torch.utils.data.DataLoader,
#          accuracy_fn,
#          epochs: int = 5):

#     results = {
#         "train_loss": [],
#         "train_acc": [],
#         "test_loss": [],
#         "test_acc": []
#     }

#     for epoch in tqdm(range(epochs)):

#         train_loss, train_acc = train_step(model=model,
#                                           dataloader=train_dataloader,
#                                           loss_fn=loss_fn,
#                                           optimizer=optimizer,
#                                           accuracy_fn=accuracy_fn)
#         test_loss, test_acc = test_step(model=model,
#                                         dataloader=test_dataloader,
#                                         loss_fn=loss_fn,
#                                         accuracy_fn=accuracy_fn)

#         print(
#             f"Epoch: {epoch+1} | "
#             f"train_loss: {train_loss:.4f} | "
#             f"train_acc: {train_acc:.4f} | "
#             f"test_loss: {test_loss:.4f} | "
#             f"test_acc: {test_acc:.4f}"
#         )

#         results["train_loss"].append(train_loss)
#         results["train_acc"].append(train_acc)
#         results["test_loss"].append(test_loss)
#         results["test_acc"].append(test_acc)

#     return results

In [None]:
# torch.manual_seed(42)
# torch.cuda.manual_seed(42)

# from timeit import default_timer as timer

# start_time = timer()

# results = train(model=model,
#                loss_fn=loss_fn,
#                optimizer=optimizer,
#                accuracy_fn=accuracy_fn,
#                train_dataloader=train_dataloader,
#                test_dataloader=test_dataloader,
#                epochs=10)

# end_time = timer()

# print(f"[INFO] Total training time: {end_time-start_time:.3f} seconds")