In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import random
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
import math
import scipy.stats as stats
from sklearn.preprocessing import MinMaxScaler
import time

In [None]:
DEVICE=torch.device("cuda" if torch.cuda.is_available() else "cpu")
"cuda" if torch.cuda.is_available() else "cpu"

In [3]:
class MyDataset(Dataset):
    def __init__(self, data_x, data_y):
        self.data_x = torch.tensor(data_x, dtype=torch.float32)
        self.data_y = torch.tensor(data_y, dtype=torch.float32)

    def __len__(self):
        return len(self.data_x)

    def __getitem__(self, idx):
        return self.data_x[idx], self.data_y[idx]

In [4]:
class CNN_LSTM(nn.Module):
    def __init__(self):
        super(CNN_LSTM, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=9, out_channels=32, kernel_size=11,padding=5)
        self.bn1 = nn.BatchNorm1d(32)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=32, kernel_size=11,padding=5)
        self.bn2 = nn.BatchNorm1d(32)
        self.pool = nn.MaxPool1d(3)
        self.lstm1 = nn.LSTM(input_size=32, hidden_size=64, batch_first=True)
        self.dropout1 = nn.Dropout(0.5)
        self.lstm2 = nn.LSTM(input_size=64, hidden_size=64, batch_first=True)
        self.dropout2 = nn.Dropout(0.2)
        self.fc = nn.Linear(64, 1)

    def forward(self, x):
        x = x.permute(0, 2, 1)  
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.pool(x)
        x = x.permute(0, 2, 1)  
        x, _ = self.lstm1(x)
        x = self.dropout1(x)
        x, _ = self.lstm2(x)
        x = self.dropout2(x)
        x = self.fc(x[:, -1, :])
        return x


In [None]:
"亂數種子"
np.random.seed(20)

def mkdir(path):
    #判斷目錄是否存在，存在：True、不存在：False
    folder = os.path.exists(path)
    if not folder:
        os.makedirs(path)

filename = []
#bad_dataset = [1,2,10,30,49,52,66,83,97,101,114,117,121] #<=137是清福的長輩、<=67是65歲以上長輩
bad_dataset = [1,2,10,30,49,52,66,83,94,97,101,107,114,117,121,124] #<=137是清福的長輩、<=67是65歲以上長輩、扣掉離群資料
y_data = pd.read_csv('ScaleScore/BBS.csv')

"取csv檔並排除有問題的受測者"
DatasetPath = "BalanceDataSet_150/"

filepath = os.listdir(DatasetPath)
for files in filepath:
    if files.endswith(".csv"):  #只留副檔名為.csv檔
        #if int(files[0:3]) < 137 and int(files[0:3]) > 65 and not (int(files[0:3]) in bad_dataset):    #只使用長輩資料(61位)
        if int(files[0:3]) < 137 and not (int(files[0:3]) in bad_dataset):  #只使用年輕人與舊庄長輩資料(120位)
            filename.append(files)

"從IMU 1 跑到 IMU 7"
for sensor in range(1,8):       #設定要讀哪一顆IMU的資料(1~7)
    
    subject_gait_data_x = []      #存每個受測者以切完的步態資料
    subject_gait_data_y = []      #存每個受測者的分數
    
    for files in filename:
        gait_x = []
        gait_y = []
        data = pd.read_csv(DatasetPath + files)
        print(DatasetPath + files)
               
        "刪除夾角資料"
        data_col_name2 = data.columns 
        for col_name2 in data_col_name2:
            if col_name2[0:5] == "Right" or col_name2[0:4] == "Left":
                data = data.drop(col_name2, axis = 1)
          
        "刪除其他sensor資料"   
        sensor_id_A = "BID" + str(sensor)    
        data_col_name3 = data.columns 
        for col_name3 in data_col_name3:            
            if col_name3[0:3] == "BID" and col_name3[0:4] != sensor_id_A:  #指定要留的sensor資料，其他刪除(單顆)
                data = data.drop(col_name3, axis = 1)
        
        row , colume = data.shape        
        
        "取得量表分數"
        scale_score = y_data.loc[y_data['ID'] == int(files[0:3]) ,'score'].values
        print(scale_score)
        
        "找出每個資料的頭跟尾"
        task_ID = 'N'
        number = 1
        task_StartEnd = pd.DataFrame(columns=["task","start","end"])
        f=0
        i=0
        while i<row: 
            if data.iloc[i,1] == task_ID and f == 0:
                f=1
                task_StartEnd.loc[number , "task"] = task_ID
                task_StartEnd.loc[number , "start"] = i          
            elif f == 1 and data.iloc[i,1] !=task_ID:
                f=0
                task_StartEnd.loc[number , "end"] = i-1
                number += 1
                break
            i+=1
        if f == 1:
            task_StartEnd.loc[number , "end"] = i-1  
        
        #print(task_StartEnd)

        "設定window size、前後筆資料重複率"
        window_size = 150       #sample rate = 50Hz，取一秒的資料
        Repeat_ratio = int(window_size * ((100-50)/100)) 
        
        "切割訓練資料"
        task_ID = 'N'
        
        start = task_StartEnd.at[1 , 'start']
        end = task_StartEnd.at[1 , 'end'] + 1
        
        
        total = end - start
        training_set = data.iloc[start:end,5:colume].values #取7顆IMU的9軸資料，並且依照動作T、U、V、W、X、Y動作分段取
        
        "Z-score Standardization"
        training_set = stats.zscore(training_set)

        if scale_score !=56 :
            training_set = np.tile(training_set,(4,1))
        
        
        for j in range(0 , training_set.shape[0] , Repeat_ratio):
            if j + window_size <= training_set.shape[0]:
                gait_x.append(training_set[j:j + window_size])
                gait_y.append(scale_score)                   #切好的資料lable

        gait_x = np.array(gait_x)
        gait_y = np.array(gait_y)      
        subject_gait_data_x.append(gait_x)
        subject_gait_data_y.append(scale_score)
       
    
    "打亂受測者的順序"
    r = list(zip(subject_gait_data_x,subject_gait_data_y))
    np.random.shuffle(r)
    subject_gait_data_x,subject_gait_data_y = zip(*r)
    
    "量表分數 normalization"
    subject_gait_data_y = np.array(subject_gait_data_y)     #從別的資料型態轉成array
    BBS = [[0],[56]]                                        #BBS的分數範圍
    BBS_array = np.array(BBS)
    scaler = MinMaxScaler(feature_range=(0,1)).fit(BBS_array)
    subject_gait_data_y = scaler.transform(subject_gait_data_y)

    "CNN-LSTM model"
    import time
    savepath = 'save/'
    if not os.path.isdir(savepath):
        os.mkdir(savepath)
    savepath = savepath + 'S2S(TaskN)/'
    if not os.path.isdir(savepath):
        os.mkdir(savepath)
    
    n=0
    epoch_range = [500]                             #設定epoch實驗參數
    batch_size_range = [64]                       #設定batch size實驗參數
    for epochs in epoch_range:                    #實驗epoch
        for batch_size in batch_size_range:       #實驗batch size
            n=n+1    
            localtime = time.localtime(time.time())
            savepath = 'save/S2S(TaskN)/' +str(localtime[0])+str(localtime[1]).zfill(2)+str(localtime[2]).zfill(2)+'v'+str(n)+'_sensor'+str(sensor)+'/'
            if not os.path.isdir(savepath):
                os.mkdir(savepath)        
             
            val_times = 5       # k fold cross validation
            MAE_average = 0
            times = 0
            
            subject_size = len(subject_gait_data_x)
            for i in range(val_times):
                times += 1
                x_test_subject = subject_gait_data_x[math.floor(i/val_times*subject_size) : math.floor((i+1)/val_times*subject_size)]
                y_test_subject = subject_gait_data_y[math.floor(i/val_times*subject_size) : math.floor((i+1)/val_times*subject_size)]
                
                if i == 0:
                    x_train_subject = subject_gait_data_x[math.floor((i+1)/val_times*subject_size) : subject_size]
                    y_train_subject = subject_gait_data_y[math.floor((i+1)/val_times*subject_size) : subject_size]
                elif i == val_times-1:
                    x_train_subject = subject_gait_data_x[0 : math.floor(i/val_times*subject_size)]
                    y_train_subject = subject_gait_data_y[0 : math.floor(i/val_times*subject_size)]
                else:
                    x_train_subject_1 = subject_gait_data_x[0 : math.floor(i/val_times*subject_size)]
                    x_train_subject_2 = subject_gait_data_x[math.floor((i+1)/val_times*subject_size) : subject_size]
                    x_train_subject = x_train_subject_1 + x_train_subject_2
    
                    y_train_subject_1 = subject_gait_data_y[0 : math.floor(i/val_times*subject_size)]
                    y_train_subject_2 = subject_gait_data_y[math.floor((i+1)/val_times*subject_size) : subject_size]
                    y_train_subject = np.concatenate([y_train_subject_1 , y_train_subject_2])                
    
                x_train = []
                y_train = []
                for j in range(len(x_train_subject)):
                    for k in range(len(x_train_subject[j])):    
                        x_train.append(x_train_subject[j][k])
                        y_train.append(y_train_subject[j])               
                x_train = np.array(x_train)
                y_train = np.array(y_train)
                
                x_test = []
                y_test = []
                for j in range(len(x_test_subject)):
                    for k in range(len(x_test_subject[j])):    
                        x_test.append(x_test_subject[j][k])
                        y_test.append(y_test_subject[j])
                x_test = np.array(x_test)
                y_test = np.array(y_test)
                
                "打亂訓練資料"
                index = np.random.permutation(x_train.shape[0])
                x_train = x_train[index]
                y_train = y_train[index]
                
                "打亂測試資料"
                index = np.random.permutation(x_test.shape[0])
                x_test = x_test[index]
                y_test = y_test[index]
            
                print(x_train.shape , x_test.shape , y_train.shape , y_test.shape)
                train_dataset = MyDataset(x_train, y_train)
                test_dataset = MyDataset(x_test, y_test)

                train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False)
                test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

                # 訓練部分
                model = CNN_LSTM().to(DEVICE)
                criterion = nn.MSELoss()
                criterion_mae = nn.L1Loss()
                optimizer = optim.Adam(model.parameters(), lr=0.0005)

                for epoch in range(epochs):
                    model.train()
                    for i,data in enumerate(train_loader):
                        x_batch, y_batch = data
                        input=x_batch.to(DEVICE)
                        label=y_batch.to(DEVICE)
                        optimizer.zero_grad()
                        outputs = model(input)
                        loss = criterion(outputs, label)
                        loss.backward()
                        optimizer.step()

                
                with torch.no_grad():
                    model.eval()
                    test_loss = 0.0
                    steps=0
                    for i,data in enumerate(test_loader):
                        x_batch, y_batch = data
                        input=x_batch.to(DEVICE)
                        label=y_batch.to(DEVICE)
                        outputs = model(input)
                        "將量表分數轉換成原來分數"
                        y_test_tran = label.cpu().detach().numpy()
                        y_pred_tran = outputs.cpu().detach().numpy()

                        y_test_tran = scaler.inverse_transform(y_test_tran)
                        y_pred_tran = scaler.inverse_transform(y_pred_tran)
                        y_pred_tran =np.around(y_pred_tran)

                        y_test_tran_tensor = torch.tensor(y_test_tran, dtype=torch.float32).to(DEVICE)
                        y_pred_tran_tensor = torch.tensor(y_pred_tran, dtype=torch.float32).to(DEVICE)
                        loss = criterion_mae(y_pred_tran_tensor, y_test_tran_tensor)
                        test_loss += loss.item()
                        steps+=1
                    test_loss /= len(test_loader)
                    print(f'Test Loss: {test_loss:.4f}')
                    MAE_average = MAE_average + test_loss
            print("BBS MAE:")
            print(MAE_average/val_times)

            f = open(savepath + '實驗結果.txt','a')
            f.write('window size = '+ str(window_size)+'\n')
            f.write('epoch = ' + str(epochs) + '\n')
            f.write('batch size = ' + str(batch_size) + '\n')
            f.write('MAE = ' + str(MAE_average/val_times) + '\n')
            f.close()