In [3]:
import skimage
import numpy as np
import ct_utils
import optimizers
import matplotlib.pyplot as plt
import ipywidgets as widgets
from ipywidgets import interactive
from IPython.display import display
import pywt


def todo():
    raise NotImplementedError("In dieser Zelle gibt es noch mindestens ein TODO!")

# Teil II: Modellbasierte Regularisierung

Eine beliebte Klasse von Regularisierungsmethoden lässt sich als Variationsproblem formulieren, bei dem die Rekonstruktion $x^*$ einer verrauschten Messung $y^{\epsilon}$ als Lösung von
$$ \min_x \frac{1}{2}\|Ax-y^{\varepsilon}\|^2_2 + \lambda J(x), $$
gesucht wird.

Die Funktion $J$ heißt Regularisierungsfunktion und bestraft unerwünschtes Verhalten von $x$. In diesem Teil des Tutorials werden wir uns typische Beispiele für $J$ ansehen und eine Vorstellung davon bekommen, wie der Minimierer des obigen Problems gefunden werden kann.

In [4]:
dim = 128

# Definiere eine ground truth (gt) - ein "Phantom"-Bild, siehe [https://de.wikipedia.org/wiki/Shepp-Logan-Phantom] (ctrl/cmd + click)
x_gt = skimage.img_as_float(skimage.data.shepp_logan_phantom())
x_gt = skimage.transform.resize(x_gt, (dim, dim))

# Definiere den Vorwärtsoperator - bei CT ist das die Radontransformation
theta = np.linspace(0, 180, endpoint=False, num=dim)
A = ct_utils.Radon(theta)

# Clean data
y_clean = A(x_gt)

## Tikhonov-Regularisierung

Die erste Regularisierungsfunktion, die wir betrachten, ist $J(x) = \frac{1}{2}\|x\|^2_2 $. Eine einfache Interpretation davon wäre, dass **Fehler klein gehalten werden, indem wir die Norm von x reduzieren**. Das Variationsproblem dafür lautet
$$ x^* = \operatorname*{arg\ min}_x \frac{1}{2}\|Ax-y^{\varepsilon}\|^2_2 + \frac{\lambda}{2} \|x\|^2_2.$$

Analytisch lässt sich die Lösung des obigen Problems wäre $ x^* = (A^*A + \lambda \operatorname{Id})^{-1} A^*y^{\varepsilon}$. Hier ist $A^*$ die Adjungierte von $A$ und $\operatorname{Id}$ der Identitätsoperator.

Da wir nicht einfach so auf die Inverse $(A^*A + \lambda \operatorname{Id})^{-1}$ zugreifen können, müssen wir als Ersatz eine iterative Optimierungsmethode wie z.B. gradient descent mit Schrittweite $\eta > 0$ verwenden. Gradient descent ist ein iteratives Verfahren zum Lösen von Optimierungsproblemen der Form $\min_x f(x)$: Wir starten bei einem Anfangsbild $x^0$ und berechnen iterativ für $k\ge 1$ die Schritte
$$ x^{k+1} = x^k - \eta \nabla f(x^k).$$
In unserem speziellen Fall heißt das, wir iterieren
$$ x^{k+1} = x^k - \eta (A^*Ax^{k} + \lambda x^k - A^*y^{\varepsilon} ).\tag{$\ast$}$$
Für $k \to \infty$ können wir (falls $\eta$ richtig gewählt wird) erwarten, dass die Iterierten $x^k$ zur Lösung $x^*$ konvergieren. In der Praxis bricht man normalerweise nach einigen Schritten die Iteration ab und hofft, nahe genug an $x^*$ gelangt zu sein.

In [11]:
class gradient_descent(optimizers.optimizer):
    def __init__(self, A, x, y, eta=0.1, lamda=1.0, **kwargs):
        super().__init__(**kwargs)
        self.A = A
        self.x = x
        self.y = y
        self.eta = eta
        self.lamda = lamda

        def energy_fun(x):
            return 0.5 * np.linalg.norm(A(x) - y) ** 2 + lamda * 0.5 * np.linalg.norm(
                x, ord=1
            )

        self.energy_fun = energy_fun

    def step(self):
        # Implementiert hier den Gradient aus (*)
        gradient = todo() # Hinweis: Die Adjungierte A^* von self.A ist implementiert als self.A.adjoint
        # Hier sollte das Update
        self.x = todo()

In [12]:
pre_compute_tikh = {}


def plot_tikh_reco(lamda, noise_lvl, max_angle):
    num_theta = int(np.ceil(max_angle / 180 * dim))
    theta = np.linspace(0, max_angle, endpoint=False, num=num_theta)
    R = ct_utils.Radon(theta=theta)
    if (lamda, noise_lvl, max_angle) in pre_compute_tikh:
        x = pre_compute_tikh[(lamda, noise_lvl, max_angle)]
    else:
        y_noisy = R(x_gt) + np.random.normal(
            loc=0, scale=noise_lvl, size=[dim, num_theta]
        )

        gd = gradient_descent(
            R,
            R.inv(y_noisy), # Wir starten bei einem educated guess x^0, der FBP von y_noisy
            y_noisy,
            eta=1 / (noise_lvl * lamda * 20000) if (noise_lvl * lamda > 0) else 0.0001,
            lamda=lamda,
            verbosity=0,
            max_it=50,
        )
        gd.solve()

        x = gd.x
        pre_compute_tikh[(lamda, noise_lvl, max_angle)] = x

    plt.figure()
    plt.imshow(x, vmin=0, vmax=1)


l_slider = widgets.FloatSlider(
    min=0, max=0.5, step=0.01, value=0, continuous_update=False, readout_format=".3f"
)
s_slider = widgets.FloatSlider(
    min=0.001,
    max=0.01,
    step=0.001,
    value=0.001,
    continuous_update=False,
    readout_format=".3f",
)
t_slider = widgets.IntSlider(
    min=1, max=180, step=10, value=180, continuous_update=False
)
interactive_plot = interactive(
    plot_tikh_reco, lamda=l_slider, noise_lvl=s_slider, max_angle=t_slider
)
display(interactive_plot)

interactive(children=(FloatSlider(value=0.0, continuous_update=False, description='lamda', max=0.5, readout_fo…

## Sparsity-fördernde Regularisierung
Eine weitere Anforderung an die Regularisierungen könnte sein, dass die erhaltenen Rekonstruktionen in gewisser Weise **einfach/aus wenigen Vektoren einer passenden Basis zusammensetzbar** sein sollten. 

Die beiden Komponenten, die wir benötigen, um dies zu erreichen, sind
* ein Operator $D$, der $x$ in die Basis zerlegt, beispielsweise eine Wavelet-Zerlegung
* die $\|\cdot\|_1$-Norm, die die Sparsity fördert, d. h. Nicht-Null-Einträge von $Dx$ bestraft.

Das resultierende Variationsproblem lautet dann
$$ x^* = \operatorname*{arg\ min}_x \frac{1}{2}\|Ax-y^{\varepsilon}\|^2_2 + \lambda \|Dx\|_1.$$

Da die Regularisierungsfunktion im Allgemeinen nicht differenzierbar ist, verwenden wir statt gradient descent jetzt proximal gradient descent. Die proximale Abbildung einer Funktion $J$ mit dem Parameter $\lambda$ ist definiert als
$$ \operatorname{prox}_{\lambda J}(x) = \operatorname*{arg\ min}_z \frac{1}{2}\|x-z\|_2^2 + \lambda J(z),$$
und ist im Spezialfall der $\|\cdot\|_1$-Norm leicht berechenbar. Das Update von proximalem Gradientenabstieg lautet dann 
$$ x^{k+1} = \operatorname{prox}_{t\lambda J} (x^k - t \cdot (A^*Ax^k - A^*y^\varepsilon )).$$


In [5]:
pre_compute_sparse = {}


def plot_sparse_reco(D_idx, lamda, noise_lvl, max_angle):
    num_theta = int(np.ceil(max_angle / 180 * dim))
    theta = np.linspace(0, max_angle, endpoint=False, num=num_theta)
    R = ct_utils.Radon(theta=theta)
    if (D_idx, lamda, noise_lvl, max_angle) in pre_compute_sparse:
        x = pre_compute_sparse[(D_idx, lamda, noise_lvl, max_angle)]
    else:
        sinogram = R(x_gt) + np.random.normal(
            loc=0, scale=noise_lvl, size=[dim, num_theta]
        )
        opti = None
        if D_idx == 0: # Dünnbesetztheit der Pixel der Lösung
            opti = optimizers.ista_L1(
                R,
                R.inv(sinogram),
                sinogram,
                eta=1 / (noise_lvl * lamda * 100000)
                if (noise_lvl * lamda > 0)
                else 0.0001,
                max_it=50,
                lamda=lamda,
                verbosity=0,
            )
        elif D_idx == 1: # Dünnbesetztheit der Lösung in Haar-Wavelets
            opti = optimizers.ista_wavelets(
                R,
                R.inv(sinogram),
                sinogram,
                wave=pywt.Wavelet("haar"),
                eta=1 / (noise_lvl * lamda * 100000)
                if (noise_lvl * lamda > 0)
                else 0.0001,
                max_it=50,
                lamda=lamda,
                verbosity=0,
            )
        elif D_idx == 2: # Dünnbesetztheit der Lösung in Daubechies-Wavelets
            opti = optimizers.ista_wavelets(
                R,
                R.inv(sinogram),
                sinogram,
                wave=pywt.Wavelet("db4"),
                eta=1 / (noise_lvl * lamda * 100000)
                if (noise_lvl * lamda > 0)
                else 0.0001,
                max_it=50,
                lamda=lamda,
                verbosity=0,
            )

        opti.solve()

        x = opti.x

        pre_compute_sparse[(D_idx, lamda, noise_lvl, max_angle)] = x

    plt.figure()
    plt.imshow(x, vmin=0, vmax=1)


idx_toggle = widgets.ToggleButtons(
    options=[("Identity", 0), ("Haar-Wavelet", 1), ("Daubechies4-Wavelet", 2)],
    description="Decomposing operator D",
    disabled=False,
)
l_slider = widgets.FloatSlider(
    min=0, max=0.2, step=0.004, value=0, continuous_update=False, readout_format=".3f"
)
s_slider = widgets.FloatSlider(
    min=0.001,
    max=0.01,
    step=0.001,
    value=0.001,
    continuous_update=False,
    readout_format=".3f",
)
t_slider = widgets.IntSlider(
    min=1, max=180, step=10, value=180, continuous_update=False
)
interactive_plot = interactive(
    plot_sparse_reco,
    D_idx=idx_toggle,
    lamda=l_slider,
    noise_lvl=s_slider,
    max_angle=t_slider,
)

display(interactive_plot)

interactive(children=(ToggleButtons(description='Decomposing operator D', options=(('Identity', 0), ('Haar-Wav…

# TV Regularisierung

Aufgabe: Wie genau regularisiert TV die Lösung $x$?

In [6]:
precompute_tv = {}


def plot_tv_reco(lamda, noise_lvl, max_angle):
    lamda, noise_lvl, max_angle = (round(lamda, 4), round(noise_lvl, 4), int(max_angle))

    num_theta = int(np.ceil(max_angle / 180 * dim))
    theta = np.linspace(0, max_angle, endpoint=False, num=num_theta)
    R = ct_utils.Radon(theta=theta)
    if (lamda, noise_lvl, max_angle) in precompute_tv:
        x = pre_compute_sparse[(lamda, noise_lvl, max_angle)]
    else:
        y_noisy = R(x_gt) + np.random.normal(
            loc=0, scale=noise_lvl, size=[dim, num_theta]
        )

        # tv = Total Variation
        tv = optimizers.TV()

        def energy_fun(x):
            return np.linalg.norm(R(x) - y_noisy) ** 2 + lamda * tv(x)

        sBTV = optimizers.split_Bregman_TV(
            R,
            y_noisy,
            R.inv(y_noisy),
            energy_fun=energy_fun,
            lamda=lamda,
            max_it=10,
            max_inner_it=2,
            verbosity=0,
        )
        sBTV.solve()

        x = sBTV.x

        precompute_tv[(lamda, noise_lvl, max_angle)] = x

    plt.figure()
    plt.imshow(x, vmin=0, vmax=1)


l_slider = widgets.FloatSlider(
    min=0, max=0.01, step=0.001, value=0, continuous_update=False, readout_format=".3f"
)
s_slider = widgets.FloatSlider(
    min=0.001,
    max=0.01,
    step=0.001,
    value=0.0,
    continuous_update=False,
    readout_format=".3f",
)
t_slider = widgets.IntSlider(
    min=1, max=180, step=10, value=180, continuous_update=False
)
interactive_plot = interactive(
    plot_tv_reco, lamda=l_slider, noise_lvl=s_slider, max_angle=t_slider
)

display(interactive_plot)

interactive(children=(FloatSlider(value=0.0, continuous_update=False, description='lamda', max=0.01, readout_f…