In [None]:
import cv2
import mediapipe as mp

mp_drawing = mp.solutions.drawing_utils
mp_face_mesh = mp.solutions.face_mesh

def detect_landmarks(image):
    with mp_face_mesh.FaceMesh(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5) as face_mesh:
        
        image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = face_mesh.process(image)

        image.flags.writeable = True
        image_with_landmarks = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        if results.multi_face_landmarks:
            for face_landmarks in results.multi_face_landmarks:
                mp_drawing.draw_landmarks(
                    image=image_with_landmarks,
                    landmark_list=face_landmarks,
                    connections=mp_face_mesh.FACEMESH_CONTOURS,
                    landmark_drawing_spec=mp_drawing.DrawingSpec(color=(0,255,0), thickness=1, circle_radius=1),
                    connection_drawing_spec=mp_drawing.DrawingSpec(color=(255,0,0), thickness=1))
                    
        return image_with_landmarks

# Load an input image using OpenCV
image = cv2.imread('/Users/Shared/D/Applied AI Solutions/DL2/FER/data/train/happy/happy_24.jpg')
# increase the size of the image
image = cv2.resize(image, (256, 256))

# Call the detect_landmarks function to obtain the image with landmark overlay
image_with_landmarks = detect_landmarks(image)

# Display the image with landmark overlay
cv2.imshow('MediaPipe FaceMesh', image_with_landmarks)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [None]:
smile_threshold = 0.5  # adjust this as needed


cap = cv2.VideoCapture(0)

while cap.isOpened():
    success, image = cap.read()
    if not success:
        print("Ignoring empty camera frame.")
        continue

    image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = mp_face_mesh.process(image)

    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

    if results.multi_face_landmarks:
        for face_landmarks in results.multi_face_landmarks:
            # extract the mouth landmarks
            mouth_landmarks = face_landmarks.landmark[mp_face_mesh.FACEMESH_CONTOURS[13:23]]

            # compute the distance between the corners of the mouth
            mouth_width = (mouth_landmarks[12].x - mouth_landmarks[4].x) * image.shape[1]

            # detect a smile if the mouth is open wider than a certain threshold
            if mouth_width > smile_threshold:
                cv2.putText(image, "Smiling", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

            mp_drawing.draw_landmarks(
                image=image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh.FACEMESH_CONTOURS,
                landmark_drawing_spec=mp_drawing.DrawingSpec(color=(0,255,0), thickness=1, circle_radius=1),
                connection_drawing_spec=mp_drawing.DrawingSpec(color=(255,0,0), thickness=1))

    cv2.imshow('MediaPipe FaceMesh', image)
    if cv2.waitKey(1) == ord('q'):
        break

# Started from here!

In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import cv2
import mediapipe as mp
import numpy as np
import os
from PIL import Image

objc[62928]: Class CaptureDelegate is implemented in both /Users/sertanavdan/miniconda3/envs/torch/lib/python3.10/site-packages/cv2/cv2.abi3.so (0x1637365a0) and /Users/sertanavdan/miniconda3/envs/torch/lib/python3.10/site-packages/mediapipe/.dylibs/libopencv_videoio.3.4.16.dylib (0x1602d4860). One of the two will be used. Which one is undefined.
objc[62928]: Class CVWindow is implemented in both /Users/sertanavdan/miniconda3/envs/torch/lib/python3.10/site-packages/cv2/cv2.abi3.so (0x1637365f0) and /Users/sertanavdan/miniconda3/envs/torch/lib/python3.10/site-packages/mediapipe/.dylibs/libopencv_highgui.3.4.16.dylib (0x137d3ca68). One of the two will be used. Which one is undefined.
objc[62928]: Class CVView is implemented in both /Users/sertanavdan/miniconda3/envs/torch/lib/python3.10/site-packages/cv2/cv2.abi3.so (0x163736618) and /Users/sertanavdan/miniconda3/envs/torch/lib/python3.10/site-packages/mediapipe/.dylibs/libopencv_highgui.3.4.16.dylib (0x137d3ca90). One of the two will be

In [2]:
test_dir = 'data/test/'
train_dir = 'data/train/'
classes = os.listdir(train_dir)
num_classes = len(classes)
print(classes)

['happy', 'sad', 'fear', 'surprise', 'neutral', 'angry', 'disgust']


In [3]:
mp_face_mesh = mp.solutions.face_mesh
def detect_landmarks(image):
    # Initialize the FaceMesh model with default parameters
    with mp_face_mesh.FaceMesh(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5) as face_mesh:
        
        # Convert the image to RGB format and feed it to the model
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(image)
        
        # Initialize an empty tensor to store the landmarks
        landmarks = torch.zeros(1, 468, 2, dtype=torch.float32)
        
        # If a face is detected, extract its landmarks and save them in the tensor
        if results.multi_face_landmarks:
            for face_landmarks in results.multi_face_landmarks:
                for i, landmark in enumerate(face_landmarks.landmark):
                    landmarks[0][i][0] = landmark.x
                    landmarks[0][i][1] = landmark.y
        
        # Convert the RGB image back to BGR format
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        # Return the image and landmarks as separate PyTorch tensors
        return torch.tensor(image, dtype=torch.float32), landmarks

In [4]:
# test
image = cv2.imread('data/test/angry/angry_1.jpg')
image, landmarks = detect_landmarks(image)

print(image.shape) #torch.Size([48, 48, 3])
print(landmarks.shape) #torch.Size([1, 468, 2])

torch.Size([48, 48, 3])
torch.Size([1, 468, 2])


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [5]:
class LandmarkDataset(Dataset):
    def __init__(self, path, transform=None):
        self.path = path
        self.transform = transform
        self.classes = os.listdir(self.path)
        self.files = []

        for c in self.classes:
            class_folder = os.path.join(self.path, c)
            if os.path.isdir(class_folder):  # Add this line to check if the item is a directory
                for file in os.listdir(class_folder):
                    self.files.append((os.path.join(class_folder, file), self.classes.index(c)))

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img_path, label = self.files[idx]
        img = cv2.imread(img_path)
        img, landmarks = detect_landmarks(img)
        img = Image.fromarray(img.numpy().astype(np.uint8))
        if self.transform:
            img = self.transform(img)
        return img, landmarks, label

In [6]:
# class MultiHeadAttention(nn.Module):
#     def __init__(self, d_model, num_heads):
#         super().__init__()
#         self.d_model = d_model
#         self.num_heads = num_heads
#         self.head_dim = d_model // num_heads

#         self.W_q = nn.Linear(d_model + 468 * 2, d_model, bias=False)
#         self.W_k = nn.Linear(d_model + 468 * 2, d_model, bias=False)
#         self.W_v = nn.Linear(d_model + 468 * 2, d_model, bias=False)
#         self.fc_out = nn.Linear(d_model, d_model)

#     def forward(self, x, landmarks):
#         N = x.shape[0]
        
#         # Concatenate the features with the landmarks
#         x = torch.cat((x, landmarks), dim=1)

#         Q = self.W_q(x).view(N, self.num_heads, self.head_dim)
#         K = self.W_k(x).view(N, self.num_heads, self.head_dim)
#         V = self.W_v(x).view(N, self.num_heads, self.head_dim)

#         energy = torch.einsum("nqhd,nkhd->nhqk", [Q, K])
#         attention = torch.softmax(energy / (self.d_model ** 0.5), dim=3)
#         out = torch.einsum("nhql,nlhd->nqhd", [attention, V]).contiguous()
#         out = out.view(N, self.d_model)
#         out = self.fc_out(out)
#         return out


In [7]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        self.W_q = nn.Linear(d_model + 468 * 2, d_model, bias=False)
        self.W_k = nn.Linear(d_model + 468 * 2, d_model, bias=False)
        self.W_v = nn.Linear(d_model + 468 * 2, d_model, bias=False)
        self.fc_out = nn.Linear(d_model, d_model)

    def forward(self, x, landmarks):
        N = x.shape[0]
        
        # Concatenate the features with the landmarks
        x = torch.cat((x, landmarks), dim=1)

        Q = self.W_q(x).view(N, self.num_heads, self.head_dim)
        K = self.W_k(x).view(N, self.num_heads, self.head_dim)
        V = self.W_v(x).view(N, self.num_heads, self.head_dim)

        """
            This part has no idea
        """
        # Compute attention scores using matrix multiplication
        scores = torch.matmul(Q, K.transpose(-2, -1))
        # Scale the scores
        scaled_scores = scores / (self.d_model ** 0.5)
        # Apply softmax to get the attention weights
        attention_weights = torch.softmax(scaled_scores, dim=-1)
        # Compute the output using matrix multiplication
        out = torch.matmul(attention_weights, V)
        # Concatenate the heads and apply the output linear layer
        out = out.transpose(0, 1).reshape(N, -1)


        out = self.fc_out(out)
        
        return out


# Notes
1. Paralell branching on the CNN model can be tested!
2. Balance of the dataset needs to be checked!


In [8]:
class LandmarkAttentionCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        # Block-1
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(32),
            nn.Conv2d(32, 32, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.2)
        )

        # Block-2
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.2)
        )

        # Block-3
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 128, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.2)
        )

        # Block-4
        self.conv4 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 256, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.2)
        )

        # Block-5
        self.fc1 = nn.Sequential(
            nn.Linear(256 * 4 * 4, 128),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(128),
            nn.Dropout(0.5)
        )

        # Attention Layer
        self.attention = MultiHeadAttention(128, 8)

        # Block-6
        self.fc2 = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(64),
            nn.Dropout(0.5)
        )

        # Block-7
        self.fc3 = nn.Linear(64, num_classes)

    def forward(self, x, landmarks):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)

        x = x.view(-1, 256 * 4 * 4)
        x = self.fc1(x)
        x = self.attention(x, landmarks.view(-1, 468 * 2))  # Pass landmarks as input to the MultiHeadAttention module
        x = F.relu(x)
        x = self.fc2(x)
        class_output = self.fc3(x)
        return class_output


num_classes = 7  # Update this value based on your specific problem
model = LandmarkAttentionCNN(num_classes)


In [10]:
import numpy as np

input_size = [(3, 64, 64), (468, 2)]
batch_size = 8

total_input_size = sum([np.prod(size) for size in input_size]) * batch_size * 4. / (1024 ** 2.)
print(f"Total input size: {total_input_size:.2f} MB")

Total input size: 0.40 MB


In [11]:
# visualize the model before training
model = LandmarkAttentionCNN(7)
model.eval()

LandmarkAttentionCNN(
  (conv1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): ReLU(inplace=True)
    (2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (4): ReLU(inplace=True)
    (5): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Dropout(p=0.2, inplace=False)
  )
  (conv2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): ReLU(inplace=True)
    (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (4): ReLU(inplace=True)
    (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_st

In [16]:
# Train the model
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

num_classes = len(os.listdir(train_dir))

train_dataset = LandmarkDataset(train_dir, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True)

val_dataset = LandmarkDataset(test_dir, transform=transform)
val_dataloader = DataLoader(val_dataset, batch_size=4, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LandmarkAttentionCNN(num_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

In [17]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (data, landmarks, targets) in enumerate(dataloader):
        data = data.to(device)
        targets = targets.to(device)
        # print(data.shape)
        # forward
        outputs = model(data, landmarks)
        loss = criterion(outputs, targets)
        # backward
        optimizer.zero_grad()
        loss.backward()
        # gradient descent or adam step
        optimizer.step()
        # update metrics
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    return running_loss / len(dataloader), correct / total

In [18]:
def validate_epoch(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, landmarks, targets) in enumerate(dataloader):
            data = data.to(device)
            targets = targets.to(device)

            #print(data.shape)
            # forward
            outputs = model(data, landmarks)
            loss = criterion(outputs, targets)

            # update metrics
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    return running_loss / len(dataloader), correct / total

In [19]:
print(device)

cpu


In [20]:
num_epochs = 10
for epoch in range(num_epochs):
    train_loss, train_acc = train_epoch(model, train_dataloader, criterion, optimizer, device)
    val_loss, val_acc = validate_epoch(model, val_dataloader, criterion, device)
    scheduler.step(val_loss)

    print(f"Epoch: {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")