This Notebook contains the main Processor pipeline: normalization of the image, filtering and binarization. It is meant to be imported and used in other notebooks.

# Technical Stuff

#### Packages Imports

In [None]:
import os
import itertools
import random
import h5py
import numpy as np
import pandas as pd
from tqdm import tqdm
from typing import List
from math import log10, sqrt

import cv2
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib

import skimage.morphology as morph
import skimage.filters as filters
import skimage.feature as feature
import skimage.restoration as restoration
import skimage.exposure as exposure

from sklearn.neighbors import NearestNeighbors

# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.wiener.html#id1
from scipy.signal import wiener
# import scipy
# scipy.special.seterr(all='ignore')
# wiener = scipy.signal.wiener

# The Preprocessor Pipeline

#### PSNR

In [None]:
""" Calculate Peak Signal-to-Noise Ratio. """
def PSNR(original, compressed, LH=True):
    if LH:
        lower_half = lambda img: img[img.shape[0]//2-50:, :]
        mse = np.mean((lower_half(original) - lower_half(compressed)) ** 2)
    else:
        mse = np.mean((original - compressed) ** 2)
    if(mse == 0):
        return 100
    max_pixel = 1.0
    psnr = 20 * log10(max_pixel / sqrt(mse))
    return round(psnr, 2)

#### Existing Ensembles

In [None]:
def choose_ensemble( paramset ):

    ei = paramset['ENSEMBLE_ID']

    if ei == 0:
        return {
            f'Wiener Filter': lambda img: wiener(img, (2*paramset['WIENER_SIZE'], paramset['WIENER_SIZE'])),
            f'TV Chambolle Filter': lambda img: restoration.denoise_tv_chambolle(img, weight=paramset['TV_WEIGHT']), }
    
    elif ei == 1:
        return {
            f'Wiener Filter': lambda img: wiener(img, (2*paramset['WIENER_SIZE'], paramset['WIENER_SIZE'])),
            f'Median Filter, disk = {paramset["MEDIAN_DISK_SIZE"]}': lambda i: filters.median(i, morph.disk(paramset["MEDIAN_DISK_SIZE"])), }

    elif ei == 2:
        return {
            f'Wavelet Filter': lambda img: restoration.denoise_wavelet( img, wavelet=paramset['WAVELET_TYPE'], rescale_sigma=True ),
            f'TV Chambolle Filter': lambda img: restoration.denoise_tv_chambolle(img, weight=paramset['TV_WEIGHT'] ), }

    elif ei == 3:
        return {
            f'Wavelet Filter': lambda img: restoration.denoise_wavelet( img, wavelet=paramset['WAVELET_TYPE'], rescale_sigma=True ),
            f'Median Filter, disk = {paramset["MEDIAN_DISK_SIZE"]}': lambda i: filters.median(i, morph.disk(paramset["MEDIAN_DISK_SIZE"])), }


#### Normalization Stage

In [None]:
""" Remove small artifacts on the bottom. """
def cut_artifacts( img ):

    i = img.shape[0] - 1
    row = img[i, :]
    th = row.max()//2
    img[i, row > th] = row.min()

    for _ in range(10):
        i -= 1
        row = img[i, :]
        img[i, row > th] = row.min()

    return img

In [None]:
""" Tilts the image so that the bottom half is brighter,
and the top one is darker. """
def tilt_image( img, amp=1.0, bottom='brighter' ):

    if amp==0:
        return img

    # Flip if needed
    if bottom == 'darker':
        img = np.flip(img)

    # Skew coefficients
    coeffs = np.linspace( -amp, amp, img.shape[0] )

    # Application
    skew = lambda pv, n: pv + n*(1-pv)*pv
    columns = [skew( pv=img[:,col], n=coeffs ) for col in range(img.shape[1])]
    app = np.stack( columns ).T

    # Normalization
    app = (app - app.min()) / (app.max() - app.min())

    # Flip back if needed
    if bottom == 'darker':
        app = np.flip(app)

    return app

#### Binarization Stage

In [None]:
""" Remove all pixels from the upper half of the image. """
def remove_upper_half(img, offset=40):
    img[:img.shape[0]//2-offset, :] = 0.0
    return img

In [None]:
""" Turn grayscale image into a binary one. """
def binarization( img, method='otsu' ):
    methods = {
        'otsu': lambda img: filters.threshold_otsu( img ),
        'yen': lambda img: filters.threshold_yen( img ),
        'adaptive': lambda img: filters.threshold_local(img, block_size=15) }
    th = methods[method]( img )
    res = img.copy()
    res[img > th] = 1.0
    res[img <= th] = 0.0
    return res

#### Actual Filtering

In [None]:
""" Filters the image and turns it into Binary one. """
def filter_and_binarize( img, paramset, calculate_psnr=False ):

    app = img.copy()

    # Normalization Stage
    app = cut_artifacts(app)
    app = tilt_image(app, amp=paramset['TILT_AMP'], bottom=paramset['TILT_BOTTOM'])
    if calculate_psnr:
        prefilter = app.copy()

    # Filtering Stage
    for filter_name, filter_function in choose_ensemble(paramset).items():
        app = filter_function(app)

    if calculate_psnr:
        psnr = PSNR( prefilter, app )

    # Binarization Stage
    app = binarization(app, method='otsu')
    app = remove_upper_half(app)

    if calculate_psnr:
        return app, psnr
    else:
        return app


#### Parameters Estimation

In [None]:
columns_filter_parameters = [ 'TILT_AMP', 'TILT_BOTTOM', 'WIENER_SIZE', 'MEDIAN_DISK_SIZE', 'WAVELET_TYPE', 'TV_WEIGHT', 'ENSEMBLE_ID' ]
columns_image_characteristics = [ 'LH_MEDIAN', 'LH_SUM', 'LH_STD' ]

In [None]:
def plot_feature_space( df, gt_lenth, radius=0.5 ):

    with plt.style.context('dark_background'):
        fig, ax = plt.subplots(1, 1, figsize=(6, 6), dpi=90)

        colors = {1.0:'red', 2.0:'green', 3.0:'blue', 0.0:'yellow'}

        f1 = 'LH_SUM'
        f2 = 'LH_MEDIAN'
        ax.scatter(df[:gt_lenth][f1], df[:gt_lenth][f2], c=df[:gt_lenth]['LABEL'].map(colors))
        
        for new_row in df.iloc[gt_lenth:].iterrows():
            ax.scatter(new_row[1][f1], new_row[1][f2], color='white')
            circle = plt.Circle((new_row[1][f1], new_row[1][f2]), radius, color='white', fill=False)
            ax.add_patch(circle)


        ax.set_title( f'{f1} vs. {f2}' )
        ax.set_xlabel( f1 )
        ax.set_ylabel( f2 )

In [None]:
def find_KNN( df, radius=0.5, K=3, display=False ):
    
    for col in columns_image_characteristics:
        vec = df[col]
        df[col] = (vec - vec.min()) / (vec.max() - vec.min())

    X = df.iloc[0:-1]
    x = list(df.iloc[-1])

    neigh = NearestNeighbors(n_neighbors=K, radius=radius)
    neigh.fit(X[columns_image_characteristics])

    feat_len = len(columns_image_characteristics)
    distances, result = neigh.kneighbors([x[1:feat_len+1]], 3, return_distance=True)
    result = result[0]
    distances = distances[0]

    neigh_ids = []
    for i, d in enumerate(distances):
        if d <= radius:
            neigh_ids.append( result[i] )

    if display:
        plot_feature_space( df, X.shape[0], radius )

    X['WIENER_SIZE'] = X['WIENER_SIZE'].astype('int')
    
    return list(X.iloc[neigh_ids][['FRAME_NUMBER']+columns_filter_parameters].to_dict('index').values())

In [None]:
""" Original Image + KNN -> Paramset. """
lower_half = lambda img: img[img.shape[0]//2-50:, :]
def estimate_filter_parameters( img, df, patient_id, frame_id ):

    """ Add the new datapoint on the feature space. """
    img = cv2.resize(img, (0, 0), fx=480/img.shape[1], fy=1)
    half = lower_half( img )
    row = [None for _ in range(12)]
    row[0] = frame_id
    row[1] = np.median(half)
    row[2] = int(half.sum())
    row[3] = np.std(half)
    row[-1] = patient_id
    dfnorm = df.copy()
    dfnorm.loc[len(dfnorm.index)] = row

    """ Normalize and find KNN. """
    paramsets = find_KNN(dfnorm)
    return paramsets
    

# The Preprocessor Function

In [None]:
""" Original Image -> Binarization OR None. """
def preprocessor(
        img,         # Original Frame
        df,          # The Reference Table
        frame_id,    # Frame Number
        patient_id,  # Patient Number
        display=True ):
    
    paramsets = estimate_filter_parameters( img, df, patient_id, frame_id )
    N = len(paramsets)
    if display:
        print(f'Paramsets found: {N}')
    if N == 0:
        if display:
            print(f'Frame #{frame_id} is too noisy, no structure recognized.')
        return None
    
    best = {'psnr': 0, 'binimg': None, 'paramset': None}
    for paramset in paramsets:
        binimg, psnr = filter_and_binarize( img, paramset, calculate_psnr=True )
        if psnr > best['psnr']:
            best['psnr'] = psnr
            best['binimg'] = binimg
            best['paramset'] = paramset
    if display:
        shows(imgs=[img, best['binimg']], titles=[f'Frame #{frame_id}', f'After Preprocessing, PSNR={best["psnr"]}'], dpi=120)
    return best['binimg']

In [None]:
""" Usage. """
# binimg = preprocessor( img, df, FRAME_ID, PATIENT_ID, display=False )