In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import numpy as np
import matplotlib.pyplot as plt

from tqdm.notebook import tqdm

# Define a sparse signal and a measurement matrix
Here, we consider that a sparse signal $x_0 \in \mathbb R^n$ with a support of size $s$ small. $x_0$ is observered through a measurement matrix $\Phi \in \mathcal M_{p, n}(\mathbb R)$: $y_0 = \Phi x_0$, where $p$ is the number of measurements. At first, we will consider a measurement matrix generated by a gaussian random process:
$$
    \Phi_{i, j} \sim \mathcal N(0, 1/p)  \quad \text{i.i.d.}
$$

In [None]:
n = 2**12
s = 2**3

def make_sparse_signal(n, s):
    """Return a signal of size n with s non-zero values equal to 1."""
    x = np.concatenate((np.random.randn(s), np.zeros(n-s)))
    x = np.random.permutation(x)
    return x

x0 = make_sparse_signal(n, s)

plt.figure()
plt.plot(x0, 'k')
plt.show()

In [None]:
p_th = int(np.ceil(2 * s * np.log(n)))
p = int(1.5 * p_th)
print(f"Number of measurements: {p}")
Phi = np.random.randn(p, n) / np.sqrt(p)
y0 = Phi @ x0

For maximal performance, the measurements should be as independant from each other as possible, hence the choice of a gaussian matrix. We plot the correlation matrix between the lines $\{\Phi_i\}_i$ of $\Phi$:
$$
    \Sigma_{i, j} = \frac{\langle \Phi_i, \Phi_j \rangle}{\| \Phi_i \|_2 \| \Phi_j \|_2} \in [-1, 1]
$$

In [None]:
scalar_prod = Phi @ Phi.T

Phi_line_norms = np.linalg.norm(Phi, axis=1)
renorm = Phi_line_norms.reshape(p, 1) * Phi_line_norms.reshape(1, p)

Sigma =scalar_prod / renorm

In [None]:
plt.figure()
plt.imshow(Sigma, cmap='coolwarm', vmin=-1, vmax=1)
plt.colorbar()
plt.show()

# Inverse problem

## $L_2$ error minimization
We want to recover $x_0$ from the observation of $y_0 = \Phi x_0$. Since $m < n$, this problem is ill-posed: if we define
$$
    \tilde x_0 = argmin_{x \in \mathbb R^n} \frac{1}{2}\|\Phi x - y_0\|_2^2,
$$
we get $\tilde x_0 = \Phi^+ y_0 = \Phi^T (\Phi \Phi^T)^{-1} y_0$ and no information can be recovered along the direction $Ker( \Phi )$ of dimension $n-p$. Since we will use the gradient later anyway, we will get this reconstruction with a gradient descent,

Note: Because $\Phi$ is randomly generated by a gaussian process and $m < n$, you can show that $\Phi \Phi^T$ is almost surely invertible.

In [None]:
def reconstruction_error(x):
    return .5 * np.linalg.norm(Phi@x - y0)**2

def reconstruction_error_gradient(x):
    return Phi.T @ (Phi@x - y0)

x = np.zeros_like(x0)
niter = 100
tau = 1/np.linalg.norm(Phi, 2)**2
err_optim = np.zeros(niter)
for itr in tqdm(range(niter)):
    x = x - tau * reconstruction_error_gradient(x)
    err_optim[itr] = reconstruction_error(x)
x_mp = x
    
err = np.linalg.norm(x0 - x_mp) / np.linalg.norm(x0)
plt.figure(figsize=(15, 6))

plt.subplot(1, 2, 1)
plt.plot(x0, 'k')
plt.plot(x_mp, 'r')
plt.title(r"L2 recovery of $x_0$: $\|\tilde x_0 - x_0\|_2 / \|x_0\|_2 = {:.5f}$".format(err))

plt.subplot(1, 2, 2)
plt.plot(err_optim)
plt.title("L2 error evolution during training")

plt.show()

## We need to use prior information in order to recover $x_0$: it is sparse !

To express this prior, we recover $x_0$ using a Lasso regularization:
$$
    \tilde x_\lambda = argmin_{x \in \mathbb R^n} \frac{1}{2}\|\Phi x - y_0\|_2^2 + \lambda \|x\|_1,
$$
which we solve using the Iterative Soft Thresholding Algorithm (ISTA).

Note: the number to remember from this is $2s\log(n)$: this is the order of the number of measurements theoretically required to be able to reconstruct the signal with good probability. Note that if we knew the support of $x_0$ in advance, we would need $s$ measurements to reconstruct it perfectly. This means that a factor $2\log(n)$ is required to find that support. For very sparse signals (e.g. time events, wavelet transforms of images), we will very quickly have $2s\log(n) \ll n$.

In [None]:
def f(x, la): return reconstruction_error(x) + la*np.linalg.norm(x,1)

In [None]:
# define step along the reconstruction error
def reconstruction_step(x, tau):
    return x - tau * reconstruction_error_gradient(x)

In [None]:
# define step along the regularization error
def regularization_step(x, T):
    return np.maximum(abs(x)-T, np.zeros_like(x)) * np.sign(x)

In [None]:
def ISTA(x, tau, la, niter):
    err_acc = np.zeros(niter)
    for itr in tqdm(range(niter)):
        x = reconstruction_step(x, tau)
        x = regularization_step(x, la*tau)
        err_acc[itr] = f(x, la)
    return x, err_acc

In [None]:
x = np.zeros_like(x0)

lmax = abs(Phi.transpose().dot(y0)).max()
la = lmax /10

niter = 2000
x_lasso, err_optim = ISTA(x, tau, la, niter)

In [None]:
plt.figure(figsize=(15, 6))

plt.subplot(1, 2, 1)
plt.plot(x0, 'k')
plt.plot(x_lasso, 'r')
err = np.linalg.norm(x0 - x_lasso) / np.linalg.norm(x0)
plt.title(r"Lasso recovery of $x_0$: $\|\tilde x_0 - x_0\|_2 / \|x_0\|_2 = {:.5f}$".format(err))

plt.subplot(1, 2, 2)
plt.plot(err_optim)
plt.title("Lasso-regularized error evolution during training")

plt.show()

## A better reconstruction: finding the support with Lasso then finding the best L2 reconstruction on that support

We find the support $I$ of $x_0$, then solve:
$$
    \tilde x_0 = argmin \left\{ \frac{1}{2}\|\Phi x - y_0\|_2^2 : x \in \mathbb R^n, i \notin I \implies x_i = 0 \right\}
$$
An easy way do to so is to project the reconstruction error gradient we used above on the space $Vect(\{x_i\}_{i \in I})$ at each step.

In [None]:
I = np.abs(x_lasso) > 1e-8

x = np.zeros_like(x0)
niter = 1000
tau = 1/np.linalg.norm(Phi, 2)**2
for itr in range(niter):
    grad = reconstruction_error_gradient(x)
    grad = grad * I
    x = x - tau * grad
x_support = x
    
err = np.linalg.norm(x0 - x_support) / np.linalg.norm(x0)
plt.figure()
plt.plot(x0, 'k')
plt.plot(x_support, 'r')
plt.title(r"L2 recovery of $x_0$: $\|\tilde x_0 - x_0\|_2 / \|x_0\|_2 = {:.5f}$".format(err))
plt.show()

# A concrete example: tomography reconstruction
The following code is imported from https://scikit-learn.org/stable/auto_examples/applications/plot_tomography_l1_reconstruction.html

MRI and ultrasounds use tomography i.e. we use penetrative waves and measure them after going through the humain body and interacting with the matter composing it. It can be though of as a linear operator acting on a single line, and measuring the signal which in this case would be matter density at each voxel.

In [None]:
from scipy import sparse
from scipy import ndimage
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge

In [None]:
# Author: Emmanuelle Gouillart <emmanuelle.gouillart@nsup.org>
# License: BSD 3 clause

def _weights(x, dx=1, orig=0):
    x = np.ravel(x)
    floor_x = np.floor((x - orig) / dx).astype(np.int64)
    alpha = (x - orig - floor_x * dx) / dx
    return np.hstack((floor_x, floor_x + 1)), np.hstack((1 - alpha, alpha))


def _generate_center_coordinates(l_x):
    X, Y = np.mgrid[:l_x, :l_x].astype(np.float64)
    center = l_x / 2.
    X += 0.5 - center
    Y += 0.5 - center
    return X, Y


def build_projection_operator(l_x, n_dir):
    """ Compute the tomography design matrix.

    Parameters
    ----------

    l_x : int
        linear size of image array

    n_dir : int
        number of angles at which projections are acquired.

    Returns
    -------
    p : sparse matrix of shape (n_dir l_x, l_x**2)
    """
    X, Y = _generate_center_coordinates(l_x)
    angles = np.linspace(0, np.pi, n_dir, endpoint=False)
    data_inds, weights, camera_inds = [], [], []
    data_unravel_indices = np.arange(l_x ** 2)
    data_unravel_indices = np.hstack((data_unravel_indices,
                                      data_unravel_indices))
    for i, angle in enumerate(angles):
        Xrot = np.cos(angle) * X - np.sin(angle) * Y
        inds, w = _weights(Xrot, dx=1, orig=X.min())
        mask = np.logical_and(inds >= 0, inds < l_x)
        weights += list(w[mask])
        camera_inds += list(inds[mask] + i * l_x)
        data_inds += list(data_unravel_indices[mask])
    proj_operator = sparse.coo_matrix((weights, (camera_inds, data_inds)))
    return proj_operator


def generate_synthetic_data():
    """ Synthetic binary data """
    rs = np.random.RandomState(0)
    n_pts = 36
    x, y = np.ogrid[0:l, 0:l]
    mask_outer = (x - l / 2.) ** 2 + (y - l / 2.) ** 2 < (l / 2.) ** 2
    mask = np.zeros((l, l))
    points = l * rs.rand(2, n_pts)
    mask[(points[0]).astype(np.int), (points[1]).astype(np.int)] = 1
    mask = ndimage.gaussian_filter(mask, sigma=l / n_pts)
    res = np.logical_and(mask > mask.mean(), mask_outer)
    return np.logical_xor(res, ndimage.binary_erosion(res))


# Generate synthetic images, and projections
l = 128
proj_operator = build_projection_operator(l, l // 7)
data = generate_synthetic_data()
proj = proj_operator * data.ravel()[:, np.newaxis]
proj += 0.15 * np.random.randn(*proj.shape)

# Reconstruction with L2 (Ridge) penalization
rgr_ridge = Ridge(alpha=0.2)
rgr_ridge.fit(proj_operator, proj.ravel())
rec_l2 = rgr_ridge.coef_.reshape(l, l)

# Reconstruction with L1 (Lasso) penalization
# the best value of alpha was determined using cross validation
# with LassoCV
rgr_lasso = Lasso(alpha=0.001)
rgr_lasso.fit(proj_operator, proj.ravel())
rec_l1 = rgr_lasso.coef_.reshape(l, l)

plt.figure(figsize=(8, 3.3))
plt.subplot(131)
plt.imshow(data, cmap=plt.cm.gray, interpolation='nearest')
plt.axis('off')
plt.title('original image')
plt.subplot(132)
plt.imshow(rec_l2, cmap=plt.cm.gray, interpolation='nearest')
plt.title('L2 penalization')
plt.axis('off')
plt.subplot(133)
plt.imshow(rec_l1, cmap=plt.cm.gray, interpolation='nearest')
plt.title('L1 penalization')
plt.axis('off')

plt.subplots_adjust(hspace=0.01, wspace=0.01, top=1, bottom=0, left=0,
                    right=1)

plt.show()