In [None]:
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118      

In [None]:
#Check if GPU is available to notebook 
import torch
torch.cuda.is_available()

In [None]:
##################################################### Data Collection & Processing ##########################################################

In [None]:
%pip install comet-ml ultralytics 

In [None]:
# Step 1: Hand Detection
from comet_ml import Experiment
from comet_ml.integration.pytorch import log_model
from ultralytics import YOLO
import torch
import os
import yaml 
from pathlib import Path

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
torch.cuda.empty_cache()

def main():
    BASE_DIR = Path.cwd()
    
    with open(BASE_DIR / "config.yaml", "r") as f:
        config = yaml.safe_load(f)

    data = BASE_DIR / config["data_path"]

    def train(dataset, epochs):
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print("Device used: ", device)

        save_dir = BASE_DIR / "yolo_finetuned_weight_output_path"

        model = YOLO("yolov8n.pt").to(device) 
        model.train(data=dataset, epochs=epochs, workers=0, save_dir=save_dir)
        metrics = model.val()  
        print("Model is using device:", model.device)

    epochs = 200
    train(data, epochs)

if __name__ == "__main__":
    main()



In [None]:
Now we have a fine-tuned YOLO model under runs/detect/train{?}. There are training details are stored, but most importantly, there is weight under weights/
which is the core brain of the model. 

In [None]:
# Step 2: Hand Segmentation
import os
import cv2 as cv
import numpy as np

def expand_box(box_shape, expansion_factor, frame_width, frame_height):
    """
    A method to expand input box by multipling by expansion_factor.

    args:
    box_shape(int,int,int,int): box shape left top (x, y) and right bottom (x, y).
    expansion_factor(int): expansion factor to expand segmented box.
    frame_width(int): orignal frame widht
    frame_height(int): original frame height
    """
    x1, y1, x2, y2 = box_shape
    width = x2 - x1
    height = y2 - y1
    new_x1 = max(0, int(x1 - expansion_factor * width))
    new_y1 = max(0, int(y1 - expansion_factor * height))
    new_x2 = min(frame_width, int(x2 + expansion_factor * width))
    new_y2 = min(frame_height, int(y2 + expansion_factor * height))
    return new_x1, new_y1, new_x2, new_y2
    
def get_target_detection_result(results, target_class):
    """
    A method to extract target class from detection results.

    args:
    results
    target_class(str): target class for detection.
    """
    target_detections = []
    for detection in results:
        class_indices = detection.boxes.cls.tolist()
        if class_indices:  
            class_index = int(class_indices[0])  
            class_name = detection.names[class_index]  # Get class name
    
            if class_name == target_class:
                target_detections.append(detection)
    return target_detections

    
def segment_hand(model, input_folder, output_folder, expansion_factor = 0.1, target_class = "left"):
    """
    A method to segment an object from background by going through each video frame. 
    To minimize the effect of hand detection failure, segmentation box expansion is implemented,
    which keeps the detected hand box even the detection model missed and "grow" the box accumulatively 
    when the object moved out of the previous box area. 

    args: 
    target_class(str): a target class to detect and segment.
    model(str): hand detection model weight path.
    input_folder(str): input video folder path
    output_folder(str):  output video folder path
    expansion_factor(float): a float number for box expansion factor.
    """
    os.makedirs(output_folder, exist_ok=True)

    for filename in os.listdir(input_folder):
        if filename.endswith(".mp4"):
            input_video_path = os.path.join(input_folder, filename)
            output_video_path = os.path.join(output_folder, filename)
            
            cumulative_mask = None

            cap = cv.VideoCapture(input_video_path)

            if not cap.isOpened():
                print(f"Error: Could not open input video file {input_video_path}.")

            frame_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
            frame_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
            frame_rate = cap.get(cv.CAP_PROP_FPS)

            fourcc = cv.VideoWriter_fourcc(*'mp4v')
            out = cv.VideoWriter(output_video_path, fourcc, frame_rate, (frame_width, frame_height))

            while True:

                ret, frame = cap.read()

                if not ret:
                    break

                results = model.predict(frame)
                
                #frame.shape: [width, height, channel]
                mask = np.zeros(frame.shape[:2], dtype=np.uint8)

                for result in results:
                    target_detections = get_target_detection_result(result, target_class)
                    for contour in target_detections:
                        box = contour.boxes.xyxy[0] # bounding box coordinates
                        expanded_box = expand_box(box, expansion_factor, frame_width, frame_height)
                        cv.rectangle(mask, (int(expanded_box[0]),int(expanded_box[1])),(int(expanded_box[2]),int(expanded_box[3])), (255), cv.FILLED)

                if cumulative_mask is None:
                   cumulative_mask = mask
                else:
                    CUMULATIVE_MASK_HEIGHT = cumulative_mask.shape[0]
                    CUMULATIVE_MASK_WIDTH = cumulative_mask.shape[1]
                    resized_mask = cv.resize(mask, (CUMULATIVE_MASK_WIDTH, CUMULATIVE_MASK_HEIGHT))
                    cumulative_mask = cv.bitwise_or(cumulative_mask, resized_mask)

                masked_frame = cv.bitwise_and(frame, frame, mask = cumulative_mask)
                out.write(masked_frame)

            cap.release()
            out.release()

In [None]:
from ultralytics import YOLO
from pathlib import Path
import yaml 

def main():
    BASE_DIR = Path.cwd()
    
    with open(BASE_DIR / "config.yaml", "r") as f:
        config = yaml.safe_load(f)

    weights_path = BASE_DIR / config["detection_weight_path"]
    input_folder = BASE_DIR / config["action_input_data_path"]
    output_folder= BASE_DIR / config["segmentation_output_data_path"]

    model = YOLO(weights_path)

    for folder in os.listdir(input_folder):
        input_file_path = os.path.join(input_folder, folder)
        output_file_path = os.path.join(output_folder, folder)
        segment_hand(model, input_file_path, output_file_path)

if __name__ == "__main__":
    main()

In [None]:
After this, we have videos that contains segmented hand out of background under output_folder. 

In [None]:
# Step 3: Optical Flow Extraction
import cv2 as cv
from pathlib import Path
import yaml 
import numpy as np 
import matplotlib.pyplot as plt

BASE_DIR = Path.cwd()

with open(BASE_DIR / "config.yaml", "r") as f:
    config = yaml.safe_load(f)

histogram_output_folder_path = BASE_DIR / config["histogram_of_motion_output_data_path"]

MAX_FRET_MOVEMENT_THRESHOLD = 18 
MIN_MOVEMENT_DISTANCE = 0.01

def check_fret_movement(down_direction, up_direction):
    """
    A method to detect the stored movement is due to fret movement, not finger movement.

    args:
    down_direction(int): 
    up_direction(int): 
    return:
    boolean: True if fretmovement is detected, otherwise False. 
    """
    if down_direction < MAX_FRET_MOVEMENT_THRESHOLD or up_direction < MAX_FRET_MOVEMENT_THRESHOLD:
        return False
    return True
    
def eliminate_displacement(displacement, threshold):
    """
    A method to eliminate a small displacement by selected threshold.

    args: 
    displacement(int): a size of displacement vector
    threshold(int): a int value for thresholding

    return:
    displacement(int): processed displacement
    """
    if displacement < threshold:
        return 0
    return displacement
    
def calculate_direction(old_points, new_points):
    """
    Calculate the direction from an old point to a new point.

    param
    old_point: Coordinates of the old point as a tuple (x, y).
    new_point: Coordinates of the new point as a tuple (x, y).
    
    return: Angle (in radians) representing the direction from the old point to the new point.
    """
    displacement = np.array(new_points) - np.array(old_points)
    distance = np.linalg.norm(displacement)

    if distance < MIN_MOVEMENT_DISTANCE:
        return distance, 8

    direction_rad = np.arctan2(displacement[1], displacement[0])
    direction_deg = np.degrees(direction_rad)

    if direction_deg < 0:
        direction_deg += 360

    direction_section = int(direction_deg/45)

    return distance, direction_section

def create_histgram_of_motion(dictionary, class_name, case_num = None, frame_count = None):
    """
    Create histogram for each direction. x axis is magnitude and y axis is frequencies.
    0 : Right
    1 : Up Right
    2 : Up
    3 : Up Left
    4 : Left
    5 : Down Left
    6 : Down
    7 : Down Right
    8 : No Direction

    args:
    dictionary(python dict): a python dictionary containing distances for each direction.
    class_name(str): a class of the action type
    case_num(int): a case number of the video  
    frame_count(int): a total number of frames within a video
    """
    for key, distances in dictionary.items():

        if len(distances) > 0:
            max_distance = max(distances)
        else:
            max_distance = 0

        total_size = frame_count
        hist, _ = np.histogram(distances, bins=20, range=(0, max_distance))
        relative_frequencies = hist / total_size

        plt.bar(np.arange(20), relative_frequencies, color='blue', edgecolor='black', width=0.8)
        plt.ylim(0, 1)

        os.makedirs(f'{histogram_output_folder_path}/{class_name}/{case_num-1}/', exist_ok=True)
        plt.savefig(f'{histogram_output_folder_path}/{class_name}/{case_num-1}/{key}.png')
        plt.clf()
        
    
def segmentation_coordinations(frame):
    """
    A method for finding a bounding box (segmented area) information (x, y, w, h) out of black background.

    args:
    frame(image): an image frame with segmented area and black background. 
    
    return:
    a first frame image.
    a list of coordinates of the segmentation area.
    """
    frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) #make the frame greyscale for easier binary threshold
    _, binary_image = cv.threshold(frame, 1, 255, cv.THRESH_BINARY)
    contours, _  = cv.findContours(binary_image, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)

    if contours:
        segmented_box = max(contours, key=cv.contourArea)
        x,y,w,h = cv.boundingRect(segmented_box)
        return frame, [x, y, w, h]
    
def block_features(frame, coordinates, n_blocks):
    """
    A method for generating a block-based feature points to track during optical flow extraction.

    Args:
        frame (image): An image frame with segmentation.
        coordinates ([int]): A list of coordinates of the bounding box (segmentation area) in black background.
        n_blocks (int): Number of blocks to set tracking points.

    Returns:
        bool: Boolean value to check if the process was successful.
        np.ndarray: Numpy array containing feature point coordinates.
    """
    boo = False
    result = None  
    try:
        start_x, start_y, width, height = coordinates

        x = int(width / (n_blocks + 1))
        y = int(height / (n_blocks + 1))

        result = []
        for row in range(n_blocks):
            for col in range(n_blocks):
                box_point_x = int(start_x + (col + 1) * x)
                box_point_y = int(start_y + (row + 1) * y)
                result.append([np.float32(box_point_x), np.float32(box_point_y)])

        result = np.array(result)

        if result.shape[0] == n_blocks * n_blocks:
            result = result.reshape(n_blocks * n_blocks, 1, 2)
        else:
            raise ValueError("Unexpected number of feature points in result.")

        boo = True

    except Exception as e:
        print("Error at Block Feature Generation function:", e)

        debug_mode = True
        if debug_mode:
            cv.imshow("error", frame)
            cv.waitKey(0)
            cv.destroyAllWindows()

    finally:
        return boo, result
                    
def extract_optical_flow(input_folder, output_folder, class_name, size=None, threshold=2):
    """
    A method for processing a video to draw/track optical flow. 
    """
    os.makedirs(output_folder, exist_ok=True)

    if(size is None):
        size = len(os.listdir(input_folder))

    #Lukas-kanade optical flow parameters
    lk_params = dict(winSize=(50,50), maxLevel=2, criteria=(cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 10, 0.03))
    color = (0, 255, 0)
    dictionary = {0:[],1:[],2:[],3:[],4:[],5:[],6:[],7:[],8:[]} #directions 0 - 8 which is top, right top, ... etc.
    frame_count = 0
    num_videos_processed = 0
    case_num = 0

    while num_videos_processed < size:
        video_list = os.listdir(input_folder)
        for video_file_name in video_list:
            video_file_path = os.path.join(input_folder, video_file_name)
            cap = cv.VideoCapture(video_file_path)

            if not cap.isOpened():
                print(f"Error: Could not open {video_file_path}")
                case_num = case_num - 1
                continue

            fourcc = cv.VideoWriter_fourcc(*'mp4v')
            output_video_path = os.path.join(output_folder, f"optical_flow_{video_file_name}")
            output_writer = cv.VideoWriter(output_video_path, fourcc, 20.0, (640, 480))

            ok, frame = cap.read()

            if np.all(frame==0):
                print("frame skipped due to mal-detection. No hand segmentation was happened on this frame.")
                continue

            # prev_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
            
            first_grayscale_frame, coordinates = segmentation_coordinations(frame)

            boo, first_feature_points = block_features(frame, coordinates, 6)

            mask = np.zeros_like(frame)

            while cap.isOpened():
                ok, frame = cap.read()
                
                if not ok:
                    break
                    
                frame_count += 1
                
                if np.all(frame==0):
                    prinit("Blank frame is detected")
                    continue
                    
                cap.set(cv.CAP_PROP_FRAME_WIDTH, 640)
                cap.set(cv.CAP_PROP_FRAME_HEIGHT, 480)
                next_grayscale_frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)

                # calculates sparse optical flow by Lucas-Kanade method
                next_feature_points, status, error = cv.calcOpticalFlowPyrLK(first_grayscale_frame, next_grayscale_frame, first_feature_points, None, **lk_params)
                good_old = first_feature_points[status==1]
                good_new = next_feature_points[status==1]

                direction_detector = {0:0,1:0,2:0,3:0,4:0,5:0,6:0,7:0,8:0} #used for fret movement elimination (hopefully removed later improvement)
                sub_dictionary = {0:[],1:[],2:[],3:[],4:[],5:[],6:[],7:[],8:[]}

                for i, (new, old) in enumerate(zip(good_new, good_old)):
                    new_x, new_y = new.ravel()
                    old_x, old_y = old.ravel()

                    displacement, direction = calculate_direction((old_x, old_y), (new_x, new_y))

                    displacement = eliminate_displacement(displacement, threshold)

                    if displacement != 0: 
                        direction_detector[direction] = direction_detector[direction]+1
                        sub_dictionary[direction].append(displacement)

                    mask_with_flow = cv.line(mask, (int(new_x), int(new_y)), (int(old_x),int(old_y)), color, 1)
                    frame = cv.circle(frame, (int(new_x), int(new_y)), 1, color, -1)

                if not check_fret_movement(direction_detector[2], direction_detector[6]):
                    for key, values in sub_dictionary.items():
                        for element in values:
                            dictionary[key].append(element)

                #update frame and feature points to next frame info
                first_grayscale_frame = next_grayscale_frame.copy()
                first_feature_points = good_new.reshape(-1, 1, 2)

                #write processed frame 
                output_frame = cv.add(frame, mask)
                output_writer.write(output_frame)

            # Release the resources
            cap.release()
            output_writer.release()
            cv.destroyAllWindows()

            print(f"Sparse optical flow video saved: {output_video_path}")
                    
            num_videos_processed += 1
            case_num += 1

            # create histogram for each direction
            print("Iteration at: ", class_name)
            print("Current Number: ",num_videos_processed)
            print("case number: ", case_num)
            create_histgram_of_motion(dictionary, class_name, case_num, frame_count)

            # Initialize dictionary and frame count for next video
            dictionary = {0:[],1:[],2:[],3:[],4:[],5:[],6:[],7:[],8:[]}
            frame_count = 0  
        

In [None]:
from pathlib import Path
import yaml 
import os 

def main():
    BASE_DIR = Path.cwd()
    
    with open(BASE_DIR / "config.yaml", "r") as f:
        config = yaml.safe_load(f)

    input_folder_path = BASE_DIR / config["segmentation_output_data_path"]
    output_folder_path = BASE_DIR / config["optical_flow_output_data_path"]
    
    for folder in os.listdir(input_folder_path):
        if(folder != "norm"):
            input_folder = os.path.join(input_folder_path, folder)
            output_folder = os.path.join(output_folder_path, folder)
            extract_optical_flow(input_folder, output_folder, folder)

if __name__ == '__main__':
    main()

In [None]:
Now, all preprocessing process is done. We have acess to optical flow data under datasets/actions_dataset/data/optical_flow,
and histogram of motion for each action types under datasets/histgram_of_motion/. Each action contains multiple examples, 
and each example contains 9 histograms of motion in total for corresponding direction. 

In [None]:
##################################################### Classification Model ###################################################################

In [None]:
We have two choices to make classification models: 1. Use histogram images for trainnig and 2. Use histogram tensors.
I actually used histogram images due to my technical drawbacks at that time I was buildinig for my research, 
but now I realized "why not use the histogram tensor directly?". It is more natural to use it after learning 
more about these stuff... (I hope I get more time for techncial discussions with peers next time I can collaborate with someone.)
I firstly present the revised original version, which is image-based classifier. 
(Btw, I even needed to revise this implementation from the original one to make sure my implementation is correct.)

In [None]:
# Step 1: Build a multi-stream LeNet model (Using Histogram Images)
#Revised
import torch
import torch.nn as nn
from torch.nn import Module, Conv2d, Linear, MaxPool2d, ReLU, ModuleList, LogSoftmax
from torch import flatten

class LeNet(Module):
    def __init__(self, numChannels, feature_size=500):
        super(LeNet, self).__init__()

        self.conv1 = Conv2d(in_channels=numChannels, out_channels=20, kernel_size=(5,5))
        self.relu1 = ReLU()
        self.maxpool1 = MaxPool2d(kernel_size=(2,2), stride=(2,2))

        self.conv2 = Conv2d(in_channels=20, out_channels=50, kernel_size=(5,5))
        self.relu2 = ReLU()
        self.maxpool2 = MaxPool2d(kernel_size=(2,2), stride=(2,2))

        self.fc1 = Linear(in_features=1250, out_features=feature_size)
        self.relu3 = ReLU()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)
        
        x = flatten(x, 1)
        x = self.fc1(x)
        x = self.relu3(x)
        
        return x


class MultiStreamCNN(Module):
    def __init__(self, numChannels, num_classes, feature_size=500):
        super(MultiStreamCNN, self).__init__()
        
        self.streams = ModuleList([
            LeNet(numChannels, feature_size) for _ in range(9)
        ])
        
        self.classifier = Linear(feature_size * 9, num_classes)
        
    def forward(self, histograms):
        
        features = []
        for i, histogram in enumerate(histograms):
            feature = self.streams[i](histogram)
            features.append(feature)
        
        combined_features = torch.cat(features, dim=1)
        
        output = self.classifier(combined_features)
        
        return output
        
model = MultiStreamCNN(numChannels=1, num_classes=3)
print(model)

In [None]:
##################################################### Training & Testing & Inference #####################################################################

In [None]:
%pip install scikit-learn

In [None]:
# Step 1: Training & Testing
import torch 
import matplotlib.pyplot as plt
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from torch import nn, optim
from torchvision.transforms import ToTensor, Compose, Resize, Normalize, Grayscale
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support

class CustomDataset(Dataset):
    def __init__(self, root_dir, classes, transform=None):
        self.root_dir = root_dir
        self.classes = classes
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
        self.samples = self._load_samples()
        self.transform = transform

    def _load_samples(self):
        samples = []
        for cls in self.classes:
            class_dir = os.path.join(self.root_dir, cls)
            for case_dir in os.listdir(class_dir):
                case_path = os.path.join(class_dir, case_dir)
                histograms = self._load_histograms(case_path)
                samples.append((histograms, self.class_to_idx[cls]))
        return samples

    def _load_histograms(self, case_path):
        histograms = []
        for i in range(9):
            hist_path = os.path.join(case_path, f"{i}.png")
            histogram = Image.open(hist_path)
            histograms.append(histogram)
        return histograms

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        histograms, label = self.samples[idx]
        if self.transform:
            histograms = [self.transform(hist) for hist in histograms]
        return histograms, label

def evaluate(model_path, val_loader, num_classes):
    """
    This function evaluates the model performance on the data_loader.

    Args:
        model: The multistream CNN model.
        data_loader: The data loader for the evaluation set.
        device: The device ("cuda" or "cpu") to use for computation.

    Returns:
        A tuple containing accuracy and loss (optional, depending on your needs).
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 
    state_dict = torch.load(model_path)

    model = MultiStreamCNN(numChannels=1, num_classes=num_classes)

    model.load_state_dict(state_dict)

    model.to(device)

    criterion = nn.CrossEntropyLoss()

    data_loader = val_loader
    model.eval()  
    num_correct = 0
    num_samples = 0
    loss_val = 0.0
    all_predictions = []  
    all_ground_truths = []  
    
    with torch.no_grad():  
        for i, (hist_streams, labels) in enumerate(data_loader):
            hist_streams = [stream.to(device) for stream in hist_streams]
            labels = labels.to(device)

            outputs = model(hist_streams)
            _, predicted = torch.max(outputs.data, 1)  
            num_correct += (predicted == labels).sum().item()
            num_samples += labels.size(0)
            loss_val += criterion(outputs, labels).item()

            all_predictions.extend(predicted.cpu().numpy())  
            all_ground_truths.extend(labels.cpu().numpy())
            
    accuracy = num_correct / num_samples

    precision, recall, f1_score, _ = precision_recall_fscore_support(
        all_ground_truths, all_predictions, average=None)  

    print(f'Evaluation Accuracy: {accuracy:.4f}')
    for i in range(len(precision)):
        print(f'Class {i}:')
        print(f'  Precision: {precision[i]:.4f}')
        print(f'  Recall: {recall[i]:.4f}')
        print(f'  F1-Score: {f1_score[i]:.4f}')
        print()

    print("##########################################################################\n")
    print()

    precision, recall, f1_score, _ = precision_recall_fscore_support(
        all_ground_truths, all_predictions, average="weighted")  

    print(f'Evaluation Precision (Weighted Average): {precision:.4f}')
    print(f'Evaluation Recall (Weighted Average): {recall:.4f}')
    print(f'Evaluation F1-Score (Weighted Average): {f1_score:.4f}')

    
def train(dataset_root, output_root, classes):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    batch_size = 64
    learning_rate = 0.001
    num_epochs = 40
    num_classes = len(classes)

    transform = Compose([
        transforms.Resize((32,32)),
        Grayscale(num_output_channels=1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5])
    ])

    dataset = CustomDataset(root_dir = dataset_root, classes = classes, transform = transform)
    train_data, val_data, train_labels, val_labels = train_test_split(dataset, dataset.samples, test_size=0.30, random_state=42)
    num_train_samples = len(train_labels)
    num_val_samples = len(val_labels)

    print(f'Number of training samples: {num_train_samples}')
    print(f'Number of validation samples: {num_val_samples}')

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)  

    model = MultiStreamCNN(numChannels=1, num_classes=num_classes).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train_loss = []  
    train_accuracy = []

    for epoch in range(num_epochs):
        for i, (hist_streams, labels) in enumerate(train_loader):
            
            hist_streams = [stream.to(device) for stream in hist_streams]
            labels = labels.to(device)

            outputs = model(hist_streams)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

            if (i+1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
            
            with torch.no_grad():
                correct = 0
                total = 0
                for hist_streams, labels in train_loader:
                    hist_streams = [stream.to(device) for stream in hist_streams]
                    labels = labels.to(device)
                    outputs = model(hist_streams)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()
                train_accuracy.append(correct / total)

    model_path = os.path.join(output_root, "multi_stream_cnn.pth")
    torch.save(model.state_dict(), model_path)

    plt.subplot(1, 2, 1)  
    plt.plot(train_loss)
    plt.xlabel('Epoch')
    plt.ylabel('Training Loss')
    plt.title('Training Loss Curve')

    plt.subplot(1, 2, 2)  
    plt.plot(train_accuracy)
    plt.xlabel('Epoch')
    plt.ylabel('Training Accuracy')
    plt.title('Training Accuracy Curve')

    plt.tight_layout()  

    plt.savefig(f"{output_root}/train_mult_loss_acc_with_norm.png")  

    evaluate(model_path, val_loader, num_classes)

In [None]:
from pathlib import Path
import yaml 
import os 

def main():
    BASE_DIR = Path.cwd()
    
    with open(BASE_DIR / "config.yaml", "r") as f:
        config = yaml.safe_load(f)
    
    dataset_root = BASE_DIR / config["histogram_of_motion_output_data_path"]
    output_root = BASE_DIR / config["model_weight_output_path"]
    classes = ["ham", "pull", "slide"]
    train(dataset_root, output_root, classes)

if __name__ == "__main__":
    main()

In [None]:
%pip install torchsummary

In [None]:
# Step 2: Summary
from torchsummary import summary

def custom_summary(model, input_size):
  """
  Prints a summary of the MultiStreamCNN model architecture.

  Args:
      model: The MultiStreamCNN model instance.
      input_size: A tuple representing the input size (e.g., (num_channels, 32, 32)).
  """

  lenet = model.streams[0]  

  print("---------- LeNet Summary ----------")
  print(f"Input Size: {input_size}")

  total_params_lenet = 0
  for name, param in lenet.named_parameters():
    if 'bias' in name:
      params = param.numel()
    else:
      params = param.numel() * param.size(1)  
    total_params_lenet += params
    print(f"{name}: {params} parameters")

  print(f"Total Parameters (LeNet): {total_params_lenet}")

  print("\n---------- MultiStreamCNN Summary ----------")
  total_params_multistream = total_params_lenet * len(model.streams)  
  total_params_multistream += model.classifier.in_features * model.classifier.out_features  
  print(f"Total Trainable Parameters: {total_params_multistream}")

  print("\n---------- Model Layers ----------")
  for name, _ in model.named_children():
    print(name)

  print("-" * 80)  

model = MultiStreamCNN(numChannels=1, num_classes=3)
custom_summary(model, input_size=(1, 32, 32))


In [None]:
# Step 3: Inference (Maybe Implement later)
class CustomDatasetForPrediction(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.samples = self._load_samples()
        self.transform = transform

    def _load_samples(self):
        samples = []
        case_path = self.root_dir
        histograms = self._load_histograms(case_path)
        samples.append(histograms)
        return samples

    def _load_histograms(self, case_path):
        histograms = []
        for i in range(9):
            hist_path = os.path.join(case_path, f"{i}.png")
            histogram = Image.open(hist_path)
            histograms.append(histogram)
        return histograms

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        histograms = self.samples[idx]
        if self.transform:
            histograms = [self.transform(hist) for hist in histograms]
        return histograms

def predict(folder_path, num_classes):
    """
    This function predicts labels for data in the specified folder using a trained multi-stream CNN model.

    Args:
        model_path: The path to the saved model checkpoint.
        folder_path: The path to the folder containing data for prediction.

    Returns:
        A tuple containing accuracy, precision, recall, and F1-score.
    """
    model_path="./model/multi_stream_cnn.pth"
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    state_dict = torch.load(model_path)

    model = MultiStreamCNN(numChannels=1, num_classes=num_classes) 

    model.load_state_dict(state_dict)

    model.to(device)

    transform = Compose([
        transforms.Resize((32, 32)),
        Grayscale(num_output_channels=1),  
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5])
    ])

    dataset = CustomDatasetForPrediction(folder_path, transform=transform)
    data_loader = DataLoader(dataset, batch_size=1, shuffle=False)

    model.eval()  
    
    with torch.no_grad():  
        for i, hist_streams in enumerate(data_loader):
            hist_streams = [stream.to(device) for stream in hist_streams]

            outputs = model(hist_streams)
            _, predicted = torch.max(outputs.data, 1)  
    return predicted
        

In [None]:
We need to modify the optical flow extraction part since we only saved histogram images from 
the previous implementation. We actually need to save the histogram of motion for each corresponding 
direction as pytorch tensor and also need to build a different model and data loader 
for training/testing/evaluation process. 

In [None]:
# Revised version of histogram extraction method aka we store histogram tensor at the end.

from pathlib import Path
import yaml 
import numpy as np 

BASE_DIR = Path.cwd()

with open(BASE_DIR / "config.yaml", "r") as f:
    config = yaml.safe_load(f)

histogram_output_folder_path = BASE_DIR / config["histogram_of_motion_output_tensor_data_path"]

HISTOGRAM_BIN_SIZE=20
FIXED_MAX_DISTANCE=10.0
def create_histgram_of_motion(dictionary, class_name, case_num = None, frame_count = None):
    """
    Create histogram for each direction. x axis is magnitude and y axis is frequencies.
    Stores histogram tensor. 
    0 : Right
    1 : Up Right
    2 : Up
    3 : Up Left
    4 : Left
    5 : Down Left
    6 : Down
    7 : Down Right
    8 : No Direction

    args:
    dictionary(python dict): a python dictionary containing distances for each direction.
    class_name(str): a class of the action type
    case_num(int): a case number of the video  
    frame_count(int): a total number of frames within a video
    """
    for key, distances in dictionary.items():

        if len(distances) > 0:
            max_distance = max(distances)
        else:
            max_distance = 1e-6 
            
        hist, _ = np.histogram(distances, bins=HISTOGRAM_BIN_SIZE, range=(0, max_distance))

        total_count = hist.sum()
        if total_count > 0:
            normalized_hist = hist / total_count
        else:
            normalized_hist = np.zeros_like(hist, dtype=np.float32)

        output_dir = f'{histogram_output_folder_path}/{class_name}/{case_num-1}/'
        os.makedirs(output_dir, exist_ok=True)
        np.save(f'{output_dir}/{key}.npy', normalized_hist.astype(np.float32))



In [None]:
# we gonna run optical flow extraction again...

from pathlib import Path
import yaml 
import os 

def main():
    BASE_DIR = Path.cwd()
    
    with open(BASE_DIR / "config.yaml", "r") as f:
        config = yaml.safe_load(f)

    input_folder_path = BASE_DIR / config["segmentation_output_data_path"]
    output_folder_path = BASE_DIR / config["optical_flow_output_data_path"]
    
    for folder in os.listdir(input_folder_path):
        if(folder != "norm"):
            input_folder = os.path.join(input_folder_path, folder)
            output_folder = os.path.join(output_folder_path, folder)
            extract_optical_flow(input_folder, output_folder, folder)

if __name__ == '__main__':
    main()

In [None]:
Then, a new version of model using tensors... 

In [None]:
# Step 2: Build a Multi-Stream CNN (Using histgram tensors)
import torch
import torch.nn as nn

class HistogramProcessor(nn.Module):
    def __init__(self, hist_size, feature_size=128):
        """
        Process a 1D histogram directly
        
        Args:
            hist_size: Size of the input histogram (number of bins)
            feature_size: Size of the output feature vector
        """
        super(HistogramProcessor, self).__init__()
        
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=5, padding=2)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool1d(kernel_size=2)
        
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=5, padding=2)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool1d(kernel_size=2)
        
        conv_output_size = hist_size // 4 * 32  
        
        self.fc1 = nn.Linear(conv_output_size, feature_size)
        self.relu3 = nn.ReLU()
        
    def forward(self, x):
        # Input shape: [batch_size, hist_size]
        # Add channel dimension
        x = x.unsqueeze(1)  # [batch_size, 1, hist_size]
        
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        
        x = torch.flatten(x, 1)
        
        x = self.fc1(x)
        x = self.relu3(x)
        
        return x

class MultiStreamHistogramCNN(nn.Module):
    def __init__(self, hist_size, num_classes, feature_size=128):
        """
        Multi-stream CNN for processing 9 directional histograms
        
        Args:
            hist_size: Size of each histogram (number of bins)
            num_classes: Number of output classes
            feature_size: Size of feature vectors from each stream
        """
        super(MultiStreamHistogramCNN, self).__init__()
        
        self.streams = nn.ModuleList([
            HistogramProcessor(hist_size, feature_size) for _ in range(9)
        ])
        
        self.classifier = nn.Linear(feature_size * 9, num_classes)
        
    def forward(self, histograms):
        """
        Process 9 directional histograms
        
        Args:
            histograms: List of 9 tensors, each with shape [batch_size, hist_size]
        """
        features = []
        for i, histogram in enumerate(histograms):
            feature = self.streams[i](histogram)
            features.append(feature)
        
        combined = torch.cat(features, dim=1)
        
        output = self.classifier(combined)
        
        return output
        
hist_size = HISTOGRAM_BIN_SIZE  
model = MultiStreamHistogramCNN(hist_size=hist_size, num_classes=3)

print(model)


In [None]:
##################################################### Training & Testing #####################################################################

In [None]:
# Step 1: Training & Testing
import torch 
import matplotlib.pyplot as plt
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from torch import nn, optim
from torchvision.transforms import ToTensor, Compose, Resize, Normalize, Grayscale
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support
import numpy as np 

class CustomDataset(Dataset):
    def __init__(self, root_dir, classes, transform=None):
        self.root_dir = root_dir
        self.classes = classes
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
        self.samples = self._load_samples()
        self.transform = transform

    def _load_samples(self):
        samples = []
        for cls in self.classes:
            class_dir = os.path.join(self.root_dir, cls)
            for case_dir in os.listdir(class_dir):
                case_path = os.path.join(class_dir, case_dir)
                histograms = self._load_histograms(case_path)
                samples.append((histograms, self.class_to_idx[cls]))
        return samples

    def _load_histograms(self, case_path):
        histograms = []
        for i in range(9):
            hist_path = os.path.join(case_path, f"{i}.npy")
            histogram = np.load(hist_path)
            histograms.append(histogram)
        return histograms

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        histograms, label = self.samples[idx]
        if self.transform:
            histograms = [self.transform(hist) for hist in histograms]
        return histograms, label

def evaluate(model_path, val_loader, num_classes):
    """
    This function evaluates the model performance on the data_loader.

    Args:
        model: The multistream CNN model.
        data_loader: The data loader for the evaluation set.
        device: The device ("cuda" or "cpu") to use for computation.

    Returns:
        A tuple containing accuracy and loss (optional, depending on your needs).
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 
    state_dict = torch.load(model_path)

    model = MultiStreamHistogramCNN(hist_size=HISTOGRAM_BIN_SIZE, num_classes=num_classes)

    model.load_state_dict(state_dict)

    model.to(device)

    criterion = nn.CrossEntropyLoss()

    data_loader = val_loader
    model.eval()  
    num_correct = 0
    num_samples = 0
    loss_val = 0.0
    all_predictions = []  
    all_ground_truths = []  
        
    with torch.no_grad():  
        for i, (hist_streams, labels) in enumerate(data_loader):
            hist_streams = [stream.to(device) for stream in hist_streams]
            labels = labels.to(device)

            outputs = model(hist_streams)
            _, predicted = torch.max(outputs.data, 1)  
            num_correct += (predicted == labels).sum().item()
            num_samples += labels.size(0)
            loss_val += criterion(outputs, labels).item()

            all_predictions.extend(predicted.cpu().numpy())  
            all_ground_truths.extend(labels.cpu().numpy())
            
    accuracy = num_correct / num_samples

    precision, recall, f1_score, _ = precision_recall_fscore_support(
        all_ground_truths, all_predictions, average=None)  

    print(f'Evaluation Accuracy: {accuracy:.4f}')
    for i in range(len(precision)):
        print(f'Class {i}:')
        print(f'  Precision: {precision[i]:.4f}')
        print(f'  Recall: {recall[i]:.4f}')
        print(f'  F1-Score: {f1_score[i]:.4f}')
        print()

    print("##########################################################################\n")
    print()

    precision, recall, f1_score, _ = precision_recall_fscore_support(
        all_ground_truths, all_predictions, average="weighted")  

    print(f'Evaluation Precision (Weighted Average): {precision:.4f}')
    print(f'Evaluation Recall (Weighted Average): {recall:.4f}')
    print(f'Evaluation F1-Score (Weighted Average): {f1_score:.4f}')

def train(dataset_root, output_root, classes):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    batch_size = 64
    learning_rate = 0.001
    num_epochs = 50
    num_classes = len(classes)

    dataset = CustomDataset(root_dir = dataset_root, classes = classes)
    train_data, val_data, train_labels, val_labels = train_test_split(dataset, dataset.samples, test_size=0.30, random_state=42)
    num_train_samples = len(train_labels)
    num_val_samples = len(val_labels)

    print(f'Number of training samples: {num_train_samples}')
    print(f'Number of validation samples: {num_val_samples}')

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)  

    model = MultiStreamHistogramCNN(hist_size=HISTOGRAM_BIN_SIZE, num_classes=num_classes).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train_loss = []  
    train_accuracy = []

    for epoch in range(num_epochs):
        for i, (hist_streams, labels) in enumerate(train_loader):
            
            hist_streams = [stream.to(device) for stream in hist_streams]
            labels = labels.to(device)

            outputs = model(hist_streams)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

            if (i+1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
            
            with torch.no_grad():
                correct = 0
                total = 0
                for hist_streams, labels in train_loader:
                    hist_streams = [stream.to(device) for stream in hist_streams]
                    labels = labels.to(device)
                    outputs = model(hist_streams)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()
                train_accuracy.append(correct / total)

    model_path = os.path.join(output_root, "multi_stream_histogram_cnn.pth")
    torch.save(model.state_dict(), model_path)

    plt.subplot(1, 2, 1)  
    plt.plot(train_loss)
    plt.xlabel('Epoch')
    plt.ylabel('Training Loss')
    plt.title('Training Loss Curve')

    plt.subplot(1, 2, 2)  
    plt.plot(train_accuracy)
    plt.xlabel('Epoch')
    plt.ylabel('Training Accuracy')
    plt.title('Training Accuracy Curve')
    plt.tight_layout()  

    plt.savefig(f"{output_root}/train_mult_hist_loss_acc_with_norm.png")  

    evaluate(model_path, val_loader, num_classes)

In [None]:
from pathlib import Path
import yaml 
import os 

def main():
    BASE_DIR = Path.cwd()
    
    with open(BASE_DIR / "config.yaml", "r") as f:
        config = yaml.safe_load(f)
    
    dataset_root = BASE_DIR / config["histogram_of_motion_output_tensor_data_path"]
    output_root = BASE_DIR / config["model_weight_output_path"]
    classes = ["ham", "pull", "slide"]
    train(dataset_root, output_root, classes)

if __name__ == "__main__":
    main()

In [None]:
# Step 2: Summary
from torchsummary import summary

def custom_summary(model, input_size):
  """
  Prints a summary of the MultiStreamCNN model architecture.

  Args:
      model: The MultiStreamCNN model instance.
      input_size: A tuple representing the input size (e.g., (num_channels, 32, 32)).
  """

  lenet = model.streams[0]  

  print("---------- LeNet Summary ----------")
  print(f"Input Size: {input_size}")

  total_params_lenet = 0
  for name, param in lenet.named_parameters():
    if 'bias' in name:
      params = param.numel()
    else:
      params = param.numel() * param.size(1)  
    total_params_lenet += params
    print(f"{name}: {params} parameters")

  print(f"Total Parameters (LeNet): {total_params_lenet}")

  print("\n---------- MultiStreamCNN Summary ----------")
  total_params_multistream = total_params_lenet * len(model.streams)  
  total_params_multistream += model.classifier.in_features * model.classifier.out_features  
  print(f"Total Trainable Parameters: {total_params_multistream}")

  print("\n---------- Model Layers ----------")
  for name, _ in model.named_children():
    print(name)

  print("-" * 80)  


model = MultiStreamHistogramCNN(hist_size=HISTOGRAM_BIN_SIZE, num_classes=3)
custom_summary(model, input_size=(1, 64))

In [None]:
############################################################ Furthermore ##############################################################################

In [None]:
Note myself:
- try mediapipe for optical extraction feature for better action data retrieval
- try more data
- try vision transformer for better recogniton
- try applying multi-head attention to finger movement
- try 