In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn
import os
import shutil
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 
from IPython.display import clear_output
from torch.utils.data import Dataset, DataLoader, random_split
from scipy.signal import butter, filtfilt, find_peaks

In [2]:
## Low Pass Filter for Azure data
def butter_lowpass_filter(data, cutoff, order):
    normal_cutoff=cutoff/(15) 
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    x = np.expand_dims(filtfilt(b, a, data[:,0]), axis=-1)
    y = np.expand_dims(filtfilt(b, a, data[:,1]), axis=-1)
    z = np.expand_dims(filtfilt(b, a, data[:,2]), axis=-1)
    filtered = np.concatenate((x, y, z), axis=-1)
    return filtered

In [4]:
def ReadCSV(path):
    
    csv_data = np.loadtxt(path, delimiter=',', dtype=str, skiprows=1, usecols=(101))
    idx_footSwitch = 0
    for i in csv_data:
        if i == ' 0':
            idx_footSwitch += 1
        else:
            break
    
    ## 누락된 데이터 전까지 읽기 위해 line_num 찾는 과정
    csv_data = np.loadtxt(path, delimiter=',', dtype=str, skiprows=1, usecols=(1))
    line_num=csv_data.shape[0]
    for i, v in enumerate(csv_data):
        if v == " ":
            line_num = i
            break
            
    csv_data = np.loadtxt(path, delimiter=',', skiprows=1, max_rows=line_num, usecols=range(1, 101))  
    
    ## 마지막 4개의 항목을 통해 각 데이터가 기록된 시간을 획득(키넥트 기준)
    time = csv_data[:,-4:]
    time_ = time[:, 0] * 3600 + time[:,1] * 60 + time[:, 2]+ time[:, 3] / 1000.0
    time_ = time_ - time_[0]
    initialContactTime = time_[idx_footSwitch]
    
    ## 포즈 데이터 읽어오기
    
    # LPF를 위한 변수
    cutoff = 6
    order = 30
    
    head_idx = 26
    Head = csv_data[:,head_idx * 3:(head_idx + 1) * 3]
    Head_filtered = butter_lowpass_filter(Head, cutoff, order)
    
    pelvis_idx = 0
    spine_naval_idx = 1
    spine_chest_idx = 2
    neck_idx = 3
    
    Pelvis = csv_data[:,pelvis_idx * 3:(pelvis_idx + 1) * 3]
    Spine_naval = csv_data[:,spine_naval_idx * 3:(spine_naval_idx + 1) * 3]
    Spine_chest = csv_data[:,spine_chest_idx * 3:(spine_chest_idx + 1) * 3]
    Neck = csv_data[:,neck_idx * 3:(neck_idx + 1) * 3]
    
    Pelvis_filtered = butter_lowpass_filter(Pelvis, cutoff, order)
    Spine_naval_filtered = butter_lowpass_filter(Spine_naval, cutoff, order)
    Spine_chest_filtered = butter_lowpass_filter(Spine_chest, cutoff, order)
    Neck_filtered = butter_lowpass_filter(Neck, cutoff, order)
    
    
    LHip_idx = 18 
    LKnee_idx = 19
    LAnkle_idx = 20
    RHip_idx = 22
    RKnee_idx = 23
    RAnkle_idx = 24
    
    LHip = csv_data[:, LHip_idx * 3: (LHip_idx + 1) * 3]
    LKnee = csv_data[:, LKnee_idx * 3: (LKnee_idx + 1) * 3]
    LAnkle = csv_data[:, LAnkle_idx * 3: (LAnkle_idx + 1) * 3]
    
    LHip_filtered = butter_lowpass_filter(LHip, cutoff, order)
    LKnee_filtered = butter_lowpass_filter(LKnee, cutoff, order)
    LAnkle_filtered = butter_lowpass_filter(LAnkle, cutoff, order)
    
    RHip = csv_data[:, RHip_idx * 3: (RHip_idx + 1) * 3]
    RKnee = csv_data[:, RKnee_idx * 3: (RKnee_idx + 1) * 3]
    RAnkle = csv_data[:, RAnkle_idx * 3: (RAnkle_idx + 1) * 3]
    
    RHip_filtered = butter_lowpass_filter(RHip, cutoff, order)
    RKnee_filtered = butter_lowpass_filter(RKnee, cutoff, order)
    RAnkle_filtered = butter_lowpass_filter(RAnkle, cutoff, order)

    ## 발목거리
    AngkleDistance = RAnkle_filtered - LAnkle_filtered
    AngkleDistance = np.linalg.norm(AngkleDistance, axis=1, keepdims=True)
    
    ## 무릎각도
    knee2hip = LHip_filtered - LKnee_filtered
    knee2hip = knee2hip / np.linalg.norm(knee2hip, axis = 1, keepdims=True)
    knee2angkle = LAnkle_filtered - LKnee_filtered
    knee2angkle = knee2angkle / np.linalg.norm(knee2angkle, axis = 1, keepdims=True)
    inner = np.sum(np.multiply(knee2hip, knee2angkle), axis=-1)
    angle_Lknee = np.expand_dims(180 - np.arccos(inner) * 180 / np.pi, axis=-1)
    
    knee2hip = RHip_filtered - RKnee_filtered
    knee2hip = knee2hip / np.linalg.norm(knee2hip, axis = 1, keepdims=True)
    knee2angkle = RAnkle_filtered - RKnee_filtered
    knee2angkle = knee2angkle / np.linalg.norm(knee2angkle, axis = 1, keepdims=True)
    inner = np.sum(np.multiply(knee2hip, knee2angkle), axis=-1)
    angle_Rknee = np.expand_dims(180 - np.arccos(inner) * 180 / np.pi, axis=-1)
    
    ## 엉덩이 각도
    pelvis2naval = butter_lowpass_filter(Spine_naval, cutoff, order) - butter_lowpass_filter(Pelvis, cutoff, order)
    pelvis2naval = pelvis2naval / np.linalg.norm(pelvis2naval, axis = 1, keepdims=True)
    hip2knee = LKnee_filtered - LHip_filtered
    hip2knee = hip2knee / np.linalg.norm(hip2knee, axis = 1, keepdims=True)
    inner = np.sum(np.multiply(pelvis2naval, hip2knee), axis=-1)
    angle_LHip = np.expand_dims(180 - np.arccos(inner) * 180 / np.pi, axis=-1)
    
    hip2knee = RKnee_filtered - RHip_filtered
    hip2knee = hip2knee / np.linalg.norm(hip2knee, axis = 1, keepdims=True)
    inner = np.sum(np.multiply(pelvis2naval, hip2knee), axis=-1)
    angle_RHip = np.expand_dims(180 - np.arccos(inner) * 180 / np.pi, axis=-1)
    
    
    ## pelvis to ankle
    pevis2Lankle = Pelvis_filtered - LAnkle_filtered
    pevis2Rankle = Pelvis_filtered - RAnkle_filtered
    
    # Safe Zone 설정
    # Pelvis의 z방향 성분을 통해 어떤 프레임부터 어떤 프레임까지 사용할지 결정
    # 카메라에서 먼 지점이 safe_start, 카메라에서 가까운 지점이 safe_end 이다.(피실험자는 카메라에서 먼 곳에서부터 카메라를 향해 보행함) 
    safe_start=0
    safe_end=0
    
    # 현재 safe_start는 pelvis의 z방향 성분이 3300mm 이하로 떨어지는 지점, safe_end는 1350 이하로 떨어지는 지점
    for i, v in enumerate(Pelvis[:, 2]):
        if safe_start == 0 and v < 3300:
            safe_start = i
        if safe_start != 0 and v < 1350:
            safe_end = i
            break
    # Head_z
    Head_filtered_z = np.expand_dims(Head_filtered[:, -1], axis = -1)
    
    # data1은 왼다리, data2는 오른다리에 대한 Input(엉덩이각도, 무릎각도, 반대쪽 엉덩이각도, 반대쪽 무릎각도, 양 발목간의 거리)
    # Output : 후처리 결과(true or false), data1, data2, initialContactTime(키넥트 시스템 기준), safe_start(프레임 넘버), safe_end(프레임 넘버)
    data_l = None
    data_r = None
    
    if inputType == 0:
        data_l = np.concatenate((angle_LHip, angle_Lknee, angle_RHip, angle_Rknee, AngkleDistance), axis=-1)  
        data_r = np.concatenate((angle_RHip, angle_Rknee, angle_LHip, angle_Lknee, AngkleDistance), axis=-1) 
    elif inputType == 1:
        data_l = np.concatenate((angle_LHip, angle_Lknee, AngkleDistance), axis=-1)  
        data_r = np.concatenate((angle_RHip, angle_Rknee, AngkleDistance), axis=-1) 
    elif inputType == 2:
        data_l = np.concatenate((angle_Lknee, angle_Rknee, AngkleDistance), axis=-1)  
        data_r = np.concatenate((angle_Rknee, angle_Lknee, AngkleDistance), axis=-1) 
    elif inputType == 3:
        data_l = np.concatenate((LHip_filtered, LKnee_filtered, LAnkle_filtered, RHip_filtered, RKnee_filtered, RAnkle_filtered), axis=-1)  
        data_r = np.concatenate((RHip_filtered, RKnee_filtered, RAnkle_filtered, LHip_filtered, LKnee_filtered, LAnkle_filtered), axis=-1)  
    elif inputType == 4:
        data_l = np.concatenate((angle_LHip, angle_Lknee, angle_RHip, angle_Rknee), axis=-1)  
        data_r = np.concatenate((angle_RHip, angle_Rknee, angle_LHip, angle_Lknee), axis=-1) 
    elif inputType == 5:
        data_l = np.concatenate((angle_LHip, angle_Lknee, angle_RHip, angle_Rknee, AngkleDistance, Head_filtered_z), axis=-1)  
        data_r = np.concatenate((angle_RHip, angle_Rknee, angle_LHip, angle_Lknee, AngkleDistance, Head_filtered_z), axis=-1) 
    
    return data_l, data_r, time_, initialContactTime, safe_start, safe_end, pevis2Lankle, pevis2Rankle

#inputType = 4
#data_l, data_r, time_, initialContactTime, safe_start, safe_end, pevis2Lankle, pevis2Rankle = ReadCSV('Dataset/BYS/BYS_00.csv')
#print(data_l.shape)
#0 : (angle_RHip, angle_Rknee, angle_LHip, angle_Lknee, AngkleDistance)
#1 : (angle_RHip, angle_Rknee, AngkleDistance)
#2 : (angle_Rknee, angle_Lknee, AngkleDistance)
#3 : (RHip, RKnee, RAnkle, LHip, LKnee, LAnkle)
#4 : (angle_RHip, angle_Rknee, angle_LHip, angle_Lknee)
#5 : (angle_RHip, angle_Rknee, angle_LHip, angle_Lknee, AngkleDistance, Head_z)


In [5]:
## Zebris에서 제공하는 pressure 데이터를 읽기 위한 함수
# file은 각 trail 마다의 폴더경로
def ReadForceData(path):
    
    # Parameters
    path_parameters = os.path.join(path, "parameters.csv")
    df = pd.read_csv(path_parameters)
    df_extracted = df[['Step length L, cm', 'Stance phase L, %', 'Step length R, cm', 'Stance phase R, %', 'Stride length, cm', 'Cadence, steps/min', 'Velocity, km/h']]
    #print(df_extracted.values.tolist()[0])
    
    # 각 폴더마다 양 발에 대한 bufferfly_force_curve 파일이 존재
    path_l = os.path.join(path, "butterfly_force_curve-L.csv")
    path_r = os.path.join(path, "butterfly_force_curve-R.csv")
    
    # 왼발에 대한 initial contact과 Toe off 지점을 찾는 파트
    csv_data_l = np.loadtxt(path_l, delimiter=',', skiprows=4, usecols=range(2), encoding='utf-8')
    Ltime = csv_data_l[:,0]
    Lforce = csv_data_l[:,1]

    l_contact = []
    l_off = []
    l_contact.append(Ltime[0])
    pre = Ltime[0]
    for i in Ltime:
        if i - pre > 0.2:
            l_contact.append(i)
            if pre != 0:
                l_off.append(pre + 0.01)
        pre = i
    l_off.append(Ltime[-1] + 0.01)
    
    # 오른발에 대한 initial contact과 Toe off 지점을 찾는 파트
    csv_data_r = np.loadtxt(path_r, delimiter=',', skiprows=4, usecols=range(2), encoding='utf-8')
    Rtime = csv_data_r[:,0]
    Rforce = csv_data_r[:,1]

    r_contact = []
    r_off = []
    r_contact.append(Rtime[0])
    pre = Rtime[0]
    for i in Rtime:
        if i - pre > 0.2:
            r_contact.append(i)
            if pre != 0:
                r_off.append(pre + 0.01)
        pre = i
    r_off.append(Rtime[-1] + 0.01)
    
    # Output : 후처리 결과(true or false), l_contact(시간 list), l_off(시간 list), r_contact(시간 list), r_off(시간 list)
    return l_contact, l_off, r_contact, r_off, df_extracted

In [6]:
class CustomDataset(Dataset):
    def __init__(self, root, numFrame, drawing):
        global inputDims
        self.root = root
        self.numFrame = numFrame
        names = []
        for name in next(os.walk(root))[1]:
            names.append(name)
        #print(names)
        self.names = names
        self.errorFiles = []
        
        #drawing을 위한 폴더 생성
        if drawing:
            if not os.path.isdir("./Inputs"):
                os.mkdir("Inputs")
            else:
                shutil.rmtree("Inputs")
                os.mkdir("Inputs")
            
        
        # 인공지능을 학습하기 위한 모든 input과 output을 저장하기 위한 버퍼
        data_l_test, data_r_test, time_, initialContactTime, safe_start, safe_end, pevis2Lankle, pevis2Rankle = ReadCSV('Dataset/BYS/BYS_00.csv')
        inputDims = data_l_test.shape[-1]

        inputs_buffer = np.zeros(shape=(1, numFrame, inputDims), dtype=np.float64)
        outputs_buffer = np.zeros(shape=(1, ), dtype=np.float64)
        
        for name in self.names:
            pathPerson = os.path.join(self.root, name)
            print(pathPerson)
            
            (root, Folders, csvFiles) = next(os.walk(pathPerson))
            Folders = sorted(Folders)
            csvFiles = sorted(csvFiles)
            
            if '.ipynb_checkpoints' in Folders:
                Folders.remove('.ipynb_checkpoints')
            if '.ipynb_checkpoints' in csvFiles:
                csvFiles.remove('.ipynb_checkpoints')
            if len(Folders) != len(csvFiles):
                print(name, ' is not matched')
                break
            
            for idx_file in range(len(csvFiles)):
                pathCSV = os.path.join(pathPerson, csvFiles[idx_file])
                pathForceData = os.path.join(pathPerson, Folders[idx_file])
                
                #try:
                data_l, data_r, time, initialContactTime, safe_start, safe_end, pevis2Lankle, pevis2Rankle = ReadCSV(pathCSV)
                l_contact, l_off, r_contact, r_off, parameters = ReadForceData(pathForceData)

                # 싱크 맞추기
                firstContact = min(r_contact[0], l_contact[0]) # zebris 기준 최초 contact 찾음
                timeGap = initialContactTime - firstContact # sync.kinect 기준으로 모든 시간 변환
                l_contact = l_contact + timeGap
                l_off = l_off + timeGap
                r_contact = r_contact + timeGap
                r_off = r_off + timeGap

                # 정답 라벨
                l_contact_ = []
                for i in l_contact:
                    tmp = abs(time - i)
                    l_contact_.append(np.argmin(tmp))
                l_off_ = []
                for i in l_off:
                    tmp = abs(time - i)
                    l_off_.append(np.argmin(tmp))

                l_label = np.zeros(data_l.shape[0])
                for i in range(len(l_contact_)):
                    for j in range(l_contact_[i], l_off_[i]):
                        l_label[j] = 1

                r_contact_ = []
                for i in r_contact:
                    tmp = abs(time - i)
                    r_contact_.append(np.argmin(tmp))
                r_off_ = []
                for i in r_off:
                    tmp = abs(time - i)
                    r_off_.append(np.argmin(tmp))

                r_label = np.zeros(data_r.shape[0])
                for i in range(len(r_contact_)):
                    for j in range(r_contact_[i], r_off_[i]):
                        r_label[j] = 1

                # 인공지능 인풋을 만들기 위한 루프
                # safe_start와 safe_end 사이에서 numFrame(현재 10)개씩 잘라서 input 하나씩 생성
                # input에 대한 결과는 l_label 또는 r_label을 이용해 파악
                idx_startframe = safe_start
                while (idx_startframe + numFrame-1) <= safe_end:
                    ## L
                    input_ = data_l[idx_startframe:idx_startframe+numFrame, :]
                    min_ = np.min(input_,axis=0)
                    max_ = np.max(input_,axis=0)
                    input_ = (input_ - min_) / (max_ - min_)
                    input_ = np.expand_dims(input_, axis=0)
                    inputs_buffer = np.concatenate((inputs_buffer, input_), axis=0)

                    output_ = l_label[idx_startframe+numFrame-1]
                    outputs_buffer = np.concatenate((outputs_buffer, np.array([output_])), axis=0)
                    #output_ = np.expand_dims(output_, axis=0)
                    #outputs_buffer = np.concatenate((outputs_buffer, output_), axis=0)

                    ## R
                    input_ = data_r[idx_startframe:idx_startframe+numFrame, :]
                    min_ = np.min(input_,axis=0)
                    max_ = np.max(input_,axis=0)
                    input_ = (input_ - min_) / (max_ - min_)
                    input_ = np.expand_dims(input_, axis=0)
                    inputs_buffer = np.concatenate((inputs_buffer, input_), axis=0)

                    output_ = r_label[idx_startframe+numFrame-1]
                    outputs_buffer = np.concatenate((outputs_buffer, np.array([output_])), axis=0)
                    #output_ = np.expand_dims(output_, axis=0)
                    #outputs_buffer = np.concatenate((outputs_buffer, output_), axis=0)
                    idx_startframe += 1

                # Drawing
                if drawing:
                    x_new = np.linspace(time[0], time[-1], len(time))
                    fig, axs = plt.subplots(4, figsize=(18,12))

                    ## Left Input
                    axs[0].set_title('Left')
                    axs[0].plot(x_new, data_l[:,0], 'r-', label = 'Hip')
                    axs[0].plot(x_new, data_l[:,1], 'b-', label = 'Knee')
                    axs[0].plot(x_new, data_l[:,4], 'k-', label = 'AnkleDistance')

                    axs[0].axvline(x = time[l_contact_[0]], label='LContact', c='r')
                    for i in l_contact_:
                        axs[0].axvline(x = time[i], c='r')
                    axs[0].axvline(x = time[l_off_[0]], label='LOff', c='b')
                    for i in l_off_:
                        axs[0].axvline(x = time[i], c='b')
                    axs[0].axvline(x = time[safe_start], label='safe Start = {}'.format(safe_start), c='g')
                    axs[0].axvline(x = time[safe_end], label='safe End = {}'.format(safe_end), c='g')
                    axs[0].legend()

                    ## Left Label
                    axs[1].set_title('Left_label')

                    for i, v in enumerate(l_label):
                        if v == 1:
                            if i == 0:
                                axs[1].plot(time[i], 0.7, 'b.', label='Swing')
                                axs[1].plot(time[i], 0.7, 'r.', label='Stance')
                            else:
                                axs[1].plot(time[i], 0.7, 'r.')
                        else:
                            if i == 0:
                                axs[1].plot(time[i], 0.3, 'r.', label='Stance')
                                axs[1].plot(time[i], 0.3, 'b.', label='Swing')
                            else:
                                axs[1].plot(time[i], 0.3, 'b.')
                    axs[1].axvline(x = time[safe_start], c='g')
                    axs[1].axvline(x = time[safe_end], c='g')
                    axs[1].legend()

                    ## Right Input
                    axs[2].set_title('Right')
                    axs[2].plot(x_new, data_r[:,0], 'r-', label = 'Hip')
                    axs[2].plot(x_new, data_r[:,1], 'b-', label = 'Knee')
                    axs[2].plot(x_new, data_r[:,4], 'k-', label = 'AnkleDistance')
                    axs[2].axvline(x = time[r_contact_[0]], label='RContact', c='r')
                    for i in r_contact_:
                        axs[2].axvline(x = time[i], c='r')
                    axs[2].axvline(x = time[r_off_[0]], label='ROff', c='b')
                    for i in r_off_:
                        axs[2].axvline(x = time[i], c='b')
                    axs[2].axvline(x = time[safe_start], label='safe Start = {}'.format(safe_start), c='g')
                    axs[2].axvline(x = time[safe_end], label='safe End = {}'.format(safe_end), c='g')
                    axs[2].legend()

                    ## Right Label
                    axs[3].set_title('Right_label')
                    for i, v in enumerate(r_label):
                        if v == 1:
                            if i == 0:
                                axs[3].plot(time[i], 0.7, 'b.', label='Swing')
                                axs[3].plot(time[i], 0.7, 'r.', label='Stance')
                            else:
                                axs[3].plot(time[i], 0.7, 'r.')
                        else:
                            if i == 0:
                                axs[3].plot(time[i], 0.3, 'r.', label='Stance')
                                axs[3].plot(time[i], 0.3, 'b.', label='Swing')
                            else:
                                axs[3].plot(time[i], 0.3, 'b.')
                    axs[3].axvline(x = time[safe_start], c='g')
                    axs[3].axvline(x = time[safe_end], c='g')
                    axs[3].legend()


                    for ax in axs:
                        ax.set_xlim([x_new[0], x_new[-1]])

                    fig.tight_layout()
                    plt.savefig('Inputs//' + csvFiles[idx_file].split('.')[0] + '.jpg')
                    plt.close()
                    
                #except:
                #    self.errorFiles.append(csvFiles[idx_file].split('.')[0])
                
                #AzureData(pathCSV)
        inputs_buffer = np.delete(inputs_buffer, 0, axis=0)
        outputs_buffer = np.delete(outputs_buffer, 0, axis=0)
        #print(inputs_buffer.shape)
        #self.x_data = np.swapaxes(inputs_buffer, 1, 2)
        self.x_data = inputs_buffer
        print('input shape : ', self.x_data.shape)
        self.y_data = outputs_buffer
        print('output shape : ', self.y_data.shape)
        print("Finish !")
        if len(self.errorFiles) > 0:
            print(self.errorFiles)
    def __len__(self):
        return len(self.x_data)
    def __getitem__(self, idx):
        x = torch.FloatTensor(self.x_data[idx,:,:])
        y = torch.FloatTensor([self.y_data[idx]])
        return x, y

In [7]:
def ruleBased(numFrame, safe_start, safe_end, pevis2Lankle, pevis2Rankle, PostProcessing_):
    # Safe Zone안에 데이터만 다룸
    pevis2Lankle = pevis2Lankle[safe_start + numFrame - 1:safe_end + 1, -1]
    pevis2Rankle = pevis2Rankle[safe_start + numFrame - 1:safe_end + 1, -1]
    
    # For Left
    LContact, _ = find_peaks(pevis2Lankle,  distance=3)
    LOff, _ = find_peaks(-pevis2Lankle,  distance=3)
    
    tmp = 0
    if LContact[0] > LOff[0]:
        tmp = 1
    l_answer = []
    for i in range(pevis2Lankle.shape[0]):
        if i in LContact:
            tmp = 1
        elif i in LOff:
            tmp = 0
        l_answer.append(tmp)
        
    # For Right
    RContact, _ = find_peaks(pevis2Rankle, distance=3)
    ROff, _ = find_peaks(-pevis2Rankle, distance=3)
    
    tmp = 0
    if RContact[0] > ROff[0]:
        tmp = 1
    r_answer = []
    for i in range(pevis2Rankle.shape[0]):
        if i in RContact:
            tmp = 1
        elif i in ROff:
            tmp = 0
        r_answer.append(tmp)
        
    finalAnswer_l = None
    finalAnswer_r = None
    
    if PostProcessing_:
        finalAnswer_l = PostProcessing(answer = l_answer, threshold = 3)
        finalAnswer_r = PostProcessing(answer = r_answer, threshold = 3)
    else:
        finalAnswer_l = l_answer
        finalAnswer_r = r_answer
        
        
    return finalAnswer_l, finalAnswer_r, l_answer, r_answer
            
#finalAnswer_l, finalAnswer_r = ruleBased(numFrame = 10, path = './Dataset/BYS/BYS_00.csv')    

In [8]:
def PostProcessing(answer, threshold):
    # L
    # count : 연속적인 gait Phase를 세기 위한 변수
    # prev : 바로 직전의 gait phase
    # mark_unstable : 불연속적인 구간기록
    count = 0
    prev = answer[0]
    mark_unstable = []
    for i, answer_ in enumerate(answer):
        # Gait Phase가 바뀌는 시점
        if prev != answer_:
            # count가 정해준 threshold를 넘어갔을 때, 즉 연속적인 구간일 때 0
            if count > threshold:
                mark_unstable += [0] * count
            # count가 정해준 threshold보다 작을 때, 즉 불연속적인 구간일 때 -1
            else:
                mark_unstable += [-1] * count
            count = 1
        # Gait Phase가 바뀌지 않는 시점
        # count 만 올라감
        else:
            count += 1

        # Gait answer 끝부분에 대한 처리구간
        if i == len(answer) - 1:
            # 끝부분에 다다랐을 때 count가 threshold를 넘어갔을 때, 즉 연속적인 구간일 때 0
            if count > threshold:
                mark_unstable += [0] * count
            # 끝부분에 다다랐을 때 count가 threshold보다 작을 때, 즉 불연속적인 구간일 때 -1
            else:
                mark_unstable += [-1] * count

        prev = answer_
    #print(mark_unstable)

    
    # 위에서 기록한 mark_unstable를 근거로 range_unstable 생성
    # 생성된 range_unstable은 불연속적인 구간들의 처음과 끝을 묶은 리스트들로 구성
    # 예) [[1, 3], [8, 13], ...]
    unstable = False
    range_unstable = []
    tmp = []
    for i, v in enumerate(mark_unstable):
        # tmp가 하나의 불연속적인 구간을 나타냄
        if v == -1 and not unstable:
            tmp.append(i)
            unstable = True
        elif v != -1 and unstable:
            tmp.append(i)
            range_unstable.append(tmp)
            tmp = []
            unstable = False
        elif v == -1 and i == len(mark_unstable)-1:
            tmp.append(i)
            range_unstable.append(tmp)
            tmp = []
            unstable = False
    #print(range_unstable)

    filtered_answer = answer

    # 생성된 range_unstable를 기반으로 filtering하는 부분
    # 각각의 불연속적인 구간에 대해서 양 옆의 gait phase를 확인하고, 그에 맞춰서 불연속적인 구간에 대한 정답보정 
    for range_ in range_unstable:

        left_class = None
        if range_[0]-1 < 0:
            left_class = 1 - filtered_answer[range_[1]+1]
            #left_class = filtered_answer[range_[1]+1]
        else:
            left_class = filtered_answer[range_[0]-1]

        right_class = None
        if range_[1]+1 == len(answer):
            right_class = 1 - filtered_answer[range_[0]-1]
            #right_class = filtered_answer[range_[0]-1]
        else:
            right_class = filtered_answer[range_[1] + 1]

        for i in range(range_[0], int(sum(range_) / 2)):
            filtered_answer[i] = left_class

        for i in range(int(sum(range_) / 2), range_[1] + 1):
            filtered_answer[i] = right_class
    #print(filtered_answer)
    return filtered_answer

def ModelTest(numFrame, pathCSV, model, PostProcessing_):
    data_l, data_r, time, initialContactTime, safe_start, safe_end, pevis2Lankle, pevis2Rankle = ReadCSV(pathCSV)
    
    l_answer = []
    r_answer = []

    idx_startframe = safe_start
    while (idx_startframe + numFrame-1) <= safe_end:
        ## L
        input_ = data_l[idx_startframe:idx_startframe+numFrame, :]
        min_ = np.min(input_,axis=0)
        max_ = np.max(input_,axis=0)
        input_ = (input_ - min_) / (max_ - min_)
        input_ = np.expand_dims(input_, axis=0)
        input_ = torch.FloatTensor(input_).to(device)
        h0 = torch.zeros(1, 1, hidden_size).to(device)
        c0 = torch.zeros(1, 1, hidden_size).to(device)
        pred = model(input_, h0.detach(), c0.detach())
        answer = 0
        if torch.squeeze(pred) > 0.5:
            answer = 1
        else:
            answer = 0
        l_answer.append(answer)

        ## R
        input_ = data_r[idx_startframe:idx_startframe+numFrame, :]
        min_ = np.min(input_,axis=0)
        max_ = np.max(input_,axis=0)
        input_ = (input_ - min_) / (max_ - min_)
        input_ = np.expand_dims(input_, axis=0)
        input_ = torch.FloatTensor(input_).to(device)
        h0 = torch.zeros(1, 1, hidden_size).to(device)
        c0 = torch.zeros(1, 1, hidden_size).to(device)
        pred = model(input_, h0.detach(), c0.detach())
        answer = 0
        if torch.squeeze(pred) > 0.5:
            answer = 1
        else:
            answer = 0
        r_answer.append(answer)
        idx_startframe += 1

    finalAnswer_l = None
    finalAnswer_r = None
    
    if PostProcessing_:
        finalAnswer_l = PostProcessing(answer = l_answer, threshold = 3)
        finalAnswer_r = PostProcessing(answer = r_answer, threshold = 3)
    else:
        finalAnswer_l = l_answer
        finalAnswer_r = r_answer
        
    return finalAnswer_l, finalAnswer_r, l_answer, r_answer
    

In [9]:
def PredictParamters(path, l_contact_predict, l_off_predict, r_contact_predict, r_off_predict, safe_start):
    
    ## 누락된 데이터 전까지 읽기 위해 line_num 찾는 과정
    csv_data = np.loadtxt(path, delimiter=',', dtype=str, skiprows=1, usecols=(1))
    line_num=csv_data.shape[0]
    for i, v in enumerate(csv_data):
        if v == " ":
            line_num = i
            break
            
    csv_data = np.loadtxt(path, delimiter=',', skiprows=1, max_rows=line_num, usecols=range(1, 101))  
    
    ## 마지막 4개의 항목을 통해 각 데이터가 기록된 시간을 획득(키넥트 기준)
    time = csv_data[:,-4:]
    time_ = time[:, 0] * 3600 + time[:,1] * 60 + time[:, 2]+ time[:, 3] / 1000.0
    time_ = time_ - time_[0]
    
    ## 발목 데이터 읽어오기
    LAnkle_idx = 20
    RAnkle_idx = 24
    
    LAnkle = csv_data[:, LAnkle_idx * 3: (LAnkle_idx + 1) * 3]
    RAnkle = csv_data[:, RAnkle_idx * 3: (RAnkle_idx + 1) * 3]
        
    # LPF를 위한 변수
    cutoff = 6
    order = 30
    LAnkle_filtered = butter_lowpass_filter(LAnkle, cutoff, order)
    RAnkle_filtered = butter_lowpass_filter(RAnkle, cutoff, order)
    
    # strideLength
    count = 0
    LstrideLength = 0
    if len(l_contact_predict) > 1:
        for i in range(len(l_contact_predict)-1, 0, -1):
            #LstrideLength += np.linalg.norm(LAnkle_filtered[l_contact_predict[i]+safe_start] - LAnkle_filtered[l_contact_predict[i-1]+safe_start])
            LstrideLength += abs(LAnkle_filtered[l_contact_predict[i]+safe_start, -1] - LAnkle_filtered[l_contact_predict[i-1]+safe_start, -1])
            count += 1
        LstrideLength /= count
        LstrideLength /= 10 # mm ->cm
    else:
        LstrideLength = None
    
    
    count = 0
    RstrideLength = 0
    if len(r_contact_predict) > 1:
        for i in range(len(r_contact_predict)-1, 0, -1):
            #RstrideLength += np.linalg.norm(RAnkle_filtered[r_contact_predict[i]+safe_start] - RAnkle_filtered[r_contact_predict[i-1]+safe_start])
            RstrideLength += abs(RAnkle_filtered[r_contact_predict[i]+safe_start, -1] - RAnkle_filtered[r_contact_predict[i-1]+safe_start, -1])
            count += 1
        RstrideLength /= count
        RstrideLength /= 10
    else:
        RstrideLength = None
        
    # stepLength
    stepLength_L = 0
    stepLength_R = 0
    if len(l_contact_predict) > 0 and len(r_contact_predict) > 0:
        count = 0
        for i in range(len(l_contact_predict)):
            for j in range(len(r_contact_predict)):
                if r_contact_predict[j] > l_contact_predict[i]:
                    #stepLength_R += np.linalg.norm(RAnkle_filtered[r_contact_predict[j]+safe_start] - LAnkle_filtered[l_contact_predict[i]+safe_start])
                    stepLength_R += abs(RAnkle_filtered[r_contact_predict[j]+safe_start, -1] - LAnkle_filtered[l_contact_predict[i]+safe_start, -1])
                    count += 1
                    break
        if count != 0:
            stepLength_R /= count
            stepLength_R /= 10
        else:
            stepLength_R = None
                    
        count = 0
        for i in range(len(r_contact_predict)):
            for j in range(len(l_contact_predict)):
                if l_contact_predict[j] > r_contact_predict[i]:
                    #stepLength_L += np.linalg.norm(LAnkle_filtered[l_contact_predict[j]+safe_start] - RAnkle_filtered[r_contact_predict[i]+safe_start])
                    stepLength_L += abs(LAnkle_filtered[l_contact_predict[j]+safe_start, -1] - RAnkle_filtered[r_contact_predict[i]+safe_start, -1])
                    count += 1
                    break
        if count != 0:
            stepLength_L /= count
            stepLength_L /= 10
        else:
            stepLength_L = None
    else:
        stepLength_L = None
        stepLength_R = None
    
        
    
    return LstrideLength, RstrideLength, stepLength_L, stepLength_R

In [10]:
def getResults(root, numFrame, drawing):
    global stopped_epoch
    totalFrame = 0
    classification_positive = 0
    numTrial = 0
    numContact = 0
    numOff = 0
    
    count_contact = 0
    count_off = 0
    
    FrameError_contact = 0
    FrameError_off = 0
    coverage_positive_contact = 0
    coverage_positive_off = 0
    coverage_negative_contact_list = []
    coverage_negative_off_list = []
    
    Error_strideLength_L = []
    Error_strideLength_R = []
    Error_stepLength_L = []
    Error_stepLength_R = []
    
    count_strideLength_L = 0
    count_strideLength_R = 0
    count_stepLength_L = 0
    count_stepLength_R = 0
    
    
    
    names = []
    for name in next(os.walk(root))[1]:
        names.append(name)
    #print(names)
    errorFiles = []

    #drawing을 위한 폴더 생성
    if not os.path.isdir("./testResult"):
            os.mkdir("testResult")
    else:
        if drawing:
            shutil.rmtree("testResult")
            os.mkdir("testResult")
    
            
    for name in names:
        clear_output(wait=True)
        
        pathPerson = os.path.join(root, name)
        (root_, Folders, csvFiles) = next(os.walk(pathPerson))
        Folders = sorted(Folders)
        csvFiles = sorted(csvFiles)

        if '.ipynb_checkpoints' in Folders:
            Folders.remove('.ipynb_checkpoints')
        if '.ipynb_checkpoints' in csvFiles:
            csvFiles.remove('.ipynb_checkpoints')
        if len(Folders) != len(csvFiles):
            print(name, ' is not matched')
            break

        for idx_file in range(len(csvFiles)):
            pathCSV = os.path.join(pathPerson, csvFiles[idx_file])
            pathForceData = os.path.join(pathPerson, Folders[idx_file])
            
            
            print(csvFiles[idx_file].split('.')[0])
            
            # 데이터 읽기
            data_l, data_r, time, initialContactTime, safe_start, safe_end, pevis2Lankle, pevis2Rankle = ReadCSV(pathCSV)
            l_contact, l_off, r_contact, r_off, parameters = ReadForceData(pathForceData)
            
            
            # 싱크 맞추기
            firstContact = min(r_contact[0], l_contact[0]) # zebris 기준 최초 contact 찾음
            timeGap = initialContactTime - firstContact # sync.kinect 기준으로 모든 시간 변환
            l_contact = l_contact + timeGap
            l_off = l_off + timeGap
            r_contact = r_contact + timeGap
            r_off = r_off + timeGap

            # 정답 라벨
            l_contact_ = []
            for i in l_contact:
                tmp = abs(time - i)
                l_contact_.append(np.argmin(tmp))
            l_off_ = []
            for i in l_off:
                tmp = abs(time - i)
                l_off_.append(np.argmin(tmp))

            l_label = np.zeros(data_l.shape[0])
            for i in range(len(l_contact_)):
                for j in range(l_contact_[i], l_off_[i]):
                    l_label[j] = 1

            r_contact_ = []
            for i in r_contact:
                tmp = abs(time - i)
                r_contact_.append(np.argmin(tmp))
            r_off_ = []
            for i in r_off:
                tmp = abs(time - i)
                r_off_.append(np.argmin(tmp))

            r_label = np.zeros(data_r.shape[0])
            for i in range(len(r_contact_)):
                for j in range(r_contact_[i], r_off_[i]):
                    r_label[j] = 1
            
            ## 인공지능 결과
            finalAnswer_l, finalAnswer_r, modelOutput_l, modelOutput_r = ModelTest(numFrame = numFrame, pathCSV = pathCSV, model = model, PostProcessing_ = True)
            
            ## Rulebased 방식 결과
            #finalAnswer_l, finalAnswer_r, modelOutput_l, modelOutput_r = ruleBased(numFrame = numFrame, safe_start=safe_start, safe_end=safe_end, pevis2Lankle=pevis2Lankle, pevis2Rankle=pevis2Rankle, PostProcessing_=False) 

            # Left gait events 
            l_contact_predict = []
            l_off_predict = []
            
            prev = finalAnswer_l[0]
            for i, v in enumerate(finalAnswer_l):
                if prev != v:
                    if prev == 0:
                        l_contact_predict.append(i)
                    elif prev == 1:
                        l_off_predict.append(i)
                prev = v
                
            # Right gait events 
            r_contact_predict = []
            r_off_predict = []
            
            prev = finalAnswer_r[0]
            for i, v in enumerate(finalAnswer_r):
                if prev != v:
                    if prev == 0:
                        r_contact_predict.append(i)
                    elif prev == 1:
                        r_off_predict.append(i)
                prev = v

            # Drawing
            if drawing:
                x_new = np.linspace(time[0], time[-1], len(time))
                
                if drawing == 1:
                    ## Type 1 : Deep learning Input + Label + Prediction
                    # Left Input
                    fig, axs = plt.subplots(3, figsize=(18,12))
                    axs[0].set_title('Left')
                    axs[0].plot(x_new, data_l[:,0], 'r-', label = 'Hip')
                    axs[0].plot(x_new, data_l[:,1], 'b-', label = 'Knee')
                    axs[0].plot(x_new, data_l[:,4], 'k-', label = 'AnkleDistance')
                    for i in l_contact_:
                        axs[0].axvline(x = time[i], c='r')
                    for i in l_off_:
                        axs[0].axvline(x = time[i], c='b')
                    axs[0].legend()

                    axs[1].set_title('True Label')
                    axs[1].axvline(x = time[safe_start], label='safe Start = {}'.format(safe_start), c='g')
                    axs[1].axvline(x = time[safe_end], label='safe End = {}'.format(safe_end), c='g')
                    for i in l_contact_:
                        axs[1].axvline(x = time[i], label='LContact = {}'.format(i), c='r')
                    for i in l_off_:
                        axs[1].axvline(x = time[i], label='LOff = {}'.format(i), c='b')

                    for i, v in enumerate(l_label):
                        if v == 1:
                            if i == 0:
                                axs[1].plot(time[i], 1, 'b.', label='Swing')
                                axs[1].plot(time[i], 1, 'r.', label='Stance')
                            else:
                                axs[1].plot(time[i], 1, 'r.')
                        else:
                            if i == 0:
                                axs[1].plot(time[i], 0, 'r.', label='Stance')
                                axs[1].plot(time[i], 0, 'b.', label='Swing')
                            else:
                                axs[1].plot(time[i], 0, 'b.')
                    axs[1].legend()

                    ## Left PostProcessing
                    axs[2].set_title('Prediction')
                    axs[2].axvline(x = time[safe_start], c='g')
                    axs[2].axvline(x = time[safe_end], c='g')
                    for i, v in enumerate(finalAnswer_l):
                        if v == 1:
                            axs[2].plot(time[i + safe_start + numFrame - 1], 1, 'r.')
                        else:
                            axs[2].plot(time[i + safe_start + numFrame - 1], 0, 'b.')
                    for i in l_contact_predict:
                        axs[2].axvline(x = time[i + safe_start + numFrame - 1], label='LContact = {}'.format(i), c='r')
                    for i in l_off_predict:
                        axs[2].axvline(x = time[i + safe_start + numFrame - 1], label='LOff = {}'.format(i), c='b')
                    axs[2].legend()

                    for ax in axs:
                        ax.set_xlim([time[0], time[-1]])  

                    fig.tight_layout()
                    plt.savefig('testResult//' + csvFiles[idx_file].split('.')[0] + '_Left_Type1.jpg')
                    plt.close()


                    # Right Input
                    fig, axs = plt.subplots(3, figsize=(18,12))
                    axs[0].set_title('Right')
                    axs[0].plot(x_new, data_r[:,0], 'r-', label = 'Hip')
                    axs[0].plot(x_new, data_r[:,1], 'b-', label = 'Knee')
                    axs[0].plot(x_new, data_r[:,4], 'k-', label = 'AnkleDistance')
                    for i in r_contact_:
                        axs[0].axvline(x = time[i], c='r')
                    for i in r_off_:
                        axs[0].axvline(x = time[i], c='b')
                    axs[0].legend()

                    axs[1].set_title('True Label')
                    axs[1].axvline(x = time[safe_start], label='safe Start = {}'.format(safe_start), c='g')
                    axs[1].axvline(x = time[safe_end], label='safe End = {}'.format(safe_end), c='g')
                    for i in r_contact_:
                        axs[1].axvline(x = time[i], label='RContact = {}'.format(i), c='r')
                    for i in r_off_:
                        axs[1].axvline(x = time[i], label='ROff = {}'.format(i), c='b')

                    for i, v in enumerate(r_label):
                        if v == 1:
                            if i == 0:
                                axs[1].plot(time[i], 1, 'b.', label='Swing')
                                axs[1].plot(time[i], 1, 'r.', label='Stance')
                            else:
                                axs[1].plot(time[i], 1, 'r.')
                        else:
                            if i == 0:
                                axs[1].plot(time[i], 0, 'r.', label='Stance')
                                axs[1].plot(time[i], 0, 'b.', label='Swing')
                            else:
                                axs[1].plot(time[i], 0, 'b.')
                    axs[1].legend()

                    ## Right PostProcessing
                    axs[2].set_title('Prediction')
                    axs[2].axvline(x = time[safe_start], c='g')
                    axs[2].axvline(x = time[safe_end], c='g')
                    for i, v in enumerate(finalAnswer_r):
                        if v == 1:
                            axs[2].plot(time[i + safe_start + numFrame - 1], 1, 'r.')
                        else:
                            axs[2].plot(time[i + safe_start + numFrame - 1], 0, 'b.')
                    for i in r_contact_predict:
                        axs[2].axvline(x = time[i + safe_start + numFrame - 1], label='RContact = {}'.format(i), c='r')
                    for i in r_off_predict:
                        axs[2].axvline(x = time[i + safe_start + numFrame - 1], label='ROff = {}'.format(i), c='b')
                    axs[2].legend()

                    for ax in axs:
                        ax.set_xlim([time[0], time[-1]])  

                    fig.tight_layout()
                    plt.savefig('testResult//' + csvFiles[idx_file].split('.')[0] + '_Right_Type1.jpg')
                    plt.close()
                    
                elif drawing == 2:
                    ## Type 2 : Rule Based Input + Label + Prediction
                    # Left Input
                    fig, axs = plt.subplots(3, figsize=(18,12))
                    axs[0].set_title('Left')
                    axs[0].plot(x_new, pevis2Lankle[:,-1], 'k-', label = 'pevis2Lankle')
                    for i in l_contact_:
                        axs[0].axvline(x = time[i], c='r')
                    for i in l_off_:
                        axs[0].axvline(x = time[i], c='b')
                    axs[0].set_ylim([np.min(pevis2Lankle[:,-1]), np.max(pevis2Lankle[:,-1])]) 
                    
                    axs[0].axvline(x = time[np.argmax(pevis2Lankle[:,-1])], c = 'k', ls='--')
                    axs[0].axvline(x = time[np.argmin(pevis2Lankle[:,-1])], c = 'k', ls='--')
                    
                    axs[0].legend()

                    axs[1].set_title('True Label')
                    axs[1].axvline(x = time[safe_start], label='safe Start = {}'.format(safe_start), c='g')
                    axs[1].axvline(x = time[safe_end], label='safe End = {}'.format(safe_end), c='g')
                    for i in l_contact_:
                        axs[1].axvline(x = time[i], label='LContact = {}'.format(i), c='r')
                    for i in l_off_:
                        axs[1].axvline(x = time[i], label='LOff = {}'.format(i), c='b')

                    for i, v in enumerate(l_label):
                        if v == 1:
                            if i == 0:
                                axs[1].plot(time[i], 1, 'b.', label='Swing')
                                axs[1].plot(time[i], 1, 'r.', label='Stance')
                            else:
                                axs[1].plot(time[i], 1, 'r.')
                        else:
                            if i == 0:
                                axs[1].plot(time[i], 0, 'r.', label='Stance')
                                axs[1].plot(time[i], 0, 'b.', label='Swing')
                            else:
                                axs[1].plot(time[i], 0, 'b.')
                    axs[1].legend()

                    ## Left PostProcessing
                    axs[2].set_title('Prediction')
                    axs[2].axvline(x = time[safe_start], c='g')
                    axs[2].axvline(x = time[safe_end], c='g')
                    for i, v in enumerate(finalAnswer_l):
                        if v == 1:
                            axs[2].plot(time[i + safe_start + numFrame - 1], 1, 'r.')
                        else:
                            axs[2].plot(time[i + safe_start + numFrame - 1], 0, 'b.')
                    for i in l_contact_predict:
                        axs[2].axvline(x = time[i + safe_start + numFrame - 1], label='LContact = {}'.format(i), c='r')
                    for i in l_off_predict:
                        axs[2].axvline(x = time[i + safe_start + numFrame - 1], label='LOff = {}'.format(i), c='b')
                    axs[2].legend()

                    for ax in axs:
                        ax.set_xlim([time[0], time[-1]])  

                    fig.tight_layout()
                    plt.savefig('testResult//' + csvFiles[idx_file].split('.')[0] + '_Left_Type2.jpg')
                    plt.close()

                    # Right Input
                    fig, axs = plt.subplots(3, figsize=(18,12))
                    axs[0].set_title('Right')
                    axs[0].plot(pevis2Rankle[:,-1], 'k-', label = 'pevis2Rankle')
                    for i in r_contact_:
                        axs[0].axvline(x = time[i], c='r')
                    for i in r_off_:
                        axs[0].axvline(x = time[i], c='b')
                    axs[0].legend()

                    axs[1].set_title('True Label')
                    axs[1].axvline(x = time[safe_start], label='safe Start = {}'.format(safe_start), c='g')
                    axs[1].axvline(x = time[safe_end], label='safe End = {}'.format(safe_end), c='g')
                    for i in r_contact_:
                        axs[1].axvline(x = time[i], label='LContact = {}'.format(i), c='r')
                    for i in r_off_:
                        axs[1].axvline(x = time[i], label='LOff = {}'.format(i), c='b')

                    for i, v in enumerate(r_label):
                        if v == 1:
                            if i == 0:
                                axs[1].plot(time[i], 1, 'b.', label='Swing')
                                axs[1].plot(time[i], 1, 'r.', label='Stance')
                            else:
                                axs[1].plot(time[i], 1, 'r.')
                        else:
                            if i == 0:
                                axs[1].plot(time[i], 0, 'r.', label='Stance')
                                axs[1].plot(time[i], 0, 'b.', label='Swing')
                            else:
                                axs[1].plot(time[i], 0, 'b.')
                    axs[1].legend()

                    ## Right PostProcessing
                    axs[2].set_title('Prediction')
                    axs[2].axvline(x = time[safe_start], c='g')
                    axs[2].axvline(x = time[safe_end], c='g')
                    for i, v in enumerate(finalAnswer_r):
                        if v == 1:
                            axs[2].plot(time[i + safe_start + numFrame - 1], 1, 'r.')
                        else:
                            axs[2].plot(time[i + safe_start + numFrame - 1], 0, 'b.')
                    for i in r_contact_predict:
                        axs[2].axvline(x = time[i + safe_start + numFrame - 1], label='RContact = {}'.format(i), c='r')
                    for i in r_off_predict:
                        axs[2].axvline(x = time[i + safe_start + numFrame - 1], label='ROff = {}'.format(i), c='b')
                    axs[2].legend()

                    fig.tight_layout()

                    for ax in axs:
                        ax.set_xlim([time[0], time[-1]])  


                    plt.savefig('testResult//' + csvFiles[idx_file].split('.')[0] + '_Right_Type2.jpg')
                    plt.close()
                    
            # Gait Event True
            # Left
            l_label = l_label[safe_start + numFrame - 1:safe_end + 1]
            l_contact_true = []
            l_off_true = []
            
            prev = l_label[0]
            for i, v in enumerate(l_label):
                if prev != v:
                    if prev == 0:
                        l_contact_true.append(i-1)
                    elif prev == 1:
                        l_off_true.append(i-1)
                prev = v
            # Right
            r_label = r_label[safe_start + numFrame - 1:safe_end + 1]
            r_contact_true = []
            r_off_true = []
            
            prev = r_label[0]
            for i, v in enumerate(r_label):
                if prev != v:
                    if prev == 0:
                        r_contact_true.append(i-1)
                    elif prev == 1:
                        r_off_true.append(i-1)
                prev = v
                
            # Coverage
            # Left Contact
            if len(l_contact_predict) == len(l_contact_true):
                coverage_positive_contact += 1
            else:
                coverage_negative_contact_list.append(csvFiles[idx_file].split('.')[0])

            # Left Toe Off
            if len(l_off_predict) == len(l_off_true):
                numOff += len(l_off_predict)
                for i in range(len(l_off_predict)):
                    FrameError_off+=abs(l_off_predict[i] - l_off_true[i])
                coverage_positive_off += 1
            else:
                coverage_negative_off_list.append(csvFiles[idx_file].split('.')[0])
            
            # Right Contact
            if len(r_contact_predict) == len(r_contact_true):
                numContact += len(r_contact_predict)
                for i in range(len(r_contact_predict)):
                    FrameError_contact += abs(r_contact_predict[i] - r_contact_true[i])
                coverage_positive_contact += 1
            else:
                coverage_negative_contact_list.append(csvFiles[idx_file].split('.')[0])
            
            # Right Toe Off
            if len(r_off_predict) == len(r_off_true):
                numOff += len(r_contact_predict)
                for i in range(len(r_off_predict)):
                    FrameError_off += abs(r_off_predict[i] - r_off_true[i])
                coverage_positive_off += 1
            else:
                coverage_negative_off_list.append(csvFiles[idx_file].split('.')[0])
                
                
            # Frame Error for Contact
            numContact += len(l_contact_predict)
            
            if len(l_contact_predict) > 0 and len(l_contact_true) > 0:
                numContact += len(l_contact_predict)
                for i in range(len(l_contact_predict)):
                    tmpList_abs = [abs(x - l_contact_predict[i]) for x in l_contact_true]    
                    FrameError_contact += min(tmpList_abs)
                    
            if len(r_contact_predict) > 0 and len(r_contact_true) > 0:
                numContact += len(r_contact_predict)
                for i in range(len(r_contact_predict)):
                    tmpList_abs = [abs(x - r_contact_predict[i]) for x in r_contact_true]  
                    FrameError_contact += min(tmpList_abs)

            # Frame Error for Toe off
            if len(l_off_predict) > 0 and len(l_off_true) > 0: 
                numOff += len(l_off_predict)
                for i in range(len(l_off_predict)):
                    tmpList_abs = [abs(x - l_off_predict[i]) for x in l_off_true]  
                    FrameError_off += min(tmpList_abs)
                    
            if len(r_off_predict) > 0 and len(r_off_true) > 0: 
                numOff += len(r_off_predict)
                for i in range(len(r_off_predict)):
                    tmpList_abs = [abs(x - r_off_predict[i]) for x in r_off_true]  
                    FrameError_off += min(tmpList_abs)
                
            # classification Acc
            totalFrame += len(l_label)
            for i in range(len(l_label)):
                if l_label[i] == finalAnswer_l[i]:
                    classification_positive += 1
            totalFrame += len(r_label)
            for i in range(len(r_label)):
                if r_label[i] == finalAnswer_r[i]:
                    classification_positive += 1
                    
            # Gait Parameters, Zebris에서 제공하는 값. 일단은 안씀
            stepLength_L = parameters.values.tolist()[0][0]
            stancePhase_L = parameters.values.tolist()[0][1]
            
            stepLength_R = parameters.values.tolist()[0][2]
            stancePhase_R = parameters.values.tolist()[0][3]
            
            strideLength = parameters.values.tolist()[0][4]
            Cadence = parameters.values.tolist()[0][5]
            Velocity = parameters.values.tolist()[0][6]

            LstrideLength_true, RstrideLength_true, LstepLength_true, RstepLength_true = PredictParamters(pathCSV, l_contact_true, l_off_true, r_contact_true, r_off_true, safe_start)
            LstrideLength_pred, RstrideLength_pred, LstepLength_pred, RstepLength_pred = PredictParamters(pathCSV, l_contact_predict, l_off_predict, r_contact_predict, r_off_predict, safe_start)
                           
            if LstrideLength_true is not None and LstrideLength_pred is not None:
                Error_strideLength_L.append(LstrideLength_true - LstrideLength_pred)
                count_strideLength_L += 1
            if RstrideLength_true is not None and RstrideLength_pred is not None:
                Error_strideLength_R.append(RstrideLength_true - RstrideLength_pred)
                count_strideLength_R += 1
                
            if LstepLength_true is not None and LstepLength_pred is not None:
                Error_stepLength_L.append(LstepLength_true - LstepLength_pred)
                count_stepLength_L += 1
            if RstepLength_true is not None and RstepLength_pred is not None:
                Error_stepLength_R.append(RstepLength_true - RstepLength_pred)
                count_stepLength_R += 1
                
            numTrial += 2
            
    print('contact negative')
    print(coverage_negative_contact_list)
    print('off negative')
    print(coverage_negative_off_list)
    classificationAcc_final = classification_positive * 100 / totalFrame
    coverageAcc_contact_final = coverage_positive_contact * 100 / numTrial
    coverageAcc_off_final = coverage_positive_off * 100 / numTrial
    FrameError_contact_final = None
    if numContact != 0:
        FrameError_contact_final = FrameError_contact / numContact
    FrameError_off_final = None
    if numOff != 0:
        FrameError_off_final = FrameError_off / numOff
        
    finalError_stepLength_L_mean = 0
    finalError_stepLength_L_std = 0
    if count_stepLength_L > 0:
        finalError_stepLength_L_mean = np.mean(Error_stepLength_L)
        finalError_stepLength_L_std = np.std(Error_stepLength_L)
        
    finalError_stepLength_R_mean = 0
    finalError_stepLength_R_std = 0
    if count_stepLength_R > 0:
        finalError_stepLength_R_mean = np.mean(Error_stepLength_R)
        finalError_stepLength_R_std = np.std(Error_stepLength_R)
        
    finalError_strideLength_L_mean = 0
    finalError_strideLength_L_std = 0
    if count_strideLength_L > 0:
        finalError_strideLength_L_mean = np.mean(Error_strideLength_L)
        finalError_strideLength_L_std = np.std(Error_strideLength_L)
        
    finalError_strideLength_R_mean = 0
    finalError_strideLength_R_std = 0
    if count_strideLength_R > 0:
        finalError_strideLength_R_mean = np.mean(Error_strideLength_R)
        finalError_strideLength_R_std = np.std(Error_strideLength_R)
        
    f = open("testResult//_Result.txt", 'w')
    tmp = f'classificationAcc_final : {classificationAcc_final:.3f} \ncoverageAcc_contact_final : {coverageAcc_contact_final:.3f} \
    \ncoverageAcc_off_final : {coverageAcc_off_final:.3f} \nFrameError_contact_final : {FrameError_contact_final:.3f} \
    \nFrameError_off_final : {FrameError_off_final:.3f} \nfinalError_stepLength_L_mean(cm) : {finalError_stepLength_L_mean:.3f} \
    \nfinalError_stepLength_L_std(cm) : {finalError_stepLength_L_std:.3f} \nfinalError_stepLength_R_mean(cm) : {finalError_stepLength_R_mean:.3f} \
    \nfinalError_stepLength_R_std(cm) : {finalError_stepLength_R_std:.3f} \nfinalError_strideLength_L_mean(cm) : {finalError_strideLength_L_mean:.3f} \
    \nfinalError_strideLength_L_std(cm) : {finalError_strideLength_L_std:.3f} \nfinalError_strideLength_R_mean(cm) : {finalError_strideLength_R_mean:.3f} \
    \nfinalError_strideLength_R_std(cm) : {finalError_strideLength_R_std:.3f} \nInputType : {inputType} \nstopped_epoch : {stopped_epoch}'
    f.write(str(tmp))
    f.close

    return classificationAcc_final, coverageAcc_contact_final, coverageAcc_off_final, FrameError_contact_final, FrameError_off_final

## InputType 
#0 : (angle_RHip, angle_Rknee, angle_LHip, angle_Lknee, AngkleDistance) \
#1 : (angle_RHip, angle_Rknee, AngkleDistance) \
#2 : (angle_Rknee, angle_Lknee, AngkleDistance) \
#3 : (RHip, RKnee, LHip, LKnee, AngkleDistance) \
#4 : (angle_RHip, angle_Rknee, angle_LHip, angle_Lknee) \
#5 : (angle_RHip, angle_Rknee, angle_LHip, angle_Lknee, AngkleDistance, Head_z)

In [19]:
inputType = 0
inputDims = 5
root = 'Dataset_old'
dataset = CustomDataset(root = root, numFrame = 10, drawing = False)

dataset_size = len(dataset)
train_size = int(0.7 * dataset_size)
test_size = dataset_size - train_size
train_dataset, test_dataset = random_split(dataset,[train_size, test_size])


train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)



Dataset_old/LJH
Dataset_old/KGJ
Dataset_old/NGR
Dataset_old/CIY
Dataset_old/PKS
Dataset_old/KYW
Dataset_old/PBD
Dataset_old/YGP
Dataset_old/YGD
Dataset_old/KYK
Dataset_old/BGS
Dataset_old/LBN
Dataset_old/JSM
Dataset_old/LSH
Dataset_old/LHS
Dataset_old/SKY
Dataset_old/CJJ
Dataset_old/RGH
Dataset_old/JBB
Dataset_old/PSCD
Dataset_old/KCS
Dataset_old/KSJ
Dataset_old/HJJ
Dataset_old/LJO
input shape :  (84862, 10, 5)
output shape :  (84862,)
Finish !


In [20]:
# 학습에 사용할 CPU나 GPU 장치를 얻습니다.
device = "cuda" if torch.cuda.is_available() else "cpu"
#print("Using {} device".format(device))
device = "cpu"

class tcnPhase(nn.Module):
    def __init__(self):
        super(tcnPhase, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=inputDims, out_channels=3, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(3, 3, 3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(3, 3, 3, stride=1, padding=1)
        
        self.input_size = 3
        self.hidden_size = 3
        self.num_layers = 1 
        self.lstm = nn.LSTM(input_size=self.input_size, hidden_size=self.hidden_size, num_layers=self.num_layers, batch_first=True, bidirectional=False)
        
        self.fc = nn.Linear(3, 1)
        
    def forward(self, x, h0, c0):
        #x = torch.swapaxes(x, 1, 2)
        x = torch.transpose(x, 1, 2)
        x = self.conv1(x)
        x = torch.sigmoid(x)
        x = self.conv2(x)
        x = torch.sigmoid(x)
        x = self.conv3(x)
        x = torch.sigmoid(x)
        #x = torch.swapaxes(x, 1, 2)
        x = torch.transpose(x, 1, 2)
        #c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).requires_grad_()
        out, (hn, cn) = self.lstm(x, (h0, c0))
        out = out[:,-1,:]
        
        x = self.fc(out)
        x = torch.sigmoid(x)
        #return out, (hn, cn)
        return x

model = tcnPhase().to(device)
"""

PATH = 'Mymodel_datasetOld'
model.load_state_dict(torch.load(PATH, map_location="cuda:0"))
model.to(device)
"""

print(model)

tcnPhase(
  (conv1): Conv1d(5, 3, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv2): Conv1d(3, 3, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv3): Conv1d(3, 3, kernel_size=(3,), stride=(1,), padding=(1,))
  (lstm): LSTM(3, 3, batch_first=True)
  (fc): Linear(in_features=3, out_features=1, bias=True)
)


In [21]:
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

hidden_size = 3
proj_size = 1
num_layers = 1

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train(True)
    for batch, (X, y) in enumerate(dataloader):
        model.zero_grad()
        X, y = X.to(device), y.to(device)
        h0 = torch.zeros(1, X.size(0), hidden_size).to(device)
        c0 = torch.zeros(1, X.size(0), hidden_size).to(device)
        pred = model(X, h0.detach(), c0.detach())
        loss = loss_fn(pred, y)
        
        # 역전파
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        """
        if batch % 400 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
        """
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            h0 = torch.zeros(1, X.size(0), hidden_size).to(device)
            c0 = torch.zeros(1, X.size(0), hidden_size).to(device)
            pred = model(X, h0.detach(), c0.detach())
            test_loss += loss_fn(pred, y).item()
            #correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    #correct /= size
    #print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    print(f"Avg loss: {test_loss:>8f} \n")
    return test_loss

In [None]:
EPOCHS = 10000
patience = 10
stopped_epoch = 0
best = np.Inf
wait = 0
PATH = 'Mymodel'
    
for t in range(EPOCHS):
    clear_output(wait=True)
    #if t%50 == 0:
    #    clear_output(wait=True)
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    testLoss = test(test_dataloader, model, loss_fn)
    
    if testLoss < best:
        best = testLoss
        torch.save(model.state_dict(), PATH)
        wait = 0
    else:
        wait += 1
        if wait >= patience:
            stopped_epoch = t
            device = torch.device("cuda")
            model.load_state_dict(torch.load(PATH, map_location="cuda:0"))
            model.to(device)
            break
    
print("Done!")

Epoch 109
-------------------------------


In [None]:
classificationAcc_final, coverageAcc_contact_final, coverageAcc_off_final, FrameError_contact_final, FrameError_off_final = getResults(root=root, numFrame=10, drawing=1)
print('classificationAcc_final : ', classificationAcc_final, ' coverageAcc_contact_final : ', coverageAcc_contact_final, ' coverageAcc_off_final : ', coverageAcc_off_final, ' FrameError_contact_final : ', FrameError_contact_final, ' FrameError_off_final : ', FrameError_off_final)