#### <p style="text-align:right";> *Projet de Compressed Sensing - ENSAE ParisTech - 2017/2018*</p>  <p style="text-align:right";> Kolia Iakovlev - Samuel Ritchie </p>

# <p style="text-align:center";><span style="color: #fb4141">Application du Compressed Sensing au cas des images satellites : </span></p> <p style="text-align:center";><span style="color: #fb4141"> un exemple d'implémentation</span></p>

In [None]:
from jyquickhelper import add_notebook_menu
add_notebook_menu()

## 0. Importation des modules utiles au projet

Nous commencons par importer les différents modules utiles au projet. Outre numpy pour le calcul numérique, nous utilisons la librairie Pillow pour le chargement des images, ainsi que la librairie cvxpy pour l'optimisation.

In [None]:
import numpy as np
from PIL import Image
from cvxpy import *
from tqdm import tqdm 
tqdm.monitor_interval = 1
import matplotlib.pyplot as plt
%matplotlib inline
import time
import os
from copy import deepcopy
from scipy.fftpack import fft, dct, idct

## 1. Chargement et prétraitement de l'image satellite

In [None]:
image = Image.open("image_satellite.jpg")
image = np.array(image)
image = image[:,:,0]
plt.imshow(image,  cmap=plt.cm.gray)

On rogne l'image en un carré de 300*300 pixels

In [None]:
image = image[100:200,100:200]
plt.imshow(image,  cmap=plt.cm.gray)

In [None]:
full_coef = []
for value in image:
    full_coef.append(value)
full_coef = np.hstack(full_coef)
plt.figure(figsize=(15,5))
plt.hist(np.abs(full_coef), range=[np.min(np.abs(full_coef)), np.max(np.abs(full_coef))], bins=100)
plt.title('Values of the coefficients in the origin space')
plt.show()  

Passage dans la base des cosinus

In [None]:
image_in_dct = dct(image)
plt.imshow(np.asarray(idct(image_in_dct)), cmap=plt.cm.gray)

In [None]:
full_coef = []
for value in image_in_dct:
    full_coef.append(value)
full_coef = np.hstack(full_coef)
plt.figure(figsize=(15,5))
plt.hist(np.abs(full_coef), range=[np.min(np.abs(full_coef)), np.max(np.abs(full_coef))], bins=100)
plt.title('Values of the coefficients in the cosine space')
plt.show()  

## 2. Calcul des matrices de mesures

Pour calculer les matrices de mesures, nous aurons besoin de matrices gaussiennes

In [None]:
def gaussian_matrix(M, N_col): #M : nombre de mesures N_col : nombre de colonnes
    return np.random.randn(M, N_col)/M

Comme suggéré dans l'article, nous allons créer une nouvelle matrice de mesure pour chaque ligne de la matrice de pixels. On stocke a la fois les *sensing matrix* 

$$\Phi \quad tq \quad \Phi^i \in \mathbb{R}^{M \times N_{col}} \quad et \quad (\Phi^i)_{kj} \sim \mathcal{N}(0,\frac{1}{M}) \quad où \quad k=1..M \quad et \quad j=1..N_{col}$$

Ensuite, nous prenons M mesures linéaires de $(X)_i$. Ceci formera les lignes de notre matrice de mesure $Y \in \mathbb{R}^{N_{row} \times M}$ : 

$$ (Y)_i^T = \Phi^i (X)_i^T $$

In [None]:
def measurement_matrixes(X,M):
    Phi =[]
    Y=[]
    for i in range(0,X.shape[0]):
        Phi.append(gaussian_matrix(M,X.shape[1]))
        Y.append((Phi[i].dot(X[i,:].T)).T)
    return Y,Phi

## 3. Initialisation et définition des filtres de prédiction linéaire

On initialise la matrice comme proposé dans l'article

In [None]:
def initialisation(Y,Phi):
    F =[]
    for i in tqdm(range(len(Y)), desc = 'Initialising...', ncols= 50):
        x = Variable(Phi[i].shape[1])
        A = Phi[i]
        y = Y[i]
        eps = 1
        obj = Minimize(sum(abs(x)))
        constraints = [norm(A*x - y) <= eps]
        prob = Problem(obj, constraints)
        prob.solve()
        m = np.squeeze(np.asarray(prob.variables()[0].value))
        F.append(m)
    return F

On crée par ailleurs les différents filtres de prédiction linéaire proposés par les auteurs

In [None]:
class PredictionFilters():
    
    def line_average_prediction_filter(self, x , y, indice):
        return (x[indice] + y[indice])/2
    
    def average_prediction_filter(self, x, y, indice):
        u=len(x)
        if indice ==0:
            return((x[0]+x[1]+y[0]+y[1])/4)
        elif indice==u-1:
            return((x[u-1]+x[u-2]+y[u-2]+y[u-1])/4)
        else:
            return((x[indice]+x[indice+1]+x[indice-1]+y[indice]+y[indice-1]+y[indice+1])/6)
        
    def weighted_average_prediction_filter(self, x, y, indice):
        a=(2-np.sqrt(2))/4
        b=(np.sqrt(2)-1)/2
        u=len(x)
        if indice ==0:
            return((x[0]+y[0])*(3/2)/(2*np.sqrt(2)+3) + (x[1]+y[1])*np.sqrt(2)/(2*np.sqrt(2)+3))
        elif indice==u-1:
            return((x[u-1]+y[u-1])*(3/2)/(2*np.sqrt(2)+3) + (x[u-2]+y[u-2])*np.sqrt(2)/(2*np.sqrt(2)+3))
        else:
            return((x[indice+1]+x[indice-1]+y[indice-1]+y[indice+1])*a+(x[indice]+y[indice])*b)    

## 4. Problème de minimisation $l_1$

Ci après est défini le problème de minimisation $l_1$ classique utilisé lors de l'étape de reconstruction

In [None]:
def arg_min(image,Phi_i,e_y):
    e = Variable(image.shape[1])
    obj = Minimize(sum(abs(e)))
    constraints = [Phi_i*e == e_y]
    prob = Problem(obj,constraints)
    prob.solve()
    m = np.squeeze(np.asarray(prob.variables()[0].value))
    return m

## 5. Reconstruction de l'image

In [None]:
def reconstruction_image_2D(image,F_0,M,N_iter,prediction):

    print("RECONSTRUCTING IMAGE WITH M = %s, %s PREDICTION FILTER AND %s ITERATIONS" %(M,prediction,N_iter))
    time.sleep(0.5)
          
    X_1 = deepcopy(F_0)
    X_0 = deepcopy(F_0)
    
    length=len(X_0[0])
    MSE=[]
    
    if prediction == 'P1':
        prediction_filter = PredictionFilters().line_average_prediction_filter
    elif prediction == 'P2':
        prediction_filter = PredictionFilters().average_prediction_filter
    elif prediction == 'P3' :
        prediction_filter = PredictionFilters().weighted_average_prediction_filter
    
    print('Starting to iterate...')
    for n in range(1,N_iter+1):
        
        time.sleep(1)
        for i in tqdm(range(length), desc='Iteration %s/%s' %(n,N_iter)):
            if (i==0) | (i==length-1):
                x_p = X_0[i]
            else:
                x_p = [prediction_filter(X_0[i-1],X_0[i+1],j) for j in range(length)]
            y_p = Phi[i].dot(x_p)
            e_y = Y[i] - y_p
            
            e_theta = arg_min(image,Phi[i],e_y)
            #e_x = Psi(e_theta)
            X_1[i] = x_p + e_theta 
        MSE.append(np.square(np.subtract(image,np.array(X_1))).mean()/(image.shape[0]*image.shape[1]))
        X_0 = X_1
 
    os.makedirs('./outputs', exist_ok=True)
    new_image = np.array(X_1)
    plt.figure(figsize = (10,10))
    plt.subplot(1, 2, 1)
    plt.imshow(image,  cmap=plt.cm.gray)
    plt.title('Original image')
    plt.subplot(1, 2, 2)
    plt.imshow(new_image,cmap = plt.cm.gray)
    plt.title('Reconstructed image')
    plt.savefig('outputs/reconstructed_image_%s_filter_%s_iterations_%s_measurements'%(p,N_iter,M))
    plt.figure()
    plt.xlabel('Number of iterations')
    plt.ylabel('Mean Standard Error')
    plt.plot(MSE)
    plt.savefig('outputs/MSE_reconstruced_image_%s_filter_%s_measurements' %(p,M))
    return X_1,MSE



On fait désormais varier le nombre de mesures ainsi que le type de filtre utilisé. Tous les résultats (image reconstruite et MSE associé) sont exportés dans le fichier output à la racine du projet

In [None]:
prediction_methods =['P1','P2','P3']
measurements = list(range(100,300,50))
for m in measurements:
    print('Initialising measurement matrix for M = %s'%m)
    time.sleep(0.2)
    Y,Phi = measurement_matrixes(image,m)    
    F_0 = initialisation(Y,Phi)  
    for p in prediction_methods:
        reconstruction_image_2D(image = image, F_0 = F_0, M = m, N_iter = 5, prediction = p)