In [None]:
import numpy as np
import matplotlib.pyplot as plt
import sys
import pandas as pd
import os
from random import uniform
from scipy.fftpack import rfft,irfft
sys.path.append(os.path.join(os.getcwd(), ".."))
from attractor_gen import attractor_generator
from metrics.methods import tsd_metrics as TSD
from shared_utils import gen_sde as Gen_dyn

In [None]:
a = attractor_generator.Attractor_Generator()
a.compute_attractor("lorenz",num_steps=10000)
a.generate_data_file("/workspaces/ecg_evaluation/results","lorenz_attractors")

In [None]:
a = attractor_generator.Attractor_Generator()
a.compute_attractor("rossler",num_steps=10000)
a.generate_data_file("/workspaces/ecg_evaluation/results","rossler_attractors")

In [None]:
def Plot_attractors(xyzs_attractor):
    ax = plt.figure().add_subplot(projection='3d')

    ax.plot(*xyzs_attractor.T, lw=0.5)
    ax.set_xlabel("X Axis")
    ax.set_ylabel("Y Axis")
    ax.set_zlabel("Z Axis")
    ax.set_title("lorenz Attractor")

    plt.show()

def system_coordinates_reader(Path_to_data,Attractor_name,num_attractor=0):
    path = Path_to_data+f"/{Attractor_name}_attractors"
    df = pd.read_csv(path + f"/{Attractor_name}__{num_attractor}.csv")
    df_n = df.to_numpy()
    xyzs = df_n[:,1:4]
    t = df_n[:,0]
    return xyzs,t

Path = "/workspaces/ecg_evaluation/data"
xyzs,t = system_coordinates_reader(Path,"lorenz",0)
Plot_attractors(xyzs)

In [None]:
def add_observational_noise(sig,SNR):
    Power_sig = np.mean(np.abs(sig)**2)
    P_db = 10*np.log10(Power_sig)
    noisedb = P_db - SNR
    sd_db_watts = 10**(noisedb/10)
    #sd_noise = np.sqrt(Power_sig/(SNR))
    noise = np.random.normal(0,np.sqrt(sd_db_watts),len(sig))
    sig_noisy = sig+noise
    return sig_noisy

def add_observational_noise_val(sig,SNR):
    Power_sig = np.mean(np.abs(sig)**2)
    P_db = 10*np.log10(Power_sig)
    noisedb = P_db - SNR
    sd_db_watts = 10**(noisedb/10)
    #sd_noise = np.sqrt(Power_sig/(SNR))
    noise = np.random.normal(0,np.sqrt(sd_db_watts),(len(sig),3))
    #sig_noisy = sig+noise
    return noise

In [None]:
##Plotting I1 and I2
fs_l = 1/(t[1]-t[0])
print(fs_l)
dico_xyzs = {}
name = ["x","y","z"]
dico_xyzs[name[0]] = xyzs[:,0]
dico_xyzs[name[1]] = xyzs[:,1]
dico_xyzs[name[2]] = xyzs[:,2]


###Plot of signal first : 

for i in name:
    plt.plot(t,dico_xyzs[i],label = i)
plt.xlabel("time")
plt.ylabel("function value")
plt.legend(loc = "best", bbox_to_anchor=(1, 1))
plt.grid()
plt.plot()


for i in name:
    I1_c,I2_c,c = TSD.discrepancies_mean_curve(dico_xyzs[i],fs_l,0.0001,0.0005,1/fs_l,t0 = t[0])
    fig,ax = plt.subplots(nrows=1,ncols=2,figsize=(20,10))
    ax[0].set_title(i)
    ax[0].plot(c,I1_c)
    ax[0].set_xlabel("signal length [s]")
    ax[0].set_ylabel("I1 value")
    ax[0].grid()
    ax[1].set_title(i)
    ax[1].plot(c,I2_c)
    ax[1].set_xlabel("signal length [s]")
    ax[1].set_ylabel("I2 value")
    ax[1].grid()

I1_cx, I2_cx, c = TSD.discrepancies_mean_curve(dico_xyzs[name[0]],fs_l,0.0001,0.0005,1/fs_l,t0 = int(t[0]))
I1c_r = I1_c[np.logical_and(I1_c>0,I1_c<=1)]
I2c_r = I1_c[np.logical_and(I2_c>0,I2_c<=1)]
c1 = c[np.isclose(I1_cx,[0.01],atol = 0.0001)]
c2 = c[np.isclose(I2_cx,[0.0005],atol = 0.0001)]
cs = np.minimum(np.mean(c1),np.mean(c2))
print(f"Wololo les distances (c1,c2) : {np.mean(c1),np.mean(c2)}")
print("Vision of smallest c* : ",np.minimum(np.mean(c1),np.mean(c2)))
print("Praise the highest length short time series : ",(cs-t[0])*fs_l)


In [None]:
##TSD for no noise

dico_xyzs = {}
name = ["x","y","z"]
dico_xyzs[name[0]] = xyzs[:,0]
dico_xyzs[name[1]] = xyzs[:,1]
dico_xyzs[name[2]] = xyzs[:,2]
fs = 1/(t[1]-t[0])

def TSD_plot(dico_lead,name_lead,segment_length,fs):

    D_lead = {}
    for i in name_lead:
        w=1
        Ds = np.array([])
        sig = dico_lead[i]
        while (w*segment_length*fs)<=len(sig):
            sig_c  = sig[int((w-1)*segment_length*fs):int((w)*segment_length*fs)]
            L1 = Lq_k(sig_c,1,fs)
            L2 = Lq_k(sig_c,2,fs)
            Dv = (np.log(L1)-np.log(L2))/(np.log(2))   
            Ds = np.append(Ds,Dv)
            w+=1
        D_lead[i] = Ds

    w_length = [w*segment_length for w in range(0,int((len(t)/fs)*(1/segment_length)))]

    for i in name_lead:
        plt.plot(w_length,D_lead[i],label = i)
    plt.xlabel("Time interval")
    plt.ylabel("TSD value")
    plt.legend(loc = "best", bbox_to_anchor=(1, 1))
    plt.grid()
    plt.show()



TSD.TSD_plot(dico_xyzs,name,3,fs_l)


In [None]:
###TSD for observational noise : 
dico_obsnoise_xyzs = {}
name = ["x","y","z"]
dico_obsnoise_xyzs[name[0]] = add_observational_noise(xyzs[:,0],20)
dico_obsnoise_xyzs[name[1]] = add_observational_noise(xyzs[:,1],20)
dico_obsnoise_xyzs[name[2]] = add_observational_noise(xyzs[:,2],20)

for i in name:
    plt.plot(t,dico_obsnoise_xyzs[i],label = i)
plt.xlabel("time")
plt.ylabel("function value")
plt.legend(loc = "best", bbox_to_anchor=(1, 1))
plt.grid()
plt.plot()


for i in name:
    I1_c,I2_c,c = TSD.discrepancies_mean_curve(dico_obsnoise_xyzs[i],fs_l,0.001,0.005,1/fs_l,t0 = int(t[0]))
    fig,ax = plt.subplots(nrows=1,ncols=2,figsize=(20,10))
    ax[0].set_title(i)
    ax[0].plot(c,I1_c)
    ax[0].set_xlabel("signal legnth [s]")
    ax[0].set_ylabel("I1 value")
    ax[0].grid()
    ax[1].set_title(i)
    ax[1].plot(c,I2_c)
    ax[1].set_xlabel("signal legnth [s]")
    ax[1].set_ylabel("I2 value")
    ax[1].grid()

I1_cx, I2_cx, c = TSD.discrepancies_mean_curve(dico_obsnoise_xyzs[name[0]],fs_l,0.001,0.005,1/fs_l,t0 = int(t[0]))
I1c_r = I1_c[np.logical_and(I1_c>0,I1_c<=1)]
I2c_r = I1_c[np.logical_and(I2_c>0,I2_c<=1)]
c1 = c[np.isclose(I1_cx,[1.0],atol = 0.001)]
c2 = c[np.isclose(I2_cx,[0.05],atol = 0.001)]
cs = np.minimum(np.mean(c1),np.mean(c2))
print(f"Wololo les distances (c1,c2) : {np.mean(c1),np.mean(c2)}")
print("Vision of smallest c* : ",np.minimum(np.mean(c1),np.mean(c2)))
print("Praise the highest length short time series : ",(cs-t[0])*fs_l)



In [None]:
fs_l = 1/(t[1]-t[0])

TSD_plot(dico_obsnoise_xyzs,name,3,fs_l)

In [None]:
##TSD vs noise level:

SNR_level = np.linspace(-10,100,50)
xyzs_l,t_l = system_coordinates_reader(Path,"lorenz",0)
xyzs_r,t_r = system_coordinates_reader(Path,"rossler",0)
The_attractor=["lorenz","rossler"]

def TSD_mean_calculator(signal,segment_length = 100,dt = 0.001):
    w = 1
    Ds = np.array([])
    while (w*segment_length)<=len(signal):
        sig_c  = signal[int((w-1)*segment_length):int((w)*segment_length)]
        L1 = TSD.lq_k(sig_c,1,1/dt)
        L2 = TSD.lq_k(sig_c,2,1/dt)
        Dv = (np.log(L1)-np.log(L2))/(np.log(2))   
        Ds = np.append(Ds,Dv)
        w+=1
    return np.mean(Ds),np.std(Ds)
    
def TSDvsNoiseLevel_array(noise_level,path_to_data,list_attractor=["lorenz","rossler"]):
    Dmean = {}
    SD_D = {}
    
    for name in list_attractor:
        Dmean[name] = np.array([])
        #SD_D[name] = np.array([])
        SD_D[name] = np.empty([2,len(noise_level)])
    for i,n in zip(noise_level,range(len(noise_level))):
    #for i in noise_level:
        for name in list_attractor:
            mid_Dmean = np.array([])
            for j in range(0,len(os.listdir(path_to_data+f"/{name}_attractors"))):
                coord,_= system_coordinates_reader(path_to_data,name,j)
                Obs = coord[:,0].copy()
                noise_obs = add_observational_noise(Obs.copy(),i)
                Mean_TSD,_ = TSD_mean_calculator(noise_obs,100)
                mid_Dmean = np.append(mid_Dmean,Mean_TSD)
            
            Dmean[name] = np.append(Dmean[name],np.mean(mid_Dmean.copy()))
            SD_D[name][:,n] = np.array([np.abs(np.mean(mid_Dmean.copy())-np.percentile(mid_Dmean.copy(),25)),np.abs(np.mean(mid_Dmean.copy())-np.percentile(mid_Dmean.copy(),75))])
            #SD_D[name] = np.append(SD_D[name],np.std(mid_Dmean.copy()))
    return Dmean,SD_D

def plt_TSDvsNoise(noise_lev,path_to_data,attractors_sel):
    Great_mean,Great_SD= TSDvsNoiseLevel_array(noise_lev,path_to_data,attractors_sel)
    fig,ax = plt.subplots(len(attractors_sel)-1,2,figsize = (20,10))
    for i,j in zip(attractors_sel,range(len(attractors_sel))):
        
        ax[j].errorbar(noise_lev,Great_mean[i],yerr = Great_SD[i],fmt = "o",color='red',
             ecolor='black', elinewidth=3, capsize=0)
        ax[j].set_xlabel("SNR (dB)")
        ax[j].set_ylabel("mean TSD value")
        ax[j].set_title(f"TSD vs SNR (db) for {i} system")
        #ax[j].set_ylim([1.9,2.1])
        ax[j].grid()


    plt.figure()
    for i in attractors_sel:
        plt.plot(noise_lev,Great_mean[i])
    plt.legend([i for i in attractors_sel])
    plt.title("Mean TSD value evolution with SNR (dB) for both system")
    plt.xlabel("SNR (dB)")
    plt.ylabel("mean TSD value")
    #plt.ylim([1.9,2.1])
    plt.grid()
    plt.show()

plt_TSDvsNoise(SNR_level,Path,The_attractor)
# a,b = TSDvsNoiseLevel_array(SNR_level,Path,The_attractor)    
# print(a)
# print(b)


In [None]:
###I1c and I2c for dynamical noise:
fs_l = 1/(t[1]-t[0])
dico_dnoise_xyzs = {}
name = ["x","y","z"]
dico_dnoise_xyzs[name[0]] = xyzs_n[:,0]
dico_dnoise_xyzs[name[1]] = xyzs_n[:,1]
dico_dnoise_xyzs[name[2]] = xyzs_n[:,2]


###Plot of signal first : 

for i in name:
    plt.plot(t,dico_dnoise_xyzs[i][:-1],label = i)
plt.xlabel("time")
plt.ylabel("function value")
plt.legend(loc = "best", bbox_to_anchor=(1, 1))
plt.grid()
plt.plot()


for i in name:
    I1_c,I2_c,c = TSD.discrepancies_mean_curve(dico_dnoise_xyzs[i],fs_l,0.005,0.001,1/fs_l,t0 = int(t[0]))
    fig,ax = plt.subplots(nrows=1,ncols=2,figsize=(20,10))
    ax[0].set_title(i)
    ax[0].plot(c,I1_c,"o")
    ax[0].set_xlabel("signal legnth [s]")
    ax[0].set_ylabel("I1 value")
    ax[0].grid()
    ax[1].set_title(i)
    ax[1].plot(c,I2_c,"o")
    ax[1].set_xlabel("signal legnth [s]")
    ax[1].set_ylabel("I2 value")
    ax[1].grid()

eps1 = np.linspace(1,9.5,100)
eps2 = np.linspace(0.01,0.095,100)
I1_cx, I2_cx, c = TSD.discrepancies_mean_curve(dico_dnoise_xyzs[name[0]],fs_l,0.001,0.005,1/fs_l,t0 = int(t[0]))
I1c_r = I1_c[np.logical_and(I1_c>0,I1_c<=1)]
I2c_r = I1_c[np.logical_and(I2_c>0,I2_c<=1)]
c1 = c[I1c_r == 1]
c2 = c[I2c_r ==0.01]
c = np.minimum(c1,c2)
print(f"Wololo les distances (c1,c2) : {c1,c2}")
print("Vision of smallest c* : ",np.minimum(c1,c2))
print("Praise the highest length short time series : ",(c-t[0])*fs_l)

In [None]:
TSD.TSD_plot(dico_dnoise_xyzs,name,1,fs_l)