Run these 2 cells first:

In [1]:
import numpy as np
import os
import json
import pandas as pd
import re
from tqdm import tqdm
import shutil

In [3]:
#Split to images data and skeleton data
#images > folder
#skeleton > h5
HEIGHT = 480
WIDTH = 640
Actions = ['Idle',
           'Pickup_item',
           'Use_item',
           'Aim',
           'Shoot'
           ]
def get_skeleton_json(json_file):
    landmarks = []
    handness = []
    frame_ids = []
    #labels 
    with open(json_file) as f:
        data = json.load(f)
        for frame in data:
            for hand in data[frame]:
                keypoint = data[frame][hand]['landmarks']
                keypoint_arr = np.array([[kp[1],kp[2]] for kp in keypoint])
                landmarks.append(keypoint_arr)
                handness.append(int(hand))
                frame_ids.append(int(frame))
    labels = -1
    for i in range(len(Actions)):
        if Actions[i] in json_file:
            labels = [i] * len(landmarks)
            break
    if labels == -1:
        print("Error: No label found")
        return None, None, None
    return landmarks, handness, labels, frame_ids

def extract_numbers(s):
    return re.findall(r'\d+', s)



# Preprocessing and dataset creation (skeleton)

In [13]:

def process_folder(folder,pd_database):
    if not len(os.listdir(folder)):
        return None
    elif len(os.listdir(folder)) == 1:
        folder = os.path.join(folder, os.listdir(folder)[0])
    #print(f'Processing folder {folder}')
    base_name = os.path.basename(folder)
    #process json
    fid = str(extract_numbers(base_name)[0])[-4:]
    #print(os.listdir(folder))
    for file in os.listdir(folder):
        if file.endswith('.json') and 'detail' not in file:
            landmarks, handness, labels, frame_ids = get_skeleton_json(os.path.join(folder, file))
            if landmarks is None:
                continue
            #append to pd_database with columns: landmarks, handness, labels, fid
            pd_database.append(pd.DataFrame({'landmarks':landmarks,
                                             'handness':handness,
                                             'labels':labels,
                                             'fid':fid,
                                             'frame_ids':frame_ids}))
            #print(f'Processing {file}')
    return pd_database

def process_folders(data_folder):
    pd_database = []
    for folder in tqdm(os.listdir(data_folder)):
        #if folder is a folder
        if os.path.isdir(os.path.join(data_folder, folder)) and not '[discarded]' in folder:
            _ = process_folder(os.path.join(data_folder, folder),pd_database)
            if _ is not None:
                pd_database = _
    return pd_database

def process_skeleton(data_folder,save_path):
    pd_database = process_folders(data_folder)
    pd_database = pd.concat(pd_database)
    pd_database.to_hdf(save_path, key='df', mode='w')

process_skeleton('/work/21010294/HandGesture/Dataset/','skeleton.h5')


100%|██████████| 42/42 [00:05<00:00,  7.71it/s]
your performance may suffer as PyTables will pickle object types that it cannot
map directly to c-types [inferred_type->mixed,key->block1_values] [items->Index(['landmarks', 'fid'], dtype='object')]

  pd_database.to_hdf(save_path, key='df', mode='w')


In [14]:
#load skeleton.h5 and display
df = pd.read_hdf('skeleton.h5')
#print(df['landmarks'][df['fid'] == '0013'].to_numpy()[0])

# Training skeleton model

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report



df = pd.read_hdf('skeleton.h5')
# Custom Dataset class
class HandPostureDataset(Dataset):
    def __init__(self, dataframe,flip_hand=True,angle_hand=False):
        self.data = dataframe['landmarks'].values
        self.handness = dataframe['handness'].values
        self.labels = dataframe['labels'].values  
        self.flip_hand = flip_hand
        self.angle_hand = angle_hand

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        sample = self.feature_extraction(sample,self.handness[idx])
        #normalize to 0-1
        sample = (sample - sample.min()) / (sample.max() - sample.min())
        #print(sample.shape)
        sample = torch.tensor(sample, dtype=torch.float)
        label = torch.tensor(self.labels[idx], dtype=torch.long)

        return sample, label
    def feature_extraction(self,landmarks,handness):
        PARSE_LANDMARKS_JOINTS = [    
        [0, 1], [1, 2], [2, 3], [3, 4], # thumb
        [0, 5],[5, 6], [6, 7], [7, 8], # index finger
        [5, 9],[9,10],[10, 11], [11, 12],# middle finger
        [9, 13],[13, 14],[14, 15],[15, 16], # ring finger
        [13, 17],  [17, 18], [18, 19], [19,20]   # little finger
        ]   
        def calculate_angle(landmark1, landmark2):
            return np.math.atan2(np.linalg.det([landmark1, landmark2]), np.dot(landmark1, landmark2))
        if self.flip_hand and handness == 1:
            landmarks = landmarks * np.array([-1,1])
        if self.angle_hand:
            angles = []
            for joint in PARSE_LANDMARKS_JOINTS:
                angle = calculate_angle(landmarks[joint[0]],landmarks[joint[1]])
                angles.append(angle)
            return np.array([angles,angles]).T
        return landmarks
            


# Model definition
class HandPostureModel(nn.Module):
    def __init__(self):
        super(HandPostureModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=2, out_channels=64, kernel_size=3)
        self.pool = nn.MaxPool1d(kernel_size=2)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3)
        self.lstm = nn.LSTM(input_size=128, hidden_size=100, batch_first=True)
        self.fc1 = nn.Linear(100, 50)
        self.fc2 = nn.Linear(50, len(Actions))  

    def forward(self, x):
        x = x.permute(0, 2, 1)  
        x = torch.relu(self.conv1(x))
        x = self.pool(x)
        x = torch.relu(self.conv2(x))
        x = self.pool(x)
        x = x.permute(0, 2, 1)
        x, _ = self.lstm(x)
        x = x[:, -1, :] 
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# Load data
df = pd.read_hdf('skeleton.h5')


# Manually set test and validation IDs
test_ids = ['1584', '0246', '0110', '0013']
val_ids = ['2089', '1596']

# Get unique IDs
unique_fids = df['fid'].unique()

# Determine training IDs by excluding test and validation IDs
train_ids = [fid for fid in unique_fids if fid not in test_ids + val_ids]

# Create dataframes for each set
train_df = df[df['fid'].isin(train_ids)]
val_df = df[df['fid'].isin(val_ids)]
test_df = df[df['fid'].isin(test_ids)]

# Verify the splits
print(f"Train set size: {len(train_df)}")
print(f"Validation set size: {len(val_df)}")
print(f"Test set size: {len(test_df)}")


train_dataset = HandPostureDataset(train_df)
val_dataset = HandPostureDataset(val_df)
test_dataset = HandPostureDataset(test_df)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Initialize model, loss function, and optimizer
model = HandPostureModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Training loop
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    
    # Step the scheduler
    scheduler.step()
    
    # Validation loop
    model.eval()
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            val_loss += criterion(outputs, labels).item()
            pred = outputs.argmax(dim=1, keepdim=True)
            correct += pred.eq(labels.view_as(pred)).sum().item()
    
    val_loss /= len(val_loader.dataset)
    val_accuracy = 100. * correct / len(val_loader.dataset)
    print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

# Test the model and calculate metrics
model.eval()
test_loss = 0
correct = 0
all_labels = []
all_preds = []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        test_loss += criterion(outputs, labels).item()
        pred = outputs.argmax(dim=1, keepdim=True)
        correct += pred.eq(labels.view_as(pred)).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(pred.cpu().numpy())

test_loss /= len(test_loader.dataset)
test_accuracy = 100. * correct / len(test_loader.dataset)
precision = precision_score(all_labels, all_preds, average='weighted')
recall = recall_score(all_labels, all_preds, average='weighted')
f1 = f1_score(all_labels, all_preds, average='weighted')

print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%')
print(f'Precision: {precision:.2f}, Recall: {recall:.2f}, F1 Score: {f1:.2f}')
print(classification_report(all_labels, all_preds))

# Export results to a CSV file
results = {
    'Test Loss': [test_loss],
    'Test Accuracy': [test_accuracy],
    'Precision': [precision],
    'Recall': [recall],
    'F1 Score': [f1]
}
results_df = pd.DataFrame(results)
results_df.to_csv('test_results.csv', index=False)
#save model
torch.save(model.state_dict(), 'hand_posture_model.pth')
print("Training complete.")


Using device: cuda
Train set size: 71431
Validation set size: 21980
Test set size: 31949
Epoch 1/20, Validation Loss: 0.0398, Validation Accuracy: 63.22%
Epoch 2/20, Validation Loss: 0.0382, Validation Accuracy: 68.26%
Epoch 3/20, Validation Loss: 0.0374, Validation Accuracy: 70.84%
Epoch 4/20, Validation Loss: 0.0377, Validation Accuracy: 69.78%
Epoch 5/20, Validation Loss: 0.0376, Validation Accuracy: 70.13%
Epoch 6/20, Validation Loss: 0.0372, Validation Accuracy: 71.23%
Epoch 7/20, Validation Loss: 0.0373, Validation Accuracy: 70.88%
Epoch 8/20, Validation Loss: 0.0372, Validation Accuracy: 71.06%
Epoch 9/20, Validation Loss: 0.0372, Validation Accuracy: 71.13%
Epoch 10/20, Validation Loss: 0.0372, Validation Accuracy: 71.13%
Epoch 11/20, Validation Loss: 0.0372, Validation Accuracy: 71.11%
Epoch 12/20, Validation Loss: 0.0372, Validation Accuracy: 71.13%
Epoch 13/20, Validation Loss: 0.0372, Validation Accuracy: 71.13%
Epoch 14/20, Validation Loss: 0.0372, Validation Accuracy: 71.

# Preprocessing and dataset creation (images)

In [11]:

def process_folder(folder,save_path,type='train'):
    print(folder)
    folder = str(folder)
    if not len(os.listdir(folder)):
        return None
    elif len(os.listdir(folder)) == 1:
        folder = os.path.join(folder, str(os.listdir(folder)[0]))
    #print(f'Processing folder {folder}')
    base_name = os.path.basename(folder)
    #process json
    fid = str(extract_numbers(base_name)[0])[-4:]
    #print(os.listdir(folder))
    
    for c in os.listdir(folder):
        #if f is folder
        if os.path.isdir(os.path.join(folder, c)):
            os.makedirs(os.path.join(save_path,type,c),exist_ok=True)
            for file in os.listdir(os.path.join(folder, c)):
                if file.endswith('.jpg') and '_drawed.jpg' not in file:
                    shutil.copy(os.path.join(folder, c, file),os.path.join(save_path,type,c,f"{fid}_{file}"))
            


def process_imgs(data_folder,save_path):
    folders = []
    #fid = str(extract_numbers(base_name)[0])[-4:]
    for folder in tqdm(os.listdir(data_folder)):
        #if folder is a folder
        if os.path.isdir(os.path.join(data_folder, folder)) and not '[discarded]' in folder:
            folders.append(os.path.join(data_folder, folder))
    #.7 folder train, .1 folder val, .2 folder test
    np.random.seed(12)
    #np.random.shuffle(folders)
    train_folders, val_folders, test_folders = np.split(folders, [int(.7*len(folders)), int(.8*len(folders))])
    for folder in list(train_folders):
        process_folder(folder,save_path,type='train')
    for folder in list(val_folders):
        process_folder(folder,save_path,type='val')
    for folder in list(test_folders):
        process_folder(folder,save_path,type='test')
    print('Done')
    


process_imgs('/work/21010294/HandGesture/Dataset/','imgs_data')


100%|██████████| 42/42 [00:00<00:00, 63595.95it/s]


/work/21010294/HandGesture/Dataset/21011611_VuDuongKhang
/work/21010294/HandGesture/Dataset/VepleyAI_dataset_23010111_tongnguyenbaolong
/work/21010294/HandGesture/Dataset/VepleyAI_dataset_22010117_PhungHoangAnh
/work/21010294/HandGesture/Dataset/21013192_NongNgocHuan
/work/21010294/HandGesture/Dataset/22010952_Nguyenvanbang
/work/21010294/HandGesture/Dataset/23010570_TranBuiNguyenDuong
/work/21010294/HandGesture/Dataset/22010057_VuQuangDuong
/work/21010294/HandGesture/Dataset/VepleyAI_dataset_21012089_NguyenAnhQuan
/work/21010294/HandGesture/Dataset/22010402_NGUYENHUYHOANG
/work/21010294/HandGesture/Dataset/VepleyAI_dataset_21010610_Phan_Huy_Duong
/work/21010294/HandGesture/Dataset/VepleyAI_dataset_22010127_NguyenHuuTan
/work/21010294/HandGesture/Dataset/21012089_NguyenAnhQuan
/work/21010294/HandGesture/Dataset/VepleyAI_dataset_21011596_PhanMinhDuc
/work/21010294/HandGesture/Dataset/22010246_NguyenTheDuy
/work/21010294/HandGesture/Dataset/VepleyAI_dataset_21011584_TranManhCuong
/work/2

In [12]:
for root, dirs, files in os.walk('imgs_data'):
    print(root, len(files))

imgs_data 0
imgs_data/test 0
imgs_data/test/Pickup_item 3099
imgs_data/test/Use_item 3100
imgs_data/test/Aim 4100
imgs_data/test/Shoot 3099
imgs_data/test/Idle 3100
imgs_data/train 0
imgs_data/train/Pickup_item 9096
imgs_data/train/Use_item 7097
imgs_data/train/Aim 7095
imgs_data/train/Shoot 7198
imgs_data/train/Idle 8098
imgs_data/val 0
imgs_data/val/Pickup_item 1999
imgs_data/val/Use_item 1999
imgs_data/val/Aim 1999
imgs_data/val/Shoot 3999
imgs_data/val/Idle 1998


# Training images model

In [15]:
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import numpy as np
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report

base_model = 'resnet50'

if base_model == 'resnet50':
    model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet50', pretrained=True)
    #transfer learning to 5 classes
    model.fc = nn.Linear(model.fc.in_features, len(Actions))
elif base_model == 'mobilenet_v2':
    model = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v2', pretrained=True)
    model.classifier[1] = nn.Linear(model.classifier[1].in_features, len(Actions))
elif base_model == 'vgg16':
    model = torch.hub.load('pytorch/vision:v0.10.0', 'vgg16', pretrained=True)
    model.classifier[6] = nn.Linear(model.classifier[6].in_features, len(Actions))
elif base_model == 'densenet121':
    model = torch.hub.load('pytorch/vision:v0.10.0', 'densenet121', pretrained=True)
    model.classifier = nn.Linear(model.classifier.in_features, len(Actions))
else:
    print('Invalid model specified')
    exit()

#read dataset
data_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_dataset = ImageFolder('imgs_data/train', transform=data_transform)
val_dataset = ImageFolder('imgs_data/val', transform=data_transform)
test_dataset = ImageFolder('imgs_data/test', transform=data_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# Initialize model, loss function, and optimizer
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Training loop
num_epochs = 20
for epoch in tqdm(range(num_epochs),desc='Epoch'):
    model.train()
    disp_loss = 0
    for inputs, labels in tqdm(train_loader,desc=f"Epoch {epoch+1}/{num_epochs}: L={disp_loss:.4f}"):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        disp_loss += loss.item()
        loss.backward()
        optimizer.step()
    
    # Step the scheduler
    scheduler.step()
    
    # Validation loop
    model.eval()
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            val_loss += criterion(outputs, labels).item()
            pred = outputs.argmax(dim=1, keepdim=True)
            correct += pred.eq(labels.view_as(pred)).sum().item()
    
    val_loss /= len(val_loader.dataset)
    val_accuracy = 100. * correct / len(val_loader.dataset)
    print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

# Test the model and calculate metrics
model.eval()
test_loss = 0
correct = 0
all_labels = []
all_preds = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        test_loss += criterion(outputs, labels).item()
        pred = outputs.argmax(dim=1, keepdim=True)
        correct += pred.eq(labels.view_as(pred)).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(pred.cpu().numpy())

test_loss /= len(test_loader.dataset)
test_accuracy = 100. * correct / len(test_loader.dataset)
precision = precision_score(all_labels, all_preds, average='weighted')
recall = recall_score(all_labels, all_preds, average='weighted')
f1 = f1_score(all_labels, all_preds, average='weighted')

print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%')
print(f'Precision: {precision:.2f}, Recall: {recall:.2f}, F1 Score: {f1:.2f}')
print(classification_report(all_labels, all_preds))




Using cache found in /home/21010294/.cache/torch/hub/pytorch_vision_v0.10.0


Using device: cuda


Epoch 1/20: L=0.0000: 100%|██████████| 1206/1206 [05:28<00:00,  3.67it/s]
Epoch:   5%|▌         | 1/20 [06:48<2:09:14, 408.14s/it]

Epoch 1/20, Validation Loss: 0.0840, Validation Accuracy: 42.28%


Epoch 2/20: L=0.0000:  63%|██████▎   | 761/1206 [03:10<01:51,  4.00it/s]
Epoch:   5%|▌         | 1/20 [09:58<3:09:31, 598.49s/it]


KeyboardInterrupt: 

# Preprocessing and dataset creation (hand images)

In [None]:

def process_folder(folder,save_path,type='train'):
    if not len(os.listdir(folder)):
        return None
    elif len(os.listdir(folder)) == 1:
        folder = os.path.join(folder, os.listdir(folder)[0])
    #print(f'Processing folder {folder}')
    base_name = os.path.basename(folder)
    #process json
    fid = str(extract_numbers(base_name)[0])[-4:]
    #print(os.listdir(folder))
    for file in os.listdir(folder):
        if file.endswith('.json') and 'detail' not in file:
            landmarks, handness, labels = get_skeleton_json(os.path.join(folder, file))
            if landmarks is None:
                continue
            #append to pd_database with columns: landmarks, handness, labels, fid
            pd_database.append(pd.DataFrame({'landmarks':landmarks,
                                             'handness':handness,
                                             'labels':labels,
                                             'fid':fid}))
            #print(f'Processing {file}')
    return pd_database
def process_folder(folder,save_path,type='train'):
    print(folder)
    folder = str(folder)
    if not len(os.listdir(folder)):
        return None
    elif len(os.listdir(folder)) == 1:
        folder = os.path.join(folder, str(os.listdir(folder)[0]))
    #print(f'Processing folder {folder}')
    base_name = os.path.basename(folder)
    #process json
    fid = str(extract_numbers(base_name)[0])[-4:]
    #print(os.listdir(folder))
    
    for c in os.listdir(folder):
        #if f is folder
        if os.path.isdir(os.path.join(folder, c)):
            if not os.path.exists(os.path.join(folder, f"{c}.json")):
                continue
            bounding_box_file = os.path.join(folder, f"{c}.json")
            os.makedirs(os.path.join(save_path,type,c),exist_ok=True)
            for file in os.listdir(os.path.join(folder, c)):
                if file.endswith('.jpg') and '_drawed.jpg' not in file:
                    shutil.copy(os.path.join(folder, c, file),os.path.join(save_path,type,c,f"{fid}_{file}"))


def process_imgs(data_folder,save_path):
    folders = []
    #fid = str(extract_numbers(base_name)[0])[-4:]
    for folder in tqdm(os.listdir(data_folder)):
        #if folder is a folder
        if os.path.isdir(os.path.join(data_folder, folder)) and not '[discarded]' in folder:
            folders.append(os.path.join(data_folder, folder))
    #.7 folder train, .1 folder val, .2 folder test
    np.random.seed(12)
    #np.random.shuffle(folders)
    train_folders, val_folders, test_folders = np.split(folders, [int(.7*len(folders)), int(.8*len(folders))])
    for folder in list(train_folders):
        process_folder(folder,save_path,type='train')
    for folder in list(val_folders):
        process_folder(folder,save_path,type='val')
    for folder in list(test_folders):
        process_folder(folder,save_path,type='test')
    print('Done')
    


process_imgs('/work/21010294/HandGesture/Dataset/','imgs_data')


# Train late fusion model

In [None]:
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import numpy as np
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report
from PIL import Image
data_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
#load hand_posture_model.pth
class HandPostureDataset(Dataset):
    def __init__(self, dataframe,imgs_path,flip_hand=True,angle_hand=False):
        self.data = dataframe['landmarks'].values
        self.handness = dataframe['handness'].values
        self.labels = dataframe['labels'].values  
        self.frame_ids = dataframe['frame_ids'].values
        self.fid = dataframe['fid'].values
        self.flip_hand = flip_hand
        self.angle_hand = angle_hand
        self.imgs_path = imgs_path

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        sample = self.feature_extraction(sample,self.handness[idx])
        #normalize to 0-1
        sample = (sample - sample.min()) / (sample.max() - sample.min())
        #print(sample.shape)
        sample = torch.tensor(sample, dtype=torch.float)
        label = torch.tensor(self.labels[idx], dtype=torch.long)

        sample_frame_name = os.path.join(self.imgs_path,Actions[self.labels[idx]],f"{self.fid[idx]}_{self.frame_ids[idx]}.jpg")
        
        img = Image.open(sample_frame_name)
        img = data_transform(img)
        return sample, img, label
        

    def feature_extraction(self,landmarks,handness):
        PARSE_LANDMARKS_JOINTS = [    
        [0, 1], [1, 2], [2, 3], [3, 4], # thumb
        [0, 5],[5, 6], [6, 7], [7, 8], # index finger
        [5, 9],[9,10],[10, 11], [11, 12],# middle finger
        [9, 13],[13, 14],[14, 15],[15, 16], # ring finger
        [13, 17],  [17, 18], [18, 19], [19,20]   # little finger
        ]   
        def calculate_angle(landmark1, landmark2):
            return np.math.atan2(np.linalg.det([landmark1, landmark2]), np.dot(landmark1, landmark2))
        if self.flip_hand and handness == 1:
            landmarks = landmarks * np.array([-1,1])
        if self.angle_hand:
            angles = []
            for joint in PARSE_LANDMARKS_JOINTS:
                angle = calculate_angle(landmarks[joint[0]],landmarks[joint[1]])
                angles.append(angle)
            return np.array([angles,angles]).T
        return landmarks
            


# Model definition
class HandPostureModel(nn.Module):
    def __init__(self):
        super(HandPostureModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=2, out_channels=64, kernel_size=3)
        self.pool = nn.MaxPool1d(kernel_size=2)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3)
        self.lstm = nn.LSTM(input_size=128, hidden_size=100, batch_first=True)
        self.fc1 = nn.Linear(100, 50)
        self.fc2 = nn.Linear(50, len(Actions))  

    def forward(self, x):
        x = x.permute(0, 2, 1)  
        x = torch.relu(self.conv1(x))
        x = self.pool(x)
        x = torch.relu(self.conv2(x))
        x = self.pool(x)
        x = x.permute(0, 2, 1)
        x, _ = self.lstm(x)
        x = x[:, -1, :] 
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)
    
model_hand = HandPostureModel()
model_hand.load_state_dict(torch.load('hand_posture_model.pth'))

#to fit with late fusion withh image, change fc2  from 5 to 1000
model_hand.fc2 = nn.Linear(50, 1000)
base_model = 'resnet50'

if base_model == 'resnet50':
    model_image = torch.hub.load('pytorch/vision:v0.10.0', 'resnet50', pretrained=True)
    #transfer learning to 5 classes
    model_image.fc = nn.Linear(model_image.fc.in_features, len(Actions))
    #load 'resnet50.pth'
    model_image.load_state_dict(torch.load('resnet50.pth'))

    model_image.fc = nn.Linear(model_image.fc.in_features, 1000)
elif base_model == 'mobilenet_v2':
    model_image = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v2', pretrained=True)
    model_image.classifier[1] = nn.Linear(model_image.classifier[1].in_features, len(Actions))
    #load 'mobilenet_v2.pth'
    model_image.load_state_dict(torch.load('mobilenet_v2.pth'))
    model_image.classifier[1] = nn.Linear(model_image.classifier[1].in_features, 1000)
elif base_model == 'vgg16':
    model_image = torch.hub.load('pytorch/vision:v0.10.0', 'vgg16', pretrained=True)
    model_image.classifier[6] = nn.Linear(model_image.classifier[6].in_features, len(Actions))
    #load 'vgg16.pth'
    model_image.load_state_dict(torch.load('vgg16.pth'))
    model_image.classifier[6] = nn.Linear(model_image.classifier[6].in_features, 1000)
elif base_model == 'densenet121':
    model_image = torch.hub.load('pytorch/vision:v0.10.0', 'densenet121', pretrained=True)
    model_image.classifier = nn.Linear(model_image.classifier.in_features, len(Actions))
    #load 'densenet121.pth'
    model_image.load_state_dict(torch.load('densenet121.pth'))
    model_image.classifier = nn.Linear(model_image.classifier.in_features, 1000)
else:
    print('Invalid model specified')
    exit()


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# Load data
df = pd.read_hdf('skeleton.h5')


# Manually set test and validation IDs
test_ids = ['1584', '0246', '0110', '0013']
val_ids = ['2089', '1596']

# Get unique IDs
unique_fids = df['fid'].unique()

# Determine training IDs by excluding test and validation IDs
train_ids = [fid for fid in unique_fids if fid not in test_ids + val_ids]

# Create dataframes for each set
train_df = df[df['fid'].isin(train_ids)]
val_df = df[df['fid'].isin(val_ids)]
test_df = df[df['fid'].isin(test_ids)]

# Verify the splits
print(f"Train set size: {len(train_df)}")
print(f"Validation set size: {len(val_df)}")
print(f"Test set size: {len(test_df)}")


train_dataset = HandPostureDataset(train_df,'imgs_data/train/')
val_dataset = HandPostureDataset(val_df,'imgs_data/val/')
test_dataset = HandPostureDataset(test_df,'imgs_data/test/')

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

#init a model with late fusion
class LateFusionModel(nn.Module):
    def __init__(self, model_hand, model_image):
        super(LateFusionModel, self).__init__()
        self.model_hand = model_hand
        self.model_image = model_image
        self.fc = nn.Linear(2000, len(Actions))

    def forward(self, x_hand, x_image):
        x_hand = self.model_hand(x_hand)
        x_image = self.model_image(x_image)
        x = torch.cat((x_hand, x_image), dim=1)
        x = self.fc(x)
        return torch.softmax(x, dim=1)
    
model = LateFusionModel(model_hand, model_image).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Training loop
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    for inputs_hand, inputs_image, labels in train_loader:
        inputs_hand, inputs_image, labels = inputs_hand.to(device), inputs_image.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs_hand, inputs_image)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    
    # Step the scheduler
    scheduler.step()
    
    # Validation loop
    model.eval()
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for inputs_hand, inputs_image, labels in val_loader:
            inputs_hand, inputs_image, labels = inputs_hand.to(device), inputs_image.to(device), labels.to(device)
            outputs = model(inputs_hand, inputs_image)
            val_loss += criterion(outputs, labels).item()
            pred = outputs.argmax(dim=1, keepdim=True)
            correct += pred.eq(labels.view_as(pred)).sum().item()
    
    val_loss /= len(val_loader.dataset)
    val_accuracy = 100. * correct / len(val_loader.dataset)
    print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

# Test the model and calculate metrics
model.eval()
test_loss = 0
correct = 0
all_labels = []
all_preds = []
with torch.no_grad():
    for inputs_hand, inputs_image, labels in test_loader:
        inputs_hand, inputs_image, labels = inputs_hand.to(device), inputs_image.to(device), labels.to(device)
        outputs = model(inputs_hand, inputs_image)
        test_loss += criterion(outputs, labels).item()
        pred = outputs.argmax(dim=1, keepdim=True)
        correct += pred.eq(labels.view_as(pred)).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(pred.cpu().numpy())

test_loss /= len(test_loader.dataset)
test_accuracy = 100. * correct / len(test_loader.dataset)
precision = precision_score(all_labels, all_preds, average='weighted')
recall = recall_score(all_labels, all_preds, average='weighted')
f1 = f1_score(all_labels, all_preds, average='weighted')

print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%')
print(f'Precision: {precision:.2f}, Recall: {recall:.2f}, F1 Score: {f1:.2f}')
print(classification_report(all_labels, all_preds))
