In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np
import matplotlib.pyplot as plt

from sklearn import preprocessing
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix
from sklearn.metrics import jaccard_score

from skimage import filters

import generator
import data_manager as dm
import pixel_counter as pc

In [None]:
def preview_phantoms(phantom, processed_phantom, title):
    fig, axes = plt.subplots(1, 2, figsize=(20, 10))
    fig.suptitle(title)
    if len(phantom.shape) == 3:
        axes[0].imshow(phantom[:, :, 0], cmap='gray')
        axes[1].imshow(processed_phantom[:, :, 0], cmap='gray')
    else:
        axes[0].imshow(phantom, cmap='gray')
        axes[1].imshow(processed_phantom, cmap='gray')


def show_phantoms_stats(phantom, processed_phantom, title):
    porous = (phantom == 1).sum()
    stones = (phantom == 0).sum()
    print(f'porosity: { porous / (porous + stones) }\n')
    ph = np.ravel(phantom)
    p_ph = np.ravel(processed_phantom)
    p_ph_porous = p_ph[ph == 1]
    p_ph_stones = p_ph[ph == 0]
    plt.figure(figsize=(20, 10))
    plt.title(title)
    plt.hist(p_ph, 255, [0, 1], color='lightgray')
    plt.hist(p_ph_porous, 255, [0, 1], histtype='step', color='red')
    plt.hist(p_ph_stones, 255, [0, 1], histtype='step', color='blue')


def generate_phantoms(preview=False):

    for p in porosity:
        for b in blobiness:
            for n in noise:
                phantom, processed_phantom = generator.create_phantom_and_process(
                    shape, p, b, n, num_of_angles, tag, preview=preview, noise_method=noise_method
                )
                title = f'porosity: { p }, blobiness: { b }, noise: { n }'
                preview_phantoms(phantom, processed_phantom, title)
                show_phantoms_stats(phantom, processed_phantom, title)


In [None]:
size = 100
dim = 3
shape = tuple(size for _ in range(dim))
porosity = np.array([0.5])
blobiness = np.array([1])
noise_method = 'poisson'
noise = np.array([1e3])
num_of_angles = 90

In [None]:
tag = 'train'
generate_phantoms()

In [None]:
data_info = dm.show_data_info()
data_info

In [None]:
im_lenght = np.prod(shape)
coeff = 100
sample_lenght = (im_lenght/coeff).astype(np.int)
indices = (np.random.rand(sample_lenght) * im_lenght).astype(np.int)

In [None]:
def get_value_from_data_info(key, data_info, index):
    return data_info[key][index[0]]
    
    
def get_params_from_data_info(data_info, index):

    porosity = get_value_from_data_info('porosity', data_info, index)
    blobiness = get_value_from_data_info('blobiness', data_info, index)
    noise = get_value_from_data_info('noise', data_info, index)

    return porosity, blobiness, noise


def scatter_plot(x, y, colors, title):
    plt.figure(figsize=(10, 10))
    plt.xlim(0, 1)
    plt.ylim(0, 1)
    plt.scatter(x, y, marker='.', c=colors)
    plt.title(title)


def scatter_plot_values(x, y, origin, title):
    x_part = np.take(x, indices)
    y_part = np.take(y, indices)
    origin_part = np.take(origin, indices)
    colors = ['red' if el else 'blue' for el in origin_part]
    scatter_plot(x_part, y_part, colors, title)


def otsu_bin(image):
    threshold = filters.threshold_otsu(image)
    return image > threshold, threshold


def li_bin(image):
    threshold = filters.threshold_li(image)
    return image > threshold, threshold


def local_bin(image):
    
    def l_b(im):
        return im > filters.threshold_local(im, 15)
    
    if len(image.shape) == 2:
        return l_b(image)
    elif len(image.shape) == 3:
        return np.asarray([l_b(image[i, :, :]) for i in range(image.shape[0])])
    else:
        raise ValueError('incorrect image shape')


def niblack_bin(image):
    return image > filters.threshold_niblack(image, 3)


def binarize_image(image, orig_image, pp, npa):

    title = 'otsu binarization'
    otsu_bin_image, otsu_threshold = otsu_bin(image)
    scatter_plot_values(pp, npa, otsu_bin_image.flatten(), title)
    preview_phantoms(otsu_bin_image, orig_image, title)
    print(f'{ title } jaccard score: { jaccard_score(otsu_bin_image.flatten(), orig_image.flatten()) }')
    
    title = 'li binarization'
    li_bin_image, li_threshold = li_bin(image)
    scatter_plot_values(pp, npa, li_bin_image.flatten(), title)
    preview_phantoms(li_bin_image, orig_image, title)
    print(f'{ title } jaccard score: { jaccard_score(li_bin_image.flatten(), orig_image.flatten()) }')

    title = 'local binarization'
    local_bin_image = local_bin(image)
    scatter_plot_values(pp, npa, local_bin_image.flatten(), title)
#     preview_phantoms(local_bin_image, orig_image, title)
    print(f'{ title } jaccard score: { jaccard_score(local_bin_image.flatten(), orig_image.flatten()) }')

    title = 'niblack binarization'
    niblack_bin_image = niblack_bin(image)
    scatter_plot_values(pp, npa, niblack_bin_image.flatten(), title)
#     preview_phantoms(niblack_bin_image, orig_image, title)
    print(f'{ title } jaccard score: { jaccard_score(niblack_bin_image.flatten(), orig_image.flatten()) }')


def count_npa_vs_pp(images_tag):

    df = data_info[data_info['tag'] == images_tag]

    for index, data_id in np.ndenumerate(df['id_indx']):
        
        porosity, blobiness, noise = get_params_from_data_info(data_info, index)
        title = f'porosity { porosity }, blobiness { blobiness }, noise { noise }'

        # pp — proc_phantom
        # npa — neighbor_pixel_average
        # op — orig_phantom

        pp, npa, op, proc_phantom, orig_phantom = pc.count_neighbor_average_array_and_save(dim, data_id, images_tag)
        scatter_plot_values(pp, npa, op, title)
        binarize_image(proc_phantom, orig_phantom, pp, npa)


count_npa_vs_pp(tag)

In [None]:
tag = 'test'
generate_phantoms()

In [None]:
data_info = dm.show_data_info()
data_info

In [None]:
count_npa_vs_pp(tag)

In [None]:
train_data = data_info.query('tag == "train"')
test_data = data_info.query('tag == "test"')


def train_and_test():
    
    for index, data_id in np.ndenumerate(train_data['id_indx']):
        
        train_datum = train_data.query(f'id_indx == "{ data_id }"')
        test_datum = test_data.query(f'id_indx == "{ data_id }"')

        if train_datum.empty or test_datum.empty:
            print(f'can\'t train&test model at index { data_id }' )
            continue
        
        porosity, blobiness, noise = get_params_from_data_info(train_data, index)
        title = f'porosity { porosity }, blobiness { blobiness }, noise { noise }'
        
        train_pixels_df = dm.get_data(dimension=dim, id_indx=data_id, what_to_return='csv', tag='train')
        X_train = np.asarray(train_pixels_df[['neighbor_average', 'proc_phantom_pixel_values']])
        Y_train = np.asarray(train_pixels_df['pixel_real_value'])
        train_scaler = preprocessing.StandardScaler()
        train_scaler.fit(X_train)
        train_scaler.transform(X_train)
        LR = LogisticRegression(C=1, solver='liblinear').fit(X_train, Y_train)
        
        print(f'train: { title }')
        print(f'mean: { train_scaler.mean_ }, var: { train_scaler.var_ }, samples_seen: { train_scaler.n_samples_seen_ }')
        
        print('test:')
        pp, npa, op, proc_phantom, orig_phantom = pc.count_neighbor_average_array_and_save(dim, data_id, 'test')
        scatter_plot_values(pp, npa, op, title)
        
        test_pixels_df = dm.get_data(dimension=dim, id_indx=data_id, what_to_return='csv', tag='test')
        X_test = np.asarray(test_pixels_df[['neighbor_average', 'proc_phantom_pixel_values']])
        Y_test = np.asarray(test_pixels_df['pixel_real_value'])
        test_scaler = preprocessing.StandardScaler()
        test_scaler.fit(X_train)
        test_scaler.transform(X_train)

        print(f'mean: { test_scaler.mean_ }, var: { test_scaler.var_ }, samples_seen: { test_scaler.n_samples_seen_ }')
        
        Y_predict = LR.predict(X_test)
        print(LR.coef_, LR.intercept_, LR.classes_)
        print(f'prediction score: { LR.score(X_test, Y_test) }')
        print(f'jaccard score: { jaccard_score(Y_predict, Y_test) }')
        print(f'\n')
        
        coef = LR.coef_
        intercept = LR.intercept_
        x = X_test[:, 1]
        y = X_test[:, 0]

        def line(x0):
            return (-(x0 * coef[0, 1]) - intercept[0]) / coef[0, 0]

        xmin, xmax = x.min(), x.max()
        
        scatter_plot_values(x, y, Y_test, title)
        plt.plot([xmin, xmax], [line(xmin), line(xmax)], color='gray')

        scatter_plot_values(x, y, Y_predict, title)
        
        predict_image = np.reshape(Y_predict, shape)
        preview_phantoms(predict_image, orig_phantom, f'predicted vs origin { title }')


train_and_test()