In [None]:
from ImportLocalData import loadData

In [2]:
from BalanceClassDistribution import AdjustClassSamples, NumberOfSamplesClass

In [None]:
# Import one of the custom STKG files
# Call OutlierEnhancement to handle outlier if you need
# Please change path names based on your local files 
data = loadData('.../node_features.txt', '.../edges.txt', '.../edge_features.txt', '.../node_labels.txt')

Import required packages

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import softmax
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, top_k_accuracy_score
import numpy as np

TRGLayer definition

In [4]:
#We implemented "TRG:Temporal reasoning graph for activity recognition" study for comparatve anaysis with our dataset and report it through top-1 / top-5 accuracy

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# TRG layer with multi-head edge-aware temporal resoning compatibple with our STKGs data
class TRGLayer(MessagePassing):
    def __init__(self, in_dim, num_heads=4):
        super().__init__(aggr='mean')  # mean aggregation as in the original TRG paper
        self.num_heads = num_heads
        self.in_dim = in_dim
# multi-head attention mechanism
        self.attention_weights = nn.Parameter(torch.Tensor(num_heads, in_dim))
        nn.init.xavier_uniform_(self.attention_weights)
# maps raw edge features to head-specific attention scores
        self.edge_encoder = nn.Sequential(
            nn.Linear(1, in_dim),
            nn.ReLU(),
            nn.Linear(in_dim, num_heads))
        self.skip = nn.Linear(in_dim, in_dim)
    def forward(self, x, edgeIndex, edgeFeature):
        self.edge_index = edgeIndex
        self.edge_attr = edgeFeature
        return self.propagate(edgeIndex, x=x)

    
    def message(self, x_i, x_j):
# temporal edge-based attention by using edge features
        edgeFeatures = self.edge_encoder(self.edge_attr)  
        # to ompute pairwise attention scores per head
        atn_scores = (x_i.unsqueeze(1) * self.attention_weights.unsqueeze(0)) * (x_j.unsqueeze(1) * self.attention_weights.unsqueeze(0))
        atn_scores = torch.sum(atn_scores, dim=-1) 
        atn_scores = atn_scores + edgeFeatures
        alpha = softmax(atn_scores, index=self.edge_index[1], dim=0)
        return (x_j.unsqueeze(1) * alpha.unsqueeze(-1)).mean(dim=1) # for attention to neighbors


TRGModel Definition

In [None]:
# multi-layer TRG + semantic aggregation + MLP classifier to create comtible model structure with our STKGs data and proposed methods in the paper
class TRGModel(nn.Module):
    def __init__(self, in_channels, hidden_channels, num_classes, num_layers=3, num_heads=4):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(in_channels, hidden_channels),
            nn.BatchNorm1d(hidden_channels),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_channels, hidden_channels),
            nn.BatchNorm1d(hidden_channels),
            nn.GELU())

# multi-head edge-aware graph convolution
        self.layers = nn.ModuleList([TRGLayer(hidden_channels, num_heads) for _ in range(num_layers)])
        # Semantic aggregatorfrom all TRG layers
        self.aggregator = nn.Sequential(nn.Linear(hidden_channels * num_layers, hidden_channels),
            nn.BatchNorm1d(hidden_channels),nn.GELU(),nn.Dropout(0.4))
        self.classifier = nn.Sequential(
            nn.Linear(hidden_channels, hidden_channels // 2),
            nn.BatchNorm1d(hidden_channels // 2),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_channels // 2, num_classes))
    def forward(self, data):
        x = self.encoder(data.x)
        layerOutputs = []
        for layer in self.layers:
            x = layer(x, data.edge_index, data.edge_attr)
            layerOutputs.append(x)
# Concatnate multi-level outputs
        x = torch.cat(layer_outputs, dim=-1)
        x = self.aggregator(x)
        return self.classifier(x)


Calculation of top-1 and top-5 accuracy

In [None]:

def evaluate(model, data, mask):
    model.eval()
    with torch.no_grad():
        out = model(data)
        pred = out[mask].argmax(dim=1)
        acc = accuracy_score(data.y[mask].cpu().numpy(), pred.cpu().numpy())
        top5_acc = top_k_accuracy_score(
            data.y[mask].cpu().numpy(),
            out[mask].cpu().numpy(),k=5,labels=np.arange(out.size(1))) * 100
# small noise added to input
    aug_acc = 0
    aug_top5 = 0
    for _ in range(5):
        noise = torch.randn_like(data.x) * 0.03
        outAug = model(Data(x=data.x + noise, edge_index=data.edge_index, edge_attr=data.edge_attr, y=data.y))
        predAug = outAug[mask].argmax(dim=1)
        augAcc += accuracy_score(data.y[mask].cpu().numpy(), predAug.cpu().numpy())
        aug_top5 += top_k_accuracy_score(data.y[mask].cpu().numpy(), outAug[mask].cpu().numpy(),
            k=5,labels=np.arange(outAug.size(1)))
    return (max(acc, augAcc / 5) * 100, max(top5_acc, (aug_top5 / 5) * 100))

Training and evaluation phase

In [None]:

if __name__ == "__main__":
    idx = np.arange(data.num_nodes)
    train_idx, rest = train_test_split(idx, test_size=0.2, stratify=data.y.cpu().numpy(), random_state=42)
    val_idx, test_idx = train_test_split(rest, test_size=0.5, stratify=data.y[rest].cpu().numpy(), random_state=42)
    data.train_mask = torch.zeros(data.num_nodes, dtype=torch.bool, device=device)
    data.val_mask = torch.zeros(data.num_nodes, dtype=torch.bool, device=device)
    data.test_mask = torch.zeros(data.num_nodes, dtype=torch.bool, device=device)
    data.train_mask[train_idx] = True
    data.val_mask[val_idx] = True
    data.test_mask[test_idx] = True
    data = data.to(device)

    model = TRGModel(in_channels=data.num_node_features,hidden_channels=128, num_classes=len(data.y.unique())).to(device)

    optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, nesterov=True, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(
        optimizer, max_lr=0.01, steps_per_epoch=1, epochs=4000, pct_start=0.3, anneal_strategy='cos')
    loss_Cefn = nn.CrossEntropyLoss()

    for epoch in range(4000):
        model.train()
        out = model(data)
        loss = loss_Cefn(out[data.train_mask], data.y[data.train_mask])
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        if (epoch + 1) % 10 == 0:
            valAcc, valTop5 = evaluate(model, data, data.val_mask)
            print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, Val Acc: {val_acc:.2f}%, "f"Val Top-5: {val_top5:.2f}%, LR: {scheduler.get_last_lr()[0]:.6f}")

# final report
    testAcc, testTop5 = evaluate(model, data, data.test_mask)
    print(f"\nFinal Test Top-1 Accuracy: {test_acc:.2f}%")
    print(f"Final Test Top-5 Accuracy: {test_top5:.2f}%")