# This.ipynb file can convert the UCI dataset into training and test sample sets in .npz format
1. Columns 1-4 are sEMG data preprocessed with z-score normalization: biceps femoris (BF), rectus femoris (RF), vastus medialis (VM), and semitendinosus (SEM), and column 5 is the knee angle in the sagittal plane.
2. The use of the UCI dataset includes the Intra-Subject scenario and the Inter-Subject scenario, which the latter in turn contains three different tasks: 'N→N', 'A→A' and 'N→A'.

In [1]:
import os
# If you are using the web version of Juypter Notebook, you will need to add the current directory to the path in the sys.path list
# import sys
# path = os.path.join(os.getcwd())
# sys.path.append(path) 
import math
import numpy as np
import pandas as pd
from utils.tools import make_dir
import warnings
warnings.filterwarnings("ignore")

In [2]:
## withTransfer=False: the Intra-Subject scenario
#  Return: Target domain subject sample set, label 1 is the motion class, label 2 is the knee joint angle of the upcoming Pl ms.
## withTransfer=True: the Inter-Subject scenario.
#  Return: Source and target domain subject sample sets, label 1 is the motion class, label 2 is the knee joint angle of the upcoming Pl ms.
#  Please refer to our paper for the definition of Pl and the definition of three tasks of 'N→N', 'A→A' and 'N→A'.

class GetSourceTargetDataset:
    def __init__(self, targetSubject, window_length, step, pl_length, withTransfer, task=None, save_flag=False):
        self.withTransfer = withTransfer
        if self.withTransfer:
            self.task = task #'N-N','A-A','N-A'
        self.targetSubject = targetSubject # Target domain subject
        self.window_length = window_length
        self.PredictLength = pl_length # The knee joint angle of the upcoming Pl ms.
        self.Length = self.window_length+self.PredictLength
        self.step = step
        self.motions = ['walking','sitting','standing'] # All motion classes
        self.sub_type = ['N','A'] #  Type of subjects
        self.sub_num = 11 # Number of subjects of the same type
        self.channel = 4 # Number of EMG channels
        self.save_flag = save_flag # Whether to save the sample set

    def return_source_target_sub(self):
        N_list = [''.join([str(i+1),'N']) for i in range(self.sub_num)]
        A_list = [''.join([str(i+1),'A']) for i in range(self.sub_num)]
        if self.task=='N-N':
            temp_list = N_list.copy()
            temp_list.remove(self.targetSubject)
            souSubjects=temp_list
        elif self.task=='A-A':
            temp_list = A_list.copy()
            temp_list.remove(self.targetSubject)
            souSubjects=temp_list
        elif self.task=='N-A':
            souSubjects = N_list
        else:
            raise ValueError('task error!')
        
        return souSubjects

    def get_different_rawdata(self,subject):
        Length, Step, motions, channel = self.Length, self.step, self.motions, self.channel
        Data = []
        MotionLabel = []
        for k in range(len(motions)):
            label = k
            temp0, temp1, temp2 = [], [], []
            file= os.path.join(os.getcwd(), 'rawData', ''.join([subject, motions[k],'.csv']))
            #print(file)
            x = pd.read_csv(file, header=None)
            a=np.array(x.values)
            a[np.isnan(a)] = 0
            a[np.isinf(a)] = 0
            temp0.extend(a)
            length = math.floor((np.array(temp0).shape[0]-Length)/Step)
            temp0=np.array(temp0)
            for j in range(length):
                subSample = temp0[Step*j:(Length+Step*j),:]
                temp1.append(subSample)
                temp2.append(label)
            Data.extend(temp1)
            MotionLabel.extend(temp2)
        Data = np.array(Data)
        MotionLabel = np.array(MotionLabel)
        print('Data.shape: ', Data.shape, ', MotionLabel.shape: ', MotionLabel.shape)
        
        return Data, MotionLabel

    def build(self):
        ## Get the target domain data
        tarSubject = self.targetSubject
        save_flag = self.save_flag
        print('tarSubject: ', tarSubject)
        targetData, targetMotionLabel = self.get_different_rawdata(tarSubject)
        if not self.withTransfer:
            if save_flag:
                target_save_dir=os.path.join(os.getcwd(), 'trainData', 'Intra-Subject')
                make_dir(target_save_dir)
                target_saveName= os.path.join(target_save_dir, ''.join([tarSubject, 'targetTrainData.npz']))
                with open(target_saveName, 'wb') as f:
                    np.savez(f, data=targetData,motionLabel=targetMotionLabel)
            print('targetData: ',targetData.shape, ', targetMotionLabel.shape: ', targetMotionLabel.shape)
        else:
            ## Get the source domain data
            sourceSubjects = self.return_source_target_sub()
            print('sourceSubjects: ', sourceSubjects)
            sourceData, sourceMotionLabel = [], []
            for i in range(len(sourceSubjects)):
                sourceSubject=sourceSubjects[i]
                print('souSubject: ',sourceSubject)
                souData,souMotionLabel=self.get_different_rawdata(sourceSubject)
                sourceData.extend(souData)
                sourceMotionLabel.extend(souMotionLabel)
            sourceData,sourceMotionLabel=np.array(sourceData),np.array(sourceMotionLabel)
            if save_flag:
                if self.task=='N-A':
                    target_save_dir=os.path.join(os.getcwd(), 'trainData', 'Inter-Subject', self.task, 'targetSubjects')
                    make_dir(target_save_dir)
                    source_save_dir=os.path.join(os.getcwd(), 'trainData', 'Inter-Subject', self.task, 'sourceSubjects')
                    make_dir(source_save_dir)
                    target_saveName = os.path.join(target_save_dir, ''.join([tarSubject, 'targetTrainData.npz']))
                    with open(target_saveName, 'wb') as f:
                        np.savez(f, data=targetData, motionLabel=targetMotionLabel)
                    source_saveName= os.path.join(source_save_dir, 'sourceTrainData.npz')
                    if not os.path.isfile(source_saveName):
                        with open(source_saveName, 'wb') as f:
                            np.savez(f, data=sourceData,motionLabel=sourceMotionLabel)
                else:
                    target_save_dir=os.path.join(os.getcwd(), 'trainData', 'Inter-Subject', self.task, 'targetSubjects')
                    make_dir(target_save_dir)
                    target_saveName = os.path.join(target_save_dir, ''.join([tarSubject, 'targetTrainData.npz']))
                    with open(target_saveName, 'wb') as f:
                        np.savez(f, data=targetData,motionLabel=targetMotionLabel)
                    source_save_dir=os.path.join(os.getcwd(),'trainData', 'Inter-Subject', self.task,'sourceSubjects')
                    make_dir(source_save_dir)
                    source_saveName= os.path.join(source_save_dir,''.join([tarSubject, 'sourceTrainData.npz']))
                    with open(source_saveName, 'wb') as f:
                        np.savez(f, data=sourceData, motionLabel=sourceMotionLabel)
            print('targetData.shape: ', targetData.shape, ', targetMotionLabel.shape: ', targetMotionLabel.shape)
            print('sourceData.shape: ', sourceData.shape, ', sourceMotionLabel.shape: ', sourceMotionLabel.shape)

In [3]:
# Parameter setting
subject_numer = 11
window_length, pl_length = 256, 64
step_length = int(0.75*window_length)
Nlist = [''.join([str(i+1),'N']) for i in range(subject_numer)]
Alist = [''.join([str(i+1),'A']) for i in range(subject_numer)]
all_sub_lists = Nlist+Alist

# Intra-Subject sample set generation
for i in range(len(all_sub_lists)):
    targetSubject = all_sub_lists[i]
    print('****'*6 + 'targetSubject: '+ targetSubject +'****'*6)
    GetSourceTargetDataset(targetSubject, window_length, step_length, pl_length, withTransfer=False, task=None, save_flag=True).build()
    
# Inter-Subject sample set generation
tasks=['N-N', 'A-A', 'N-A']
for k, task in enumerate(tasks):
    if task=='N-N':
        tar_lists = Nlist
    else:
        tar_lists = Alist
    for j, targetSubject in enumerate(tar_lists):
        print('****'*6 + 'task: '+ task + ', targetSubject: '+ targetSubject +'****'*6)
        GetSourceTargetDataset(targetSubject, window_length, step_length, pl_length, withTransfer=True, task=task, save_flag=True).build()