In [53]:
# This class allows to produce file output.csv file containing 7 features
# for 8 channels of EMG signal with timestamps. Format of output data is
#
#     t1,    MAV1, ..., CHP1,    MAV2, ..., CHP2,    ...,    MAV8, ..., CHP8,
#     t2,    MAV1, ..., CHP1,    MAV2, ..., CHP2,    ...,    MAV8, ..., CHP8,
#     ..,    ...............,    ...............,    ...,    ...............,
#     ..,    ...............,    ...............,    ...,    ...............,
#     ..,    ...............,    ...............,    ...,    ...............,
#     tN,    MAV1, ..., CHP1,    MAV2, ..., CHP2,    ...,    MAV8, ..., CHP8,
#
# (no white space included). Input files should have the following format:
#
#     EMG0;    EMG1;    EMG2;    ...;    EMG7;    CLASS0;
#     EMG0;    EMG1;    EMG2;    ...;    EMG7;    CLASS1;
#     ....;    ....;    ....;    ...;    ....;    ......;
#     ....;    ....;    ....;    ...;    ....;    ......;
#     ....;    ....;    ....;    ...;    ....;    ......;
#     EMG0;    EMG1;    EMG2;    ...;    EMG7;    CLASSN;
#


class converter:
    # here H is a frequency,
    #      l is a number of samples in segment (~20) 
    #      L is a number of segments in window (~10),
    # i.e. window consists of L * l samples and takes t = L * l / H [seconds]
    def __init__(self, l, L, H, *args):
        self.l = l
        self.L = L
        self.H = H
        
        # save addresses of files
        self.addresses = []
        for el in args:
            self.addresses.append(el)
    
    
    # process 1 file
    def process_file(self, address):
        # read data from file
        with open(address, "r") as f:
            data = f.read()
            
        # convert to list of lists
        data = data.split("\n")
        for i in range(len(data)):
            data[i] = data[i].split(";")
            data[i].pop()
            for j in range(len(data[i])):
                data[i][j] = float(data[i][j])
            
        # calculate number of rows in output
        windows_num = int (int(len(data)) / int(self.l) - self.L + 1) 

        # init windows
        w0, w1, w2, w3, w4, w5, w6, w7 = [], [], [], [], [], [], [], []

        # open output file
        with open("features_" + address, "w") as f:
            
            # produce output
            for i in range(windows_num):
                for j in range(i * self.l, (i + self.L) * self.l):
                    # get window for each channel
                    w0.append(data[j][0])
                    w1.append(data[j][1])
                    w2.append(data[j][2])
                    w3.append(data[j][3])
                    w4.append(data[j][4])
                    w5.append(data[j][5])
                    w6.append(data[j][6])
                    w7.append(data[j][7])
                # get features for each channel
                output0 = self.process_1_channel(w0)
                output1 = self.process_1_channel(w1)
                output2 = self.process_1_channel(w2)
                output3 = self.process_1_channel(w3)
                output4 = self.process_1_channel(w4)
                output5 = self.process_1_channel(w5)
                output6 = self.process_1_channel(w6)
                output7 = self.process_1_channel(w7)
                
                # write data to file
                for el in output0:
                    f.write(str(el))
                    f.write(",")
                for el in output1:
                    f.write(str(el))
                    f.write(",")
                for el in output2:
                    f.write(str(el))
                    f.write(",")
                for el in output3:
                    f.write(str(el))
                    f.write(",")
                for el in output4:
                    f.write(str(el))
                    f.write(",")
                for el in output5:
                    f.write(str(el))
                    f.write(",")
                for el in output6:
                    f.write(str(el))
                    f.write(",")
                for el in output7:
                    f.write(str(el))
                    f.write(",")
                    
                # move to the next line
                f.write("\n")

                # clean windows
                w0, w1, w2, w3, w4, w5, w6, w7 = [], [], [], [], [], [], [], []

            
    # this method gets a list "w" consisting of l * L samples
    # from float[-1, 1] for 1 EMG channel and returns  list "f" 
    # containing features corresponding to it; the order is 
    # [MAV, RMS, SSC, WL, AHP, MHP, CHP]
    def process_1_channel(self, w):
        # normilize input data
        for i in range(len(w)):
            w[i] = float(w[i])/128

        # calculate features
        f = []
        f.append(self.get_MAV(w))
        f.append(self.get_RMS(w))
        f.append(self.get_SSC(w))
        f.append(self.get_WL(w))
        f.append(self.get_AHP(w))
        f.append(self.get_MHP(w))
        f.append(self.get_CHP(w))
        
        return f
        
        
    # this method gets a list "w" consisting of l * L samples from
    # float[-1, 1] for 1 EMG channel and returns their mean value
    def get_MAV(self, w):
        MAV = 0
        return MAV
    
    
    # this method gets a list "w" consisting of l * L samples from
    # float[-1, 1] for 1 EMG channel and returns their mean power
    def get_RMS(self, w):
        RMS = 1
        return RMS
    
    
    # this method gets a list "w" consisting of l * L samples from
    # float[-1, 1] for 1 EMG channel and returns frequency parameter
    def get_SSC(self, w):
        SSC = 2
        return SSC
    
    
    # this method gets a list "w" consisting of l * L samples from
    # float[-1, 1] for 1 EMG channel and returns cumulative wavelength
    def get_WL(self, w):
        WL = 3
        return WL
    
    
    # this method gets a list "w" consisting of l * L samples from
    # float[-1, 1] for 1 EMG channel and returns parameter AHP
    def get_AHP(self, w):
        AHP = 4
        return AHP
    
    
    # this method gets a list "w" consisting of l * L samples from
    # float[-1, 1] for 1 EMG channel and returns parameter MHP
    def get_MHP(self, w):
        MHP = 5
        return MHP
    
    
    # this method gets a list "w" consisting of l * L samples from
    # float[-1, 1] for 1 EMG channel and returns parameter CHP
    def get_CHP(self, w):
        CHP = 6
        return CHP

In [55]:
m_converter = converter(5, 3, 200, "example1.txt")

m_converter.process_file("example2.txt")