In [None]:
%pip install face-alignment

In [None]:
import cv2 as cv
import math
from face_alignment import FaceAlignment
from face_alignment import LandmarksType
import matplotlib.pyplot as plt
import cv2 as cv
import numpy as np
from PIL import Image
import os
import torch
from torchvision.transforms import functional as TF
import torchvision.transforms as transforms

## Data analysis

- Show the number of frames contained in each sequence

- Show the sequence frames along with the Prkachin and Solomon Pain Intensity (PSPI) score of each frame in a graph

- Show the number of the sequence that are labeled as no pain, medium pain and severe pain in a histogram




In [None]:
import os
import cv2
import matplotlib.pyplot as plt
from scipy.interpolate import make_interp_spline
from matplotlib.offsetbox import OffsetImage, AnnotationBbox

In [None]:
data_root = "D:\\Library\\Documents\\UM Lecture Notes & Tutorial\\FYP\\Dataset\\UNBC-McMaster Shoulder Pain Data"

if os.path.exists(data_root):
    print("Path is found.",sep=' ', end=' ', flush=True)
    if os.scandir(data_root):
        print("Folder is not empty.")


In [None]:
# Define the directory where your data is stored
img_dir = os.path.join(data_root, "Images")
images = os.listdir(img_dir)


The total count of the PSPI from 0 - 10


In [None]:
PSPI_label = data_root + '\\Frame_Labels\\PSPI'

label_count = {}

for subject in os.listdir(PSPI_label):
    for sequence in os.listdir(os.path.join(PSPI_label, subject)):
        for label in os.listdir(os.path.join(PSPI_label,subject, sequence)):
            with open(os.path.join(PSPI_label,subject, sequence, label), 'r') as label_file:
                vas = int(float(label_file.read().strip()))
                if label_count.get(vas) is None:
                    label_count[vas] = 1
                else:
                    temp = label_count.get(vas)
                    #update count value
                    label_count[vas] += 1

print(label_count)


In [None]:
# Extract labels and counts from the dictionary
sorted_label_counts = dict(sorted(label_count.items()))

labels = list(sorted_label_counts.keys())
counts = list(sorted_label_counts.values())

# Create a bar plot (histogram)
plt.bar(labels, counts,tick_label=labels)

# Adding labels and title
plt.xlabel('Labels')
plt.ylabel('Counts')
plt.title('PSPI Label Counts Histogram')

# # Display the plot
plt.tight_layout()

for x, y in zip(labels, counts):
    plt.annotate(f'{y}', (x, y), textcoords="offset points", xytext=(0, 100), ha='center')

plt.show()


Frames vs PSPI Score

In [None]:
from matplotlib.layout_engine import ConstrainedLayoutEngine
def print_img_PSPI(img_dir, subject, sequence):
    image_filenames = []
    pain_intensity_labels = []
    average_intensity_scores = []

    sequence_dir = os.path.join(img_dir, subject , sequence)
    image_label_dir = os.path.join(data_root, "Frame_Labels","PSPI", subject, sequence)

    for filename in sorted(os.listdir(sequence_dir)):
        if filename.endswith('.png'):  # Adjust the file extension as needed
            # Load the image
            img = cv2.imread(os.path.join(sequence_dir, filename))
            image_filenames.append(img)

            # Load the pain intensity label from the corresponding text file
            label_filename = os.path.splitext(filename)[0] + '_facs.txt'

            with open(os.path.join(image_label_dir, label_filename), 'r') as label_file:
                pain_intensity = int(float(label_file.read().strip()))
                pain_intensity_labels.append(pain_intensity)

   # Calculate average intensity scores for each batch (for demonstration purposes)
    num_batches = 15
    batch_size = len(image_filenames) // num_batches


    for batch_index in range(num_batches):
        start_index = batch_index * batch_size
        end_index = start_index + batch_size


        # Calculate the average intensity score for the current batch
        average_intensity = np.mean(pain_intensity_labels[start_index:end_index])
        average_intensity_scores.append(pain_intensity_labels[start_index])

    # Create a smooth curve fitted graph
    x = np.arange(0, num_batches)
    y = average_intensity_scores

    # Use make_interp_spline for a smooth curve fit
    x_smooth = np.linspace(x.min(), x.max(), 300)
    y_smooth = make_interp_spline(x, y, k=3)(x_smooth)

    fig = plt.figure(figsize=(16, 10),constrained_layout= True)

    gs1 = fig.add_gridspec(2, num_batches,height_ratios=[1,3])

    # Create subplots for the images in the first row
    for i in range(num_batches):
        ax = fig.add_subplot(gs1[0, i])
        ax.imshow(cv2.cvtColor(image_filenames[i * batch_size], cv2.COLOR_BGR2RGB))
        ax.axis('off')

    # Plot the smooth curve in the second row
    ax = fig.add_subplot(gs1[1, :num_batches])
    ax.plot(x_smooth, y_smooth, label='Smooth Curve', linewidth=2)
    ax.scatter(x, y, color='red', marker='o', label='Data Points')
    ax.set_xlabel('Batch Index')
    ax.set_ylabel('Average Pain Intensity')
    ax.set_title('Smooth Curve Fitted Graph - ' + sequence)
    ax.legend()
    ax.grid(True)

    plt.show()



In [None]:
# Initialize lists to store image filenames and pain intensity labels
for subject in images:
    for sequence in os.listdir(img_dir+'/'+subject):
        print_img_PSPI(img_dir, subject, sequence)

    break


## Preprocess the data
- enhance the appearance of the face
- align the face
- crop the face

In [None]:
from tqdm import trange, tqdm_notebook
from time import sleep 

class preprocess():

    def __init__(self, frames, subject_code, sequence_code):
        super(preprocess, self).__init__()
        self.frames = frames
        self.subject_code = subject_code
        self.sequence_code = sequence_code
        # Do landmark detection on the input frames for face recognition proposes
        # self.landmarkDetection()

        self.readLandmarkLabel()

        # Mask the non-face area with black pixels
        self.frames = self.maskFace()

        # Tilt and align the face at centre, then crop the frames according to the face region
        self.frames = self.tiltAlign()

        self.frames = self.resize_frames()

        self.savePreprocessedimages()

    def resize_frames(self):
        frames = []

        for img in self.frames:
            img = cv.resize(img, (120,120))
            frames.append(img)

        return frames
    def savePreprocessedimages(self):
        print("Saving...")
        path = "D:\\Library\\Documents\\UM Lecture Notes & Tutorial\\FYP\\Dataset\\Preprocessed"
        target_dir =""
        # check filepath exist
        if not os.path.exists(path):
            print("checking root...")
            os.mkdir(path)
            
        subject_path = os.path.join(path, self.subject_code)

        if not os.path.exists(subject_path):
            print("checking subject folder")
            os.mkdir(subject_path)

        sequence_path = os.path.join(subject_path, self.sequence_code)

        if not os.path.exists(sequence_path):
            print("checking sequence path")
            os.mkdir(sequence_path)
            target_dir = sequence_path

        print(target_dir)
        for i in trange((len(self.frames)), desc="Saving_" +self.sequence_code ):
            n = f'{i+1:03}'
            cv.imwrite(f'{target_dir}\{self.sequence_code}{n}.png', cv.cvtColor(self.frames[i], cv.COLOR_BGR2RGB))
            sleep(0.01)

    def readLandmarkLabel(self):
        label_path = os.path.join(data_root, "AAM_landmarks", self.subject_code, self.sequence_code)
        all_frames_landmark = []
        frames = self.frames
        n=0
        for frames_label in os.listdir(label_path):
            
            landmarks = []
            landmarks_tuple = []

            img = (frames[n])
            img = img.copy()
            
            if frames_label.endswith(".txt"):
                landmark_file_path = os.path.join(label_path, frames_label)

                with open(landmark_file_path,  'r') as file:
                    landmarks = [list(map(float, line.strip().split())) for line in file]

            if landmarks is not None:
                # Iterate over the detected faces
                for pred in landmarks:
                    
                    x, y = pred
                    landmarks_tuple.append((int(x), int(y)))
                    if 0 <= x < img.shape[1] and 0 <= y < img.shape[0]:
                        cv.circle(img, (int(x), int(y)), 2, (0, 255, 0), -1)
            all_frames_landmark.append(landmarks_tuple)
            n += 1

        self.framesLandmark=all_frames_landmark

    def delete_files_in_directory(self, directory_path):
        try:
            with os.scandir(directory_path) as entries:
                for entry in entries:
                    if entry.is_file():
                        os.unlink(entry.path)
                print("All files deleted successfully.")
        except OSError:
            print("Error occurred while deleting files.")

    def landmarkDetection(self):
        print("Detecting landmark...")
        frames = self.frames
        output = []
        framesLandmark = []
        model = FaceAlignment(landmarks_type=LandmarksType.TWO_D, face_detector='blazeface',
                              face_detector_kwargs={'back_model': True}, device='cpu')
        
        for n in trange(len(frames), desc="Detecting landmark_" + self.sequence_code):
            img = (frames[n])
            img = img.copy()
            landmarks = model.get_landmarks(img)
            landmarks_tuple = []
            if landmarks is not None:
                # Iterate over the detected faces
                for pred in landmarks:
                    # Draw landmarks on the frame
                    for point in pred:
                        x, y = point
                        landmarks_tuple.append((int(x), int(y)))
                        if 0 <= x < img.shape[1] and 0 <= y < img.shape[0]:
                            cv.circle(img, (int(x), int(y)), 2, (0, 255, 0), -1)

            framesLandmark.append(landmarks_tuple)
            output.append(img)
            sleep(0.01)

        self.framesLandmark = framesLandmark

    def tiltAlign(self):
        print("Tilting and aligning...")
        frames = self.frames
        output =[]
        for i in trange(len(frames), desc = "Tilt_align_" + self.sequence_code):
            
            img = frames[i]
            landmarkTuple = self.framesLandmark[i]
            # Landmark index of reight eye and left eye are
            right_eye_cood = [(landmarkTuple[39][0] + landmarkTuple[36][0])/2, (landmarkTuple[39][1] + landmarkTuple[36][1])/2]
            left_eye_cood = [(landmarkTuple[45][0] + landmarkTuple[42][0])/2, (landmarkTuple[45][1] + landmarkTuple[42][1])/2]
            x1, y1 = right_eye_cood
            x2, y2 = left_eye_cood

            a = abs(y1 - y2)
            b = abs(x2 - x1)
            c = math.sqrt(a * a + b * b)

            cos_alpha = (b * b + c * c - a * a) / (2 * b * c)

            alpha = np.arccos(cos_alpha)
            alpha = (alpha * 180) / math.pi
            img = Image.fromarray(img)
            if y1>y2 :
                alpha = -alpha
            img = np.array(img.rotate(alpha))
            plt.imshow(cv.cvtColor(img, cv.COLOR_BGR2RGB))
            output.append(img)
            
            sleep(0.01)
        return output

    def maskFace(self):
        print("Masking...")
        routes = [i for i in range (16,-1,-1)] + [i for i in range (17,26+1)]

        frames = self.frames
        output = []
        for n in trange(len(frames), desc = "Masking_" + self.sequence_code):
            
            routes_cod = []
            mask = None
            out = None
            landmarks_tuple = self.framesLandmark[n]
            img = (frames[n])
            img = img.copy()
            img2 = img.copy()
            for i in range (0, len(routes)-1):
                source_point = routes[i]
                target_point = routes[i+1]

                source_cod = landmarks_tuple[source_point]
                target_cod = landmarks_tuple[target_point]
                routes_cod.append(source_cod)
                cv.line(img, (source_cod), (target_cod),(255,255,255),2)

            routes_cod = routes_cod+[routes_cod[0]]

            mask = np.zeros((img.shape[0], img.shape[1]))
            mask = cv.fillConvexPoly(mask, np.array(routes_cod),1)
            mask = mask.astype(np.bool_)
            out = np.zeros_like(img)
            out[mask] = img2[mask]
            # plt.imshow(cv.cvtColor(out, cv.COLOR_BGR2RGB))
            output.append(cv.cvtColor(self.cropFaceArea(out, mask), cv.COLOR_BGR2RGB))
            
            sleep(0.01)
        return output

    def cropFaceArea(self, frame, mask):

        gray = cv.cvtColor(frame,cv.COLOR_BGR2GRAY)
        contours, _ = cv.findContours(gray, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)

        # Get the bounding box of the largest contour

        largest_contour = max(contours, key=cv.contourArea)
        x, y, w, h = cv.boundingRect(largest_contour)

        # Crop the image to the size of the masked face
        cropped_image = frame[y:y+h, x:x+w]

        return cropped_image

    def padding_normalization(self, target_length):
        print("padding and normalising...")
        """
        Preprocesses a sequence of images and pads them to a target length.

        Args:
            images (list): List of PIL images.
            target_length (int): Desired length of the sequence after padding.

        Returns:
            torch.Tensor: Tensor of preprocessed and padded images.
        """
        # Resize the images to a consistent size
        array_images = self.frames
        images =[]

        for image in array_images:
            images.append((Image.fromarray(image)))

        resized_images = [TF.resize((img), [224, 224]) for img in images]

        # Convert the images to tensors
        tensor_images = [TF.to_tensor(img) for img in resized_images]

        # Stack the tensor images along a new dimension (sequence dimension)
        tensor_sequence = torch.stack(tensor_images)

        # Calculate the current length of the sequence
        current_length = tensor_sequence.size(0)

        # Pad the sequence if necessary
        if current_length < target_length:
            padding_length = target_length - current_length
            padding = torch.zeros(padding_length, *tensor_sequence.shape[1:])
            tensor_sequence = torch.cat((tensor_sequence, padding))

        # Normalize the tensor sequence
        # Define the mean and standard deviation values for normalization
        mean = [0.485, 0.456, 0.406]
        std = [0.229, 0.224, 0.225]

        # Apply normalization to the tensor sequence
        normalize = transforms.Normalize(mean=mean, std=std)
        normalized_sequence = normalize(tensor_sequence)

        return normalized_sequence



In [None]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import trange
from tqdm.notebook import trange, tqdm

from time import sleep 

# Load the images into a array
# The structure of the dataset folder
# root > Images > Subject > Sequences > *frame


images_data_dir = img_dir # Path that contains all the subject frames of sequences
labels_dir = os.path.join(data_root, 'Sequence_Labels', "VAS")

# Create lists to store preprocessed images and labels
preprocessed_images = []
preprocessed_labels = []

image_subject = os.listdir(images_data_dir)

# Iterate through subject folders (assuming subject folders are labeled with integers)
for i in trange(len(image_subject), desc = "Preprocess progress" ) :
    subject_folder = image_subject[i]
    subject_path = os.path.join(images_data_dir, subject_folder)

    # Skip non-directory files
    if not os.path.isdir(subject_path):
        continue

    # Iterate through sequence folders
    all_sequence_list = os.listdir(subject_path)
    for i in trange(len(all_sequence_list), desc = "Subject_"+subject_folder):
        
        sequence_folder = all_sequence_list[i]
        if os.path.exists(os.path.join("D:\\Library\\Documents\\UM Lecture Notes & Tutorial\\FYP\\Dataset\\Preprocessed",
                                       subject_folder, sequence_folder)):
            continue
        sequence_path = os.path.join(subject_path, sequence_folder)

        # Initialize a list to store image frames in the current sequence
        sequence_images = []

        # Skip non-directory files
        if not os.path.isdir(sequence_path):
            continue
        
        allframes_path = sorted(os.listdir(sequence_path))

        # Iterate through frames in the sequence
        for i in trange(len(allframes_path),desc='Reading Images_'+sequence_folder):
            frame_filename = allframes_path[i]

            if not frame_filename.endswith('.png'):
                continue

            frame_path = os.path.join(sequence_path, frame_filename)

            # Load the image
            img = cv2.imread(frame_path)

            # Append the image to the images list
            sequence_images.append(img)
            sleep(0.01)


        preprocessing = preprocess(sequence_images, subject_folder, sequence_folder)
        
        '''
        # Stack the image frames to create a sequence tensor
        sequence_tensor = np.stack(preprocessing.tensor)

        # Append the sequence tensor to the list of preprocessed images
        preprocessed_images.append(sequence_tensor)

        # Read the label for the current sequence from the Labels directory
        label_file_path = os.path.join(labels_dir, subject_folder, sequence_folder + '.txt')
        with open(label_file_path, 'r') as label_file:
            sequence_label = int(float(label_file.read().strip()))

        # Append the label to the list of preprocessed labels
        preprocessed_labels.append(sequence_label)
        '''
        sleep(0.01)
    sleep(0.01)

        

# Convert the lists of preprocessed images and labels to NumPy arrays
preprocessed_images = np.array(preprocessed_images)
preprocessed_labels = np.array(preprocessed_labels)


In [None]:
from tqdm import tnrange, tqdm_notebook
from time import sleep 

sequence_path = "D:\\Library\\Documents\\UM Lecture Notes & Tutorial\\FYP\\Dataset\\UNBC-McMaster Shoulder Pain Data\\Images\\043-jh043\\jh043t1aeaff"
allframes_path = os.listdir(sequence_path)
sequence_images=[]
for i in tnrange(len(allframes_path),desc='progress'):
    frame_filename = allframes_path[i]
    frame_path = os.path.join(sequence_path, frame_filename)
    # Load the image
    img = cv2.imread(frame_path)
    # Append the image to the images list
    sequence_images.append(img)
    sleep(0.01)

print(len(sequence_images))
preprocessing = preprocess(sequence_images, subject_folder, sequence_folder)


## Load data and do subset for the data 

Select the frames from 48k images where consist of the PSPI from 0 to 10 (PSPI >10 is considered as 10) 

The data should be selected evenly from 25 subject, and each class should have 400 frames for each classes 

Total number of frames for each class
{0: 40029, 1: 2909, 2: 2351, 4: 802, 5: 242, 6: 270, 3: 1412, 7: 53, 8: 79, 9: 32, 10: 67, 11: 76, 12: 48, 13: 22, 14: 1, 15: 5}

There will be total of 4000 images consist in the dataset

In [10]:
from time import sleep
from tqdm.notebook import trange
import os 
subset_image_dir = {}
subset_label= {}
class_count = {x:0 for x in range(11)}
read_file = {}

dataset_root = "D:\\Library\Documents\\UM Lecture Notes & Tutorial\\FYP\\Dataset\\UNBC-McMaster Shoulder Pain Data"
img_folder_dir = os.path.join(dataset_root, "Images")
PSPI_folder_dir = os.path.join(dataset_root, "Frame_Labels", "PSPI")
subject_id = os.listdir(img_folder_dir)
PSPI_score = 0
while True:
	count = class_count.get(PSPI_score)
	for subject in subject_id:
		temp = {}
		PSPI_sequence_dir = os.path.join(PSPI_folder_dir, subject)
		i = 0
		for sequence_id in os.listdir(PSPI_sequence_dir):
			PSPI_sequence_id_folder = os.path.join(PSPI_sequence_dir, sequence_id)
			temp_img_dir_list =[]
			temp_label_list = []
			for frames in os.listdir(PSPI_sequence_id_folder):
				if i<=16:
					pain_intensity = 0
					with open(os.path.join(PSPI_sequence_id_folder, frames), 'r') as label_file:
						pain_intensity = int(float(label_file.read().strip()))
					
					append_flag = read_file.get(os.path.join(PSPI_sequence_id_folder, frames))
					if pain_intensity == PSPI_score and append_flag is None:
						temp_img_dir_list.append(os.path.join(img_folder_dir,subject, sequence_id, frames))
						temp_label_list.append(os.path.join(PSPI_sequence_id_folder, frames))
						read_file[os.path.join(PSPI_sequence_id_folder, frames)] = False
						i+=1
						count+=1
				else:
					break
			if subset_image_dir.get(subject) is None:
				subset_image_dir[subject] = temp_img_dir_list
				subset_label[subject] = temp_label_list
			else:
				update_img_list = subset_image_dir.get(subject)
				update_img_list.extend(temp_img_dir_list)
				subset_image_dir.update({subject:(update_img_list)})

				update_label_list = subset_label.get(subject)
				update_label_list.extend(temp_label_list)
				subset_label.update({subject:(update_label_list)})
			if i == 17:
				break
		if i ==17:
			break

	if count<400 and not count==class_count.get(PSPI_score):
		class_count.update({PSPI_score:count})
		print(class_count)
		print("continue")
		continue
	elif PSPI_score < 14 :
		class_count.update({PSPI_score:count})
		print(class_count)
		print("next")
		PSPI_score+=1
	else:
		break

{0: 17, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 34, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 51, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 68, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 85, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 102, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 119, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 136, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 153, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 170, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 187, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 204, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0}
continue
{0: 221, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 

KeyboardInterrupt: 

In [None]:
import os
import torch
import cv2
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import GroupKFold
from sklearn.utils.class_weight import compute_class_weight
from imblearn.over_sampling import SMOTE
from torchvision import transforms
from sklearn.metrics import accuracy_score

from tqdm import tnrange, tqdm_notebook
from time import sleep


def load_subject_data(subject_name):
    subject_dir = os.path.join("/content/drive/MyDrive/Colab Notebooks/Dataset/Preprocessed",subject_name)
    frames = []  # Load frames from the subject's directory
    pain_intensity_labels = []  # Load corresponding labels
    all_subject_sequence = sorted(os.listdir(subject_dir))
    for i in trange(len(all_subject_sequence),desc='Subject level progress'):
        sequence = all_subject_sequence[i]
        sequence_dir = os.path.join(subject_dir,sequence)
        allframes_ofsequence = os.listdir(sequence_dir)
        for n in trange(len(allframes_ofsequence),desc= (sequence + ' - Sequence level progress')):
            filename = allframes_ofsequence[n]
            if filename.endswith(".png"):
                try:
                    # Load the pain intensity label from the corresponding text file
                    label_filename = os.path.splitext(filename)[0] + '_facs.txt'
                    label_path = os.path.join("/content/drive/MyDrive/Colab Notebooks/Dataset/Frame_Labels/PSPI", subject_name, sequence)
                    with open(os.path.join(label_path, label_filename), 'r') as label_file:
                        pain_intensity = int(float(label_file.read().strip()))
                        pain_intensity_labels.append(pain_intensity)

                    # Load the image
                    img = cv2.imread(os.path.join(sequence_dir, filename))
                    img = cv2.resize(img, (120,120))
                    frames.append(img)
                except Exception as e:
                    print(e)
                finally:
                    sleep(0.01)
                    continue
                
        sleep(0.01)


    return frames, pain_intensity_labels

In [None]:
# Define a custom dataset
class CustomDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.image_dir = os.path.join(root_dir,"Images")
        self.subjects = os.listdir(self.image_dir)
        self.transform = transforms.Compose([])

    def __len__(self):
        return len(self.subjects)

    def __getitem__(self, idx):
        subject = self.subjects[idx]

        frames, labels = load_subject_data(subject) # Load labels from labels_path

        frames = self.transform(frames)

        return frames, labels

# Define a simple CNN model
class PainScorePredictor(nn.Module):
    def __init__(self):
        super(PainScorePredictor, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(64 * 120 * 120 // 4, 128)
        self.fc2 = nn.Linear(128, 1)  # Assuming pain score is a scalar value

    def forward(self, x):
        x = self.pool(nn.functional.F.relu(self.conv1(x)))
        x = self.pool(nn.functional.F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 120 * 120 // 4)
        x = nn.functional.F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Assuming your dataset is structured as described, with an "images" folder and a "labels" folder
dataset = CustomDataset(root_dir='/content/drive/MyDrive/Colab Notebooks/Dataset')

# Set device to CUDA if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 10

# Step 1: Group K-Fold Cross-Validation
gkf = GroupKFold(n_splits=25)


In [None]:
for fold, (train_idx, val_idx) in enumerate(gkf.split(dataset, y=None, groups=dataset.subjects)):

    model = PainScorePredictor().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    train_dataset = torch.utils.data.Subset(dataset, train_idx)
    val_dataset = torch.utils.data.Subset(dataset, val_idx)

     # Set up DataLoader for training and validation
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
    val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
    print(train_dataset)
    print(train_idx)
    print(val_dataset)
    print(val_idx)

    # Step 2: Compute Class Weights
    train_labels = [label for _, labels in train_dataset for label in labels]
    class_weights = compute_class_weight('balanced', classes=torch.unique(train_labels), y=train_labels)
    class_weight_dict = dict(zip(torch.unique(train_labels), class_weights))

    # Step 3: Resample with SMOTE (or other methods) using class weights
    smote = SMOTE(random_state=42)

    # Step 4: Train your model with class weights
    model.to(device)
    model.train()

    # Assuming you have a DataLoader for training
    train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)


    for epoch in range(num_epochs):
        for inputs, labels in train_dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            # Resample using SMOTE and class weights
            inputs, labels = smote.fit_resample(torch.stack(inputs).reshape(-1, (120*120*3)), labels)
            inputs = inputs.reshape(-1, len(inputs), (120*120*3))
            optimizer.zero_grad()

            # Forward pass
            outputs = model(*inputs)

            # Compute loss
            loss = criterion(outputs, labels)

            # Backward pass
            loss.backward()

            # Optimizer step
            optimizer.step()

        # Validation loop
        model.eval()
        total_loss = 0.0
        correct_predictions = 0
        total_samples = 0

        with torch.no_grad():
            for inputs, labels in val_dataloader:
                inputs = [frame.to(device) for frame in inputs]
                labels = labels.to(device)

                # Forward pass
                outputs = model(*inputs)

                # Compute loss
                loss = criterion(outputs, labels)
                total_loss += loss.item()

                # Compute accuracy (or other metrics)
                _, predicted = torch.max(outputs, 1)
                correct_predictions += (predicted == labels).sum().item()
                total_samples += labels.size(0)

        # Calculate metrics
        average_loss = total_loss / len(val_dataloader)
        accuracy = correct_predictions / total_samples

        # Print or log metrics
        print(f'Epoch {epoch + 1}/{num_epochs}, Validation Loss: {average_loss:.4f}, Accuracy: {accuracy * 100:.2f}%')

