In [None]:
# Run in desktop

In [None]:
import os
import json
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import math
import sys
import cv2
import tqdm
import seaborn as sns
from scipy import signal
from jupyterthemes import jtplot

jtplot.style(theme='monokai', context='notebook', ticks=True, grid=False)
%matplotlib inline

In [None]:
# Define the path to the dataset
DATASET_PATH = r'C:\Users\User\Desktop\FYP2\Alphapose_results'

# Dictionary that maps body keypoints to their respective indices 
# refer to -> https://github.com/Fang-Haoshu/Halpe-FullBody
keypoints_code = {"Nose" : 0, "LShoulder" : 5,  "RShoulder" : 6, "LElbow" : 7, "RElbow" : 8, "LWrist" : 9, 
                  "RWrist" : 10,"LHip" : 11, "RHip" : 12, "LKnee" : 13, "RKnee" : 14,"LAnkle" : 15,
                  "RAnkle" : 16, "Head" : 17, "Neck" : 18, "Hip" : 19, "LBigToe" : 20, "RBigToe" : 21, "LSmallToe" : 22,
                  "RSmallToe" : 23, "LHeel" : 24, "RHeel" : 25}

# Convert dict key to list
keypoints_list = list(keypoints_code.keys())

## Reading data

In [None]:
def read_data(*CLASSES_NAME):
    all_x, all_y, videos_path, frames= [], [], [], []
    for CLASS_NAME in CLASSES_NAME:
        print('Reading data of class %s...' % CLASS_NAME)
        # Get into class folder
        files = os.listdir(os.path.join(DATASET_PATH, CLASS_NAME))
        with tqdm.tqdm(total=len(files), ascii=' >=', file=sys.stdout) as pbar:
            for file in files:
                json_path = os.path.join(DATASET_PATH, CLASS_NAME, file, 'alphapose-results.json')
                video_path = os.path.join(DATASET_PATH, CLASS_NAME, file, f'AlphaPose_{file}')
                f = open(json_path)
                all_data = json.load(f)
                f.close()
                x, y, frame = [], [], []
                previous_frame = '' # Variable that stores the image ID of the previous frame
                previous_score = 0 # Variable that stores the score of the previous frame
                for data in all_data:
                    # Extract the image ID and score for the current frame
                    current_frame = data['image_id']
                    current_score = data['score']
                    
                    # Check if the current frame is the same as the previous frame
                    if current_frame == previous_frame:
                        # If the current score is higher than the previous score,
                        # remove the last data
                        if current_score > previous_score:
                            x.pop()
                            y.pop()
                        else:
                            # If the current score is not higher, skip processing
                            continue
                    else:
                        frame.append(current_frame.split('.')[0])
                    # Set current frame as previous frame
                    previous_frame = current_frame
                    previous_score = current_score
                    keypoints = data['keypoints']
                    filtered_x, filtered_y = [], []
                    # Read only the keypoints we want
                    for keypoint_code in keypoints_code.values():
                        filtered_x.append(keypoints[keypoint_code*3])
                        filtered_y.append(keypoints[keypoint_code*3+1])
                    x.append(filtered_x)
                    y.append(filtered_y)
                all_x.append(x)
                all_y.append(y)
                frames.append(frame)
                videos_path.append(video_path)
                pbar.update(1)
    return all_x, all_y, videos_path, frames

In [None]:
all_x_pd, all_y_pd, videos_path_pd, detected_frames_pd = read_data('SEVERE', 'MODERATE', 'MILD')
all_x_normal, all_y_normal, videos_path_normal, detected_frames_normal = read_data('NORMAL')

Reading data of class SEVERE...
Reading data of class MODERATE...
Reading data of class MILD...
Reading data of class NORMAL...


## Gait features extraction

### Stride segmentation

In [None]:
## All extraction functions are writen in batch mode

# low pass filter to reduce frequency of points
def butter_lowpass(cutoff, fs, order=5):
    return signal.butter(order, cutoff, fs=fs, btype='lowpass', analog=False)

def butter_lowpass_filter(data, cutoff, fs, order=5):
    b, a = butter_lowpass(cutoff, fs, order=order)
    y = signal.filtfilt(b, a, data)
    return y
    

In [None]:
def stride_segmentation(all_y, videos_path=None, graph=False):
    total_person = len(all_y)
    all_peaks = []
    # For display graph purpose
    if graph:
        cols = 2
        rows = int(math.ceil((total_person/4)/cols))
        fig = plt.figure(layout='constrained', figsize=[6.4*cols*2, 4.8*rows+4])
        subfigs = fig.subfigures(rows, cols, hspace=0.07, wspace=0.07)
        j=0
    for i in range(total_person):
        # Get the y-coordinate of heel and apply low pass
        y1 = [frame[keypoints_list.index('RHeel')] for frame in all_y[i]]
        y1 = butter_lowpass_filter(y1, 5, 24)
        y2 = [frame[keypoints_list.index('LHeel')] for frame in all_y[i]]
        y2 = butter_lowpass_filter(y2, 5, 24)
        
        # Get local minimal
        peaks_right, _ = signal.find_peaks([p for p in y1], distance = 24)
        peaks_left, _ = signal.find_peaks([p for p in y2], distance = 24)
        all_peaks.append([peaks_right,peaks_left])
        
        # For display graph
        if graph:
            if i % 4 == 0:
                subfigs[int(j/2),j%2].suptitle('_'.join(videos_path[i].split('\\')[-1].split('.')[0].split('_')[1:]) + f'\ni = {i}', fontsize = 20)
                axs = subfigs[int(j/2),j%2].subplots(1, 2)
                x1 = list(range(0,len(y1)))
                axs[0].plot(x1, y1)
                axs[0].scatter([x1[peak] for peak in peaks_right], [y1[peak] for peak in peaks_right], 
                               marker = 'X', color='red', label='Stride (n): %d' % int(peaks_right.shape[0]-1))
                axs[0].invert_yaxis()
                axs[0].set_title('Right Stride')
                axs[0].legend()

                x2 = list(range(0,len(y2)))
                axs[1].plot(x2, y2, color='green')
                axs[1].scatter([x2[peak] for peak in peaks_left], [y2[peak] for peak in peaks_left], 
                               marker = 'X', color='red', label='Stride (n): %d' % int(peaks_left.shape[0]-1))
                axs[1].invert_yaxis()
                axs[1].set_title('Left Stride')
                axs[1].legend()
                j += 1
    if graph:
        plt.show()        
    return all_peaks

In [None]:
all_peaks_pd = stride_segmentation(all_y_pd, videos_path=videos_path_pd)
all_peaks_normal = stride_segmentation(all_y_normal, videos_path=videos_path_normal)

### Stride ratio

In [None]:
def mean(lst):
    return sum(lst)/len(lst)

def get_x_ratio(all_x, all_y, peaks):
    # Get x-coordinate of detected strides
    xR = [[all_x[i][peak][keypoints_list.index('RHeel')] for peak in peaks[i][0]] for i in range(len(all_x))]
    xL = [[all_x[i][peak][keypoints_list.index('LHeel')] for peak in peaks[i][1]] for i in range(len(all_x))]
    
    # Get x-coordinate of nose and hip
    yN = [[frame[keypoints_list.index('Nose')] for frame in person] for person in all_y]
    yH = [[frame[keypoints_list.index('Hip')] for frame in person] for person in all_y]
    
    # Calculate the distance in pixels of x-coord of detected strides
    diffsR = [[abs(j-i) for i, j in zip(x[:-1], x[1:])] for x in xR]
    diffsL = [[abs(j-i) for i, j in zip(x[:-1], x[1:])] for x in xL]
    
    # Calculate the mean of nose to hip distance
    nth = [mean([abs(y1-y2) for y1,y2 in zip(yN[i], yH[i])]) for i in range(len(all_y))]
    
     # Calculate the mena of ratio between the distance of detected strides and the nose to hip distance
    ratioR = [mean(diff)/nth_dist for diff, nth_dist in zip(diffsR, nth)]
    ratioL = [mean(diff)/nth_dist for diff, nth_dist in zip(diffsR, nth)]
    return ratioR, ratioL
    
ratioR_pd, ratioL_pd = get_x_ratio(all_x_pd, all_y_pd, all_peaks_pd)
ratioR_normal, ratioL_normal = get_x_ratio(all_x_normal, all_y_normal, all_peaks_normal)

In [None]:
def get_y_ratio(x, y, peaks):
    # Apply same method with get_x_ratio
    # Only difference is to get y-coord of Heel
    yR = [y[peak][keypoints_list.index('RHeel')] for peak in peaks[0]]
    yL = [y[peak][keypoints_list.index('LHeel')] for peak in peaks[1]]
    yN = [frame[keypoints_list.index('Nose')] for frame in y]
    yH = [frame[keypoints_list.index('Hip')] for frame in y]
    diffsR = [abs(j-i) for i, j in zip(yR[:-1], yR[1:])]
    diffsL = [abs(j-i) for i, j in zip(yL[:-1], yL[1:])]
    nth = mean([abs(y1-y2) for y1,y2 in zip(yN, yH)])
    ratioR = mean(diffsR)/nth
    ratioL = mean(diffsL)/nth
    return ratioR, ratioL

In [None]:
# If the right stride ratio of a normal person is less than the
# mean of the right stride ratios of PD patients
# try apply get_y_ratio
pd_mean = mean(ratioR_pd)
for i,ratio in enumerate(ratioR_normal):
    if ratio < pd_mean:
        ratioR, ratioL = get_y_ratio(all_x_normal[i], all_y_normal[i], all_peaks_normal[i])
        # Only replace the original ratio if result of get_y_ratio > original ratio
        if ratioR > ratio:
            ratioR_normal[i] = ratioR
            ratioL_normal[i] = ratioL

### Cadence

In [None]:
def get_cadence(all_peaks):
    all_cadence = []
    for peaks in all_peaks:
        # numbers of strides - 1 equal to steps since two consecutive stride form a step
        step_numbers = len(peaks[0])+len(peaks[1])-2
        # time = frame number of last stride - frame number of first stride
        time = ((max(peaks[0][-1], peaks[1][-1]) - min(peaks[0][0], peaks[1][0]))/24)/60
        cadence = round(step_numbers/time)
        all_cadence.append(cadence)
    return all_cadence
all_cadence_pd = get_cadence(all_peaks_pd)
all_cadence_normal = get_cadence(all_peaks_normal)

### Mean speed

In [None]:
def get_mean_speed(all_peaks, ratioR, ratioL):
    all_mean_speed = []
    for i, peaks in enumerate(all_peaks):
        # Get walking time in second
        time = ((max(peaks[0][-1], peaks[1][-1]) - min(peaks[0][0], peaks[1][0]))/24)
        mean_ratio = (ratioR[i] + ratioL[i])/2
        mean_speed = mean_ratio/time
        all_mean_speed.append(mean_speed)
    return all_mean_speed
mean_speed_pd = get_mean_speed(all_peaks_pd, ratioR_pd, ratioL_pd)
mean_speed_normal = get_mean_speed(all_peaks_normal, ratioR_normal, ratioL_normal)

### Turning features

In [None]:
def show_frame(cap):
    # Capture frames in the video
    success, frame = cap.read()
    if success:
        # describe the type of font
        # to be used.
        font = cv2.FONT_HERSHEY_SIMPLEX
        cv2.putText(frame, str(int(cap.get(cv2.CAP_PROP_POS_FRAMES))) + ' / ' 
                    + str(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))), (0,20), 
                    font, 0.6, (255,255,255), 2, cv2.LINE_AA)

        # Display the resulting frame
        cv2.imshow('Turning determination', frame)
        return True
    else:
        return False

# A small program to help to define the frame number of turning period
# press 'r' to record the current frame number
def video_determine_turning(video_path):
    
    cap = cv2.VideoCapture(video_path)
    fps = 24
    delay = int((1 / int(fps)) * 1000)
    pause = False
    turning_frame = []
    
    while True:

        if not pause:
            success = show_frame(cap)
            if not success:
                break
            key = cv2.waitKey(delay)
        else:
            key = cv2.waitKey(-1)
        
        # next video
        if key == ord('q'):
            break
        elif key == ord('r'):
            turning_frame.append(cap.get(cv2.CAP_PROP_POS_FRAMES)-1)
            print(turning_frame)
            if len(turning_frame) >= 2:
                break
        # show next frame
        elif key == ord('x'):
            show_frame(cap)
        # show previous frame
        elif key == ord('z'):
            f = 2 if pause else 3
            cap.set(cv2.CAP_PROP_POS_FRAMES, cap.get(cv2.CAP_PROP_POS_FRAMES) - f)
            show_frame(cap)
        # show previous 10 frames
        elif key == ord('c'):
            cap.set(cv2.CAP_PROP_POS_FRAMES, cap.get(cv2.CAP_PROP_POS_FRAMES) - 10)
            show_frame(cap)
        # show next 10 frames
        elif key == ord('v'):
            cap.set(cv2.CAP_PROP_POS_FRAMES, cap.get(cv2.CAP_PROP_POS_FRAMES) + 10)
            show_frame(cap)
        # pause video
        elif key == ord(' '):
            pause = not pause
        # quit program
        elif key == ord('w'):
            cap.release()
            cv2.destroyAllWindows()
            raise Exception
    
    cap.release()
    cv2.destroyAllWindows()
    return turning_frame

In [None]:
def extract_video_name(video_path):
    return '_'.join(video_path.split('\\')[-1].split('.')[0].split('_')[1:])

def define_turning(videos_path, csv_file_path):
    # read csv file that stored frame numbers of turning period
    if os.path.exists(csv_file_path):
        df = pd.read_csv(csv_file_path)
        # Only define turning for the video that not in csv file
        videos_path = [video_path for video_path in videos_path if extract_video_name(video_path) not in df['video_name'].values]
    else:
        df = pd.DataFrame(columns=['video_name', 'start', 'end'])
    
    for video_path in videos_path:
        try:
            turning_frame = video_determine_turning(video_path)
            turning_frame.insert(0, extract_video_name(video_path))
            # print selected frame number
            if len(turning_frame) > 1:
                print(turning_frame)
                print('')
            else:
                # If there is no turning period, insert nan value after video name
                turning_frame = turning_frame + [None, None]
            df.loc[len(df.index)] = turning_frame  
        except:
            break
    # Save csv file
    df.to_csv(csv_file_path, index=False)

In [None]:
csv_file_pd = 'turning_frames_pd.csv'
csv_file_normal = 'turning_frames_normal.csv'

In [None]:
define_turning(videos_path_pd, csv_file_pd)
define_turning(videos_path_normal, csv_file_normal)

In [None]:
# Read frame numbers of turning period from csv file
def read_turning(videos_path, csv_file_path):
    if os.path.exists(csv_file_path):
        df = pd.read_csv(csv_file_path)
        def replace_name(name):
            # Helper function to replace video name to the correspond index in keypoints data
            for i, video_path in enumerate(videos_path):
                if name in video_path:
                    return i
        df['video_name'] = df['video_name'].apply(lambda x: replace_name(x))
        print('Total number of videos with turning: ', (df.notnull().sum(axis=1) == len(df.columns)).sum())
        return df
    else:
        raise Exception('Cannot find the csv file.')

In [None]:
df_turning_pd = read_turning(videos_path_pd, csv_file_pd)
df_turning_normal = read_turning(videos_path_normal, csv_file_normal)

Total number of videos with turning:  82
Total number of videos with turning:  11


In [None]:
def compute_turning_duration(df):
    # Calculate duration in second for turning
    df['duration'] = (df['end'] - df['start'])/24
    return df['duration']
df_turning_duration_pd = compute_turning_duration(df_turning_pd)
df_turning_duration_normal = compute_turning_duration(df_turning_normal)

In [None]:
# Combine two dataframe for turning duration
df_turning_duration = df_turning_duration_pd.append(df_turning_duration_normal, ignore_index=True)
df_turning_duration

0           NaN
1           NaN
2           NaN
3      9.291667
4           NaN
         ...   
287         NaN
288         NaN
289         NaN
290         NaN
291         NaN
Name: duration, Length: 292, dtype: float64

### Turning steps

In [None]:
def turning_steps_segmentation(df, all_y):
    df['step_numbers'] = np.nan
    for index, row in df.iterrows():
        i = row['video_name']
        start = row['start']
        end = row['end']
        if not math.isnan(start):
            # use stride_segmentation function to segment strides in turning period
            peaks = stride_segmentation([all_y[int(i)][int(start):int(end)]])
            step_numbers = len(peaks[0][0])+len(peaks[0][1])
            df.loc[index, 'step_numbers'] = step_numbers
    return df['step_numbers']

df_step_numbers_pd = turning_steps_segmentation(df_turning_pd, all_y_pd)
df_step_numbers_normal = turning_steps_segmentation(df_turning_normal, all_y_normal)

In [None]:
df_step_numbers = df_step_numbers_pd.append(df_step_numbers_normal, ignore_index=True)
df_step_numbers

0       NaN
1       NaN
2       NaN
3      14.0
4       NaN
       ... 
287     NaN
288     NaN
289     NaN
290     NaN
291     NaN
Name: step_numbers, Length: 292, dtype: float64

In [None]:
# Create label
label = [1]*len(ratioR_pd) + [0]*len(ratioR_normal)
# Create dataframe
df = pd.DataFrame(
    {'ratioR': np.concatenate((ratioR_pd, ratioR_normal)),
     'ratioL': np.concatenate((ratioL_pd, ratioL_normal)),
     'cadence': all_cadence_pd + all_cadence_normal,
     'mean_speed': mean_speed_pd + mean_speed_normal,
     'turning_duration': df_turning_duration,
     'turning_steps': df_step_numbers,
     'parkinson': label
    })
df

Unnamed: 0,ratioR,ratioL,cadence,mean_speed,turning_duration,turning_steps,parkinson
0,0.028705,0.028705,90,0.001587,,,1
1,0.079651,0.079651,86,0.006309,,,1
2,0.142220,0.142220,83,0.009326,,,1
3,0.186680,0.186680,84,0.006188,9.291667,14.0,1
4,0.092608,0.092608,84,0.008082,,,1
...,...,...,...,...,...,...,...
287,0.146164,0.146164,73,0.017898,,,0
288,0.148651,0.148651,77,0.019181,,,0
289,0.504445,0.504445,92,0.069981,,,0
290,0.645283,0.645283,79,0.071040,,,0


In [None]:
# Save dataframe to csv file
df.to_csv('dataset.csv', mode='a', index=False, header=False)