# Initialization

In [1]:
import pandas as pd
import warnings
with warnings.catch_warnings():
    warnings.simplefilter('ignore')
import time
from tqdm import tqdm
import pandas as pd
import numpy as np
import pathlib
import os
import pickle
import pydicom
from matplotlib import pyplot as plt
import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score
import seaborn as sns
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.impute import KNNImputer

In [2]:
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torchvision import transforms

# setting device on GPU if available, else CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

#Additional Info when using cuda
if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))
    print('Memory Usage:')
    print('Allocated:', round(torch.cuda.memory_allocated(0)/1024**3,1), 'GB')
    print('Cached:   ', round(torch.cuda.memory_reserved(0)/1024**3,1), 'GB')

Using device: cuda
NVIDIA GeForce RTX 4090
Memory Usage:
Allocated: 0.0 GB
Cached:    0.0 GB


In [3]:
from Transformer.attention import attention
from Transformer.DecoderLayer import Decoder_Layer
from Transformer.multiHeadAttention import MultiHeadAttention
from Transformer.utils import positional_encoding
import torchvision.transforms as transforms

In [10]:
from getParameters import get_gait_parameters_insole
from getParameters import gait_aligned_jnt
from pre import butter_lowpass_filter
from DataManager import DataManager
import scipy.io as sio
from scipy.signal import resample

# Load Data

In [11]:
#%%
DM = DataManager()
available_ID_list = DM.ID_list #Get all available list
available_ID_list = ['GAIT102424-01', 'GAIT102424-02'] # or type in manually

valid_files = DM.get_valid_data_file_dict(available_ID_list)

# Load Feature Names
names = ['foot_trace_r', 'cop_x_r', 'cop_y_r', 'cont_area_r', 'pp_r', 'pp_x_r', 'pp_y_r',
             'foot_trace_l', 'cop_x_l', 'cop_y_l', 'cont_area_l', 'pp_l', 'pp_x_l', 'pp_y_l'] # matrix name

# Create feature list to store calculation results
FM_all = []
ankle_all = []

In [12]:
for file in valid_files:
    FM_all = []
    ankle_all = []
    data = sio.loadmat(file['path'])
    insoleAll_l = data['insoleAll_l'].astype(np.float64)
    insoleAll_r = data['insoleAll_r'].astype(np.float64)
    t_insole_l = data['t_insole_l'].astype(np.float64)
    t_insole_r = data['t_insole_r'].astype(np.float64)

    t_trackers = data['t_trackers'].astype(np.float64)
    jnt_angles_all_l = np.array(data['jnt_angles_all_l'])
    jnt_angles_all_r = np.array(data['jnt_angles_all_r'])
    jnt_pos_all_l = np.array(data['jnt_pos_all_l'])
    jnt_pos_all_r = np.array(data['jnt_pos_all_r'])

    # feature matrix
    g = get_gait_parameters_insole(insoleAll_r, insoleAll_l, t_insole_r, t_insole_l)
    FM_r = [g['foot_trace_r'],g['cop_x_r'],g['cop_y_r'],g['cont_area_r'],g['pp_r'],g['pp_x_r'],g['pp_y_r']]
    FM_l = [g['foot_trace_l'],g['cop_x_l'],g['cop_y_l'],g['cont_area_l'],g['pp_l'],g['pp_x_l'],g['pp_y_l']]
    FM_r, FM_l = np.column_stack(FM_r), np.column_stack(FM_l)

    # resample
    fm = []
    step_num = len(g['strike_l'])
    for step in range(10,step_num - 10):
        start_i, end_i = g['strike_l'][step], g['strike_l'][step+1]
        fm_l=resample(FM_l[start_i:end_i,:], 100, axis=0)
        fm_r=resample(FM_r[start_i:end_i, :], 100, axis=0)
        fm.append(np.hstack((fm_l, fm_r)))
    FM = np.vstack(fm)

    joint = gait_aligned_jnt(g, jnt_angles_all_l, jnt_angles_all_r, jnt_pos_all_l, jnt_pos_all_r, t_trackers)
    angle = joint['resampled_angles_l']['ankle'][100*9:109*100,:]

    for i in range(FM.shape[0]):
        FM_all.append(FM[i, :])
        ankle_all.append(angle[i, :])

    feature_matrix = np.array(FM_all)
    ankle_matrix = np.array(ankle_all)
    
    trail_num = file['meta_data']['Trial'][0]
    feature_path = file['path'].with_name(file['path'].stem + '_feature.npy')
    ankle_angle_path = file['path'].with_name(file['path'].stem + '_ankle_angle.npy')
    
    # np.save(feature_path, feature_matrix)
    # np.save(ankle_angle_path, ankle_matrix)

    break

# VisTransformer

In [45]:
class VisualTransformer(nn.Module):
    
    """
    Parameters
    ----------
    image_dim:
        Number of image dimension of tuple (w*h)
    patch_num:
        Number of patches used in each row and column
    d_base:
        Number of baseline / time-independent covariates
    d_model:
        Dimension of the input vector (post embedding)
    nhead:
        Number of heads
    num_decoder_layers:
        Number of decoder layers to stack
    dropout:
        The dropout value
    """

    def __init__(self,
                 image_dim,
                 n_patch,
                 d_output,
                 d_model = 32,
                 nhead = 4,
                 num_decoder_layers = 4,
                 dropout = 0.2):
        super().__init__()

        self.patch_width = int(image_dim[0]/n_patch)

        self.patch_height = int(image_dim[1]/n_patch)

        self.n_patch = n_patch
        
        self.decoder_layers = nn.ModuleList([Decoder_Layer(d_model,nhead,dropout)
                                             for _ in range(num_decoder_layers)])
        
        self.RELU = nn.ReLU()

        self.patch_embedding = nn.Linear(self.patch_width*self.patch_height, d_model)

        self.pred = nn.Linear(n_patch*d_model, d_output)


    def forward(self, patches):
        """
        patches: size of batch*num_patches*patch_width*patch_height
        """

        flatten_patches = patches.reshape(patches.shape[0], patches.shape[1], self.patch_width*self.patch_height)

        # Patch Embedding to model demension
        x = self.patch_embedding(flatten_patches)

        # Positional Embedding using index of patches
        x = x + positional_encoding(x.shape[0], x.shape[1], x.shape[2])

        # Identical Mask (No information is masked)
        mask = torch.ones((x.shape[0],x.shape[1],x.shape[1])).to(device)
        
        # Decoder Layers
        # Decoder Layer with prediction time embedding
        for layer in self.decoder_layers:
            x = layer(x, x, mask) 
        
        # Flatten the attention matrix
        x = x.view(x.size(0), -1)
        
        # Output layer
        x = self.pred(x)

        return x

# Example

In [46]:
insole_input = torch.randn(10,4,16,4).to(device) # batch * n_patch * patch_dim_height * patch_dim_width
insole_output = torch.randn(10,1).to(device) # batch * joint_angles

In [48]:
model = VisualTransformer(n_patch = 4, d_model=32, image_dim = (64,16), d_output = 1).to(device)
model(insole_input)

tensor([[-0.9346],
        [-0.3099],
        [-0.6195],
        [ 0.3713],
        [-0.5137],
        [-0.4942],
        [-0.1884],
        [-0.5465],
        [-0.1173],
        [-0.8626]], device='cuda:0', grad_fn=<AddmmBackward0>)