In [None]:
"""
automatically calculates annotation (aAnno) from EMG signal from database
adjust third cell for parallel processing
displaycell for analysis and plotting
"""

In [None]:
from os import walk
import matplotlib.transforms
import csv
import ast
import pyedflib
import numpy as np
import matplotlib.pyplot as plt
import math
import scipy
from scipy.signal import oaconvolve
import time
import copy

In [None]:
startfilename = "acq_000058160.edf"
numberOfFiles = 6218
fileoffset = 0 


Annopath = "C:\\Users\\aaron\\OneDrive\\Dokumente\\Uni\\Studienarbeit\\exampleAnnos\\"
MAnnopath = "C:\\Users\\aaron\\OneDrive\\Dokumente\\Uni\\Studienarbeit\\manualAnnos\\"
path = "\\\\vs-grp06.zih.tu-dresden.de\\gl4psgdata\\tsm-retro-lm\\"


In [None]:
Annofilenames = next(walk(Annopath), (None, None, []))[2] 
filenames = next(walk(path), (None, None, []))[2] 

startAtFileNr = filenames.index(startfilename)+ fileoffset
stopAtFileNr = startAtFileNr + numberOfFiles



In [None]:
'''
detector helperfunctions
'''

#merges two LMs that overlap, iterates through signal until no LM overlap
def clean_LM_overlap(sig):
    cSig = []
    if(len(sig) > 2):
        
        
        cleanedOne = False
        
       
        idx = 0
            
        while idx < len(sig)-1:
            if(sig[idx+1][0] <= sig[idx][1] and sig[idx][0] < sig[idx+1][1]):#overlap
                cSig.append([sig[idx][0], sig[idx+1][1]])
                idx += 1
                cleanedOne = True
            else: 
                cSig.append(sig[idx])
            idx += 1
            
        if(cleanedOne):
            cSig = clean_LM_overlap(cSig)
        
       
    return cSig


#moving average with no boundary effect
def moving_average(signal, window):
    dividend = [min((window)/2 + 1 + index, window+1, len(signal)-index+(window)/2) for index, _ in enumerate(signal)]
    return [e/dividend[index] for index, e in enumerate(scipy.signal.oaconvolve(signal, np.ones(window+1), 'same'))]
     

#root mean sqare with no boundary effect
def RMS(data, window):
    dividend = [min((window)/2 + 1 + index, window+1, len(data)-index+(window)/2) for index, _ in enumerate(data)]
    return [np.sqrt(e/dividend[index]) for index, e in enumerate(scipy.signal.oaconvolve(np.power(data,2), np.ones(window+1), 'same'))]





#plots two annotaitonsignals with shape [[starLM1,endLM1],[starLM2,endLM2],...]
def plotAnnotation(signals, labels,  lightOff, lightOn, plotwindow):
    fig = plt.figure(figsize=(16,2))
    plt.rcParams["figure.autolayout"] = True
    plt.margins(x=0.1, y=0.1)
    for sidx, signal in enumerate(signals):

        annoBuffer = np.zeros(lightOff, dtype=int)
        
        for i in range(len(signal)):
            annoBuffer = np.append(annoBuffer, np.zeros(signal[i][0] - len(annoBuffer), dtype=int))
            annoBuffer = np.append(annoBuffer, np.ones(signal[i][1] - len(annoBuffer), dtype=int))
        annoBuffer = np.append(annoBuffer, np.zeros(lightOn - len(annoBuffer)))
        annoBuffer = ["kein LM" if annoSample == 0 else "LM" for annoSample in annoBuffer]
        
        #offset of signals for better view
        offset = matplotlib.transforms.Affine2D().translate(0, 3*sidx)
        
        shadow_transform = fig.axes[0].transData + offset
        plt.plot(annoBuffer[plotwindow[0]:plotwindow[1]], label = labels[sidx], transform = shadow_transform)
        
    plt.xticks(np.arange(0,(plotwindow[1]-plotwindow[0])*11/10, (plotwindow[1]-plotwindow[0])/10))
    plt.xlabel("Zeit in ms")
    plt.legend(loc = "upper right")

def calculateNoiseFloor(absSignal, window, U, L):
    noiseFloor = moving_average(absSignal, window)
    alpha = [n*math.log(n+1)+U if n <= 50 else math.inf for idx,n in enumerate(noiseFloor)]
    beta = [a*L/U for idx,a in enumerate(alpha)]
    psi = [a*(1+L/U)/2 for idx,a in enumerate(alpha)]
    return noiseFloor, alpha, beta, psi

#finds candidates for LM 
def calculateLMTimes(RMS_signal, noiseFloor, alpha, beta, downtime):
    LM_candidateTimes = []
    
    n = 0
    while n < len(RMS_signal) - downtime:
        #startcondition signal over alpha (upper threshold)
        if(RMS_signal[n] > noiseFloor[n] + alpha[n]):
            LM_candidateTimes.append([n])
            #find next index at which signal is downtime under beta (lower threshold)
            while len(LM_candidateTimes[-1]) != 2:
                n += 1
                if(all(e < noiseFloor[index] + beta[index] for index, e in enumerate(RMS_signal[n:n+downtime]))):
                    LM_candidateTimes[-1].append(n-1)
        n += 1
    #if signal ends on a positive
    if(len(LM_candidateTimes[-1]) == 1):
        remove(LM_candidateTimes[-1])
    return LM_candidateTimes


#merges on EMG level by adding both samplevalues
def mergeEMG(leg1, leg2):
    mergedEMG = []
    for i in range(len(leg1)):
        mergedEMG.append(leg1[i] + leg2[i])
    return mergedEMG      
        




#post processing of the annotationsignal 
def clean_LM_timerequirements(signal, LM_minlen, LM_maxlen, bridgetime, mergetime):
    #bridges all LM that are seperated by 0.1s or less
    bridgedSignal = [signal[0][:]]
    b = 1
    while b < len(signal):
        if(signal[b][0] - bridgedSignal[-1][1] <= bridgetime):
            bridgedSignal[-1][1] = signal[b][1]
        else: bridgedSignal.append(signal[b][:])
        b += 1
        
    #rejects all LM that are shorter than 0.75s
    minSignal = []
    for i in range(len(bridgedSignal)):
        if(bridgedSignal[i][1]-bridgedSignal[i][0] >= LM_minlen):
            minSignal.append(bridgedSignal[i][:])
    
    if(len(minSignal) == 0):
        return []
    
    #merges all LM that are seperated by 2s or less
    mergedSignal = [minSignal[0][:]]
    m = 1
    while m < len(minSignal):
        if(minSignal[m][0] - mergedSignal[-1][1] <= mergetime):
            mergedSignal[-1][1] = minSignal[m][1]
        else: mergedSignal.append(minSignal[m][:])
        m += 1
    
    #rejects all LM that are longer than 10s
    cleanSignal = []
    for c in range(len(mergedSignal)):
        if(mergedSignal[c][1]-mergedSignal[c][0] <= LM_maxlen):
            cleanSignal.append(mergedSignal[c][:])
    
    return cleanSignal


#postprocessing reject LM that dont dont match certain area under absolute Signal 
def clean_AUC(anno, absSignal, alpha):
    cleanedAnno = []
    for LM in anno:
        if(np.trapz(absSignal[LM[0]:LM[1]]) >= alpha[LM[0]]/2):
            cleanedAnno.append(LM)
    return cleanedAnno
    
    
#cant plot overlapping annotations
def clean_for_plot(signal):
    return [LM for LM in clean_LM_overlap(signal) if LM[0] > lightOff and LM[1] < lightOn]


In [None]:
totalstarttime = time.time()

#for filename in [startfilename]:
for filename in [filename for filename in filenames[startAtFileNr:stopAtFileNr] if filename not in [path[0:-4]for path in Annofilenames]]:
    filestarttime = time.time()
    print("working on file " + filename)
    print("Nr: "+ str(filenames.index(filename)) + " noch " +str(stopAtFileNr- filenames.index(filename)))
    saveAnnodir = 'C:\\Users\\aaron\\OneDrive\\Dokumente\\Uni\\Studienarbeit\\exampleAnnos\\'+ filename + '.csv'
    saveMetricdir = 'C:\\Users\\aaron\\OneDrive\\Dokumente\\Uni\\Studienarbeit\\exampleMetrics\\'+ filename + '.csv'
    saveManualAnnodir = 'C:\\Users\\aaron\\OneDrive\\Dokumente\\Uni\\Studienarbeit\\manualAnnos\\'+ filename + '.csv'
    f = pyedflib.EdfReader(path + filename)

    Leg1 = f.readSignal(0)
    Leg2 = f.readSignal(1)
    samplefreq = f.getSampleFrequency(0)

    Annotimes = samplefreq*f.readAnnotations()[0][2:] 

    if(len(Annotimes)==0):
        print("no manual Annotation")
        
        with open(saveManualAnnodir, "w") as csvfile:
            swriter = csv.writer(csvfile, delimiter=',')
            swriter.writerow('')

        continue
    Annolens = samplefreq*f.readAnnotations()[1][2:]
    lightOn = int(samplefreq*f.readAnnotations()[0][0])
    lightOff = int(samplefreq*f.readAnnotations()[0][1])
    
    Annotimes = Annotimes.astype(int)
    Annolens = Annolens.astype(int)

    #create manual annotation
    mAnno = []
    for idx,LMstart in enumerate(Annotimes):
        mAnno.append([LMstart, LMstart+Annolens[idx]])
        
    mAnno = [LM for LM in clean_LM_overlap(mAnno) if LM[0] > lightOff and LM[1] < lightOn]

    f.close()

    
    with open(saveManualAnnodir, "w") as csvfile:
        swriter = csv.writer(csvfile, delimiter=',')
        swriter.writerow(mAnno)
    
    #constants
    U = 8 #original upper threhsold 
    L = 2 #original lower threhsold 
    noiseWindow = int(20 * samplefreq) #window over wich noisefloor is calculated
    RMSwindow = int(0.15 * samplefreq)  #window over wich noisefloor is calculated
    LM_minlen = int(0.75 * samplefreq) #length of shortest LM
    LM_maxlen = int(10 * samplefreq) #length of longest LM
    LM_bridgetime = int(0.1 * samplefreq) #time in which LM are bridged
    LM_downtime = int(0.05 * samplefreq) #downtime after passing lower threshold
    LM_mergetime = int(2 * samplefreq) #time in which LM are merged
    
    
    mergedEMG = mergeEMG(Leg1,Leg2)

    absSignal = np.asarray([abs(e) for e in mergedEMG], dtype = np.single)
    

    RMS_signal = RMS(absSignal, RMSwindow)
    
    print("first pass noisefloor")
    #first pass - calculating noisefloor with LM in signal 
    noiseFloor1, alpha1, beta1, _ = calculateNoiseFloor(absSignal, noiseWindow, U, L)
    
    print("first pass candidates")
    #first pass LM detection (5a.)
    LM_candidatesTimes = calculateLMTimes(RMS_signal, noiseFloor1, alpha1, beta1, LM_downtime)
   
    #for second pass of signal: "deleting" LM to get a better approximation of the noise floor (4.)
    absSignal_LMdel = absSignal
    for LM in LM_candidatesTimes:
        absSignal_LMdel[LM[0]:LM[1]] = [b/2 for b in beta1[LM[0]:LM[1]]]
    
    
    print("second pass noisefloor")
    #second pass to get more accurate thresholds
    noiseFloor2, alpha2, beta2, _ = calculateNoiseFloor(absSignal_LMdel, noiseWindow, U, L)
    
    
    
    #final detected LM after lightoff (5b +6)
    print("calculating aAnno")
    
    #before post processing
    aAnno_dirty = clean_LM_overlap([LM for LM in calculateLMTimes(RMS_signal, noiseFloor2, alpha2, beta2, LM_downtime) if LM[0] > lightOff and LM[1] < lightOn])
    
    aAnno_cleaned = clean_LM_timerequirements(aAnno_dirty, LM_minlen, LM_maxlen, LM_bridgetime, LM_mergetime)
    #final annotation
    aAnno = clean_AUC(aAnno_cleaned, absSignal, alpha2)
    
    print("saving aAnno")
    
    with open(saveAnnodir, "w") as csvfile:
        swriter = csv.writer(csvfile, delimiter=',')
        swriter.writerow(aAnno)
        
        
    fileendtime = time.time()
    print("runningtime: "+'{:5.3f}s'.format(fileendtime-filestarttime))
totalendtime = time.time()
print("total runningtime: "+'{:5.3f}s'.format(totalendtime-totalstarttime))


In [None]:
'''cell for plotting any of the used signals in any timeframe'''

sectionStart = int(1e6)
samples = 10000#lightOn - sectionStart
savefig = False
savename = "highNoiseFloor"



fig1 = plt.figure(figsize=(16,8))
plt.plot(RMS_signal[sectionStart:sectionStart+samples],zorder=0, label = "RMS_Signal")
plt.xticks(np.arange(0, samples*11/10, samples/10))
plt.yticks(np.arange(0,max(RMS_signal[sectionStart:sectionStart+samples])*1.1, 10))

plt.plot(alpha2[sectionStart:sectionStart+samples],zorder=0, label = "alpha")
plt.margins(x=0, y=0)
plt.xlabel("Zeit in ms")
plt.ylabel("Elektrodenspannung in µV")
plt.legend(loc = "upper right")
if(savefig):
    plt.savefig(savename + "EMG" + filename + str(sectionStart) + "," + str(samples)+".jpg", dpi = 400.0)

#clean_for_plot()
plotAnnotation([mAnno,LM_candidatesTimes[440:443]],["manuelle Annotation","automatische Annotation"], lightOff, lightOn, [sectionStart,sectionStart+samples])


if(savefig):
    plt.savefig(savename + filename + str(sectionStart) + "," + str(samples)+".jpg", dpi = 400.0)
