In [None]:
import numpy as np
from matplotlib import pyplot as plt
from scipy.io import loadmat
from skimage.io import imread

In [None]:
rootfolder = ".."


Useful function for plot the dictionary


In [None]:
def get_dictionary_img(D):
    M, N = D.shape
    p = int(round(np.sqrt(M)))
    nnn = int(np.ceil(np.sqrt(N)))
    bound = 2
    img = np.ones((nnn * p + bound * (nnn - 1), nnn * p + bound * (nnn - 1)))
    for i in range(N):
        m = np.mod(i, nnn)
        n = int((i - m) / nnn)
        m = m * p + bound * m
        n = n * p + bound * n
        atom = D[:, i].reshape((p, p))
        if atom.min() < atom.max():
            atom = (atom - atom.min()) / (atom.max() - atom.min())
        img[m : m + p, n : n + p] = atom

    return img

Set all the parameters for the anomaly detection


In [None]:
# patch size (tha patch is square)
p = 15

# number of patches in the training set for dictionary learning
npatch_dictionary = 10000

# number of patches to estimate the confidence region
npatch_region = 1000

# parameters for the dictionary learning using the KSVD
niter_dl = 10
natom = int(np.round(p**2 * 1.5))
L = 4

# regularization parameters for the l1 sparse coding
lmbda = 0.18

## Construct the training and validation sets


In [None]:
# load the training image and rescale it in [0,1]
img = imread(f"{rootfolder}/data/img_normal.png") / 255

# extract random patches from the image and store them in matrices S, V
imsz = img.shape
M = p**2

# Training patches S
S = np.zeros((M, npatch_dictionary))
for n in range(npatch_dictionary):
    i = np.random.randint(0, imsz[0] - p + 1)
    j = np.random.randint(0, imsz[1] - p + 1)
    patch = img[i : i + p, j : j + p]
    S[:, n] = patch.flatten()

# Validation patches V
V = np.zeros((M, npatch_region))
for n in range(npatch_region):
    i = np.random.randint(0, imsz[0] - p + 1)
    j = np.random.randint(0, imsz[1] - p + 1)
    patch = img[i : i + p, j : j + p]
    V[:, n] = patch.flatten()

## Dictionary Learning

Perform preprocessing on the patches in $S$


In [None]:
# PREPROCESSING: exclude black patches from S
v = np.median(S, axis=0)
S = S[:, v > 0.06]

In [None]:
# PREPROCESSING: remove the mean from each patch
S = S - np.mean(S, axis=0, keepdims=True)

Perform dictionary learning via KSVD or MOD


In [None]:
# You can use KSVD or load a precomputed dictionary
D = loadmat(f"{rootfolder}/data/dict_anom_det.mat")["D"]

# Or implement KSVD if you prefer:
# D = ksvd(S, M, natom, niter_dl, S.shape[1], L)

Show the learned dictionary


In [None]:
img_dict = get_dictionary_img(D)
plt.imshow(img_dict, cmap="gray")
plt.show()

## Confidence region estimation / density estimation


In [None]:
# PREPROCESSING: exclude black patches
v = np.median(V, axis=0)
V = V[:, v > 0.06]

In [None]:
# PREPROCESSING: remove the mean from each patch
V = V - np.mean(V, axis=0, keepdims=True)

In [None]:
def OMP(s, D, L, tau):
    _, N = D.shape
    r = s.copy()  # initial residual
    omega = []  # support set
    x_OMP = np.zeros(N)  # final sparse code

    while len(omega) < L and np.linalg.norm(r) > tau:
        # SWEEP STEP: compute correlations between residual and dictionary atoms
        e = np.zeros(N)
        for j in range(N):
            e[j] = D[:, j].T @ r

        # find the column index with maximum correlation
        jStar = np.argmax(np.abs(e))

        # UPDATE support set
        if jStar not in omega:
            omega.append(jStar)

        # update coefficients using least squares
        D_omega = D[:, omega]
        x_omega, _, _, _ = np.linalg.lstsq(D_omega, s, rcond=None)

        # update residual
        r = s - D_omega @ x_omega

    # construct full sparse vector
    for i, idx in enumerate(omega):
        x_OMP[idx] = x_omega[i]

    return x_OMP

In [None]:
# sparse coding of each patch in V
X = np.zeros((natom, npatch_region))
for i in range(V.shape[1]):
    # Use IRLS or OMP for sparse coding
    X[:, i] = OMP(V[:, i], D, L, 1e-6)

In [None]:
# computing the anomaly indicators (l1 norm, reconstruction error) for each
# patch in V

A = np.zeros(
    (2, V.shape[1])
)  # each column contains the values of the anomaly_scores for a patch

for i in range(V.shape[1]):
    # Anomaly indicator 1: L1 norm of sparse coefficients
    # Anomaly indicator 2: Reconstruction error
    reconstruction = D @ X[:, i]
    A[0, i] = np.sum(np.abs(X[:, i]))  # L1 norm
    A[1, i] = np.linalg.norm(V[:, i] - reconstruction) ** 2  # Reconstruction error

# Estimation of mean and covariance
mu = np.mean(A, axis=1)
Sigma = np.cov(A)

In [None]:
# estimation of the threshold that gives the desired false positive rate
# using the patches in V

FPR_target = 0.1

# compute the Mahalanobis distance for each indicator vector in A
mahal_dist = np.zeros(V.shape[1])
for i in range(A.shape[1]):
    diff = A[:, i] - mu
    mahal_dist[i] = np.sqrt(diff.T @ np.linalg.inv(Sigma) @ diff)

# set the threshold
threshold = np.percentile(mahal_dist, (1 - FPR_target) * 100)

## Test phase


In [None]:
# load the test image
img_test = imread(f"{rootfolder}/data/img_anom.png") / 255

imsz = img_test.shape

STEP = 7
# initialize the estimated image
heatmap = np.zeros_like(img)

# initialize the weight matrix
weights = np.zeros_like(img)

In [None]:
for i in range(0, imsz[0] - p + 1, STEP):
    for j in range(0, imsz[1] - p + 1, STEP):
        # extract the patch with the top left corner at pixel (i, j)
        s = img_test[i : i + p, j : j + p].flatten()

        # if the median of s is too small set the anomaly score to 0:
        if np.median(s) <= 0.06:
            score = 0
        else:
            # subtract the mean from the patch
            s = s - np.mean(s)

            # perform the sparse coding
            x = OMP(s, D, L, 1e-6)  # or use IRLS

            # compute the anomaly indicators vector
            reconstruction = D @ x
            a = np.array([np.sum(np.abs(x)), np.linalg.norm(s - reconstruction) ** 2])

            # compute the anomaly score (Mahalanobis distance)
            diff = a - mu
            score = np.sqrt(diff.T @ np.linalg.inv(Sigma) @ diff)

        # update the heatmap
        heatmap[i : i + p, j : j + p] += score

        # update the weight matrix
        weights[i : i + p, j : j + p] += 1

In [None]:
# normalize the heatmap
heatmap = heatmap / (weights + 1e-10)

In [None]:
# plot the heatmap
plt.imshow(heatmap)

In [None]:
# build the detection mask, that has the same size of the test image
# each pixel in the mask has value 1 if the corresponding patch has been
# detected as anomalous, otherwise it has value 0
mask = (heatmap > threshold).astype(float)

In [None]:
## show the results
plt.figure(3), plt.imshow(img_test, cmap="gray"), plt.title("Test Image")
plt.figure(4), plt.imshow(mask, cmap="gray"), plt.title("Mask")
plt.show()

In [None]:
# combine the mask and the test image
img_color = np.zeros([img_test.shape[0], img_test.shape[1], 3])
img_temp = img_test.copy()
img_temp[mask > 0] = 1
img_color[:, :, 0] = img_temp
img_temp = img_test.copy()
img_temp[mask > 0] = 0
img_color[:, :, 1] = img_temp
img_temp = img_test.copy()
img_temp[mask > 0] = 0
img_color[:, :, 2] = img_temp

plt.figure(5), plt.imshow(img_color), plt.title("Detections")
plt.show()