**Imported Libraries**

In [1]:
import pandas as pd

from sklearn.model_selection import train_test_split
from torchvision import transforms
import torch.nn as nn

import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import torch.nn.functional as F
import torchvision.models as models
import matplotlib.pyplot as plt
import torchvision.models as models
from albumentations.pytorch import ToTensorV2
import os
from PIL import Image

import cv2
import numpy as np

from torch.utils.data import DataLoader

import albumentations as A
from albumentations.pytorch import ToTensorV2

from torchvision.transforms import ToPILImage

  Referenced from: <2D1B8D5C-7891-3680-9CF9-F771AE880676> /Users/mclevesluna/anaconda3/envs/condaenv/lib/python3.9/site-packages/torchvision/image.so
  warn(


**Load Images**

In [2]:
#Load images from each folder (test and train)
TrainImagePaths = "./V3Data/train"
TestImagePaths = "./V3Data/test"

In [3]:
def load_Trainimages_from_folder(folder):
    TrainImages = []
    TrainLabelsList=[]
    for file in os.listdir(folder):
        TrainImages.append(os.path.join(folder,file))
        TrainLabelsList.append(int(file.split(os.path.sep)[-1][-5])-1)
    return TrainImages, TrainLabelsList

In [4]:
def load_Testimages_from_folder(folder):
    TestImages = []
    TestLabelsList=[]
    for file in os.listdir(folder):
        TestImages.append(os.path.join(folder,file))
        TestLabelsList.append(int(file.split(os.path.sep)[-1][-5])-1)
    return TestImages, TestLabelsList

**Transform Images and Prepare for Training**

In [2]:

train_transform = A.Compose(
    [
        A.HorizontalFlip(p=0.5),
        A.SmallestMaxSize(max_size=256),
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=60, p=0.5),
        A.RandomCrop(height=224, width=224),
        A.RGBShift(r_shift_limit=5, g_shift_limit=5, b_shift_limit=5, p=0.5),
        A.RandomBrightnessContrast(p=0.5),
        ToTensorV2(),
    ]
)

val_transform = A.Compose(
    [
        A.SmallestMaxSize(max_size=256),
        A.CenterCrop(height=224, width=224),
        ToTensorV2(),
    ]
)

In [None]:
TrainImages, TrainLabels = load_Trainimages_from_folder(TrainImagePaths)
TestImages, TestLabels = load_Testimages_from_folder(TestImagePaths)

**Create Model Classess**

In [3]:
class ImageDataset(Dataset):
    
    def __init__(self,data_paths,labels,transform=None,mode='train'):
         self.data=data_paths
         self.labels=labels
         self.transform=transform
         self.mode=mode
    def __len__(self):
       return len(self.data)
    
    def __getitem__(self,idx):
        img_name = self.data[idx]
        img = cv2.imread(img_name)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        # img=Image.fromarray(img)
        # if self.transform is not None:
        img = self.transform(image=img)["image"]/255.
        img = img.cuda()
        
        labels = torch.tensor(self.labels[idx]).cuda()

        return img, labels
            


In [None]:
train_dataset=ImageDataset(data_paths=TrainImages,labels=TrainLabels,transform=train_transform)
val_dataset=ImageDataset(data_paths=TestImages,labels=TestLabels,transform=val_transform)

train_loader=DataLoader(train_dataset,batch_size=16,shuffle=True)
val_loader=DataLoader(val_dataset,batch_size=1,shuffle=False)

In [4]:
# Implementing attention layer

class AttentionBlock(nn.Module):
    def __init__(self, in_features_l, in_features_g, attn_features, up_factor, normalize_attn=True):
        super(AttentionBlock, self).__init__()
        self.up_factor = up_factor
        self.normalize_attn = normalize_attn
        self.W_l = nn.Conv2d(in_channels=in_features_l, out_channels=attn_features, kernel_size=1, padding=0, bias=False)
        self.W_g = nn.Conv2d(in_channels=in_features_g, out_channels=attn_features, kernel_size=1, padding=0, bias=False)
        self.phi = nn.Conv2d(in_channels=attn_features, out_channels=1, kernel_size=1, padding=0, bias=True)
    def forward(self, l, g):
        N, C, W, H = l.size()
        l_ = self.W_l(l)
        g_ = self.W_g(g)
        if self.up_factor > 1:
            g_ = F.interpolate(g_, scale_factor=self.up_factor, mode='bilinear', align_corners=False)
        c = self.phi(F.relu(l_ + g_)) # batch_sizex1xWxH
        
        # compute attn map
        if self.normalize_attn:
            a = F.softmax(c.view(N,1,-1), dim=2).view(N,1,W,H)
        else:
            a = torch.sigmoid(c)
        # re-weight the local feature
        f = torch.mul(a.expand_as(l), l) # batch_sizexCxWxH
        if self.normalize_attn:
            output = f.view(N,C,-1).sum(dim=2) # weighted sum
        else:
            output = F.adaptive_avg_pool2d(f, (1,1)).view(N,C) # global average pooling
        return a, output

In [5]:
class AttnVGG(nn.Module):
    def __init__(self, num_classes, normalize_attn=False, dropout=None):
        super(AttnVGG, self).__init__()
        net = models.vgg16_bn(pretrained=True)
        self.conv_block1 = nn.Sequential(*list(net.features.children())[0:6])
        self.conv_block2 = nn.Sequential(*list(net.features.children())[7:13])
        self.conv_block3 = nn.Sequential(*list(net.features.children())[14:23])
        self.conv_block4 = nn.Sequential(*list(net.features.children())[24:33])
        self.conv_block5 = nn.Sequential(*list(net.features.children())[34:43])
        self.pool = nn.AvgPool2d(7, stride=1)
        self.dpt = None
        if dropout is not None:
            self.dpt = nn.Dropout(dropout)
        self.cls = nn.Linear(in_features=512+512+256, out_features=num_classes, bias=True)
        
       # initialize the attention blocks defined above
        self.attn1 = AttentionBlock(256, 512, 256, 4, normalize_attn=normalize_attn)
        self.attn2 = AttentionBlock(512, 512, 256, 2, normalize_attn=normalize_attn)
        
       
        self.reset_parameters(self.cls)
        self.reset_parameters(self.attn1)
        self.reset_parameters(self.attn2)
    def reset_parameters(self, module):
        for m in module.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0.)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1.)
                nn.init.constant_(m.bias, 0.)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0., 0.01)
                nn.init.constant_(m.bias, 0.)
    def forward(self, x):
        block1 = self.conv_block1(x)       # /1
        pool1 = F.max_pool2d(block1, 2, 2) # /2
        block2 = self.conv_block2(pool1)   # /2
        pool2 = F.max_pool2d(block2, 2, 2) # /4
        block3 = self.conv_block3(pool2)   # /4
        pool3 = F.max_pool2d(block3, 2, 2) # /8
        block4 = self.conv_block4(pool3)   # /8
        pool4 = F.max_pool2d(block4, 2, 2) # /16
        block5 = self.conv_block5(pool4)   # /16
        pool5 = F.max_pool2d(block5, 2, 2) # /32
        N, __, __, __ = pool5.size()
        
        g = self.pool(pool5).view(N,512)
        a1, g1 = self.attn1(pool3, pool5)
        a2, g2 = self.attn2(pool4, pool5)
        g_hat = torch.cat((g,g1,g2), dim=1) # batch_size x C
        if self.dpt is not None:
            g_hat = self.dpt(g_hat)
        out = self.cls(g_hat)

        return [out, a1, a2]

In [6]:
model = AttnVGG(num_classes=5, normalize_attn=True)



**Adjust Loss and Optimizer for Created Model**

In [38]:
# changed focal to cross enthropy loss
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

**Train Model**

In [44]:
import time
start_time = time.time()

epochs = 100

train_losses = []
train_auc=[]
val_auc=[]

for i in range(epochs):

    train_preds=[]
    train_targets=[]
    auc_train=[]
    loss_epoch_train=[]
    running_loss = 0
    running_loss_val = 0
    # Run the training batches
    for b, (X_train, y_train) in enumerate(train_loader):
        
        b+=1
        y_pred,_,_=model(X_train)
        loss = criterion(y_pred, y_train)   
        loss_epoch_train.append(loss.item())
        # For plotting purpose
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(running_loss/len(train_loader))   
    acc = 0
    # Run the testing batches
    with torch.no_grad():
        for b, (X_test, y_test) in enumerate(val_loader):
            
            y_val,_,_ = model(X_test)
            loss = criterion(y_val, y_test)
            running_loss_val += loss.item()

            res = F.softmax(y_val)

            pred = torch.argmax(res, 1)
            acc += torch.sum(pred==y_test)

    if acc/len(val_loader)>=0.88
        break

    print(running_loss_val/len(val_loader), acc/len(val_loader))

0.20810227132425074


  res = F.softmax(y_val)


0.6071600172473909 tensor(0.7516, device='cuda:0')
0.18783490977636197
0.5145018588495898 tensor(0.7908, device='cuda:0')
0.24882987986614064
0.5870162542130238 tensor(0.7778, device='cuda:0')
0.17253897143755018
0.5104692158185773 tensor(0.8105, device='cuda:0')
0.1527851317077875
0.5158046749633511 tensor(0.8431, device='cuda:0')
0.15163079690097309
0.5469482316775551 tensor(0.7974, device='cuda:0')
0.1257767251170263
0.4739392288455289 tensor(0.8366, device='cuda:0')
0.18556353431649325
0.6216215875632818 tensor(0.7843, device='cuda:0')
0.17946387972773575
0.8399625208374917 tensor(0.6993, device='cuda:0')
0.17238183115131972
0.6467660231412284 tensor(0.7778, device='cuda:0')
0.12173517025643732
0.5325753948791256 tensor(0.7974, device='cuda:0')
0.11708921021441134
0.5075109527138932 tensor(0.7974, device='cuda:0')
0.10205929751348931
0.5654188533012262 tensor(0.7908, device='cuda:0')
0.16200134944079853
0.45296109472310137 tensor(0.8431, device='cuda:0')
0.13793422018245952
0.59605

In [48]:
# Save model
torch.save(model.state_dict(), os.path.join(save_dir, "model.pth"))


In [7]:
# Load the model on your CPU AND SET IT TO EVALUATION MODE
model_path = "/Users/mclevesluna/Documents/ML&AI/Henshim/V3SavedModel/model.pth"
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
model.eval()  # Set the model to evaluation mode

AttnVGG(
  (conv_block1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
  )
  (conv_block2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
  )
  (conv_block3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(256, eps=1e-05, momentum=0

In [8]:
#Take Picture
# Initialize the camera
cam_port = 0
cam = cv2.VideoCapture(cam_port)

while True:
    # Read the input from the camera
    ret, frame = cam.read()

    # Display the captured frame
    cv2.imshow('Press q to capture', frame)

    # Check for the 'q' key to capture the frame
    if cv2.waitKey(1) & 0xFF == ord('q'):
        # Save the captured frame as an image
        cv2.imwrite('captured_image.png', frame)
        print("Image captured successfully!")
        
        # Release the camera and close the window
        cam.release()
        cv2.destroyAllWindows()
        break

# Release the camera and close the window (additional safety measure)
cam.release()
cv2.destroyAllWindows()


Image captured successfully!


In [9]:
# Make a prediction for an image!
def predict(input_path, model, transform):
    # Load the saved image
    input_image = cv2.imread(input_path)
    if input_image is None:
        print("Error: Unable to load the image from the specified path.")
        return None, None

    input_image = cv2.cvtColor(input_image, cv2.COLOR_BGR2RGB)

    # Apply the validation transform
    input_transformed = transform(image=input_image)['image']
    input_transformed = input_transformed.unsqueeze(0)  # Makes the shape of the tensor [1, 3, 224, 224]

    # Ensure input tensor data type matches the model's parameters
    input_tensor = input_transformed.to(torch.float32)

    # Convert the transformed input to a tensor and move it to GPU if available
    input_tensor = input_tensor.cuda() if torch.cuda.is_available() else input_tensor

    # Perform inference
    with torch.no_grad():
        # Make prediction
        output, _, _ = model(input_tensor)
        # Convert the output tensor to a probability distribution
        probabilities = torch.softmax(output, dim=1)
        # Get the predicted class index
        predicted_class = torch.argmax(probabilities, dim=1).item()

    return predicted_class, probabilities.cpu().numpy()

# Example usage
input_path = 'captured_image.png'
predicted_class, probabilities = predict(input_path, model, val_transform)
print(f'Predicted Class: {predicted_class}')
print(f'Probabilities: {probabilities}')


Predicted Class: 0
Probabilities: [[1. 0. 0. 0. 0.]]


In [10]:
import pygame

# Initialize pygame
pygame.init()

# Define the sound files corresponding to each class label
sound_files = {
    0: 'sound0.wav',
    1: 'sound1.wav',
    2: 'sound2.wav',
    3: 'sound3.wav',
    4: 'sound4.wav'
}

# Load the sounds
sounds = {label: pygame.mixer.Sound(sound_file) for label, sound_file in sound_files.items()}

# Make a prediction for an image!
def predict(input_path, model, transform):
    # Load the saved image
    input_image = cv2.imread(input_path)
    if input_image is None:
        print("Error: Unable to load the image from the specified path.")
        return None, None

    input_image = cv2.cvtColor(input_image, cv2.COLOR_BGR2RGB)

    # Apply the validation transform
    input_transformed = transform(image=input_image)['image']
    input_transformed = input_transformed.unsqueeze(0)  # Makes the shape of the tensor [1, 3, 224, 224]

    # Ensure input tensor data type matches the model's parameters
    input_tensor = input_transformed.to(torch.float32)

    # Convert the transformed input to a tensor and move it to GPU if available
    input_tensor = input_tensor.cuda() if torch.cuda.is_available() else input_tensor

    # Perform inference
    with torch.no_grad():
        # Make prediction
        output, _, _ = model(input_tensor)
        # Convert the output tensor to a probability distribution
        probabilities = torch.softmax(output, dim=1)
        # Get the predicted class index
        predicted_class = torch.argmax(probabilities, dim=1).item()

    # Play the corresponding sound
    sounds[predicted_class].play()

    return predicted_class, probabilities.cpu().numpy()

# Example usage
input_path = 'captured_image.png'
predicted_class, probabilities = predict(input_path, model, val_transform)
print(f'Predicted Class: {predicted_class}')
print(f'Probabilities: {probabilities}')


pygame 2.5.2 (SDL 2.28.3, Python 3.9.18)
Hello from the pygame community. https://www.pygame.org/contribute.html
Predicted Class: 0
Probabilities: [[1. 0. 0. 0. 0.]]


In [27]:
# Initialize pygame
pygame.init()

# Define the sound files corresponding to each class label
sound_files = {
    0: 'sound0.wav',
    1: 'sound1.wav',
    2: 'sound2.wav',
    3: 'sound3.wav',
    4: 'sound4.wav'
}

# Load the sounds
sounds = {label: pygame.mixer.Sound(sound_file) for label, sound_file in sound_files.items()}

# Function to adjust volume based on speech command
def adjust_volume():
    recognizer = sr.Recognizer()
    with sr.Microphone() as source:
        print("Listening for volume commands (say 'up' or 'down')...")
        recognizer.adjust_for_ambient_noise(source)
        audio = recognizer.listen(source)

    try:
        command = recognizer.recognize_google(audio).lower()
        if 'up' in command:
            pygame.mixer.music.set_volume(min(pygame.mixer.music.get_volume() + 0.2, 1.0))
            print("Volume increased.")
        elif 'down' in command:
            pygame.mixer.music.set_volume(max(pygame.mixer.music.get_volume() - 0.2, 0.0))
            print("Volume decreased.")
        else:
            print("Unknown command. Please say 'up' or 'down'.")
    except sr.UnknownValueError:
        print("Speech recognition could not understand audio.")
    except sr.RequestError as e:
        print("Could not request results from Google Speech Recognition service; {0}".format(e))

# Function to make a prediction and play sound with adjusted volume
def predict_and_play(input_path, model, transform):
    # Make a prediction
    predicted_class, _ = predict(input_path, model, transform)
    # Play sound with adjusted volume
    sounds[predicted_class].set_volume(pygame.mixer.music.get_volume())
    sounds[predicted_class].play()

# Example usage
input_path = 'captured_image.png'
predict_and_play(input_path, model, val_transform)

# Adjust volume based on speech command
adjust_volume()


Listening for volume commands (say 'up' or 'down')...
Speech recognition could not understand audio.


In [None]:
# Function to start recording
# Initialize the camera
cam_port = 0
cam = cv2.VideoCapture(cam_port)

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('captured_video.avi', fourcc, 20.0, (640, 480))

# Start capturing video
while True:
    # Read the input from the camera
    ret, frame = cam.read()

    # Display the captured frame
    cv2.imshow('Recording... Press q to stop', frame)

    # Write the frame to the video file
    out.write(frame)

    # Check for the 'q' key to stop recording
    if cv2.waitKey(1) & 0xFF == ord('q'):
        print("Recording stopped!")
        break

# Release the camera and video writer, and close the window
cam.release()
out.release()
cv2.destroyAllWindows()

In [38]:
def predict_from_video(video_path, model, transform):
    # Initialize video capture object
    cap = cv2.VideoCapture(video_path)
    
    # Initialize variables to store predictions
    predictions = []
    all_probabilities = []
    
    # Loop through video frames
    while cap.isOpened():
        ret, frame = cap.read()
        
        if not ret:
            break
        
        # Convert frame to RGB
        input_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Apply transformation
        input_transformed = transform(image=input_image)['image']
        input_transformed = input_transformed.unsqueeze(0)  # Add batch dimension
        
        # Ensure input tensor data type matches the model's parameters
        input_tensor = input_transformed.to(torch.float32)
        
        # Move tensor to GPU if available
        input_tensor = input_tensor.cuda() if torch.cuda.is_available() else input_tensor
        
        # Perform inference
        with torch.no_grad():
            output, _, _ = model(input_tensor)
            probabilities = torch.softmax(output, dim=1)
            predicted_class = torch.argmax(probabilities, dim=1).item()
        
        # Append predictions and probabilities
        predictions.append(predicted_class)
        all_probabilities.append(probabilities.cpu().numpy())
    
    # Release video capture object
    cap.release()
    
    return predictions, all_probabilities

# Example usage
video_path = 'captured_video.avi'  # Replace with the path to your captured video
predicted_classes, all_probabilities = predict_from_video(video_path, model, val_transform)
print(f'Predicted Classes: {predicted_classes}')
print(f'Probabilities: {all_probabilities}')

Predicted Classes: []
Probabilities: []


In [8]:
# Define the prediction function for live webcam feed
def predict_from_webcam(model, transform):
    # Initialize video capture object for webcam
    cap = cv2.VideoCapture(0)  # Use 0 as the argument for the default webcam
    
    # Define class labels
    class_labels = ['0', '1', '2', '3', '4']  # Class labels corresponding to class indices 0, 1, 2, 3, 4
    
    while cap.isOpened():
        ret, frame = cap.read()
        
        if not ret:
            break
        
        # Convert frame to RGB
        input_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Apply transformation
        input_transformed = transform(image=input_image)['image']
        input_transformed = input_transformed.unsqueeze(0)  # Add batch dimension
        
        # Ensure input tensor data type matches the model's parameters
        input_tensor = input_transformed.to(torch.float32)
        
        # Move tensor to GPU if available
        input_tensor = input_tensor.cuda() if torch.cuda.is_available() else input_tensor
        
        # Perform inference
        with torch.no_grad():
            output, _, _ = model(input_tensor)
            probabilities = torch.softmax(output, dim=1)
            predicted_class = torch.argmax(probabilities, dim=1).item()
        
        # Map predicted class to human-readable label
        predicted_label = class_labels[predicted_class]
        
        # Display the frame with prediction
        cv2.putText(frame, f'Predicted Class: {predicted_label}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.imshow('Webcam Feed', frame)
        
        # Break the loop if 'q' key is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Release video capture object and close all OpenCV windows
    cap.release()
    cv2.destroyAllWindows()

# Example usage
predict_from_webcam(model, val_transform)