In [1]:
import os
import pickle
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv,GATConv
from sklearn.metrics import accuracy_score, recall_score, f1_score, classification_report
import torch.nn.functional as F
import random
import mediapipe as mp
from pymongo import MongoClient
from neo4j import GraphDatabase
from collections import defaultdict
from collections import defaultdict, Counter
import numpy as np

In [2]:
# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
    static_image_mode=False,
    model_complexity=2,  
    smooth_landmarks=True,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)
mp_drawing = mp.solutions.drawing_utils

In [3]:
 #MongoDB connection
mongo_client = MongoClient("mongodb://admin:password@localhost:27017/")
mongo_db = mongo_client["SportsAnalysis"]
labels_collection = mongo_db["metadata"]

# Neo4j connection
neo4j_driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

In [4]:
def fetch_all_video_ids():
        return [doc['video_id'] for doc in labels_collection.find({}, {'video_id': 1})]

def fetch_label(video_id):
        doc = labels_collection.find_one({'video_id': video_id})
        return doc['label'] if doc else -1

In [5]:

def fetch_graphs_from_neo4j(video_id):
        with neo4j_driver.session() as session:
            # Get all unique time steps
            result = session.run("""
                MATCH (n:PoseNode {video_id: $video_id})
                RETURN DISTINCT n.time_index AS timestep
                ORDER BY timestep ASC
            """, video_id=video_id)
            time_steps = [record["timestep"] for record in result]

            graphs = []
            for t in time_steps:
                # Fetch nodes
                node_query = session.run("""
                    MATCH (n:PoseNode {video_id: $video_id, time_index: $t})
                    RETURN n.node_index AS idx, n.angle AS angle, n.time AS time
                    ORDER BY idx
                """, video_id=video_id, t=t)

                node_data = []
                time_value = 0
                for record in node_query:
                    node_data.append(float(record["angle"]))
                    time_value = float(record["time"])

                x = torch.tensor(node_data, dtype=torch.float).view(-1, 1)

                # Fetch edges
                edge_query = session.run("""
                    MATCH (a:PoseNode {video_id: $video_id, time_index: $t})-[r:CONNECTED_TO]->(b:PoseNode)
                    RETURN a.node_index AS src, b.node_index AS dst, r.weight AS weight
                """, video_id=video_id, t=t)

                edge_index = []
                edge_attr = []
                for record in edge_query:
                    edge_index.append([int(record["src"]), int(record["dst"])])
                    edge_attr.append([float(record["weight"])])

                edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
                edge_attr = torch.tensor(edge_attr, dtype=torch.float)

                graphs.append({
                    "edge_index": edge_index,
                    "edge_attr": edge_attr,
                    "angle_features": x,
                    "time": time_value,
                    "source_video": video_id,
                    "label": fetch_label(video_id),
                    "node_mapping": {},  # Optional: mapping if you have remapped indices
                    "reverse_mapping": {},
                    "node_features": x.clone()
                })
            return graphs

In [6]:
def load_graph_sequences_from_db():
    video_ids = fetch_all_video_ids()
    all_data = []

    for vid in video_ids:
        try:
            graph_sequence = fetch_graphs_from_neo4j(vid)
            if graph_sequence:
                all_data.append(graph_sequence)
        except Exception as e:
            print(f"Error loading video {vid}: {e}")

    print(f"✅ Loaded {len(all_data)} videos from DB")
    return all_data


In [7]:
mp_pose = mp.solutions.pose

class FreeThrowDataset(Dataset):
    def __init__(self, matrix_data):
        self.data = matrix_data
        # The angle nodes are automatically mapped in our enhanced processing
        # But we can still keep track of which landmarks are associated with angles
        self.angle_node_landmarks = [
            mp_pose.PoseLandmark.LEFT_ELBOW.value,
            mp_pose.PoseLandmark.RIGHT_ELBOW.value,
            mp_pose.PoseLandmark.LEFT_SHOULDER.value,
            mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
            mp_pose.PoseLandmark.LEFT_KNEE.value,
            mp_pose.PoseLandmark.RIGHT_KNEE.value,
            mp_pose.PoseLandmark.LEFT_HIP.value,
            mp_pose.PoseLandmark.RIGHT_HIP.value
        ]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sequence = self.data[idx]
        data_sequence = []
        
        for timestep in sequence:
            # Check if timestep is a dictionary (as expected)
            if not isinstance(timestep, dict):
                raise TypeError(f"Expected dictionary, got {type(timestep)}. Value: {timestep}")
                
            try:
                edge_index = timestep['edge_index']
                edge_attr = timestep['edge_attr']
                label = timestep['label']
                angle_features = timestep['angle_features']
                
                # Convert label to tensor if it's not already
                if not isinstance(label, torch.Tensor):
                    y = torch.tensor([label], dtype=torch.float)
                else:
                    y = label
                
                # Create graph data object with already remapped indices and features
                data = Data(
                    x=angle_features,  # Use angle features as node features
                    edge_index=edge_index,
                    edge_attr=edge_attr,
                    y=y,
                    num_nodes=angle_features.size(0)
                )
                
                # Store additional information for reference
                data.original_to_new_mapping = timestep['node_mapping']
                data.new_to_original_mapping = timestep['reverse_mapping']
                data.positional_features = timestep['node_features']  # Store original position features
                data.time = timestep['time']
                data.source_video = timestep['source_video']
                
                data_sequence.append(data)
            except KeyError as e:
                # Print detailed error info for debugging
                print(f"KeyError: {e} not found in timestep. Available keys: {list(timestep.keys())}")
                raise
            except Exception as e:
                print(f"Error processing timestep: {e}")
                raise
            
        return data_sequence

def collate_fn(batch):
    """
    Custom collate function for batching sequences.
    Each batch item is a sequence of frames, and we want to
    maintain these sequences.
    """
    return batch

In [8]:
def stratified_split(graphdata, train_ratio=0.7, random_seed=42):
    """
    Split graph data while maintaining class distribution in both train and test sets.
    
    Args:
        graphdata: List of graph sequences
        train_ratio: Proportion of data for training (default: 0.7)
        random_seed: Random seed for reproducibility
    
    Returns:
        train_matrix, test_matrix: Stratified splits of the data
    """
    # Set random seed for reproducibility
    random.seed(random_seed)
    np.random.seed(random_seed)
    
    # Group data by labels
    label_to_sequences = defaultdict(list)
    
    for sequence in graphdata:
        # Get label from first timestep (all timesteps in a sequence have same label)
        if sequence and len(sequence) > 0:
            label = sequence[0]['label']
            label_to_sequences[label].append(sequence)
    
    # Print class distribution
    print("Class distribution in dataset:")
    for label, sequences in label_to_sequences.items():
        print(f"  Class {label}: {len(sequences)} sequences ({len(sequences)/len(graphdata)*100:.1f}%)")
    
    train_matrix = []
    test_matrix = []
    
    # Split each class proportionally
    for label, sequences in label_to_sequences.items():
        # Shuffle sequences for this class
        random.shuffle(sequences)
        
        # Calculate split point
        n_train = int(len(sequences) * train_ratio)
        
        # Ensure at least one sample in test if possible
        if len(sequences) > 1 and n_train == len(sequences):
            n_train = len(sequences) - 1
        
        # Split the sequences
        train_sequences = sequences[:n_train]
        test_sequences = sequences[n_train:]
        
        train_matrix.extend(train_sequences)
        test_matrix.extend(test_sequences)
        
        print(f"  Class {label}: {len(train_sequences)} train, {len(test_sequences)} test")
    
    # Final shuffle to mix classes
    random.shuffle(train_matrix)
    random.shuffle(test_matrix)
    
    print(f"\nFinal split: {len(train_matrix)} train, {len(test_matrix)} test")
    
    # Verify class distribution is maintained
    train_labels = [seq[0]['label'] for seq in train_matrix if seq]
    test_labels = [seq[0]['label'] for seq in test_matrix if seq]
    
    print(f"\nTrain set class distribution:")
    train_counter = Counter(train_labels)
    for label, count in sorted(train_counter.items()):
        print(f"  Class {label}: {count} ({count/len(train_labels)*100:.1f}%)")
    
    print(f"\nTest set class distribution:")
    test_counter = Counter(test_labels)
    for label, count in sorted(test_counter.items()):
        print(f"  Class {label}: {count} ({count/len(test_labels)*100:.1f}%)")
    
    return train_matrix, test_matrix

In [9]:
from collections import defaultdict, Counter
import random
import numpy as np

def stratified_balanced_split(graphdata, train_ratio=0.7, random_seed=42):
    """
    Perform a balanced stratified split: equal number of sequences from each class
    in both training and testing, limited by the smallest class count.
    
    Args:
        graphdata: List of graph sequences
        train_ratio: Proportion of data to use for training
        random_seed: Seed for reproducibility
        
    Returns:
        train_matrix, test_matrix: Balanced stratified splits
    """
    random.seed(random_seed)
    np.random.seed(random_seed)
    
    # Group sequences by label
    label_to_sequences = defaultdict(list)
    for sequence in graphdata:
        if sequence and len(sequence) > 0:
            label = sequence[0]['label']
            label_to_sequences[label].append(sequence)

    # Display original class distribution
    print("Original class distribution:")
    for label, seqs in label_to_sequences.items():
        print(f"  Class {label}: {len(seqs)} sequences")

    # Determine how many sequences to use per class (based on minority class)
    min_class_count = min(len(seqs) for seqs in label_to_sequences.values())
    print(f"\nBalancing to {min_class_count} sequences per class...")

    train_matrix = []
    test_matrix = []

    for label, sequences in label_to_sequences.items():
        # Shuffle and trim to balanced size
        random.shuffle(sequences)
        balanced_sequences = sequences[:min_class_count]

        # Split into train/test
        n_train = int(train_ratio * min_class_count)
        if min_class_count > 1 and n_train == min_class_count:
            n_train = min_class_count - 1

        train_sequences = balanced_sequences[:n_train]
        test_sequences = balanced_sequences[n_train:]

        train_matrix.extend(train_sequences)
        test_matrix.extend(test_sequences)

        print(f"  Class {label}: {len(train_sequences)} train, {len(test_sequences)} test")

    # Shuffle final sets
    random.shuffle(train_matrix)
    random.shuffle(test_matrix)

    print(f"\nFinal split: {len(train_matrix)} train, {len(test_matrix)} test")

    # Check class distributions
    train_labels = [seq[0]['label'] for seq in train_matrix]
    test_labels = [seq[0]['label'] for seq in test_matrix]

    print(f"\nTrain set class distribution:")
    for label, count in sorted(Counter(train_labels).items()):
        print(f"  Class {label}: {count} ({count/len(train_labels)*100:.1f}%)")

    print(f"\nTest set class distribution:")
    for label, count in sorted(Counter(test_labels).items()):
        print(f"  Class {label}: {count} ({count/len(test_labels)*100:.1f}%)")

    return train_matrix, test_matrix


In [10]:
def create_stratified_datasets(graphdata, train_ratio=0.7, random_seed=42):
    """
    Create stratified train/test datasets with proper class balance.
    """
    train_matrix, test_matrix = stratified_balanced_split(graphdata, train_ratio, random_seed)
    
    train_dataset = FreeThrowDataset(train_matrix)
    test_dataset = FreeThrowDataset(test_matrix)
    
    train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)
    
    return train_dataset, test_dataset, train_loader, test_loader

In [11]:
class GCN_LSTM(nn.Module):
    def __init__(self, in_channels, hidden_channels, lstm_hidden, num_classes):
        super().__init__()
        self.gcn = GCNConv(in_channels, hidden_channels)
        self.lstm = nn.LSTM(hidden_channels, lstm_hidden, batch_first=True)
        self.classifier = nn.Linear(lstm_hidden, num_classes)

    def forward(self, sequence):
        gcn_outputs = []
        for data in sequence:
            x = self.gcn(data.x, data.edge_index)
            x = torch.relu(x)
            pooled = x.mean(dim=0)  # Global mean pooling
            gcn_outputs.append(pooled)

        gcn_outputs = torch.stack(gcn_outputs).unsqueeze(0)  # [1, T, F]
        lstm_out, _ = self.lstm(gcn_outputs)
        out = self.classifier(lstm_out[:, -1, :])  # Use last time step
        return out


In [12]:
class GCN_LSTM_SpatialAttention(nn.Module):
    def __init__(self, in_channels, hidden_channels, lstm_hidden, num_classes, num_heads=4):
        super().__init__()
        self.gat = GATConv(in_channels, hidden_channels, heads=num_heads, concat=False, dropout=0.1)
        self.lstm = nn.LSTM(hidden_channels, lstm_hidden, batch_first=True)
        self.classifier = nn.Linear(lstm_hidden, num_classes)
        self.dropout = nn.Dropout(0.1)

    def forward(self, sequence):
        gcn_outputs = []
        for data in sequence:
            x = self.gat(data.x, data.edge_index)
            x = torch.relu(x)
            x = self.dropout(x)
            pooled = x.mean(dim=0)  # Global mean pooling
            gcn_outputs.append(pooled)

        gcn_outputs = torch.stack(gcn_outputs).unsqueeze(0)  # [1, T, F]
        lstm_out, _ = self.lstm(gcn_outputs)
        out = self.classifier(lstm_out[:, -1, :])  # Use last time step
        return out

In [13]:
class TemporalAttention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.attention = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1)
        )
    
    def forward(self, lstm_output):
        # Compute attention weights for each time step
        attn_weights = self.attention(lstm_output)  # [batch_size, seq_len, 1]
        attn_weights = F.softmax(attn_weights, dim=1)  # Normalize over time
        
        # Weighted sum over time dimension
        attended_output = torch.sum(lstm_output * attn_weights, dim=1)  # [batch_size, hidden_dim]
        
        return attended_output, attn_weights.squeeze(-1)

In [14]:
class AttentionPooling(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.attention = nn.Sequential(
            nn.Linear(input_dim, input_dim // 2),
            nn.Tanh(),
            nn.Linear(input_dim // 2, 1)
        )
    
    def forward(self, x):
        # Compute attention weights for each node
        attn_weights = self.attention(x)  # [num_nodes, 1]
        attn_weights = F.softmax(attn_weights, dim=0)  # Normalize
        
        # Weighted sum of node features
        pooled = torch.sum(x * attn_weights, dim=0)  # [input_dim]
        return pooled

In [15]:
class GCN_LSTM_TemporalAttention(nn.Module):
    def __init__(self, in_channels, hidden_channels, lstm_hidden, num_classes):
        super().__init__()
        self.gcn = GCNConv(in_channels, hidden_channels)
        self.lstm = nn.LSTM(hidden_channels, lstm_hidden, batch_first=True)
        self.temporal_attention = TemporalAttention(lstm_hidden)
        self.classifier = nn.Linear(lstm_hidden, num_classes)
        self.dropout = nn.Dropout(0.1)

    def forward(self, sequence):
        gcn_outputs = []
        for data in sequence:
            x = self.gcn(data.x, data.edge_index)
            x = torch.relu(x)
            x = self.dropout(x)
            pooled = x.mean(dim=0)
            gcn_outputs.append(pooled)

        gcn_outputs = torch.stack(gcn_outputs).unsqueeze(0)  # [1, T, F]
        lstm_out, _ = self.lstm(gcn_outputs)
        
        # Apply temporal attention instead of using just last time step
        attended_output, attention_weights = self.temporal_attention(lstm_out)
        out = self.classifier(attended_output)
        
        return out


In [16]:
class GCN_LSTM_FullAttention(nn.Module):
    def __init__(self, in_channels, hidden_channels, lstm_hidden, num_classes, num_heads=4):
        super().__init__()
        # Spatial attention with GAT
        self.gat = GATConv(in_channels, hidden_channels, heads=num_heads, concat=False, dropout=0.1)
        
        # Attention pooling
        self.attention_pool = AttentionPooling(hidden_channels)
        
        # LSTM with temporal attention
        self.lstm = nn.LSTM(hidden_channels, lstm_hidden, batch_first=True)
        self.temporal_attention = TemporalAttention(lstm_hidden)
        
        # Classification
        self.classifier = nn.Linear(lstm_hidden, num_classes)
        self.dropout = nn.Dropout(0.1)
        
        # Layer normalization
        self.layer_norm1 = nn.LayerNorm(hidden_channels)
        self.layer_norm2 = nn.LayerNorm(lstm_hidden)

    def forward(self, sequence):
        gcn_outputs = []
        
        for data in sequence:
            # Spatial attention with GAT
            x = self.gat(data.x, data.edge_index)
            x = self.layer_norm1(x)
            x = F.relu(x)
            x = self.dropout(x)
            
            # Attention-based pooling
            pooled = self.attention_pool(x)
            gcn_outputs.append(pooled)

        gcn_outputs = torch.stack(gcn_outputs).unsqueeze(0)  # [1, T, F]
        
        # LSTM processing
        lstm_out, _ = self.lstm(gcn_outputs)
        lstm_out = self.layer_norm2(lstm_out)
        
        # Temporal attention
        attended_output, temporal_attention_weights = self.temporal_attention(lstm_out)
        
        # Final classification
        out = self.classifier(attended_output)
        
        return out

In [17]:
graphdata=load_graph_sequences_from_db()

✅ Loaded 109 videos from DB


In [18]:

train_dataset,test_dataset,train_loader,test_loader=create_stratified_datasets(graphdata,0.7,42)

Original class distribution:
  Class 1: 79 sequences
  Class 0: 30 sequences

Balancing to 30 sequences per class...
  Class 1: 21 train, 9 test
  Class 0: 21 train, 9 test

Final split: 42 train, 18 test

Train set class distribution:
  Class 0: 21 (50.0%)
  Class 1: 21 (50.0%)

Test set class distribution:
  Class 0: 9 (50.0%)
  Class 1: 9 (50.0%)


# TGNN

In [19]:
model = GCN_LSTM(in_channels=1, hidden_channels=32, lstm_hidden=16, num_classes=1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCEWithLogitsLoss()

In [20]:
for epoch in range(20):
    model.train()
    total_loss = 0
    for batch in train_loader:
        sequence = batch[0]  # batch size = 1
        target = sequence[0].y
        output = model(sequence)
        loss = loss_fn(output.view(-1), target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} - Loss: {total_loss:.4f}")


Epoch 1 - Loss: 30.5084
Epoch 2 - Loss: 29.4286
Epoch 3 - Loss: 29.2687
Epoch 4 - Loss: 29.2798
Epoch 5 - Loss: 29.5883
Epoch 6 - Loss: 29.2298
Epoch 7 - Loss: 29.3196
Epoch 8 - Loss: 29.2072
Epoch 9 - Loss: 29.8774
Epoch 10 - Loss: 29.1038
Epoch 11 - Loss: 29.1798
Epoch 12 - Loss: 29.2795
Epoch 13 - Loss: 29.1435
Epoch 14 - Loss: 29.1717
Epoch 15 - Loss: 29.1438
Epoch 16 - Loss: 29.1392
Epoch 17 - Loss: 29.1835
Epoch 18 - Loss: 29.0743
Epoch 19 - Loss: 29.1531
Epoch 20 - Loss: 29.1389


In [21]:
model.eval()
correct = 0
total = 0
all_predictions = []
all_targets = []

with torch.no_grad():
    for batch in test_loader:
        sequence = batch[0]
        target = int(sequence[0].y.item())
        output = model(sequence)
        prediction = (torch.sigmoid(output) > 0.5).int().item()
        
        correct += int(prediction == target)
        total += 1
        
        all_predictions.append(prediction)
        all_targets.append(target)

# Calculate metrics
accuracy_manual = correct / total
accuracy = accuracy_score(all_targets, all_predictions)
recall = recall_score(all_targets, all_predictions, average='binary')  # for binary classification
f1 = f1_score(all_targets, all_predictions, average='binary')  # for binary classification

# Print results
print(f"Test Accuracy (manual): {correct}/{total} = {accuracy_manual:.2%}")
print(f"Accuracy (sklearn): {accuracy:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Optional: Print detailed classification report
print('\nDetailed Classification Report:')
print(classification_report(all_targets, all_predictions))

Test Accuracy (manual): 9/18 = 50.00%
Accuracy (sklearn): 0.5000
Recall: 1.0000
F1 Score: 0.6667

Detailed Classification Report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00         9
           1       0.50      1.00      0.67         9

    accuracy                           0.50        18
   macro avg       0.25      0.50      0.33        18
weighted avg       0.25      0.50      0.33        18



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


# GNN Spatial Attention

In [22]:
model = GCN_LSTM_SpatialAttention(in_channels=1, hidden_channels=32, lstm_hidden=16, num_classes=1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCEWithLogitsLoss()



In [23]:
for epoch in range(20):
    model.train()
    total_loss = 0
    for batch in train_loader:
        sequence = batch[0]  # batch size = 1
        target = sequence[0].y
        output = model(sequence)
        loss = loss_fn(output.view(-1), target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} - Loss: {total_loss:.4f}")



Epoch 1 - Loss: 29.5730
Epoch 2 - Loss: 29.2203
Epoch 3 - Loss: 29.3043
Epoch 4 - Loss: 29.2643
Epoch 5 - Loss: 29.3031
Epoch 6 - Loss: 29.2665
Epoch 7 - Loss: 29.4636
Epoch 8 - Loss: 29.3315
Epoch 9 - Loss: 29.2497
Epoch 10 - Loss: 29.1001
Epoch 11 - Loss: 29.3126
Epoch 12 - Loss: 29.1332
Epoch 13 - Loss: 29.3020
Epoch 14 - Loss: 29.1122
Epoch 15 - Loss: 29.1615
Epoch 16 - Loss: 29.1709
Epoch 17 - Loss: 29.1272
Epoch 18 - Loss: 29.0364
Epoch 19 - Loss: 29.1871
Epoch 20 - Loss: 29.2570


In [24]:
model.eval()
correct = 0
total = 0
all_predictions = []
all_targets = []

with torch.no_grad():
    for batch in test_loader:
        sequence = batch[0]
        target = int(sequence[0].y.item())
        output = model(sequence)
        prediction = (torch.sigmoid(output) > 0.5).int().item()
        
        correct += int(prediction == target)
        total += 1
        
        all_predictions.append(prediction)
        all_targets.append(target)

# Calculate metrics
accuracy_manual = correct / total
accuracy = accuracy_score(all_targets, all_predictions)
recall = recall_score(all_targets, all_predictions, average='binary')  # for binary classification
f1 = f1_score(all_targets, all_predictions, average='binary')  # for binary classification

# Print results
print(f"Test Accuracy (manual): {correct}/{total} = {accuracy_manual:.2%}")
print(f"Accuracy (sklearn): {accuracy:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Optional: Print detailed classification report
print('\nDetailed Classification Report:')
print(classification_report(all_targets, all_predictions))

Test Accuracy (manual): 11/18 = 61.11%
Accuracy (sklearn): 0.6111
Recall: 0.7778
F1 Score: 0.6667

Detailed Classification Report:
              precision    recall  f1-score   support

           0       0.67      0.44      0.53         9
           1       0.58      0.78      0.67         9

    accuracy                           0.61        18
   macro avg       0.62      0.61      0.60        18
weighted avg       0.62      0.61      0.60        18



# GNN Temporal Attention

In [25]:
model = GCN_LSTM_TemporalAttention(in_channels=1, hidden_channels=32, lstm_hidden=16, num_classes=1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCEWithLogitsLoss()

In [26]:
for epoch in range(20):
    model.train()
    total_loss = 0
    for batch in train_loader:
        sequence = batch[0]  # batch size = 1
        target = sequence[0].y
        output = model(sequence)
        loss = loss_fn(output.view(-1), target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} - Loss: {total_loss:.4f}")



Epoch 1 - Loss: 29.8273
Epoch 2 - Loss: 29.4909
Epoch 3 - Loss: 29.3906
Epoch 4 - Loss: 29.2631
Epoch 5 - Loss: 29.1875
Epoch 6 - Loss: 29.3073
Epoch 7 - Loss: 29.2141
Epoch 8 - Loss: 29.1474
Epoch 9 - Loss: 29.1572
Epoch 10 - Loss: 29.1387
Epoch 11 - Loss: 29.1356
Epoch 12 - Loss: 29.1852
Epoch 13 - Loss: 29.1417
Epoch 14 - Loss: 29.1125
Epoch 15 - Loss: 29.1085
Epoch 16 - Loss: 29.0941
Epoch 17 - Loss: 29.1915
Epoch 18 - Loss: 29.0976
Epoch 19 - Loss: 29.0824
Epoch 20 - Loss: 29.0970


In [27]:
model.eval()
correct = 0
total = 0
all_predictions = []
all_targets = []

with torch.no_grad():
    for batch in test_loader:
        sequence = batch[0]
        target = int(sequence[0].y.item())
        output = model(sequence)
        prediction = (torch.sigmoid(output) > 0.5).int().item()
        
        correct += int(prediction == target)
        total += 1
        
        all_predictions.append(prediction)
        all_targets.append(target)

# Calculate metrics
accuracy_manual = correct / total
accuracy = accuracy_score(all_targets, all_predictions)
recall = recall_score(all_targets, all_predictions, average='binary')  # for binary classification
f1 = f1_score(all_targets, all_predictions, average='binary')  # for binary classification

# Print results
print(f"Test Accuracy (manual): {correct}/{total} = {accuracy_manual:.2%}")
print(f"Accuracy (sklearn): {accuracy:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Optional: Print detailed classification report
print('\nDetailed Classification Report:')
print(classification_report(all_targets, all_predictions))

Test Accuracy (manual): 9/18 = 50.00%
Accuracy (sklearn): 0.5000
Recall: 0.0000
F1 Score: 0.0000

Detailed Classification Report:
              precision    recall  f1-score   support

           0       0.50      1.00      0.67         9
           1       0.00      0.00      0.00         9

    accuracy                           0.50        18
   macro avg       0.25      0.50      0.33        18
weighted avg       0.25      0.50      0.33        18



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


# GNN Temporal + Spatial Attention

In [28]:
model = GCN_LSTM_FullAttention(in_channels=1, hidden_channels=32, lstm_hidden=16, num_classes=1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCEWithLogitsLoss()

In [29]:
for epoch in range(20):
    model.train()
    total_loss = 0
    for batch in train_loader:
        sequence = batch[0]  # batch size = 1
        target = sequence[0].y
        output = model(sequence)
        loss = loss_fn(output.view(-1), target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} - Loss: {total_loss:.4f}")



Epoch 1 - Loss: 31.1536
Epoch 2 - Loss: 29.6593
Epoch 3 - Loss: 29.4238
Epoch 4 - Loss: 29.6290
Epoch 5 - Loss: 29.0253
Epoch 6 - Loss: 28.9259
Epoch 7 - Loss: 29.2086
Epoch 8 - Loss: 28.9513
Epoch 9 - Loss: 28.1176
Epoch 10 - Loss: 28.4098
Epoch 11 - Loss: 28.3028
Epoch 12 - Loss: 28.1333
Epoch 13 - Loss: 27.9079
Epoch 14 - Loss: 27.2741
Epoch 15 - Loss: 26.6381
Epoch 16 - Loss: 26.9597
Epoch 17 - Loss: 28.5227
Epoch 18 - Loss: 25.8668
Epoch 19 - Loss: 26.7569
Epoch 20 - Loss: 26.2356


In [30]:
model.eval()
correct = 0
total = 0
all_predictions = []
all_targets = []

with torch.no_grad():
    for batch in test_loader:
        sequence = batch[0]
        target = int(sequence[0].y.item())
        output = model(sequence)
        prediction = (torch.sigmoid(output) > 0.5).int().item()
        
        correct += int(prediction == target)
        total += 1
        
        all_predictions.append(prediction)
        all_targets.append(target)

# Calculate metrics
accuracy_manual = correct / total
accuracy = accuracy_score(all_targets, all_predictions)
recall = recall_score(all_targets, all_predictions, average='binary')  # for binary classification
f1 = f1_score(all_targets, all_predictions, average='binary')  # for binary classification

# Print results
print(f"Test Accuracy (manual): {correct}/{total} = {accuracy_manual:.2%}")
print(f"Accuracy (sklearn): {accuracy:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Optional: Print detailed classification report
print('\nDetailed Classification Report:')
print(classification_report(all_targets, all_predictions))

Test Accuracy (manual): 8/18 = 44.44%
Accuracy (sklearn): 0.4444
Recall: 0.2222
F1 Score: 0.2857

Detailed Classification Report:
              precision    recall  f1-score   support

           0       0.46      0.67      0.55         9
           1       0.40      0.22      0.29         9

    accuracy                           0.44        18
   macro avg       0.43      0.44      0.42        18
weighted avg       0.43      0.44      0.42        18

