In [1]:
import numpy as np
from matplotlib import pyplot as plt
from sklearn.cross_decomposition import PLSRegression
import os.path
import glob
from scipy.io import loadmat
import pandas as pd
from scipy import signal
from scipy.signal import butter, sosfilt
from scipy.interpolate import interp1d
from scipy import signal
from scipy.signal import butter, sosfilt

In [None]:
import numpy as np
import os.path
import glob
import pandas as pd

from io import BytesIO
from sklearn.cross_decomposition import PLSRegression
from sklearn.model_selection import train_test_split
from scipy.io import loadmat
from sklearn.metrics import mean_absolute_error

from matplotlib import pyplot as plt

from scipy.stats import multivariate_normal
from scipy.stats import pearsonr

from scipy.stats import gamma
from scipy.stats import wishart

from sklearn.preprocessing import normalize
from sklearn.preprocessing import StandardScaler

In [None]:
title_font = {'family':'Arial', 'size': 20, 'color':'black', 'weight':'normal',
              'verticalalignment':'bottom'} # Bottom vertical alignment for more space
axis_font = {'family':'Arial', 'size': 16}


In [None]:
path_name_train = 'Data\\20100802S1_ECoG_Motion6'
path_name_test = 'Data\\20100726S1_ECoG_Motion6'

In [None]:
def get_ecog(dir):
    #n_ch - number of channels
    n_ch = len(glob.glob(os.path.join(dir, 'ECoG_ch*.mat')))
    ECoG = []
    for ch in range(1, n_ch + 1):
        ECoGData = loadmat(os.path.join(dir, 'ECoG_ch%d.mat' % ch))
        ECoGData = ECoGData['ECoGData_ch%d' % ch]
        ECoG.append(ECoGData[0])

    ECoG = np.array(ECoG)
    
    Time = loadmat(os.path.join(dir, 'ECoG_time.mat'))
    Time = Time['ECoGTime']

    return ECoG.T, Time.T

In [None]:
def get_ecog_and_motions(dir):
    n_ch = len(glob.glob(os.path.join(dir, 'ECoG_ch*.mat')))
    ECoG = []
    for ch in range(1, n_ch + 1):
        ECoGData = loadmat(os.path.join(dir, 'ECoG_ch%d.mat' % ch))
        ECoGData = ECoGData['ECoGData_ch%d' % ch]
        ECoG.append(ECoGData[0])

    ECoG = np.array(ECoG)

    Motion = loadmat(os.path.join(dir, 'Motion.mat'))
    MotionTime = Motion['MotionTime']
    Motion = Motion['MotionData']
        
    LSHO = Motion[0,:]
    LELB = Motion[1,:]
    LWRI = Motion[2,:]
    RSHO = Motion[3,:]
    RELB = Motion[4,:]
    RWRI = Motion[5,:]
    
    Time = loadmat(os.path.join(dir, 'ECoG_time.mat'))
    Time = Time['ECoGTime']

    return ECoG.T, LSHO[0], LELB[0], LWRI[0], RSHO[0], RELB[0], RWRI[0], Time.T, MotionTime.T

In [None]:
def extract_needed_data(ECoG, LSHO, LELB, LWRI, RSHO, RELB, RWRI, Time, MotionTime):
    
    body_center = (LSHO - RSHO)/2 #It is a static point in this experiment
    motion_left_hand = LWRI - body_center #centering motion
    
    return ECoG, motion_left_hand

In [None]:
def synchronize_interpol(signal_data, motion_data): 
    start = max(signal_data[1, 0],motion_data[1,0])
    end = min(signal_data[-1, 0],motion_data[-1,0])

    #cutting signal and motion, only overlapping time left
    signal_data = signal_data[:,:][(signal_data[:,0]>=start)]
    signal_data = signal_data[:,:][(signal_data[:,0]<=end)]
    motion_data = motion_data[:,:][motion_data[:,0]>= start] 
    motion_data = motion_data[:,:][motion_data[:,0]<= end]
    M = []
    #signal and motion have different time stamps, we need to synchronise them
    #interpolating motion and calculating arm position in moments of "signal time"
    for i in range(1,motion_data.shape[1]):
        interpol = interp1d(motion_data[:,0],motion_data[:,i],kind="cubic")
        x = interpol(signal_data[:,0])
        M.append(x)

    #downsampling in 10 times to get faster calcultions

    ecog_signal = signal_data[::10,1:]
    motion = np.array(M).T[::10,:]
    time = signal_data[::10,0]
    
    
    #self.signal = signal_data[:,1:]
    #self.motion = np.array(M).T[:,:]
    #self.time = signal_data[:,0]

    return ecog_signal, motion, time

In [None]:
#signal filtering (not sure that it works correctly)
def bandpass_filter(ecog_signal, lowcut, highcut, fs = 100, order=7):
    nyq =  fs
    low = lowcut / nyq
    high = highcut / nyq
    sos = signal.butter(order,  (low, high), btype='band',analog=False,output='sos')
    filtered_signal = np.array([sosfilt(sos, ecog_signal[:,i]) for i in range(ecog_signal.shape[1])])

    return filtered_signal.T

In [None]:
 #Generating a scalogram by wavelet transformation 
def scalo(ecog_signal, motion, time, window, freqs,start,end, step = 100): #window in sec,freqs in Hz, step in ms
    #div = 1
    X = ecog_signal[start:end,:]
    div = 10 #downsampling
    window_len = int(((window * 1000 // step) + 2) * step//div)
    scalo = np.empty((X.shape[0]-window_len,X.shape[1],freqs.shape[0],(window * 1000 // step) + 2))
    for i in range(X.shape[1]):
        for j in range(window_len,X.shape[0]):
            scalo[j-window_len,i,:,:] = signal.cwt(data = X[j-window_len:j,i],
                                                    wavelet=signal.morlet,widths = freqs)[:,::step//div] **2
    return scalo, motion[start+window_len:end,:], time[start+window_len:end]
    

In [None]:
def get_mean(ECoG, time):
    mean = []
    intens = 0
    for j in range(len(time)):
        intens_sum = 0
        x, y = 0, 0
        for i in range(64):
            center_x, center_y = centers[i]
            intens = abs(ECoG[j,i])
            intens_sum += intens
            x += center_x * intens
            y += center_y * intens
            
        mean.append([x / intens_sum, y / intens_sum])
        
    return np.array(mean)

In [None]:

def get_disp(mean, ECoG, time):
    disp = []
    for j in range(len(time)):
        mean_x, mean_y = mean[j]
        intens_sum = 0
        disp_x, disp_y = 0, 0
        for i in range(64):
            center_x, center_y = centers[i]
            intens = abs(ECoG[j, i])
            intens_sum += intens
            disp_x += (center_x - mean_x)**2 * intens
            disp_y += (center_y - mean_y)**2 * intens

        disp.append([disp_x / intens_sum, disp_y / intens_sum])
    return np.array(disp)

In [None]:
def get_intens(mean, disp, mv, ECoG, time):
    intens = []
    for j in range(len(time)):
        mean_x, mean_y = mean[j]
        # Adjust our mean to integer points we have in centers
        x = int(round(mean_x))
        y = int(round(mean_y))
        if (x, y) in centers:
            i = centers.index((x,y))
        else:
            if y<17:
                y = y+1
            else:
                y = y-1
            i = centers.index((x,y))
        i = 0;
        
        pred_int = mv[j].pdf(centers[0])
        intens.append(ECoG[j,i]/pred_int)
    return intens

In [None]:
def plotting_gauss_standardized(mv, intens_multiplier):
    mae = []
    for channel in range(len(centers)):
        point = centers[channel]
        rv = []

        for j in range(len(mv)):
                p = mv[j].pdf(point)
                if intens_multiplier == True:
                    rv.append(p*intens[j])
                else:
                    rv.append(p)
                
        mae.append(mean_absolute_error(rv, data_n[:N, channel]))     
        fig, ax1 = plt.subplots(1,1, figsize = (14, 8))
        color = 'tab:green'
        ax1.set_xlabel('time (ms)', **axis_font)
        ax1.set_ylabel('predicted', color=color, **axis_font)
        ax1.plot(time[:N], rv, color=color, alpha = 0.5)
        ax1.tick_params(axis='y', labelcolor=color)

        ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis

        color = 'tab:blue'
        ax2.set_ylabel('true', color=color, **axis_font) 
        if intens_multiplier == True:
            ax2.plot(time[:N], data[:N, channel], color=color, alpha = 0.5)
        else:
            ax2.plot(time[:N], data_n[:N, channel], color=color, alpha = 0.5)    
        ax2.tick_params(axis='y', labelcolor=color)

        fig.tight_layout()  # otherwise the right y-label is slightly clipped
        plt.title('Gauss local model for first 100000 entries \n channel #%i' %channel , **title_font)
        if intens_multiplier == True:
                    plt.savefig(r'C:\Users\User\Documents\diploma_plots\Gauss_intens'+str(channel))
                else:
                    plt.savefig(r'C:\Users\User\Documents\diploma_plots\Gauss_no_intens'+str(channel))
        plt.close()
    return mae

In [None]:
def plotting_gauss_normalized(mv, intens_multiplier):
    mae = []
    for channel in range(len(centers)):
        point = centers[channel]
        rv = []

        for j in range(len(mv_normed)):
                p = mv_normed[j].pdf(point)
                rv.append(p)
        mae.append(mean_absolute_error(rv, data_normed[:N, channel]))     
        fig, ax1 = plt.subplots(1,1, figsize = (14, 8))
        color = 'tab:green'
        ax1.set_xlabel('time (ms)', **axis_font)
        ax1.set_ylabel('predicted', color=color, **axis_font)
        ax1.plot(time[:N], rv, color=color, alpha = 0.5)
        ax1.tick_params(axis='y', labelcolor=color)

        ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis

        color = 'tab:blue'
        ax2.set_ylabel('true', color=color, **axis_font) 
        ax2.plot(time[:N], data_normed[:N, channel], color=color, alpha = 0.5)    
        ax2.tick_params(axis='y', labelcolor=color)

        fig.tight_layout()  # otherwise the right y-label is slightly clipped
        plt.title('Gauss local model for first 100000 entries \n channel #%i' %channel , **title_font)
        plt.savefig(r'C:\Users\User\Documents\diploma_plots\Gauss_normed'+str(channel))
        plt.close()
    return mae

In [None]:
def plotting_gamma():
    mae = []
    for channel in range(64):
        #point = centers[channel]
        g_d = []
        for j in range(len(m)):
            p = wishart.pdf(point, 2, [[m[j, 0], 0], [0, m[j, 1]]])
            g_d.append(p)
        mae.append(mean_absolute_error(g_d, data_n[:N, channel])) 
        fig, ax1 = plt.subplots(1,1, figsize = (14, 5))
        color = 'tab:green'
        ax1.set_xlabel('time (ms)', **axis_font)
        ax1.set_ylabel('predicted', color=color, **axis_font)
        ax1.plot(time[:N], (np.array(g_d)[:N]), color='green', alpha = 0.5)
        #ax1.plot(time[20000:40000], (np.array(g_d)[20000:40000]), color='green', alpha = 0.5)
        ax1.tick_params(axis='y', labelcolor=color)

        ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis

        color = 'tab:blue'
        ax2.set_ylabel('true', color=color, **axis_font) 
        ax2.plot(time[:N], (data_n[:N, channel]), color='blue', alpha = 0.5)
        #ax2.plot(time[20000:40000], (data[20000:40000, channel]), color='blue', alpha = 0.5)
        ax2.tick_params(axis='y', labelcolor=color)

        fig.tight_layout()  # otherwise the right y-label is slightly clipped

        plt.title('Intens local model for first %i entries \n' %N , **title_font)

        plt.show()
    return mae    