In [68]:
from typing import List
import os
import abc
import dataclasses
from dataclasses import dataclass
import scipy.stats
import numpy as np
import pandas as pd
import PIL
import functools
from PIL import Image
import matplotlib.pyplot as plt
import time
import datetime

## Utilities

In [2]:
def load_grayscale_image_as_numpy_array(path):
    return np.asarray(Image.open(path).convert(mode='L')).astype(np.float32)


def load_binary_image_as_numpy_array(path):
    image = load_grayscale_image_as_numpy_array(path)
    return 255.0 * (image > 127.5)


def show_image(image, figsize=(12, 8), cmap='gray'):
    plt.figure(figsize=figsize)
    plt.tick_params(labelbottom=False, labelleft=False)
    plt.imshow(image, cmap=cmap)

In [3]:
def pad_ising_model(input_image):
    return np.pad(input_image, pad_width=1, mode='symmetric')


def unpad_ising_model(input_image):
    return input_image[1:-1, 1:-1]


def convert_image_to_ising_model(input_image):
    return pad_ising_model(input_image / 127.5 - 1.0)


def convert_ising_model_to_image(input_image):
    return unpad_ising_model((input_image + 1.0) * 127.5)

## Test data

In [77]:
BINARY_IMAGES_PATH = "binary_images/300x300_easy_noise10"
GRAYSCALE_IMAGES_PATH = "grayscale_images/size800_noise10_independent_distance100"

In [78]:
BINARY_IMAGE_OBSEVATION = load_binary_image_as_numpy_array(
    path=os.path.join(BINARY_IMAGES_PATH, "image_0_observation.png")
)
BINARY_IMAGE_GROUND_TRUTH = load_binary_image_as_numpy_array(
    path=os.path.join(BINARY_IMAGES_PATH, "image_0_ground_truth.png")
)
GRAYSCALE_IMAGE_OBSERVATION = load_grayscale_image_as_numpy_array(
    path=os.path.join(GRAYSCALE_IMAGES_PATH, "image_1_observation.png")
)
GRAYSCALE_IMAGE_GROUND_TRUTH = load_grayscale_image_as_numpy_array(
    path=os.path.join(GRAYSCALE_IMAGES_PATH, "image_1_ground_truth.png")
)

## Interface for noise reducers

In [6]:
@dataclass
class NoiseReducerStatistics(object):
    evaluation_losses: List[float] = dataclasses.field(default_factory=list)
    computation_times: List[float] = dataclasses.field(default_factory=list)


@dataclass
class NoiseReducerResult(object):
    original_image: np.ndarray
    obsevation: np.ndarray
    reduced_image: np.ndarray
    statistics: NoiseReducerStatistics

In [37]:
class SupervisedNoiseReducer(object):
    def __init__(self, iterations_count, iterations_per_evaluation):
        self._iterations_count = iterations_count
        self._iterations_per_evaluation = iterations_per_evaluation
        self._average_statistics = NoiseReducerStatistics()

    @property
    def average_statistics(self):
        return self._average_statistics

    @abc.abstractmethod
    def _sampler_step(self, observation, current_state):
        pass
    
    def _evaluate_noise_reduction(self, original_image, reduced_image):
        return np.mean(np.abs(original_image - reduced_image))
    
    def _update_average_statistics(self, image_statistics):
        evaluation_loss = image_statistics.evaluation_losses[-1]
        self.average_statistics.evaluation_losses.append(evaluation_loss)
        average_computation_time = np.mean(image_statistics.computation_times)
        self.average_statistics.computation_times.append(average_computation_time)        

    def reduce_noise(self, original_image, observation):
        initial_state = convert_image_to_ising_model(observation)
        current_state = initial_state.copy()
        statistics = NoiseReducerStatistics()
        for iteration in range(1, self._iterations_count + 1):
            start_time = time.time()          
            self._sampler_step(initial_state, current_state)
            statistics.computation_times.append(1000 * (time.time() - start_time))
            if iteration % self._iterations_per_evaluation == 0:
                statistics.evaluation_losses.append(self._evaluate_noise_reduction(
                    original_image=original_image,                    
                    reduced_image=convert_ising_model_to_image(current_state),
                ))
        self._update_average_statistics(statistics)
        return NoiseReducerResult(
            original_image=original_image,
            obsevation=observation,
            reduced_image=convert_ising_model_to_image(current_state),
            statistics=statistics,
        )

## BinaryGibbsNoiseReducer

In [101]:
class BinaryGibbsNoiseReducer(SupervisedNoiseReducer):
    def __init__(self, noise_level_prior, observation_strength, coupling_strength, **kwargs):
        self._noise_level_prior = noise_level_prior
        self._observation_strength = observation_strength
        self._coupling_strength = coupling_strength
        super(BinaryGibbsNoiseReducer, self).__init__(**kwargs)

    def _observation_potential(self, observation_pbty):
        return self._observation_strength * np.log(observation_pbty)

    @functools.lru_cache(maxsize=None)
    def _positive_observation_potential(self, observation_value):
        if observation_value >= 0.0:
            return self._observation_potential(1.0 - self._noise_level_prior)
        return self._observation_potential(self._noise_level_prior)

    @functools.lru_cache(maxsize=None)    
    def _negative_observation_potential(self, observation_value):
        if observation_value < 0.0:
            return self._observation_potential(1.0 - self._noise_level_prior)
        return self._observation_potential(self._noise_level_prior)

    def _sampler_step(self, observation, current_state):
        row = np.random.randint(low=1, high=(current_state.shape[0] - 1))
        column = np.random.randint(low=1, high=(current_state.shape[1] - 1))
        neighbours_sum = np.sum([
            current_state[row - 1, column], current_state[row + 1, column],
            current_state[row, column - 1], current_state[row, column + 1],
        ])
        positive_observation_potential = self._positive_observation_potential(
            observation[row, column]
        )
        negative_observation_potential = self._negative_observation_potential(
            observation[row, column]
        )
        positive_coupling = self._coupling_strength * 2.0 * neighbours_sum
        negative_coupling = self._coupling_strength * 2.0 * -neighbours_sum
        positive_potential = np.exp(positive_observation_potential + positive_coupling)
        negative_potential = np.exp(negative_observation_potential + negative_coupling)
        positive_pbty = positive_potential / (positive_potential + negative_potential)
        if np.random.uniform() <= positive_pbty:
            current_state[row, column] = 1.0
        else:
            current_state[row, column] = -1.0

In [None]:
reducer = BinaryGibbsNoiseReducer(
    noise_level_prior=0.1,
    observation_strength=1.0,
    coupling_strength=4.0,
    iterations_count=3_000_000,
    iterations_per_evaluation=100,
)
reducer_result = reducer.reduce_noise(BINARY_IMAGE_GROUND_TRUTH, BINARY_IMAGE_OBSEVATION)
reducer.average_statistics

In [None]:
show_image(reducer_result.reduced_image)