In [1]:
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
from copy import deepcopy
import random
from pyclusterbdmseed.algorithms import H2SOM
import pyclusterbdmseed.core as core
import seaborn as sns
from sklearn.neighbors import NearestNeighbors
from scipy.spatial.distance import pdist, squareform

In [None]:
def create_empty_img(dframe):
    gx = dframe.index.get_level_values("grid_x").astype(int)
    gy = dframe.index.get_level_values("grid_y").astype(int)
    img = np.zeros((gy.max()+1, gx.max()+1))
    return img

def create_binary_img(dframe):
    gx = dframe.index.get_level_values("grid_x").astype(int)
    gy = dframe.index.get_level_values("grid_y").astype(int)
    img = np.zeros((gy.max()+1, gx.max()+1))
    img[(gy, gx)] = 1
    return img

def cart2polar(x,y):
    theta = np.arctan2(y,x)
    r = np.sqrt(x**2 + y**2)
    return theta, r

def polar2cart(theta, r):
    x = r * np.cos(theta)
    y = r * np.sin(theta)
    return x, y

def Sx(x,L,H,k,x0,dist_range):
    nmin = -10
    nmax = 10
    x = (nmax-nmin) * (x - dist_range[0]) / (dist_range[1] - dist_range[0]) + nmin
    def lower(x,L,k,x0):
        H=0
        return L + ((H-L) / (1+np.e**(-k*(x-x0))))
    def upper(x,H,k,x0):
        L=0
        return L + ((H-L) / (1+np.e**(-k*(x-x0))))
    limit = np.max(abs(x))
    xl = -(limit/2)
    xh = limit/2
    return [lower(p,L,k*2,xl) if p < 0 else upper(p,H,k*2,xh) for p in x]
x = np.array(range(-10,10))
y = Sx(x,-1,1,0.5,0,(-10,10))

In [2]:
def bygp_cmap():
    # blue - yellow - green - pink colormap
    ro = [51,255,102,255]
    lo = [255,51,170,255]
    ru = [51,85,255,255]
    lu = [255,204,51,255]
    rolo = np.array([ro,lo])/255
    lolu = np.array([lo,lu])/255
    luru = np.array([lu,ru])/255
    ruro = np.array([ru,ro])/255
    rolo_cmap = mpl.colors.LinearSegmentedColormap.from_list(".", rolo, 45)
    lolu_cmap = mpl.colors.LinearSegmentedColormap.from_list(".", lolu, 45)
    luru_cmap = mpl.colors.LinearSegmentedColormap.from_list(".", luru, 45)
    ruro_cmap = mpl.colors.LinearSegmentedColormap.from_list(".", ruro, 45)
    color_list = []
    for x in [rolo_cmap, lolu_cmap, luru_cmap, ruro_cmap]:
        for i in range(90):
            color_list.append(list(x(i)))
        
    color_list = np.array(color_list)
    color_list = color_list[:,:-1]
    return color_list

def circular_colormap_chooser(colormap_name):
    #"hsv", "cmocean_phase", "hue_L60", "erdc_iceFire", "nic_Edge", "cyclic_mrybm", "cyclic_mygbm"
    cyclic_mygbm_list = np.loadtxt("./cyclic_colormaps/cyclic_mygbm.txt")
    cyclic_mrybm_list = np.loadtxt("./cyclic_colormaps/cyclic_mrybm.txt")
    cmocean_phase_list = np.loadtxt("./cyclic_colormaps/cmocean_phase.txt")
    hue_L60_list = np.loadtxt("./cyclic_colormaps/hue_L60.txt")
    erdc_iceFire_list = np.loadtxt("./cyclic_colormaps/erdc_iceFire.txt")
    nic_Edge_list = np.loadtxt("./cyclic_colormaps/nic_Edge.txt")
    bygp_cmap_list = bygp_cmap()
    
    cyclic_mygbm_cmap = mpl.colors.ListedColormap(cyclic_mygbm_list)
    cyclic_mrybm_cmap = mpl.colors.ListedColormap(cyclic_mrybm_list)
    cmocean_phase_cmap = mpl.colors.ListedColormap(cmocean_phase_list)
    hue_L60_cmap = mpl.colors.ListedColormap(hue_L60_list)
    erdc_iceFire_cmap = mpl.colors.ListedColormap(erdc_iceFire_list)
    nic_Edge_cmap = mpl.colors.ListedColormap(nic_Edge_list)
    bygp_cmap_cmap = mpl.colors.ListedColormap(bygp_cmap_list)
    husl_cmap = mpl.colors.ListedColormap(sns.color_palette("husl",100))
    hsl_cmap = mpl.colors.ListedColormap(sns.hls_palette(2000, s=1))
    #hsl_cmap = mpl.colors.ListedColormap(np.roll(sns.hls_palette(2000, s=1), 900, axis=0))
    
    cmap_dict = {
        "mygbm": cyclic_mygbm_cmap,
        "mrybm": cyclic_mrybm_cmap,
        "phase": cmocean_phase_cmap,
        "L60": hue_L60_cmap,
        "icefire": erdc_iceFire_cmap,
        "edge": nic_Edge_cmap,
        "bygp": bygp_cmap_cmap,
        "husl": husl_cmap,
        "hsl": hsl_cmap,
        "hsv": "hsv",
        "twilight": "twilight",
        "twilight_shifted": "twilight_shifted"
    }
    
    if colormap_name not in ["hsl", "husl", "hsv", "twilight", "twilight_shifted", "mygbm", "mrybm", "phase", "bygp", "L60", "icefire", "edge"]:
        raise ValueError('Wrong colormap name, choose one of the following options: "hsl", "husl", "hsv", "twilight", "twilight_shifted", "mygbm", "mrybm", "phase", "bygp", "L60", "icefire", "edge"')
    
    return cmap_dict[colormap_name]

def unit_cicle_color_wheel(h2som, ring, colormap_name, newsom=None):
    # If displayed in Jupyter notebook
    #%matplotlib inline
    # Generate a figure with a polar projection
    fig = plt.figure(figsize=(6,6))
    ax = fig.add_axes([0.1,0.1,0.8,0.8], projection="polar")
    ax.set_theta_direction(1)
    # Plot a color mesh on the polar plot with the color set by the angle
    n = 2000 # The number of secants for the mesh
    t = np.linspace(-np.pi, np.pi, n) # Theta values
    r = np.linspace(0, 1, 2) # radius values, change 0 for non full circle
    rg, tg = np.meshgrid(r, t) # create a r,theta meshgrid
    c = tg # define color values as theta values
    norm = mpl.colors.Normalize(-np.pi, np.pi) # Define colomap normalization for 0 to 2*pi
    
    my_cmap = circular_colormap_chooser(colormap_name)
    
    im = ax.pcolormesh(t, r, c.T, cmap=my_cmap, norm=norm) # plot colormesh on axis with colormap
    ax.set_yticklabels([]) # turn of radial ticl labels (yticks)
    ax.tick_params(pad=15, labelsize=24) # cosmetic change to tick labels
    ax.spines["polar"].set_visible(False) # turn off the axis spine.
    ring_start = h2som._rings[ring-1][0]
    ring_end = h2som._rings[ring-1][1]+1
    d = np.arctan2(h2som._pos[ring_start:ring_end,1], h2som._pos[ring_start:ring_end,0])
    r = np.array([ring*0.3]*(ring_end-ring_start))
    #d = d[1:]
    #r = r[1:]
    ax.plot(d,r,"ko", markersize=5, markerfacecolor='none', markeredgewidth=1.5)
    for i in range(len(d)):
        ax.text(d[i], r[i]+0.3, i, color="black", fontsize=16)
        
    colors = im.to_rgba(d)
    if newsom:
        d2 = np.arctan2(newsom._pos[ring_start:ring_end,1], newsom._pos[ring_start:ring_end,0])
        r2 = np.array([ring*0.3]*(ring_end-ring_start))
        #d2 = d2[1:]
        #r2 = r2[1:]
        ax.plot(d2,r2,"ko", markersize=11, markerfacecolor='none', markeredgewidth=3)
        ax.plot(d2,r2,"ko", markersize=5, markerfacecolor='none', markeredgewidth=1)
        ax.plot(d2,r2,"wo", markersize=9, markerfacecolor='none', markeredgewidth=3)
        
        for i in range(len(d2)):
            ax.text(d2[i], r2[i]+0.2, i, color="white", fontsize=16)
        colors = im.to_rgba(d2)
    
    return d, r, colors

def spectral_cluster(data, bmu_matches, ring, h2som, dframe, colormap_name, newsom=None):
    grid_x = np.array(dframe.index.get_level_values("grid_x")).astype(int)
    grid_y = np.array(dframe.index.get_level_values("grid_y")).astype(int)
    img = np.zeros((np.amax(grid_y)+1, np.amax(grid_x)+1)) - 1
    proto_idx = np.array(range(np.amax(bmu_matches+1)))
    d,r,colors = unit_cicle_color_wheel(h2som, ring, colormap_name, newsom)
    colors = np.vstack([[0,0,0,1],colors])
    cm = mpl.colors.LinearSegmentedColormap.from_list("hsom_colors", colors)
    plt.figure()
    for i in proto_idx:
        idx = np.where(bmu_matches == i)[0]
        seg_x = grid_x[idx]
        seg_y = grid_y[idx]
        img[(seg_y, seg_x)] = i
        plt.imshow(img, cmap=cm)  

In [None]:
def calc_h2som(data, epsilon, sigma, n_iter):
    h2som = H2SOM(data, epsilon=epsilon, sigma=sigma, n_iter=n_iter, arch=[3,8])
    h2som.cluster()
    return h2som

def calc_memb(data, h2som, ring_idx):
    ring_idx = ring_idx-1
    bmu_matches = core.bmus(data, h2som._centroids[h2som._rings[ring_idx][0]:h2som._rings[ring_idx][1]+1])   
    return bmu_matches

In [None]:
def position_optimization(h2som, prototypes, variant, ring, stopfactor=1):
    def posdist_adjust(dist):
        return 1/(1+dist)
    h2som_cp = deepcopy(h2som)
    distances = 1 - squareform(pdist(prototypes, metric="correlation"))
    # Consider only distances of neighboring prototypes
    dist = np.diag(distances, k=1)
    # add last proto to first proto distance
    dist = np.append(dist,distances[0][-1])
    # Neighbors from the other side
    dist_rev = np.roll(dist,1)
    changes = []
    for i, d in enumerate(dist):
        ring_start = h2som_cp._rings[ring-1][0]
        ring_end = h2som_cp._rings[ring-1][1]+1
        pos_in_ring = h2som_cp._pos[ring_start:ring_end]
        d_next = d
        d_prev = dist_rev[i]
        if variant in ["winnertakeall", "winnertakealldulled"]:
            # A: let only the stronger similarity drag
            if d_next > d_prev:
                if i == len(dist)-1:
                    j = 0
                else:
                    j = i+1

                pos_dist = np.sqrt(((pos_in_ring[j] - pos_in_ring[i])**2).sum())
                if variant == "winnertakealldulled":
                    dullfactor = posdist_adjust(pos_dist)
                else:
                    dullfactor = 1

                if d_next < (posdist_adjust(pos_dist)*stopfactor):
                    change = np.array([0,0])
                else:
                    change = (pos_in_ring[j] - pos_in_ring[i]) * d_next * dullfactor
            else:
                if i == 0:
                    j = len(dist)-1
                else:
                    j = i-1

                pos_dist = np.sqrt(((pos_in_ring[j] - pos_in_ring[i])**2).sum())
                if variant == "winnertakealldulled":
                    dullfactor = posdist_adjust(pos_dist)
                else:
                    dullfactor = 1
                
                if d_prev < (posdist_adjust(pos_dist)*stopfactor):
                    change = np.array([0,0])
                else:
                    change = (pos_in_ring[j] - pos_in_ring[i]) * d_prev * dullfactor

        elif variant in ["tugofwar", "tugofwardulled"]:
            # B: let both forces drag against each other
            if i == 0:
                j = i+1
                k = len(dist)-1
            elif i == len(dist)-1:
                j = 0
                k = i-1
            else:
                j = i+1
                k = i-1

            pos_dist_ij = np.sqrt(((pos_in_ring[j] - pos_in_ring[i])**2).sum())
            pos_dist_ik = np.sqrt(((pos_in_ring[k] - pos_in_ring[i])**2).sum())
            if variant == "tugofwardulled":
                dullfactor_ij = posdist_adjust(pos_dist_ij)
                dullfactor_ik = posdist_adjust(pos_dist_ik)
            else:
                dullfactor_ij = 1
                dullfactor_ik = 1

            if d_next < (posdist_adjust(pos_dist_ij)*stopfactor):
                change_next = np.array([0,0])
            else:
                change_next = (pos_in_ring[j] - pos_in_ring[i]) * d_next * dullfactor_ij

            if d_prev < (posdist_adjust(pos_dist_ik)*stopfactor):
                change_prev = np.array([0,0])
            else:
                change_prev = (pos_in_ring[k] - pos_in_ring[i]) * d_prev * dullfactor_ik
                
            change = [change_next, change_prev]
            
        else:
            raise ValueError("Wrong variant!")

        changes.append(change)

    changes = np.array(changes)

    if variant in ["winnertakeall", "winnertakealldulled"]:
        h2som_cp._pos[ring_start:ring_end] = h2som_cp._pos[ring_start:ring_end] + (changes/2)
    elif variant in ["tugofwar", "tugofwardulled"]:
        h2som_cp._pos[ring_start:ring_end] = h2som_cp._pos[ring_start:ring_end] + (np.array(changes)[:,0,:]/2)
        h2som_cp._pos[ring_start:ring_end] = h2som_cp._pos[ring_start:ring_end] + (np.array(changes)[:,1,:]/2)

    return h2som_cp, np.sum(np.abs(changes))

In [None]:
path = "../../backend/datasets/barley_101.h5"

dframe = pd.read_hdf(path)
data = dframe.values
sigma = [12, 1]
epsilon = [1.1, 0.1]
n_iter = 10
m_iter = 2
h2som = calc_h2som(data, epsilon, sigma, n_iter)
rings = [1,2]
colorlist_name = "hsl"

In [None]:
for ring in rings:
    bmus = calc_memb(data ,h2som, ring)
    spectral_cluster(data, bmus, ring, h2som, dframe, colorlist_name)

In [None]:
# Position optimization (winnertakeall) after pre-defined steps
maxiter = m_iter
h2som_A = deepcopy(h2som)
posopt_method = "winnertakeall"
for ring in rings:
    prototypes = h2som_A._centroids[h2som_A._rings[ring-1][0]:h2som_A._rings[ring-1][1]+1]
    change = 1
    iterations = 0
    while change > 0 and iterations < maxiter:
        h2som_A, change = position_optimization(h2som_A, prototypes, posopt_method, ring)
        iterations += 1
    bmus = calc_memb(data ,h2som_A, ring)
    spectral_cluster(data, bmus, ring, h2som, dframe, colorlist_name, h2som_A)
    print(f"Number of interations: {iterations}")

In [None]:
# Position optimization (winnertakeall) until auto converge
maxiter = np.inf
h2som_A = deepcopy(h2som)
posopt_method = "winnertakeall"
for ring in rings:
    prototypes = h2som_A._centroids[h2som_A._rings[ring-1][0]:h2som_A._rings[ring-1][1]+1]
    change = 1
    iterations = 0
    while change > 0 and iterations < maxiter:
        h2som_A, change = position_optimization(h2som_A, prototypes, posopt_method, ring)
        iterations += 1
    bmus = calc_memb(data ,h2som_A, ring)
    spectral_cluster(data, bmus, ring, h2som, dframe, colorlist_name, h2som_A)
    print(f"Number of interations: {iterations}")