In [None]:
import plotly.offline as py
py.init_notebook_mode()

import os, glob, cv2
from ict.particle_tracking.io import load_dm4_file, binarize_file_list
from ict.particle_tracking.image_processing import ImageEnhance, ImageMerge, ImageBinarization
from ict.particle_tracking.gui.image_visualization import ImageVisualizationPlotly
from ict.particle_tracking.gui.parameter_tuner import ImageEnhanceTuner, ImageBinarizationTuner, ImageMergeTuner
import ipywidgets

In [None]:
from namedlist import namedlist
from itertools import chain
import numpy as np
import json
import ipyparallel as ipp
from copy import deepcopy
from deap import algorithms
from deap import base
from deap import creator
from deap import tools
from deap.tools.crossover import cxUniform, cxBlend
from deap.tools.mutation import mutGaussian, mutFlipBit
import random
import seaborn as sns
%matplotlib inline

In [None]:
img_dm4 = cv2.imread("avg_pic/Minute00_Second_08/Capture1_Hour_00_Minute_00_Second_08_Frame_0080.png", cv2.IMREAD_GRAYSCALE)
img_plot_size = ImageVisualizationPlotly.infer_plot_size_from_image(img_dm4, 500)
half_img_plot_size = ImageVisualizationPlotly.infer_plot_size_from_image(img_dm4, 250)
vis = ImageVisualizationPlotly(img_plot_size)

In [None]:
test_series = sorted(list(glob.glob("avg_pic" + "/**/*.png", recursive=True)))[:9]
opt_processors = [ImageEnhance("bal_params.json"), ImageBinarization("bin_params.json")]
img_mer, img_marked, trust_ratio = binarize_file_list(test_series, 4, 
    processors=opt_processors + [ImageMerge("mer_params.json")], 
    out_file="bin_pic/merged_bin.png",
    record_file="record_pic/merged_9frames.png",
    estimate_error=True)
print("Detection Trust Ratio", trust_ratio)

In [None]:
enhance_tuner = ImageEnhanceTuner(img_dm4, img_plot_size, processor=opt_processors[0])
bin_tuner = ImageBinarizationTuner(img_dm4, img_plot_size, processor=opt_processors[1])
mer_tuner = ImageMergeTuner(test_series, img_plot_size, pre_processors=opt_processors, processor=ImageMerge("mer_params.json"))

In [None]:
def int_list_func(x):
        return [int(item) for item in x]
    
def get_type_func_dict(processors):
    type_func_dict = dict() 
    for proc in processors:
        for k, v in proc.params.items():
            if isinstance(v, bool):
                type_func_dict[k] = bool
            elif isinstance(v, float):
                type_func_dict[k] = float
            elif isinstance(v, int):
                type_func_dict[k] = int
            elif isinstance(v, list):
                type_func_dict[k] = int_list_func
            else:
                print(k, "not know what func should use to type it")
    return type_func_dict

type_func_dict = get_type_func_dict(opt_processors)  

In [None]:
FloatParam = namedlist("FloatParam", ["name", "value", "min", "max", "step", "type_func"])
BoolParam = namedlist("BoolParam", ["name", "value", "type_func"])
GaParam = namedlist("GaParam", ["float_params", "range_params", "bool_params"])
initial_ga_params = GaParam([], [], [])
for tuner in [enhance_tuner, bin_tuner]:
    for k, v in sorted(tuner.target_param_and_widgets.items()):
        if k in ['reverse_color'] or 'sobel_size' in k:
            continue
        if k == 'bilat_r_space':
            v.max = 15
        if v.__class__ in [ipywidgets.widgets.IntSlider, ipywidgets.widgets.FloatSlider]:
            initial_ga_params.float_params.append(FloatParam(k, v.value, v.min, v.max, v.step, type_func_dict[k]))
        elif v.__class__ is ipywidgets.widgets.IntRangeSlider:
            initial_ga_params.range_params.append(FloatParam(k, list(v.value), v.min, v.max, v.step, type_func_dict[k]))
        elif v.__class__ is ipywidgets.widgets.Checkbox:
            initial_ga_params.bool_params.append(BoolParam(k, v.value, type_func_dict[k]))
        else:
            print(v.__class__, "not recognized")
num_widget_params = sum([len(tuner.target_param_and_widgets.items())
                         for tuner in [enhance_tuner, bin_tuner]]) 
num_opt_params = len(set([p.name for p in chain(*initial_ga_params)]))
assert num_widget_params == num_opt_params + 1 + 3

In [None]:
def update_processor_params(processors, ga_params):
    opt_params = {param.name: param for param in 
                  chain(ga_params.float_params, ga_params.range_params, ga_params.bool_params)}
    for proc in processors:
        common_keys = set(proc.params.keys()) & set(opt_params.keys())
        d_new = {k: opt_params[k] for k in common_keys}
        d_new = {k: v.type_func(v.value) for k, v in d_new.items()}
        proc.params.update(d_new)

update_processor_params(opt_processors, initial_ga_params)

In [None]:
fn_list = glob.glob("avg_pic/Minute00_Second_08/*png")
assert len(fn_list) == 9
img_orig_list = [load_dm4_file(fn) for fn in fn_list]
img_bin_list = [opt_processors[1].process(opt_processors[0].process(img)) for img in img_orig_list]
img_ground_truth = ImageMerge("mer_params.json").process(img_bin_list)

In [None]:
def avg_iou(ground_truth, pred):
    assert set(ground_truth.ravel()) == {0, 255}
    if set(pred.ravel()) < {0, 255}:
        return 0.0
    assert set(pred.ravel()) == {0, 255}
    g2 = ground_truth // 255 * 113
    p2 = pred // 255 * 137
    overlay = g2 + p2
    contours_overlay, _ = cv2.findContours(overlay, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    iou_list = []
    for cnt in contours_overlay:
        x,y,w,h = cv2.boundingRect(cnt)
        img_cnt = overlay[y:y+h, x:x+w]
        sub_cnt = cnt - [x, y]
        mask = np.zeros(img_cnt.shape,np.uint8)
        cv2.drawContours(mask,[sub_cnt],0,255,-1)
        pixelpoints = img_cnt[np.nonzero(mask)]
        values, counts = np.unique(pixelpoints, return_counts=True)
        if values[-1] > 200:
            num_intersection = counts[-1]
        else:
            num_intersection = 0
        num_union = counts.sum()
        iou = num_intersection / num_union
        iou_list.append(iou)
    iou_array = np.array(iou_list)
    return iou_array.mean()

In [None]:
c = ipp.Client()

print(c.ids)

with c[:].sync_imports():
    import cv2
    from namedlist import namedlist
    from itertools import chain
    from deap import creator
    from deap import base
%px import numpy as np
%px from copy import deepcopy
%px FloatParam = namedlist("FloatParam", ["name", "value", "min", "max", "step", "type_func"])
%px BoolParam = namedlist("BoolParam", ["name", "value", "type_func"])
%px GaParam = namedlist("GaParam", ["float_params", "range_params", "bool_params"])
%px creator.create("FitnessMax", base.Fitness, weights=(1.0,) * 9)
%px creator.create("Individual", object, fitness=creator.FitnessMax)
c[:].push(dict(avg_iou=avg_iou,
               update_processor_params=update_processor_params,
               int_list_func=int_list_func),
          block=True)

In [None]:
def init_detection_params(container, base_ga_params, sigma=0.2, bool_revserve_perc=0.05):
    new_ga_params = GaParam([], [], [])
    for p in base_ga_params.bool_params:
        r = random.uniform(0.0, 1.0)
        if r < bool_revserve_perc:
            value = not p.value
        else:
            value = p.value
        new_ga_params.bool_params.append(BoolParam(p.name, p.value, p.type_func))
    for p in base_ga_params.float_params:
        r = random.gauss(mu=0, sigma=sigma)
        value = p.value + r * (p.max - p.min)
        while value > p.max or value < p.min:
            r = random.gauss(mu=0, sigma=sigma)
            value = p.value + r * (p.max - p.min)
        new_ga_params.float_params.append(FloatParam(p.name, value, p.min, p.max, p.step, p.type_func))
    for p in base_ga_params.range_params:
        value = []
        for val in p.value:
            r = random.gauss(mu=0, sigma=sigma)
            v2 = val + r * (p.max - p.min)
            while v2 > p.max or v2 < p.min:
                r = random.gauss(mu=0, sigma=sigma)
                v2 = val + r * (p.max - p.min)
            value.append(v2)
        new_ga_params.range_params.append(FloatParam(p.name, value, p.min, p.max, p.step, p.type_func))
    ind = container()
    ind.float_params = new_ga_params.float_params
    ind.range_params = new_ga_params.range_params
    ind.bool_params = new_ga_params.bool_params
    return ind


def eval_detect_params(individual, ground_truth, img_orig_list, processors):
    processors = deepcopy(processors)
    update_processor_params(processors, individual)
    enhance_processor, bin_processor = processors
    img_bin_list = [bin_processor.process(enhance_processor.process(img)) for img in img_orig_list]
    result_ious = [avg_iou(ground_truth, img) for img in img_bin_list]
    return result_ious


def hybrid_mate(ind1, ind2, alpha, indpb):
    ind1.bool_params, ind2.bool_params = cxUniform(ind1.bool_params, ind2.bool_params, indpb)
    f_value1, f_value2 = [[p.value for p in params]
                          for params in [ind1.float_params, ind2.float_params]]
    f_value1, f_value2 = cxBlend(f_value1, f_value2, alpha)
    for params, vals in [[ind1.float_params, f_value1], 
                         [ind1.float_params, f_value1]]:
        for p, v in zip(params, vals):
            p.value = v
    for p1, p2 in zip(ind1.range_params, ind2.range_params):
        p1.value, p2.value = cxBlend(p1.value, p2.value, alpha)
    return ind1, ind2


def hybrid_mut(individual, mu, sigma, indpb):
    b_values = [p.value for p in individual.bool_params]
    b_values = mutFlipBit(b_values, indpb)[0]
    for p, val in zip(individual.bool_params, b_values):
        p.value = val
    f_values = [p.value for p in individual.float_params]
    f_sigma = [sigma * (p.max - p.min) for p in individual.float_params]
    f_values = mutGaussian(f_values, mu, sigma, indpb)[0]
    for p, val in zip(individual.float_params, f_values):
        p.value = val
    for p in individual.range_params:
        p.value = mutGaussian(p.value, mu, sigma * (p.max - p.min), indpb)[0]
    return individual,


def checkBounds():
    def decorator(func):
        def wrapper(*args, **kargs):
            offspring = func(*args, **kargs)
            def bound_child(child):
                for p in child.bool_params:
                    assert isinstance(p.value, bool)
                for p in child.float_params:
                    nsteps = round((p.value - p.min) / p.step)
                    p.value = p.min + nsteps * p.step
                    if p.value > p.max:
                        p.value = p.max
                    if p.value < p.min:
                        p.value = p.min
                    p.value = p.type_func(p.value)
                for p in child.range_params:
                    value = []
                    for val in p.value:
                        nsteps = round((val - p.min) / p.step)
                        val= p.min + nsteps * p.step
                        if val > p.max:
                            val = p.max
                        if val < p.min:
                            val = p.min
                        value.append(val)
                    p.value = p.type_func(sorted(value))
            if isinstance(offspring, creator.Individual):
                bound_child(offspring)
            elif isinstance(offspring[0], creator.Individual):
                for child in offspring:
                    bound_child(child)
            else:
                raise Exception("must be an Individual or a list of Individual")
            return offspring
        return wrapper
    return decorator


def opt_detection_params(initial_ga_params, img_ground_truth, img_orig_list, processors,
                         init_sigma=0.4, mat_alpha=0.1, mat_indpb=0.5, mut_sigma=0.05, mut_indpb=2/num_opt_params,
                         ea_cxpb=0.6, ea_mutpb=0.4,
                         n_pop=200, ea_mu=600, tournsize=3, ea_ngen=200, 
                         #n_pop=5, ea_mu=10, tournsize=2, ea_ngen=3, 
                         verbose=True, map_func=None):   
    creator.create("FitnessMax", base.Fitness, weights=(1.0,) * 9)
    creator.create("Individual", object, fitness=creator.FitnessMax)
    toolbox = base.Toolbox()
    
    if map_func is not None:
        toolbox.register("map", map_func)
    toolbox.register("individual", init_detection_params, creator.Individual, base_ga_params=initial_ga_params, sigma=init_sigma)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", eval_detect_params, ground_truth=img_ground_truth, img_orig_list=img_orig_list, processors=processors)
    toolbox.register("mate", hybrid_mate, alpha=mat_alpha, indpb=mat_indpb)
    toolbox.register("mutate", hybrid_mut, mu=0, sigma=mut_sigma, indpb=mut_indpb)
    toolbox.register("select", tools.selTournament, tournsize=tournsize)
    
    toolbox.decorate("individual", checkBounds())
    toolbox.decorate("mate", checkBounds())
    toolbox.decorate("mutate", checkBounds())
    

    pop = toolbox.population(n=n_pop)
    hof = tools.HallOfFame(1)
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean)
    stats.register("std", np.std)
    stats.register("min", np.min)
    stats.register("max", np.max)
    
    pop, log = algorithms.eaMuPlusLambda(pop, toolbox, mu=ea_mu, lambda_=n_pop, cxpb=ea_cxpb, mutpb=ea_mutpb, 
                                         ngen=ea_ngen, stats=stats, halloffame=hof, verbose=verbose)
    return hof[-1], toolbox, pop

best_param, toolbox, pop = opt_detection_params(initial_ga_params, img_ground_truth, img_orig_list,
                                                processors=opt_processors, map_func=c[:].map)

In [None]:
update_processor_params(opt_processors, best_param)
for fn, p in zip(["best_enhance_params.json", "best_bin_params.json"], opt_processors):
    with open(fn, "w") as f:
        json.dump(p.params, f, indent=4, sort_keys=True)