In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

In [2]:
import numpy as np

class Graph():
    def __init__(self, layout='openpose', strategy='spatial'):
        self.get_edge(layout)
        self.hop_dis = self.get_hop_distance(self.num_node, self.edge)
        self.get_adjacency(strategy)

    def get_edge(self, layout):
        if layout == 'openpose':
            self.num_node = 18
            self_link = [(i, i) for i in range(self.num_node)]
            neighbor_link = [(4, 3), (3, 2), (7, 6), (6, 5), (13, 12), (12, 11),
                             (10, 9), (9, 8), (11, 5), (8, 2), (5, 1), (2, 1),
                             (0, 1), (15, 0), (14, 0), (17, 15), (16, 14)]
            self.edge = self_link + neighbor_link
            self.center = 1
        else:
            raise ValueError("Do not support this layout.")

    def get_hop_distance(self, num_node, edge, max_hop=1):
        A = np.zeros((num_node, num_node))
        for i, j in edge:
            A[j, i] = 1
            A[i, j] = 1

        hop_dis = np.zeros((num_node, num_node)) + np.inf
        transfer_mat = [np.linalg.matrix_power(A, d) for d in range(max_hop + 1)]
        arrive_mat = (np.stack(transfer_mat) > 0)
        for d in range(max_hop, -1, -1):
            hop_dis[arrive_mat[d]] = d
        return hop_dis

    def get_adjacency(self, strategy):
        valid_hop = range(0, self.num_node - 1)
        adjacency = np.zeros((self.num_node, self.num_node))
        for hop in valid_hop:
            adjacency[self.hop_dis == hop] = 1
        normalize_adjacency = self.normalize_digraph(adjacency)

        if strategy == 'uniform':
            A = np.zeros((1, self.num_node, self.num_node))
            A[0] = normalize_adjacency
            self.A = A
        elif strategy == 'distance':
            A = np.zeros((len(valid_hop), self.num_node, self.num_node))
            for i, hop in enumerate(valid_hop):
                A[i][self.hop_dis == hop] = normalize_adjacency[self.hop_dis == hop]
            self.A = A
        elif strategy == 'spatial':
            A = []
            for hop in valid_hop:
                a_root = np.zeros((self.num_node, self.num_node))
                a_close = np.zeros((self.num_node, self.num_node))
                a_further = np.zeros((self.num_node, self.num_node))
                for i in range(self.num_node):
                    for j in range(self.num_node):
                        if self.hop_dis[j, i] == hop:
                            if self.hop_dis[j, self.center] == self.hop_dis[i, self.center]:
                                a_root[j, i] = normalize_adjacency[j, i]
                            elif self.hop_dis[j, self.center] > self.hop_dis[i, self.center]:
                                a_close[j, i] = normalize_adjacency[j, i]
                            else:
                                a_further[j, i] = normalize_adjacency[j, i]
                if hop == 0:
                    A.append(a_root)
                else:
                    A.append(a_root + a_close)
                    A.append(a_further)
            A = np.stack(A)
            self.A = A
        else:
            raise ValueError("Do not support this strategy.")

    def normalize_digraph(self, A):
        Dl = np.sum(A, 0)
        num_node = A.shape[0]
        Dn = np.zeros((num_node, num_node))
        for i in range(num_node):
            if Dl[i] > 0:
                Dn[i, i] = Dl[i]**(-1)
        AD = np.dot(A, Dn)
        return AD

In [3]:
def normalize_digraph(A):
    Dl = np.sum(A, 0)
    num_node = A.shape[0]
    Dn = np.zeros((num_node, num_node))
    for i in range(num_node):
        if Dl[i] > 0:
            Dn[i, i] = Dl[i]**(-1)
    AD = np.dot(A, Dn)
    return AD



In [4]:
class ConvTemporalGraphical(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, t_kernel_size=1, t_stride=1, t_padding=0, t_dilation=1, bias=True):
        super().__init__()
        self.kernel_size = kernel_size
        self.conv = nn.Conv2d(in_channels, out_channels * kernel_size, kernel_size=(t_kernel_size, 1), 
                              padding=(t_padding, 0), stride=(t_stride, 1), dilation=(t_dilation, 1), bias=bias)

    def forward(self, x, A):
        assert A.size(0) == self.kernel_size
        x = self.conv(x)
        n, kc, t, v = x.size()
        x = x.view(n, self.kernel_size, kc//self.kernel_size, t, v)
        x = torch.einsum('nkctv,kvw->nctw', (x, A))
        return x.contiguous(), A



In [5]:
class STGCNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, dropout=0, residual=True):
        super().__init__()
        assert len(kernel_size) == 2
        assert kernel_size[0] % 2 == 1
        padding = ((kernel_size[0] - 1) // 2, 0)
        
        self.gcn = ConvTemporalGraphical(in_channels, out_channels, kernel_size[1])
        self.tcn = nn.Sequential(
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, (kernel_size[0], 1), (stride, 1), padding),
            nn.BatchNorm2d(out_channels),
            nn.Dropout(dropout, inplace=True)
        )
        
        if not residual:
            self.residual = lambda x: 0
        elif (in_channels == out_channels) and (stride == 1):
            self.residual = lambda x: x
        else:
            self.residual = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=(stride, 1)),
                nn.BatchNorm2d(out_channels)
            )
        
        self.relu = nn.ReLU(inplace=True)
    
    def forward(self, x, A):
        res = self.residual(x)
        x, A = self.gcn(x, A)
        x = self.tcn(x) + res
        return self.relu(x), A

In [6]:
class STGCN(nn.Module):
    def __init__(self, in_channels, graph_args, edge_importance_weighting, **kwargs):
        super().__init__()
        self.graph = Graph(**graph_args)
        A = torch.tensor(self.graph.A, dtype=torch.float32, requires_grad=False)
        self.register_buffer('A', A)
        
        spatial_kernel_size = A.size(0)
        temporal_kernel_size = 9
        kernel_size = (temporal_kernel_size, spatial_kernel_size)
        
        self.data_bn = nn.BatchNorm1d(in_channels * A.size(1))
        
        self.st_gcn_networks = nn.ModuleList((
            STGCNBlock(in_channels, 64, kernel_size, stride=1, residual=False),
            STGCNBlock(64, 64, kernel_size, stride=1),
            STGCNBlock(64, 64, kernel_size, stride=1),
            STGCNBlock(64, 64, kernel_size, stride=1),
            STGCNBlock(64, 128, kernel_size, stride=2),
            STGCNBlock(128, 128, kernel_size, stride=1),
            STGCNBlock(128, 128, kernel_size, stride=1),
            STGCNBlock(128, 256, kernel_size, stride=2),
            STGCNBlock(256, 256, kernel_size, stride=1),
            STGCNBlock(256, 256, kernel_size, stride=1)
        ))
        
        if edge_importance_weighting:
            self.edge_importance = nn.ParameterList([
                nn.Parameter(torch.ones(self.A.size()))
                for i in self.st_gcn_networks
            ])
        else:
            self.edge_importance = [1] * len(self.st_gcn_networks)
    
    def forward(self, x):
        N, C, T, V = x.size()
        x = x.permute(0, 3, 1, 2).contiguous()
        x = x.view(N, V * C, T)
        x = self.data_bn(x)
        x = x.view(N, V, C, T)
        x = x.permute(0, 2, 3, 1).contiguous()
        x = x.view(N, C, T, V)
        
        for gcn, importance in zip(self.st_gcn_networks, self.edge_importance):
            x, _ = gcn(x, self.A * importance)
        
        x = F.avg_pool2d(x, x.size()[2:])
        x = x.view(N, -1)
        
        return x



In [7]:
class ParkinsonsDetectionModel(nn.Module):
    def __init__(self, in_channels, graph_args, edge_importance_weighting, **kwargs):
        super().__init__()
        self.st_gcn = STGCN(in_channels, graph_args, edge_importance_weighting, **kwargs)
        self.angle_embedding = nn.Embedding(3, 64)  # 3 angles, embedding size 64
        self.fc1 = nn.Linear(256 + 64, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)  # Binary classification
        self.dropout = nn.Dropout(0.5)

    def forward(self, x, angle):
        x = self.st_gcn(x)
        angle_embed = self.angle_embedding(angle).squeeze(1)
        combined = torch.cat([x, angle_embed], dim=1)
        x = F.relu(self.fc1(combined))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return torch.sigmoid(x)  # Sigmoid for binary classification



In [8]:
# Example usage
graph_args = {'layout': 'openpose', 'strategy': 'spatial'}
model = ParkinsonsDetectionModel(in_channels=2, graph_args=graph_args, edge_importance_weighting=True)

# Example input
batch_size = 2
num_channels = 2  # X and Y coordinates
num_frames = 300
num_joints = 18
x = torch.randn(batch_size, num_channels, num_frames, num_joints)
angle = torch.LongTensor([0, 1])  # Example angles for each sample in the batch

# Forward pass
output = model(x, angle)
print(output.shape)  # Should be [batch_size, 1]

torch.Size([2, 1])


In [9]:
# Load the CSV file
df = pd.read_csv("../output_data/output_2D.csv")

# Convert the vector column from string to numpy array
df['vector'] = df['vector'].apply(eval).apply(np.array)


NameError: name 'pd' is not defined

In [None]:
df

Unnamed: 0,client,angle,parkinson,vector
0,1,0,1,"[[38.89351272583008, 17.675010681152344], [43...."
1,1,0,1,"[[85.9338607788086, 29.34123420715332], [92.69..."
2,1,0,1,"[[50.33542251586914, 20.45751190185547], [55.2..."
3,1,0,1,"[[44.83964920043945, 21.37966537475586], [50.3..."
4,1,0,1,"[[64.98465728759766, 29.381858825683594], [70...."
...,...,...,...,...
546,1,90,1,"[[157.67861938476562, 44.677371978759766], [15..."
547,1,90,1,"[[104.82242584228516, 46.87606430053711], [109..."
548,1,90,1,"[[135.35787963867188, 49.76270294189453], [138..."
549,1,90,1,"[[133.68704223632812, 43.090797424316406], [13..."


In [None]:
#get the shortest vector length
min_len = min([len(v) for v in df['vector']])
min_len

17

In [None]:
# Pad the vectors to the shortest length
df['vector'] = df['vector'].apply(lambda x: x[:min_len])

In [None]:
import pandas as pd
import numpy as np
import torch

# Prepare the input data
X = np.stack(df['vector'].values)
X = X.transpose(0, 2, 1)  # Reshape to (N, C, T, V)
X = np.expand_dims(X, axis=1)  # Add feature dimension
X = torch.FloatTensor(X)

# Prepare the labels
y = torch.LongTensor(df['parkinson'].values)

print("Input shape:", X.shape)
print("Labels shape:", y.shape)

Input shape: torch.Size([551, 1, 2, 17])
Labels shape: torch.Size([551])


In [None]:
# Assuming you've already defined the Graph and STGCN classes

# Create the graph
graph_args = {'layout': 'openpose', 'strategy': 'spatial'}

# Create the model
model = ParkinsonsDetectionModel(in_channels=1, num_class=2, graph_args=graph_args, edge_importance_weighting=True)

# If you have a GPU available, move the model and data to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
X = X.to(device)
y = y.to(device)

# Forward pass
outputs = model(X)

print("Output shape:", outputs.shape)

# If you want to train the model:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop (example for one epoch)
model.train()
optimizer.zero_grad()
outputs = model(X)
loss = criterion(outputs, y)
loss.backward()
optimizer.step()

print("Loss:", loss.item())

# For inference:
model.eval()
with torch.no_grad():
    outputs = model(X)
    _, predicted = torch.max(outputs, 1)
    accuracy = (predicted == y).float().mean()
    print("Accuracy:", accuracy.item())

TypeError: forward() missing 1 required positional argument: 'angle'

In [None]:
import pandas as pd
import numpy as np
import torch
import ast


# Function to parse the vector string and add a third dimension
def parse_vector(vector_str):
    vector = ast.literal_eval(vector_str)
    return np.array([[x, y, 0] for x, y, _, _ in vector])

# Parse and stack the pose data
pose_data = np.stack(df['vector'].apply(parse_vector).values)

# Reshape to [N, C, T, V]
# Here we're treating each sample as a single frame (T=1)
N, V, C = pose_data.shape
pose_data = pose_data.transpose(0, 2, 1).reshape(N, C, 1, V)

# Convert to torch tensors
pose_data_tensor = torch.FloatTensor(pose_data)
angles_tensor = torch.LongTensor(df['angle'].values)
labels_tensor = torch.FloatTensor(df['parkinson'].values)

print("Pose data shape:", pose_data_tensor.shape)
print("Angles shape:", angles_tensor.shape)
print("Labels shape:", labels_tensor.shape)

ValueError: malformed node or string: array([[ 38.89351273,  17.67501068],
       [ 43.85916138,  13.46286392],
       [ 35.01784515,  13.28335571],
       [ 51.71231079,  13.56864738],
       [ 29.73617935,  13.27781391],
       [ 62.44268417,  34.74855423],
       [ 19.77564049,  35.57767487],
       [ 76.11538696,  63.75419235],
       [  5.23509645,  66.12895966],
       [ 76.39914703,  95.90537262],
       [  4.93015718, 101.26876831],
       [ 54.76709366, 102.197052  ],
       [ 27.94917488, 102.59548187],
       [ 55.66150284, 156.74565125],
       [ 27.86084938, 157.37513733],
       [ 55.00213623, 207.13873291],
       [ 31.33620071, 208.5050354 ]])