## Prepare Data

In [4]:
import os
import shutil

def create_combined_folder(base_path, combined_folder_name):
    combined_path = os.path.join(base_path, combined_folder_name)
    if not os.path.exists(combined_path):
        os.makedirs(combined_path)
    return combined_path

def copy_contents(src, dst):
    for root, dirs, files in os.walk(src):
        relative_path = os.path.relpath(root, src)
        target_dir = os.path.join(dst, relative_path)
        if not os.path.exists(target_dir):
            os.makedirs(target_dir)
        for file in files:
            shutil.copy2(os.path.join(root, file), os.path.join(target_dir, file))

def main():
    base_path = 'C:/Users/LENOVO/Downloads/KArSL-502'
    folders_to_combine = ['01', '02', '03']
    combined_folder_name = 'combined'

    combined_path = create_combined_folder(base_path, combined_folder_name)

    for folder in folders_to_combine:
        src_path = os.path.join(base_path, folder)
        copy_contents(src_path, combined_path)

    print(f"Combined contents of {folders_to_combine} into {combined_path}")

if __name__ == "__main__":
    main()

Combined contents of ['01', '02', '03'] into C:/Users/LENOVO/Downloads/KArSL-502\combined


In [43]:
import os

# Path to the directory containing the folders
directory_path = 'Data/train'

# List all the folders in the directory
folders = os.listdir(directory_path)

for folder in folders:
    # Check if the folder name starts with '0'
    if folder.startswith('0'):
        # New folder name without the leading zero
        new_folder_name = folder[1:]
        # Old folder path
        old_folder_path = os.path.join(directory_path, folder)
        # New folder path
        new_folder_path = os.path.join(directory_path, new_folder_name)
        # Rename the folder
        os.rename(old_folder_path, new_folder_path)

In [2]:
import os
import pandas as pd

def assign_labels(files_path, df):
    data_list = []

    folders = os.listdir(files_path)

    for i, j in enumerate(folders):
        folder_sign_number = int(j)
        
        if folder_sign_number in df['Sign Number'].values:
            row = df[df['Sign Number'] == folder_sign_number].iloc[0]
            folder_path = os.path.join(files_path, j)
            label = row['Arabic Sign']
            
            for file_name in os.listdir(folder_path):
                file_path = os.path.join(folder_path, file_name)
                
                data_list.append({'file_path': file_path, 'Label': label})

    data = pd.concat([pd.DataFrame(data_list)], ignore_index=True)
    data = data.sample(frac=1).reset_index(drop=True)
    data = pd.get_dummies(data, columns=['Label'])
    x = data['file_path']
    y = data.loc[:, data.columns != 'file_path']

    return x, y


## Load Data

In [67]:
import pandas as pd
import cv2
import mediapipe as mp
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from concurrent.futures import ThreadPoolExecutor, as_completed
from torch_geometric.data import Data
from torch_geometric.utils import dense_to_sparse
from torch_geometric.nn import GCNConv
import torch
from tqdm import tqdm
from torch_geometric.loader import DataLoader
from sklearn.preprocessing import LabelEncoder
import pickle
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool
from torch_geometric.utils import add_self_loops, to_dense_adj
import time
from sklearn.metrics import accuracy_score    

In [3]:
train_df = pd.read_csv(r'C:\Users\LENOVO\Downloads\KArSL-502\train.csv')
test_df = pd.read_csv(r'C:\Users\LENOVO\Downloads\KArSL-502\test.csv')

In [4]:
X_train, y_train = train_df['file_path'], train_df['Label']
X_test, y_test = test_df['file_path'], test_df['Label']

## Preprocess Data

In [8]:
def sample_frames(video_path, T=48):
    cap = cv2.VideoCapture(video_path)
    N = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    frames = []
    if N < T:
        # Read all frames if there are fewer than T frames
        for i in range(N):
            ret, frame = cap.read()
            if ret:
                frames.append(frame)
        # Interpolate frames to reach T frames
        frames = interpolate_frames(frames, T)
    else:
        # Sample T frames evenly from the video
        I = max(1, N // T)
        for i in range(0, N, I):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            ret, frame = cap.read()
            if ret:
                frames.append(frame)
            if len(frames) == T:
                break
    
    cap.release()
    return frames

In [9]:
def interpolate_frames(frames, T):
    num_existing_frames = len(frames)
    interpolated_frames = []
    
    for i in range(T):
        t = i * (num_existing_frames - 1) / (T - 1)
        lower_idx = int(np.floor(t))
        upper_idx = int(np.ceil(t))
        
        if lower_idx == upper_idx:
            interpolated_frames.append(frames[lower_idx])
        else:
            alpha = t - lower_idx
            frame = cv2.addWeighted(frames[lower_idx], 1 - alpha, frames[upper_idx], alpha, 0)
            interpolated_frames.append(frame)
    
    return interpolated_frames

In [10]:
def detect_landmarks(frames):
    mp_holistic = mp.solutions.holistic
    landmarks_list = []
    with mp_holistic.Holistic(static_image_mode=True) as holistic:
        for frame in frames:
            results = holistic.process(frame)
            if results.pose_landmarks and results.left_hand_landmarks and results.right_hand_landmarks:
                landmarks = []
                pose_indices = [0, 11, 12, 13, 14]
                for idx in pose_indices:
                    lm = results.pose_landmarks.landmark[idx]
                    landmarks.append((lm.x, lm.y, lm.z))
                hand_indices = [0, 4, 5, 8, 9, 12, 13, 16, 17, 20]
                for hand_landmarks in [results.left_hand_landmarks, results.right_hand_landmarks]:
                    for idx in hand_indices:
                        lm = hand_landmarks.landmark[idx]
                        landmarks.append((lm.x, lm.y, lm.z))
            else:
                # Append default values if landmarks are not detected
                landmarks = [(0.0, 0.0, 0.0)] * 25
            landmarks_list.append(landmarks)
    return landmarks_list

In [11]:
def normalize_landmarks(landmarks_list):
    normalized_landmarks_list = []
    for landmarks in landmarks_list:
        nose = landmarks[0] 
        normalized_landmarks = [(x - nose[0] + 1e-14, y - nose[1] + 1e-14, z - nose[2] + 1e-14) for x, y, z in landmarks]
        normalized_landmarks_list.append(normalized_landmarks)
    return normalized_landmarks_list

In [12]:
def construct_graph(normalized_landmarks_list):
    G = nx.Graph()
    T = len(normalized_landmarks_list)
    X = []  # Node features matrix

    # Define connections based on the landmarks
    body_connections = [
        # Upper Body
        (0, 1), (0, 2), (1, 2), # Nose to shoulders
        # Left Arm
        (1, 3), # Shoulder to Elbow
        # Right Arm
        (2, 4), # Shoulder to Elbow
    ]
    
    hand_connections = [
        # Thumb
        (1, 2), (2, 3), (3, 4),
        # Index finger
        (5, 6), (6, 7), (7, 8),
        # Middle finger
        (9, 10), (10, 11), (11, 12),
        # Ring finger
        (13, 14), (14, 15), (15, 16),
        # Pinky finger
        (17, 18), (18, 19), (19, 20),
        # Wrist to fingers
        (0, 1), (0, 5), (0, 9), (0, 13), (0, 17)
    ]

    for t in range(T):
        for i, landmark in enumerate(normalized_landmarks_list[t]):
            # Add node for each landmark in each frame with 3D position
            G.add_node((t, i), pos=(landmark[0], landmark[1], landmark[2]))  
            X.append([landmark[0], landmark[1], landmark[2]])  # Add to feature matrix
            if t > 0:
                # Connect the same landmark between consecutive frames (inter-frame edges)
                G.add_edge((t-1, i), (t, i))
        
        # Add intra-frame edges based on the body and hand structure
        for (i, j) in body_connections + hand_connections:
            G.add_edge((t, i), (t, j))
    
    return G, np.array(X)

In [13]:
def extract_features_as_tensor(normalized_landmarks_list):
    T = len(normalized_landmarks_list)
    V = len(normalized_landmarks_list[0])
    C = 3

    tensor = np.zeros((C, V, T))

    for t in range(T):
        for v in range(V):
            x, y, z = normalized_landmarks_list[t][v]
            tensor[0, v, t] = x
            tensor[1, v, t] = y
            tensor[2, v, t] = z

    return tensor

In [14]:
def get_adjacency_matrix(graph):
    adj_matrix = nx.adjacency_matrix(graph).todense()
    #adj_matrix = np.linalg.inv(np.sqrt(adj_matrix))
    return adj_matrix

In [15]:
def process_video(video_path, label, T=48):
    sampled_frames = sample_frames(video_path, T)
    detected_landmarks = detect_landmarks(sampled_frames)
    normalized_landmarks = normalize_landmarks(detected_landmarks)
    graph, X = construct_graph(normalized_landmarks)
    adj_matrix = get_adjacency_matrix(graph)
    
    edge_index, _ = dense_to_sparse(torch.tensor(adj_matrix, dtype=torch.float))
    edge_index = edge_index.long()
    x = torch.tensor(X, dtype=torch.float)
    
    # Include the label in the Data object
    y = torch.tensor(label, dtype=torch.long)
    
    data = Data(x=x, edge_index=edge_index, y=y)
    
    return data


In [16]:
def process_videos_parallel(video_paths, labels, T=48):
    data_list = []
    
    with ThreadPoolExecutor() as executor:
        futures = {executor.submit(process_video, video_path, label, T): (video_path, label) for video_path, label in zip(video_paths, labels)}
        
        # Use tqdm to display progress
        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing Videos"):
            video_path, label = futures[future]
            try:
                data = future.result()  # The `Data` object returned from `process_video`
                data_list.append(data)
            except Exception as e:
                print(f"Error processing video {video_path}: {e}")

    return data_list


## Feed data to model

In [14]:
# Initialize the label encoder
label_encoder = LabelEncoder()

# Fit and transform the labels to integer indices
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)

In [15]:
train_data = process_videos_parallel(X_train, y_train_encoded, 48)
test_data = process_videos_parallel(X_test, y_test_encoded, 48)

Processing Videos: 100%|██████████| 2494/2494 [2:13:09<00:00,  3.20s/it]  
Processing Videos: 100%|██████████| 480/480 [25:47<00:00,  3.22s/it] 


## Model

In [15]:
from torch_geometric.nn import global_mean_pool

class GCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, dropout_prob=0.5):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.bn1 = torch.nn.BatchNorm1d(hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.bn2 = torch.nn.BatchNorm1d(hidden_channels)
        self.fc = torch.nn.Linear(hidden_channels, out_channels)
        self.dropout = torch.nn.Dropout(dropout_prob)

    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = torch.relu(x)
        x = self.dropout(x)

        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = torch.relu(x)

        # Apply global mean pooling to get a graph-level representation
        x = global_mean_pool(x, batch)

        x = self.fc(x)
        return x


In [4]:
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

In [16]:
model = GCNModelEnhanced(in_channels=3, hidden_channels=16, out_channels=20)  # Adjust output channels based on number of classes
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

## Train

In [7]:
from torch.optim.lr_scheduler import ReduceLROnPlateau

In [8]:
def train(model, train_loader, optimizer, criterion, device, scheduler=None, grad_clip=None):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    correct = 0
    total = 0

    for data in train_loader:
        data = data.to(device)
        optimizer.zero_grad()  # Zero the gradients

        output = model(data.x, data.edge_index, data.batch)  # Forward pass
        loss = criterion(output, data.y)  # Calculate loss
        loss.backward()  # Backward pass (compute gradients)
        
        # Gradient clipping
        if grad_clip is not None:
            torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
        
        optimizer.step()  # Update weights

        running_loss += loss.item() * data.num_graphs
        _, predicted = torch.max(output, 1)
        correct += (predicted == data.y).sum().item()
        total += data.y.size(0)
    
    # Step the learning rate scheduler, if provided
    if scheduler is not None:
        scheduler.step(running_loss)

    epoch_loss = running_loss / len(train_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

In [9]:
def evaluate(model, test_loader, criterion, device):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():  # No need to calculate gradients during evaluation
        for data in test_loader:
            data = data.to(device)

            output = model(data.x, data.edge_index, data.batch)  # Forward pass
            loss = criterion(output, data.y)  # Calculate loss

            running_loss += loss.item() * data.num_graphs
            _, predicted = torch.max(output, 1)
            correct += (predicted == data.y).sum().item()
            total += data.y.size(0)

    epoch_loss = running_loss / len(test_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

num_epochs = 10000
grad_clip = 1.0  # Gradient clipping threshold
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True)



In [17]:
for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model, train_loader, optimizer, criterion, device, scheduler, grad_clip)
    test_loss, test_accuracy = evaluate(model, test_loader, criterion, device)

    print(f'Epoch {epoch+1}/{num_epochs}, '
          f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, '
          f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

Epoch 1/10, Train Loss: 3.0018, Train Accuracy: 0.0449, Test Loss: 2.9980, Test Accuracy: 0.0500
Epoch 2/10, Train Loss: 2.9968, Train Accuracy: 0.0529, Test Loss: 2.9977, Test Accuracy: 0.0500
Epoch 3/10, Train Loss: 2.9964, Train Accuracy: 0.0445, Test Loss: 2.9979, Test Accuracy: 0.0500
Epoch 4/10, Train Loss: 2.9960, Train Accuracy: 0.0389, Test Loss: 2.9982, Test Accuracy: 0.0500
Epoch 5/10, Train Loss: 2.9960, Train Accuracy: 0.0473, Test Loss: 2.9983, Test Accuracy: 0.0500
Epoch 6/10, Train Loss: 2.9964, Train Accuracy: 0.0465, Test Loss: 2.9983, Test Accuracy: 0.0500
Epoch 7/10, Train Loss: 2.9966, Train Accuracy: 0.0425, Test Loss: 2.9980, Test Accuracy: 0.0500
Epoch 8/10, Train Loss: 2.9961, Train Accuracy: 0.0409, Test Loss: 2.9978, Test Accuracy: 0.0500
Epoch 9/10, Train Loss: 2.9964, Train Accuracy: 0.0433, Test Loss: 2.9984, Test Accuracy: 0.0500
Epoch 10/10, Train Loss: 2.9959, Train Accuracy: 0.0477, Test Loss: 2.9986, Test Accuracy: 0.0500


In [42]:
torch.save(train_data, "Saved/train_data.pth")
torch.save(test_data, "Saved/test_data.pth")
torch.save(model.state_dict(), "Saved/gcn_model.pth")
with open("saved/label_encoder.pkl", "wb") as f:
    pickle.dump(label_encoder, f)

## Testing and Inference

In [53]:
class GCNModel1(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(GCNModel1, self).__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.fc = torch.nn.Linear(hidden_channels, out_channels)  # Final fully connected layer

    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = torch.relu(x)
        x = self.conv2(x, edge_index)
        x = global_mean_pool(x, batch)  # Pooling over all nodes in the graph
        x = self.fc(x)
        return x

# Second model
class GCNModel2(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, dropout_prob=0.5):
        super(GCNModel2, self).__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.bn1 = torch.nn.BatchNorm1d(hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.bn2 = torch.nn.BatchNorm1d(hidden_channels)
        self.fc = torch.nn.Linear(hidden_channels, out_channels)
        self.dropout = torch.nn.Dropout(dropout_prob)

    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = torch.relu(x)
        x = self.dropout(x)

        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = torch.relu(x)

        x = global_mean_pool(x, batch)
        x = self.fc(x)
        return x

In [55]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Instantiate both models
model1 = GCNModel1(in_channels=3, hidden_channels=16, out_channels=20).to(device)  # Adjust in_channels, out_channels as needed
model2 = GCNModel2(in_channels=3, hidden_channels=16, out_channels=20, dropout_prob=0.5).to(device)

# Load saved weights
model1.load_state_dict(torch.load('Saved/gcn_model.pth'))
model2.load_state_dict(torch.load('Saved/gcn_model_1.pth'))

  model1.load_state_dict(torch.load('Saved/gcn_model.pth'))
  model2.load_state_dict(torch.load('Saved/gcn_model_1.pth'))


<All keys matched successfully>

In [63]:
def inference_pipeline(video_path, model, label_encoder, T=48):
    # Step 1: Preprocess the video
    sampled_frames = sample_frames(video_path, T)
    detected_landmarks = detect_landmarks(sampled_frames)
    normalized_landmarks = normalize_landmarks(detected_landmarks)
    graph, X = construct_graph(normalized_landmarks)
    adj_matrix = get_adjacency_matrix(graph)
    
    edge_index, _ = dense_to_sparse(torch.tensor(adj_matrix, dtype=torch.float))
    edge_index = edge_index.long()
    x = torch.tensor(X, dtype=torch.float)
    
    # Step 2: Create a Data object for the video
    data = Data(x=x, edge_index=edge_index)
    
    # Since this is a single example, we add a dummy batch dimension
    data.batch = torch.zeros(data.num_nodes, dtype=torch.long)
    
    # Step 3: Perform inference
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # No need to compute gradients for inference
        output = model(data.x, data.edge_index, data.batch)
        predicted_class = output.argmax(dim=1).item()  # Get the class with highest score
    
    # Step 4: Decode the predicted class to the original label
    predicted_label = label_encoder.inverse_transform([predicted_class])[0]
    
    return predicted_label

In [64]:
# Evaluate model1
model1_loss, model1_accuracy = evaluate(model1, test_loader, criterion, device)
print(f"Model 1 - Loss: {model1_loss:.4f}, Accuracy: {model1_accuracy:.4f}")

# Evaluate model2
model2_loss, model2_accuracy = evaluate(model2, test_loader, criterion, device)
print(f"Model 2 - Loss: {model2_loss:.4f}, Accuracy: {model2_accuracy:.4f}")

Model 1 - Loss: 0.8824, Accuracy: 0.7354
Model 2 - Loss: 0.8457, Accuracy: 0.7354


In [65]:
def evaluate_model_on_videos(model, X_test, y_test_encoded, label_encoder, T=48, num_videos=100):
    correct_predictions = 0
    total_time = 0
    total_videos = min(num_videos, len(X_test))  # Use the first 100 videos or fewer if X_test is smaller

    for i in range(total_videos):
        video_path = X_test[i]
        true_label = y_test_encoded[i]

        start_time = time.time()
        predicted_label = inference_pipeline(video_path, model, label_encoder, T)
        end_time = time.time()

        # Compare predicted label to the true label
        if predicted_label == label_encoder.inverse_transform([true_label])[0]:
            correct_predictions += 1

        # Calculate time taken for this inference
        total_time += (end_time - start_time)

    # Calculate accuracy and average inference time
    accuracy = correct_predictions / total_videos
    average_time_per_inference = total_time / total_videos

    return accuracy, average_time_per_inference

In [66]:
# Evaluate the first model
model1_accuracy, model1_avg_time = evaluate_model_on_videos(model1, X_test, y_test_encoded, label_encoder, T=48, num_videos=100)
print(f"Model 1 - Accuracy: {model1_accuracy:.4f}, Average Inference Time: {model1_avg_time:.4f} seconds")

# Evaluate the second model
model2_accuracy, model2_avg_time = evaluate_model_on_videos(model2, X_test, y_test_encoded, label_encoder, T=48, num_videos=100)
print(f"Model 2 - Accuracy: {model2_accuracy:.4f}, Average Inference Time: {model2_avg_time:.4f} seconds")

Model 1 - Accuracy: 0.7300, Average Inference Time: 5.2008 seconds
Model 2 - Accuracy: 0.6800, Average Inference Time: 5.1972 seconds


In [44]:
# Example usage
video_path = r"Data/train/181/03_02_0181_(25_05_17_17_36_14)_c.mp4"
predicted_label = inference_pipeline(video_path, model, label_encoder)
print(f"The predicted label for the video is: {predicted_label}")

The predicted label for the video is: يفكر


## Load model and data

In [1]:
import pandas as pd
import cv2
import mediapipe as mp
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from concurrent.futures import ThreadPoolExecutor, as_completed
from torch_geometric.data import Data
from torch_geometric.utils import dense_to_sparse
from torch_geometric.nn import GCNConv
import torch
from tqdm import tqdm
from torch_geometric.loader import DataLoader
from sklearn.preprocessing import LabelEncoder
import pickle
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool
from torch_geometric.utils import add_self_loops, to_dense_adj
import time
from sklearn.metrics import accuracy_score    
from ruptures import Pelt
from ruptures.metrics import hausdorff
from ruptures.costs import CostL2

In [2]:
train_data = torch.load("Saved/train_data.pth")
test_data = torch.load("Saved/test_data.pth")
with open("Saved/label_encoder.pkl", "rb") as f:
    label_encoder = pickle.load(f)

  train_data = torch.load("Saved/train_data.pth")
  test_data = torch.load("Saved/test_data.pth")


In [5]:
class GCNModel(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(GCNModel, self).__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.fc = torch.nn.Linear(hidden_channels, out_channels)  # Final fully connected layer

    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = torch.relu(x)
        x = self.conv2(x, edge_index)
        x = global_mean_pool(x, batch)  # Pooling over all nodes in the graph
        x = self.fc(x)
        return x
    

model = GCNModel(in_channels=3, hidden_channels=16, out_channels=20)
model.load_state_dict(torch.load('Saved/gcn_model.pth'))

  model.load_state_dict(torch.load('Saved/gcn_model.pth'))


<All keys matched successfully>

## Sliding Window

In [25]:
def detect_change_points(video_path, model, label_encoder, min_size=5, penalty=10):
    """
    Detects change points in a video and classifies segments.
    
    Parameters:
    - video_path: Path to the input video file.
    - model: The trained GCN model.
    - label_encoder: The label encoder used to encode/decode class labels.
    - min_size: Minimum segment size (in frames) for change point detection.
    - penalty: Penalty value for change point detection sensitivity.
    
    Returns:
    - A sentence representing the detected signs.
    """
    cap = cv2.VideoCapture(video_path)
    N = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_features = []

    # Feature extraction (e.g., pixel-wise differences between frames)
    _, prev_frame = cap.read()
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    for i in range(1, N):
        ret, frame = cap.read()
        if not ret:
            break
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        diff = cv2.absdiff(prev_gray, gray)
        frame_features.append(np.mean(diff))
        prev_gray = gray

    cap.release()

    # Convert to numpy array
    frame_features = np.array(frame_features)

    # Change point detection using PELT
    model_cp = Pelt(model="l2", min_size=min_size).fit(frame_features)
    change_points = model_cp.predict(pen=penalty)

    # Add start and end points
    change_points = [0] + change_points + [N]

    predicted_labels = []
    cap = cv2.VideoCapture(video_path)
    
    for start, end in zip(change_points[:-1], change_points[1:]):
        sampled_frames = []
        for i in range(start, end):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            ret, frame = cap.read()
            if ret:
                sampled_frames.append(frame)

        if len(sampled_frames) < 2:
            continue
        
        # Apply the existing inference pipeline to each segment
        detected_landmarks = detect_landmarks(sampled_frames)
        normalized_landmarks = normalize_landmarks(detected_landmarks)
        graph, X = construct_graph(normalized_landmarks)
        adj_matrix = get_adjacency_matrix(graph)
        
        edge_index, _ = dense_to_sparse(torch.tensor(adj_matrix, dtype=torch.float))
        edge_index = edge_index.long()
        x = torch.tensor(X, dtype=torch.float)
        
        data = Data(x=x, edge_index=edge_index)
        data.batch = torch.zeros(data.num_nodes, dtype=torch.long)

        # Perform inference on the segment
        model.eval()
        with torch.no_grad():
            output = model(data.x, data.edge_index, data.batch)
            predicted_class = output.argmax(dim=1).item()

        # Decode the predicted class to the original label
        predicted_label = label_encoder.inverse_transform([predicted_class])[0]
        predicted_labels.append(predicted_label)
    
    cap.release()

    # Join the final labels into a sentence
    sentence = " ".join(predicted_labels)
    
    return sentence

In [26]:
video_path = r"C:\Users\LENOVO\Downloads\03-03-0171-21-03-17-22-04-56-c_CnFcgmB0.mp4"
predicted_sentence = detect_change_points(video_path, model, label_encoder)
print("Predicted Sentence:", predicted_sentence)

Predicted Sentence: يبني يبني يمشي
