In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from matplotlib.widgets import TextBox, Button
import sys
import pandas as pd
import os
import hfda

In [None]:
from random import uniform
from scipy.fftpack import rfft,irfft,fft, ifft

def Random_phase_shuffling(signal):
    fft_signal = rfft(signal)
    phase_fs = np.arctan2(fft_signal[2::2],fft_signal[1:-1:2])
    mag = np.sqrt((fft_signal[1:-1:2])**2+(fft_signal[2::2])**2)
    ##phase shuffler:
    rng_phase = phase_fs.copy()
    np.random.shuffle(rng_phase)
    #rng_phase = np.random.uniform(-np.pi,np.pi,len(phase_fs))
    ##new signal : 
    #fsrp = mag[:, np.newaxis] * np.c_[np.cos(phase_fs+rng_phase), np.sin(phase_fs+rng_phase)]
    fsrp = mag[:, np.newaxis] * np.c_[np.cos(rng_phase), np.sin(rng_phase)]
    fsrp = np.r_[fft_signal[0], fsrp.ravel(), fft_signal[-1]]
    tsr = irfft(fsrp)
    return tsr

In [None]:
###plot of one attractor:

path = "/workspaces/maitrise/data/attractors"
df = pd.read_csv(path + "/lorenz__0.csv")
df_n = df.to_numpy()
print(df_n.shape)
xyzs = df_n[:,1:4]
t = df_n[:,0]
print(t)



In [None]:
##Plot attractor:
def Plot_attractors(xyzs_attractor):
    ax = plt.figure().add_subplot(projection='3d')

    ax.plot(*xyzs_attractor.T, lw=0.5)
    ax.set_xlabel("X Axis")
    ax.set_ylabel("Y Axis")
    ax.set_zlabel("Z Axis")
    ax.set_title("Lorenz Attractor")

    plt.show()

In [None]:
##Plot attractor:
Plot_attractors(xyzs)

In [None]:
##Adding noise yith specific SNR : 

def add_observational_noise(sig,SNR):
    fft_signal = rfft(sig)
    Power_sig = (fft_signal[1:-1:2])**2+(fft_signal[2::2])**2
    sd_noise = np.sqrt(Power_sig/SNR)
    noise = np.random.normal(0,np.average(sd_noise),len(sig))
    sig_noisy = sig+noise
    return sig_noisy




In [None]:
##I1 and I2 thingy

def Mean_taux(signal,taux,hprime,h=0):
    N = len(signal)
    if taux+hprime+h<N and taux+h < N:
        return (1/hprime)*np.sum((signal[int(h+taux):int(taux+hprime+h)]))
    elif taux+hprime+h>N and taux+h < N:
        return (1/hprime)*np.sum((signal[int(h+taux):-1]))
    else: 
        return 0


def Variance_taux(signal,taux,hprime,h=0):
    
    N = len(signal)
    if taux+hprime+h<=N and taux+h < N:
        return (1/hprime)*np.sum((signal[int(h+taux):int(taux+hprime+h)]-Mean_taux(signal,taux,hprime,h))**2)
    elif taux+hprime+h>N and taux+h < N:
        return (1/hprime)*np.sum((signal[int(taux):-1]-Mean_taux(signal,taux,hprime,0))**2)
    else: 
        return 0

def I1(c,signal,fs,tab,h,hprime,step_c,t0=0):
    if (t0>c or t0<0):
        return print("error : t0 is outside the range you want to calculate")
    elif (c>len(signal) or c<0):
        return "error : c is outside the range of your signal"
    else:   
        if len(tab) ==0:
            I1c = (1/(h*len(signal)))*step_c*np.abs(Mean_taux(signal,t0*fs,hprime*len(signal),h*len(signal))-Mean_taux(signal,t0*fs,hprime*len(signal)))
        else:
            I1c = tab[-1]
            I1c += (1/(h*len(signal))*step_c)*np.abs(Mean_taux(signal,c*fs,hprime*len(signal),h*len(signal))-Mean_taux(signal,c*fs,hprime*len(signal)))

    return I1c

def I2(c,signal,fs,tab,h,hprime,step_c,t0=0):
    if (t0>c or t0<0):
        return "error : t0 is outside the range you want to calculate"
    elif (c>len(signal) or c<0):
        return "error : c is outside the range of your signal"
    else: 
        if len(tab) ==0:
            I2c = (1/(h*len(signal)))*step_c*np.abs(Variance_taux(signal,t0*fs,hprime*len(signal),h*len(signal))-Variance_taux(signal,t0*fs,hprime*len(signal)))
        else : 
            I2c = tab[-1]
            I2c += (1/(h*len(signal)))*step_c*step_c*np.abs(Variance_taux(signal,c*fs,hprime*len(signal),h*len(signal))-Variance_taux(signal,c*fs,hprime*len(signal)))

    return I2c

def discrepancies_mean_curve(signal_tot,fs,h,hprime,step,t0=0):
    c =  np.arange(t0,(len(signal_tot)/fs)+t0,step)
    I1_val = np.array([])
    I2_val = np.array([])
    for j in c:
            I1_val = np.append(I1_val,I1(j,signal_tot,fs,I1_val,h,hprime,step,t0))
            I2_val = np.append(I2_val,I2(j,signal_tot,fs,I2_val,h,hprime,step,t0))
    return I1_val,I2_val,c


    

In [None]:
##TSD code : 
def Lm_q(signal,m,k,fs):
    N = len(signal)
    n = np.floor((N-m)/k).astype(np.int64)
    norm  = ((N-1)/(n*k))/(k*(1/fs))
    sum = np.sum(np.abs(np.diff(signal[m::k], n=1)))
    Lmq = (sum*norm)
    return Lmq

def Lq_k(signal,k,fs):
    calc_L_series = np.frompyfunc(lambda m: Lm_q(signal,m,k,fs), 1, 1)
    L_average = np.average(calc_L_series(np.arange(1, k+1)))
    return L_average


def Dq(signal,kmax,fs):
    calc_L_average_series = np.frompyfunc(lambda k: Lq_k(signal,k,fs), 1, 1)

    k = np.arange(1, kmax+1)
    L = calc_L_average_series(k).astype(np.float64)

    D, _ = - np.polyfit(np.log2(k), np.log2(L), 1)

    return D

In [None]:
##Plotting I1 and I2
fs_l = 1/(t[1]-t[0])
print(fs_l)
dico_xyzs = {}
name = ["x","y","z"]
dico_xyzs[name[0]] = xyzs[:,0]
dico_xyzs[name[1]] = xyzs[:,1]
dico_xyzs[name[2]] = xyzs[:,2]


###Plot of signal first : 

for i in name:
    plt.plot(t,dico_xyzs[i],label = i)
plt.xlabel("time")
plt.ylabel("function value")
plt.legend(loc = "best", bbox_to_anchor=(1, 1))
plt.grid()
plt.plot()


for i in name:
    I1_c,I2_c,c = discrepancies_mean_curve(dico_xyzs[i],fs_l,0.001,0.005,1/fs_l,t0 = int(t[0]))
    fig,ax = plt.subplots(nrows=1,ncols=2,figsize=(20,10))
    ax[0].set_title(i)
    ax[0].plot(c,I1_c)
    ax[0].set_xlabel("signal legnth [s]")
    ax[0].set_ylabel("I1 value")
    ax[0].grid()
    ax[1].set_title(i)
    ax[1].plot(c,I2_c)
    ax[1].set_xlabel("signal legnth [s]")
    ax[1].set_ylabel("I2 value")
    ax[1].grid()

eps1 = np.linspace(1,9.5,100)
eps2 = np.linspace(0.01,0.095,100)
I1_cx, I2_cx, c = discrepancies_mean_curve(dico_xyzs[name[0]],fs_l,0.001,0.005,1/fs_l,t0 = int(t[0]))
f1 = np.polyfit(I1_cx,c,1)
f2 = np.polyfit(I2_cx,c,1)
l1 = np.poly1d(f1)
l2 = np.poly1d(f2)
c1 = l1(1)-t[0]
c2 = l2(0.05)-t[0]
c = np.minimum(c1,c2)
print(f"Wololo les distances (c1,c2) : {c1,c2}")
print("Vision of smallest c* : ",int(np.minimum(c1,c2)))
print("Praise the highest length short time series : ",int((c-t[0])*fs_l))


In [None]:
##TSD for no noise

dico_xyzs = {}
name = ["x","y","z"]
dico_xyzs[name[0]] = xyzs[:,0]
dico_xyzs[name[1]] = xyzs[:,1]
dico_xyzs[name[2]] = xyzs[:,2]
fs = 1/(t[1]-t[0])

def TSD_ECG(dico_lead,name_lead,segment_length,fs):

    D_lead = {}
    for i in name_lead:
        w=1
        Ds = np.array([])
        sig = dico_lead[i]
        while (w*segment_length*fs)<=len(sig):
            sig_c  = sig[int((w-1)*segment_length*fs):int((w)*segment_length*fs)]
            Ds = np.append(Ds,hfda.measure(sig_c,10))
            w+=1
        D_lead[i] = Ds

    w_length = [w*segment_length for w in range(0,int((len(t)/fs)*(1/segment_length)))]

    for i in name_lead:
        if i.decode('utf8') == "V6":
            plt.plot(w_length,D_lead[i],"--r",label = i.decode('utf8'))
        elif i.decode('utf8') == "V5":
            plt.plot(w_length,D_lead[i],"--b",label = i.decode('utf8'))
        else :
            plt.plot(w_length,D_lead[i],label = i.decode('utf8'))
        plt.xlabel(" Time interval")
        plt.ylabel("TSD value")
    plt.legend(loc = "best", bbox_to_anchor=(1, 1))
    plt.grid()
    plt.show()



TSD_ECG(dico_xyzs,name,6,fs_l)



In [None]:
###TSD for observational noise : 
dico_obsnoise_xyzs = {}
name = ["x","y","z"]
dico_obsnoise_xyzs[name[0]] = add_observational_noise(xyzs[:,0],0.8)
dico_obsnoise_xyzs[name[1]] = add_observational_noise(xyzs[:,1],0.8)
dico_obsnoise_xyzs[name[2]] = add_observational_noise(xyzs[:,2],0.8)

for i in name:
    plt.plot(t,dico_obsnoise_xyzs[i],label = i)
plt.xlabel("time")
plt.ylabel("function value")
plt.legend(loc = "best", bbox_to_anchor=(1, 1))
plt.grid()
plt.plot()


for i in name:
    I1_c,I2_c,c = discrepancies_mean_curve(dico_obsnoise_xyzs[i],fs_l,0.005,0.001,1/fs_l,t0 = int(t[0]))
    fig,ax = plt.subplots(nrows=1,ncols=2,figsize=(20,10))
    ax[0].set_title(i)
    ax[0].plot(c,I1_c)
    ax[0].set_xlabel("signal legnth [s]")
    ax[0].set_ylabel("I1 value")
    ax[0].grid()
    ax[1].set_title(i)
    ax[1].plot(c,I2_c)
    ax[1].set_xlabel("signal legnth [s]")
    ax[1].set_ylabel("I2 value")
    ax[1].grid()

eps1 = np.linspace(1,9.5,100)
eps2 = np.linspace(0.01,0.095,100)
I1_cx, I2_cx, c = discrepancies_mean_curve(dico_obsnoise_xyzs[name[0]],fs_l,0.005,0.001,1/fs_l,t0 = int(t[0]))
f1 = np.polyfit(I1_cx,c,1)
f2 = np.polyfit(I2_cx,c,1)
l1 = np.poly1d(f1)
l2 = np.poly1d(f2)
c1 = l1(1)
c2 = l2(0.05)
c = np.minimum(c1,c2)
print(f"Wololo les distances (c1,c2) : {c1,c2}")
print("Vision of smallest c* : ",int(np.minimum(c1,c2)))
print("Praise the highest length short time series : ",int((c-t[0])*fs_l))



In [None]:
##TSD observational noise:

fs_l = 1/(t[1]-t[0])

TSD_ECG(dico_obsnoise_xyzs,name,5,fs_l)

In [None]:
###shuffling time!!!!!
def The_Big_shufflers(xyzs_clean):
    xyzs_shuffled=np.empty((df_n.shape[0],3))
    x = xyzs[:,0]
    y = xyzs[:,1]
    z = xyzs[:,2]
    x_noisy = Random_phase_shuffling(x)
    y_noisy = Random_phase_shuffling(y)
    z_noisy = Random_phase_shuffling(z)
    xyzs_shuffled[:,0] = x_noisy
    xyzs_shuffled[:,1] = y_noisy
    xyzs_shuffled[:,2] = z_noisy
    return xyzs_shuffled

    


In [None]:
##plot shuffled attractor
xyzs_n = The_Big_shufflers(xyzs)
Plot_attractors(xyzs_n)

In [None]:
###I1c and I2c for dynamical noise:
fs_l = 1/(t[1]-t[0])
dico_dnoise_xyzs = {}
name = ["x","y","z"]
dico_dnoise_xyzs[name[0]] = xyzs_n[:,0]
dico_dnoise_xyzs[name[1]] = xyzs_n[:,1]
dico_dnoise_xyzs[name[2]] = xyzs_n[:,2]


###Plot of signal first : 

for i in name:
    plt.plot(t,dico_dnoise_xyzs[i],label = i)
plt.xlabel("time")
plt.ylabel("function value")
plt.legend(loc = "best", bbox_to_anchor=(1, 1))
plt.grid()
plt.plot()


for i in name:
    I1_c,I2_c,c = discrepancies_mean_curve(dico_dnoise_xyzs[i],fs_l,0.005,0.001,1/fs_l,t0 = int(t[0]))
    fig,ax = plt.subplots(nrows=1,ncols=2,figsize=(20,10))
    ax[0].set_title(i)
    ax[0].plot(c,I1_c)
    ax[0].set_xlabel("signal legnth [s]")
    ax[0].set_ylabel("I1 value")
    ax[0].grid()
    ax[1].set_title(i)
    ax[1].plot(c,I2_c)
    ax[1].set_xlabel("signal legnth [s]")
    ax[1].set_ylabel("I2 value")
    ax[1].grid()

eps1 = np.linspace(1,9.5,100)
eps2 = np.linspace(0.01,0.095,100)
I1_cx, I2_cx, c = discrepancies_mean_curve(dico_dnoise_xyzs[name[0]],fs_l,0.001,0.005,1/fs_l,t0 = int(t[0]))
f1 = np.polyfit(I1_cx,c,1)
f2 = np.polyfit(I2_cx,c,1)
l1 = np.poly1d(f1)
l2 = np.poly1d(f2)
c1 = l1(1)
c2 = l2(9)
c = np.minimum(c1,c2)
print(f"Wololo les distances (c1,c2) : {c1,c2}")
print("Vision of smallest c* : ",int(np.minimum(c1,c2)))
print("Praise the highest length short time series : ",int((c-t[0])*fs_l))


In [None]:
TSD_ECG(dico_dnoise_xyzs,name,5,fs_l)