# Débruitage de Signaux par ondelettes

Le but de ce TP est double :

1) Montrer que l'on peut supprimer une partie importante des parasites sur un signal 1D en effectuant un seuillage dans une base d'ondelettes appropriée. On appelle débruitage cette opération de suppression ou de diminution des parasites

2) Mettre en place un plan d'expériences adéquat en Python pour évaluer en un temps raisonnable les paramètres optimaux de la méthode de débruitage.

Dans ce TP on ajoutera nous même les parasites (le bruit) sur les signaux en ajoutant à des signaux de référence une réalisation d'un bruit blanc gaussien centré. On testera plusieurs variances. On pourrait bien évidemment considérer d'autres modèles de bruit, poissoniens, impulsionnel, salt and pepper ou autre en modifiant quelques lignes. 

Le débruitage par seuillage dans une base, orthornormée ou non, est une méthode classique 
et ses performances dépendent largement des capacités d'approximation non linéaire de la base relativement à la classe de signaux qu'on souhaite débruiter. Ainsi, si on souhaite atténuer les effet d'ajoût d'un bruit blanc gaussien sur des signaux uniformément réguliers par exemple $C^2$, un seuillage dans la base de Fourier fera très bien l'affaire. En revanche si on veut être efficace sur des signaux deux fois dérivable par morceaux, un seuillage en ondelettes sera plus approprié.      

La qualité du débruitage dépend de la régularité du signal, de la base d'ondelettes choisie, du niveau de bruit ajouté et du niveau du seuil. Nous verrons aussi qu'on peut ajouter une étape de translation pour améliorer encore les résultats. Il existe plusieurs moyens de mesurer le niveau de dégradation du signal bruité ainsi que la qualité de la reconstruction. Nous choisissons ici de considérer le PSNR qui est un outils à la fois classique de mesure de qualité et qui est facilement calculable. Il faut avoir conscience que cette mesure n'en est qu'une parmi d'autres. 
Pour des images, la qualité visuelle est important comme la qualité sonore l'est pour des sons. 
Quand l'étape de débruitage n'est un préprocessing d'un travail d'analyse, il est important de ne pas créer d'artefacts qui pourraient être interprétés comme significatif.

Une première étape du travail consiste d'abord à constuire une fonction qui prend en entrée ces différents paramètres, signal, niveau de bruit, ondelettes, niveau de seuil et qui renvoie un signal de sortie et un PSNR.

Une seconde étape consiste à créer un plan d'expériences, c'est à dire une série de tests en faisant varier les différents paramètres et de le visualiser de manière efficace.
Des bibliothèques Python permettent de le faire sans lancer les codes à la main un grand nombre de fois en faisant varier les paramètres à la main ni même de faire de boucles for. 

L'art d'utiliser ces méthodes réside dans le fait de faire une exploration intelligente des paramètres. 
Dans un premier temps, on propose de faire varier le signal de référence, le niveau de bruit, le choix de la base d'ondelettes et du seuil. Mais on ne traite ici que deux signaux et on ne teste pas la translation des bases d'ondelettes (ceci sera expliqué plus tard). En effet, le temps de calcul explose assez vite si on veut faire une recherche exhaustive. A l'issue de ce premier plan d'expériences, vous devriez pouvoir limiter le choix du seuil et des bases d'ondelettes à considérer. Vous pourrez aussi limiter le nombre de niveau de bruit que vous voulez tester pour essayer d'explorer l'influence d'autres paramètres. 

In [None]:
import numpy as np
import scipy as scp
import pylab as pyl
import matplotlib.pyplot as plt
import pywt
import scipy.io as sio
import pandas as pd
import holoviews as hv
import param
import panel as pn
from panel.pane import LaTeX
import itertools
import requests
from io import BytesIO
from PIL import Image
import shutil
pn.extension()
hv.extension('bokeh')

Les lignes suivantes permettent de charger les données à parit du fichier datatP.npy et d'inclure les signaux dans un dictionnaire SignauxRef. Si vous voulez utiliser d'autres signaux de référence, vous pouvez les ajouter ici. La dernière ligne permet de les afficher avec holoview.

In [None]:
local=0
if local:
    S3=np.load('Piece.npy')
    S4=np.load('Blocks.npy')   
else:
    url='https://plmlab.math.cnrs.fr/dossal/optimisationpourlimage/raw/master/img/Blocks.npy'
    response = requests.get(url, stream=True)
    with open('blocks.npy', 'wb') as fin:
        shutil.copyfileobj(response.raw, fin)
    S4=res=np.load('blocks.npy')
    url='https://plmlab.math.cnrs.fr/dossal/optimisationpourlimage/raw/master/img/Piece.npy'
    response = requests.get(url, stream=True)
    with open('piece.npy', 'wb') as fin:
        shutil.copyfileobj(response.raw, fin)
    S3=res=np.load('piece.npy')
pn.Row(hv.Curve(S3),hv.Curve(S4))

Le programme suivant définit le PSNR.

In [None]:
def psnr(Sref, Sd):
    mse = np.mean( (Sref - Sd) ** 2 )
    if mse == 0:
        return 100
    Val_MAX = max(Sref)
    return 20 * np.log10(Val_MAX / np.sqrt(mse))

Le programme suivant permet d'effectuer un seuillage d'un vecteur SB dans une base d'ondelettes définie par un filtre qmf et un nombre décompositions L et un seuil Seuil

In [None]:
def SeuillageOndelette(SB,qmf,L,Seuil):
        WTB= pywt.wavedecn(SB, qmf, mode='per', level=L)
        arr, coeff_slices = pywt.coeffs_to_array(WTB)
        WTS=arr*(np.abs(arr)>Seuil)
        coeffs_from_arr = pywt.array_to_coeffs(WTS, coeff_slices)
        Srec=pywt.waverecn(coeffs_from_arr,qmf,mode='per')
        return Srec

Le programme suivant ajoute un bruit à un signal et renvoie un signal débruité

Il prend en entrée 

1) un vecteur S non bruité

2) une chaine de caractère qmf qui est le nom d'une ondelette pour py wavelets,

3) une entier seednoise qui permettra d'initialiser le générateur aléatoire générant le bruit. 
Le fait de comparer des méthodes différentes avec la même seed du générateur assure une comparaison plus fair.

4) un réel sigma qui est l'écart type du bruit que l'on ajoute, 

5) un réel T est le niveau du seuil relatif, T=3 signifie par exemple qu'on met à 0 tous les coefficients 
de la transformée en ondelettes dont la valeur absolue sont inférieurs à 3*sigma

Il renvoie le signal bruité et le PSNR associé, le signal reconstruit et le PSNR associé.  

In [None]:
def Debruit(S,qmf,seednoise,sigma,T):
    N1=len(S)
    np.random.seed(seed=seednoise)
    bruit=np.random.normal(0,1,N1)
    Lmax=pywt.dwt_max_level(len(S),pywt.Wavelet(qmf).dec_len)
    SB=S+sigma*bruit
    Seuil=T*sigma
    Srec=SeuillageOndelette(SB,qmf,Lmax,Seuil)
    psnr1=psnr(S,SB)
    psnr2=psnr(S,Srec)
    return Srec,SB,psnr1,psnr2



Le programme suivant calcule un PSNR moyen du débruitage par seuillage par ondelettes sur N réalisations de bruit.
Les paramètres d'entrée sont les mêmes que précédemment à l'exception de la seed du générateur aléatoire qui varie dans le programme. Pour comparer des méthodes de débruitage, il est important d'effecuer la comparaison non pas sur une seule réalisation du bruit mais sur un nombre important. Dans la suite on se limitera à 100 pour avoir une premièreestimation correcte. Le fait de contrôler la seed du générateur aléaoire assure que toutes les méthodes seront comparées sur les mêmes réalisations de bruit.

In [None]:
def DebruitPSNR(S,qmf,N,sigma,T):
    seednoise=np.arange(N)
    N1=len(S)
    Lmax=pywt.dwt_max_level(len(S),pywt.Wavelet(qmf).dec_len)
    Seuil=T*sigma
    psnr1=np.zeros(N)
    for k in seednoise:
        np.random.seed(seed=seednoise)
        bruit=np.random.normal(0,1,N1)
        SB=S+sigma*bruit
        Srec=SeuillageOndelette(SB,qmf,Lmax,Seuil)
        psnr1[k]=psnr(S,Srec)
    return np.mean(psnr1)

In [None]:
SignauxRef= {"PieceRegular" : S3,"Blocks" : S4}
Test=DebruitPSNR(SignauxRef['PieceRegular'],"haar",100,1,3)
print(Test)

On définit ensuite une liste des ondelettes et des signaux que l'on souhaite tester.

In [None]:
wavelist = ['haar','db2','db3','db4','coif1','coif2','coif3']



In [None]:
Sigma=np.linspace(0.1,0.5,2)
T=np.linspace(1,6,6)
for s in Sigma:
    for t in T:
        for d in SignauxRef:
            p=DebruitPSNR(SignauxRef[d],"db2",100,s,t)
        print(p)

Le programme suivant permet d'explorer numériquement la fonction précédemment écrite Debruit.

Il crée un tableau de commandes (dashboard) avec des sliders qui permet de jouer sur les differents paramètres de cette fonction 
en temps réel. 

Il superpose dans une même figure le signal original, le signal bruité et le signal reconstruit et affiche à droite les deux PSNR des signaux bruité et débruité calculé par rapport au signal original.

In [None]:
class WaveDebruit(param.Parameterized):
    wave = param.ObjectSelector(default="haar",objects=wavelist)
    Signal = param.ObjectSelector(default="Blocks",objects=SignauxRef.keys())
    seednoise = param.Integer(1,bounds=(0,50))
    sigma=param.Number(0.01,bounds=(0,0.2))
    T=param.Number(3,bounds=(0,8))
    def view(self):
        #S=Name2Signal(self.Signal)
        S=SignauxRef[self.Signal]
        Srec,SB,p1,p2=Debruit(S,self.wave,self.seednoise,self.sigma,self.T)
        strp1="%2.2f" % p1
        strp2="%2.2f" % p2
        te1='PSNR signal bruité = '
        te2='PSNR signal reconstruit = '
        TN1=hv.Text(0.5,0.5,te1+strp1).opts(xaxis=None,yaxis=None,toolbar=None)
        TN2=hv.Text(0.5,0.3,te2+strp2).opts(xaxis=None,yaxis=None)
        #TN=pn.Column(LaTeX(te1,size=15,dpi=100)\
        #            ,LaTeX(strp1,size=15,dpi=100),LaTeX(te2,size=15,dpi=100),LaTeX(strp2,size=15,dpi=100))
        curve=hv.Curve(S,kdims='x',vdims='v').opts(width=500,color='r')
        curveB=hv.Curve(SB,kdims='x',vdims='v').opts(color='c')
        curveRec=hv.Curve(Srec,kdims='x',vdims='v').opts(color='k')
        m=-0.1
        M=1.1
        curve=curve.redim.range(x=(0,len(S)),v=(m,M))
        curveB=curveB.redim.range(x=(0,len(S)),v=(m,M))
        curveRec=curveRec.redim.range(x=(0,len(S)),v=(m,M))
        return pn.Column(curveB*curveRec*curve,TN1*TN2)

Les deux lignes suivantes permettent d'afficher le résultat de la focntion précédente. A gauche le tableau de commande avec les paramètres wavedebruit.param. A droite la sortie du programme précédent wavedebruit.view.

In [None]:
wavedebruit = WaveDebruit()
pn.Row(wavedebruit.param,wavedebruit.view)

Les lignes suivantes permettent de créer via la librairie Panda un plan d'expériences.
On considère 2 signaux, 7 ondelettes, 5 valeurs de Sigma et 8 valeurs de seuil... ce qui fait 560 jeux de paramètres différents qui vont être testé. Pour chacun d'entre eux on va évaluer un PSNR moyen sur 100 expériences, ce qui fait un total de 56000 débruitages par seuillage dans une base d'ondelettes.

La variable dfexp est un objet Panda, tableau ou base de données qui est construit à partir d'un objet expériences qui décrit les différents paramètres et les valeurs qu'ils prennent.

In [None]:
experiences = {'Si':SignauxRef.keys(),'sigma':np.linspace(0.1,0.5,5),'Th':np.linspace(1,8,8),'wave':wavelist}
dfexp = pd.DataFrame(list(itertools.product(*experiences.values())),columns=experiences.keys())

In [None]:
print(dfexp)

On peut accéder à une ligne spécifique de cette base de donnée de la manière suivante :

In [None]:
rowtest=  dfexp.iloc[6]
print(rowtest)

In [None]:
print(rowtest.Si)

In [None]:
print(rowtest.Th)

ATTENTION À NE PAS UTILISER UN CHAMP APPELÉ T DANS "expreriences". EN EFFET "experiences.T" EST LA TRANSPOSÉE DE "experiences". 

In [None]:
print(dfexp.T)

La fonction suivante prend en entrée une ligne d'une base de donnée et renvoie le PSNR associé à l'expérience définie par les paramètres contenue dans la ligne. La valeur 10 correspond au nombre de fois où on effectue le débruitage pour chaque jeu de paramètres pour obte,nir une valeur fiable du PSNR.

In [None]:
def row2PSNR(row):
    N=10
    p=DebruitPSNR(SignauxRef[row.Si],row.wave,N,row.sigma,row.Th)
    return {'PSNR':p}

il est possible de lancer cette fonction, directement sur une ligne :

In [None]:
result_test = row2PSNR(rowtest)
print(result_test)

La commande suivante permet de parcourir la base de données dfexp ligne par ligne (option axis=1) et d'appliquer à chaque ligne la fonction précédente. Le résultat est stocké dans une base de donnée avec une seule colonne appelée result.


In [None]:
result = dfexp.apply(row2PSNR,axis=1)


La commande suivante permet d'ajouter une colonne PSNR à la base de données dfexp et d'y mettre les valeurs contenues dans la base de données results.

In [None]:
dfexp[['PSNR']] = pd.DataFrame.from_records(result.values)

On peut vérifier qu'on a bien ajouté une colonne à la base de données dfexp. 

In [None]:
print(dfexp)

Les lignes suivantes permettent de visualiser la base de données dfexp à l'aide de la librairie holoviews.

On affiche le PSNR en fonction du seuil. 

L'option "by" permet d'afficher ces PSNR avec des couleurs différentes selon les ondelettes. On peut cliquer sur les les noms des ondelettespour faire la faire disparaitre momentanément de l'affichage.

L'option "kind" permet d'afficher les résultats avec des points et pas des courbes. Ce qui me semble ici le plus lisible. 

L'option "groupby" permet de créer un menu déroulant et un curseur sur la droite. Le choix est fait automatiquement en fonction du type de la variable, ici chaine de caractères ou réel. Si on enlève le groupby, la figure comportera plus de points de chaque couleur.



In [None]:
import hvplot.pandas
from bokeh.models import HoverTool
h = HoverTool()
dfexp.hvplot('Th','PSNR',by='wave',kind='scatter',groupby=['Si','sigma'])\
.opts(width=600,tools = [h]).redim.range(PSNR=(5,30),Th=(0,9))

Quelle base d'ondelettes semble la plus adaptée à chacun des deux signaux ?

Pouvez vous l'expliquer ?

Les résultats sont ils différents si on passe à 50 le nombre d'expériences pour évaluer chaque valeur de PSNR.

Proposez une valeur raisonnable pour ce nombre d'expériences, c'est-à-dire une valeur qui ganrantit une valeur fiable des PSNR et ne nécessite pas de calculs inutiles.

Le graphique précédent semble indiquer que le choix d'un seuil à $3\sigma$ est un choix raisonnable. Dans la suite, on pourra donc limiter le plan d'expériences à la valeur Th=3.

# Translations et débruitage en bases d'ondelettes.

On peut améliorer la performance du débruitage d'un seuillage simple en ondelettes en utilisant le fait que la transformée en ondelettes n'est pas invariante par translation. Ainsi si on effectue un shift circulaire sur les composantes d'un vecteur, on modifie l'amplitude des coefficients d'ondelettes, comme on peut le voir sur l'exemple suivant : 

On affiche sur une même figure, un vecteur et son image par une translation d'une part, et ses coefficients en ondelettes et les coefficients du vecteur translaté. Vous pouvez observer qu'on obtient pas toujours les coefficients en ondelettes du vecteur translaté par translation du vecteur original. Cette remarque est particulièrement bien illustrée pour les translations d'un nombre impair de points.

In [None]:
class WaveTranslation(param.Parameterized):
    trans = param.Integer(15,bounds=(0,31))
    wave = param.ObjectSelector(default="haar",objects=wavelist)
    Signal = param.ObjectSelector(default="Blocks",objects=SignauxRef.keys())
    
    def view(self):
        S=SignauxRef[self.Signal]
        W= pywt.wavedecn(S, self.wave, mode='per', level=3)
        arr, coeff_slices = pywt.coeffs_to_array(W)
        S1=np.roll(S,self.trans)
        W1=pywt.wavedecn(S1, self.wave, mode='per', level=3)
        arr1, coeff_slices = pywt.coeffs_to_array(W1)
        return pn.Column(hv.Curve(S)*hv.Curve(S1).opts(width=600),hv.Curve(arr)*hv.Curve(arr1).opts(width=600))

In [None]:
wavetranslation = WaveTranslation()
pn.Row(wavetranslation.param,wavetranslation.view)

On peut exploiter cette propriété pour le débruitage en effectuant un débruitage dun même signal dans des bases d'ondelettes translatées.

Le programme suivant réalise le débruitage pour un nombre "trans" de translations. 
La fonction renvoie la liste des PSNR obtenus en effectuant la moyenne sur les différentes translation, pour une réalisation de bruit donnée.

In [None]:
def DebruitTrans(S,qmf,seednoise,sigma,T,trans):
    N1=len(S)
    np.random.seed(seed=seednoise)
    bruit=np.random.normal(0,1,N1)
    Lmax=pywt.dwt_max_level(len(S),pywt.Wavelet(qmf).dec_len)
    SB=S+sigma*bruit
    Seuil=T*sigma
    SSum=0*SB
    P=np.zeros(trans)
    for k in np.arange(0,trans):
        SBtemp=np.roll(SB,k)
        Srectemp=SeuillageOndelette(SBtemp,qmf,Lmax,Seuil)
        Srectemp2=np.roll(Srectemp,-k)
        SSum=SSum+Srectemp2
        Srec=SSum/(k+1)
        P[k]=psnr(S,Srec)
    return Srec,P

On peut tester le programme de la façon suivante à la main : la courbe bleu est le Signal bruité, la rouge, celle obtenue par seuillage simple, la jaune en utilisant les translations. On doit observer que la reconstruction jaune est meilleure que la rouge.

In [None]:
S=SignauxRef['Blocks']
qmf='haar'
seednoise=3
sigma=0.1
T=3
trans=32
Srec,SB,psnr1,psnr2=Debruit(S,qmf,seednoise,sigma,T)
Srec1,P=DebruitTrans(S,qmf,seednoise,sigma,T,trans)
hv.Curve(SB)*hv.Curve(Srec)*hv.Curve(Srec1).opts(width=800)

On peut aussi comparer les PSNR : celui du signal bruité, du signal obtenu par seuillage simple et celui obtenu en exploitant les translations.

In [None]:
print(psnr1,psnr2,P[trans-1])

Si on veut mesurer l'impact en terme de gain de PSNR de ces translations, on peut réaliser un plan d'expériences comme on l'a fait précédemment. On va se limiter à des translations de 0 à 31 points. 

Pour cela on va d'abord créer une fonction qui calcule une moyenne des PSNR sur N différentes réalisations du bruit.


In [None]:
def DebruitTransMoyenne(S,qmf,sigma,T,trans,N):
    P=np.zeros(trans)
    for seednoise in np.arange(0,N):
        Srec,Ptemp=DebruitTrans(S,qmf,seednoise,sigma,T,trans)
        P=P+Ptemp
    P=P/N
    return P

On peut tester cette fonction à la main 

In [None]:
S=SignauxRef['Blocks']
qmf='haar'
seednoise=3
sigma=0.1
T=3
trans=32
N=50
P=DebruitTransMoyenne(S,qmf,sigma,T,trans,N)
print(P)

On peut remarquer que la suite des valeurs n'est pas strictement croissante, mais que les translations apportent un gain substantiel.

On crée ensuite un plan d'expériences. Ici je fais le choix de ne pas intégrer les translations dans le plan d'expériences mais on aurait pu procéder autrement. La raison de choix réside dans le fait qu'une approche aurait impliqué de faire de nombreux calculs plusieurs fois. En effet, tout calculer pour 7 translations puis le refaire pour 8 translations est extrêmement redondant. J'ai choisis ici de faire de calculer en une seule fois, les PSNR associés à toutes les translations et de manipuler a posteriori les bases de données.

In [None]:
experiences_trans = {'Si':SignauxRef.keys(),'sigma':np.linspace(0.1,0.5,5),'wave':wavelist}
dfexp_trans = pd.DataFrame(list(itertools.product(*experiences_trans.values())),columns=experiences_trans.keys())

In [None]:
print(dfexp_trans)

In [None]:
def row2PSNR_trans(row):
    T=3
    N=10
    trans=11
    P=DebruitTransMoyenne(SignauxRef[row.Si],row.wave,row.sigma,T,trans,N)
    return {'PSNR':P}

In [None]:
result2 = dfexp_trans.apply(row2PSNR_trans,axis=1)
dfexp_trans[['PSNR']] = pd.DataFrame.from_records(result2.values)

In [None]:
print(dfexp_trans)

Les lignes suivantes permettent de restructurer la base de données de manière à avoir une colonne translation qui correspond à ce que l'on souhaite.

In [None]:
df = dfexp_trans.copy()

df = pd.concat((df[['Si','sigma','wave']],pd.DataFrame(df.PSNR.values.tolist(),df.index)),axis=1)

df = df.melt(id_vars=['Si','sigma','wave'],var_name='translation',value_name='PSNR')

In [None]:
print(df)

On termine en affichant les PSNR en fonction du nombre de translations... 

In [None]:
import hvplot.pandas
from bokeh.models import HoverTool
h = HoverTool()
df.hvplot('translation','PSNR',by='wave',kind='scatter',groupby=['Si','sigma'])\
.opts(width=600,tools = [h]).redim.range(PSNR=(13,32),translation=(-0.5,10.5))

## Estimation de la variance du bruit par Ondelettes

Dans des situations réelles, on peut ne pas connaitre la variance du bruit. Cette dernière peut être estimée 
en utilisant les coefficients d'ondelettes. En effet on peut exploiter le fait qu'aux fines échelles, l'essentiel des coefficients sont dus au bruit. La moyenne des valeurs absolue des coefficients du signal bruité peut être lourdement impacté par les coefficients du signal mais pas la médiane. Comme on peut le constater en effectuant les lignes suivantes : 

In [None]:
s=SignauxRef["Blocks"]
n1=len(s)
b=0.01*np.random.randn(n1)
sb=s+b
wsb=pywt.wavedec(sb, 'haar', mode='per', level=9)
wb=pywt.wavedec(b, 'haar', mode='per', level=9)
print(np.median(np.abs(wsb[9])),np.median(np.abs(wb[9])))

On peut remarquer que si $X$ est une variable gaussienne centrée réduite, la médiane $m$ de sa valeur absolue vérifie 
$P(X>m)=0.25$.

En déduire que l'espérance de cette médiane des coefficients du bruit est donnée par 

In [None]:
print(0.01*np.sqrt(2)*scp.special.erfinv(0.5))

Et qu'on peut ainsi estimer l'écart type du bruit par la fonction suivante :

In [None]:
def EstimEcartTypeBruit(s,qmf):
    Lmax=pywt.dwt_max_level(len(s),pywt.Wavelet(qmf).dec_len)
    wsb=pywt.wavedec(sb, qmf, mode='per', level=Lmax)
    mt=np.sqrt(2)*scp.special.erfinv(0.5)
    return np.median(np.abs(wsb[Lmax]))/mt

In [None]:
print(EstimEcartTypeBruit(s,'haar'))