In [1]:
import copy
import tensorflow as tf

from keras import backend as K
from keras.models import Model
from keras.layers import Input, Lambda, Dense, Flatten, Conv2D, MaxPooling2D

In [2]:
def get_hyperopt_emb():
    inputs = Input(sh)

    x = Conv2D(64, (5, 5), padding="same", activation="relu")(inputs)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    
    x = Conv2D(32, (5, 5), padding="same", activation="relu")(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    
    x = Conv2D(12, (3, 3), padding="same", activation="relu")(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    # Flatten layer
    x = Flatten()(x)
    x = Dense(4096, activation='relu')(x)
    encoder = Model(inputs, x, name="embedding")
    # encoder.summary()
    
    return encoder

def get_siamese_net():
    encoder = get_hyperopt_emb() # get encoder sub-networkk
    
    # Define the tensors for the two input images
    left_input = Input(sh)
    right_input = Input(sh)

    # Generate the encodings (feature vectors) for the two images
    encoded_l = encoder(left_input)
    encoded_r = encoder(right_input)

    dist_layer = Lambda(lambda tensors: K.abs(tensors[0]-tensors[1])) #l1
    # dist_layer = Lambda(lambda tensors: K.square(tensors[0]-tensors[1])) #l2

    # Merge layer
    inputs_merged = dist_layer([encoded_l, encoded_r])

    # Final layer
    prediction = Dense(1, activation='sigmoid', name='preds')(inputs_merged)

    # Connect the inputs with the outputs
    model = Model(inputs=[left_input, right_input], outputs=prediction)

    return model

In [3]:
import os
import cv2
import copy
import random
import numpy as np

# get k random element from a list 
def get_n_rnd_element_from_list(lst, n):
    return random.sample(lst, k=n)

# get k random element from a list excluding the exception parameter value from the list
def get_n_rnd_element_from_list_excluding_parameter(lst, exception, n):
    possible_choices = [v for v in lst if v != exception]
    return get_n_rnd_element_from_list(possible_choices, n)

def add_channel_dimension(img):
    # Add a `channels` dimension, so that the spectrogram can be used
    # as image-like input data with convolution layers (which expect
    # shape (`batch_size`, `height`, `width`, `channels`).
    return img[..., tf.newaxis]

In [4]:
src = 'images'
val_classes = ['Atlantean', 'Anglo-Saxon_Futhorc', 'Greek', 'Japanese_(katakana)', 'Sylheti'] 
test_classes = ['Hebrew', 'Tengwar', 'Latin', 'Oriya', 'Sanskrit']

In [5]:
sh = (105, 105, 1)
cp = 'character' # character placeholder

In [6]:
model = get_siamese_net()
model.load_weights('models/siamese_net_1ch5kiter.h5')

Metal device set to: Apple M1


2023-02-27 23:30:21.630047: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-02-27 23:30:21.630140: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


Val/Test accuracy performance

In [7]:
def one_shot_validate(model, alphabets, C, src, runs_per_alphabet = 300):
    print('Making ' + str(runs_per_alphabet) + ' random tasks (per alphabet) on ' + str(C) + '-Way One-Shot support sets...')
    mean_global_accuracy = 0

    for alphabet in alphabets:
        mean_alphabet_accuracy = 0

        for _ in range(runs_per_alphabet):
            X, y, c = get_one_shot_batch(alphabet, alphabets, C, src)
            probabilities = model.predict_on_batch([X[:, 0], X[:, 1]])

            if len(set(probabilities.flatten())) == 1 and len(probabilities) != 1:
                # logging.info(probabilities.flatten())
                print("All probabilities are equal, setting accuracy to 0.0")
                accuracy = 0.0
            elif np.argmax(probabilities) == np.argmax(y):
                accuracy = 1.0
            else:
                accuracy = 0.0

            mean_alphabet_accuracy += accuracy
            mean_global_accuracy += accuracy
        
        mean_alphabet_accuracy /= runs_per_alphabet
        print(alphabet + ' Alphabet' + ', accuracy: ' + str(mean_alphabet_accuracy))
    
    mean_global_accuracy /= (len(alphabets) * runs_per_alphabet)
    print('Mean global accuracy: ' + str(mean_global_accuracy))
    print('----------------------------------------------------------------')
    return mean_global_accuracy

In [8]:
# returns a C-way 1-shot batch
# set: set of all validation/test classes (singular characters)
# query_class: query class
# C: C-way (C is tipically the length of all classes set)
def get_one_shot_batch(query_aplhabet, alphabets, C, src):
    S, X, y, c = [], [], [], []

    # select a random character and then a random sample of that character as query sample
    rnd_char = get_n_rnd_element_from_list(os.listdir(os.path.join(src,query_aplhabet)), 1)[0] # select a random character of the current alphabet
    charsamples = os.listdir(os.path.join(src, query_aplhabet, rnd_char)) # get char samples of selected character
    query_sample = get_n_rnd_element_from_list(charsamples, 1)[0] # select a random samples of the current character of the current alphabet
    # query_img = cv2.imread(os.path.join(src, query_aplhabet, rnd_char, query_sample))
    query_img = cv2.cvtColor(cv2.imread(os.path.join(src, query_aplhabet, rnd_char, query_sample)), cv2.COLOR_BGR2GRAY)
    query_img = add_channel_dimension(query_img)
    
    # positive sample pair
    pos_sample = get_n_rnd_element_from_list_excluding_parameter(charsamples, query_sample, 1)[0] # get a different char of the same aplha as positive sample
    # pos_img = cv2.imread(os.path.join(src, query_aplhabet, rnd_char, pos_sample))
    pos_img = cv2.cvtColor(cv2.imread(os.path.join(src, query_aplhabet, rnd_char, pos_sample)), cv2.COLOR_BGR2GRAY)
    pos_img = add_channel_dimension(pos_img)

    X.append([query_img, pos_img])
    y.append(1)
    S.append(pos_img)
    c.append([query_aplhabet + ":" + rnd_char.replace(cp, ""), query_aplhabet + ":" + rnd_char.replace(cp, "")])

    # negative pairs (un carattere per ognuno degli alfabeti)
    negative_aplhabets = get_n_rnd_element_from_list_excluding_parameter(alphabets, query_aplhabet, 4) # get all alphabets different from the query input 
    for neg_alpha in negative_aplhabets:
        negative_chars = os.listdir(os.path.join(src,neg_alpha)) # get all character folders
        neg_char = get_n_rnd_element_from_list(negative_chars, 1)[0] # select a random character of the current alphabet
        neg_samples = os.listdir(os.path.join(src, neg_alpha, neg_char)) # get char samples of selected character
        neg_sample = get_n_rnd_element_from_list(neg_samples, 1)[0] # select a random samples of the current character of the current alphabet
        # neg_img = cv2.imread(os.path.join(src, neg_alpha, neg_char, neg_sample))
        neg_img = cv2.cvtColor(cv2.imread(os.path.join(src, neg_alpha, neg_char, neg_sample)), cv2.COLOR_BGR2GRAY)
        neg_img = add_channel_dimension(neg_img)

        X.append([query_img, neg_img])
        y.append(0)
        S.append(neg_img)
        c.append([query_aplhabet + ":" + rnd_char.replace(cp, ""), neg_alpha + ":" + neg_char.replace(cp, "")])

    

    return np.array(X), np.array(y), np.array(c)

In [None]:
print("Validate on evaluation alphabets...")
one_shot_validate(model, val_classes, len(val_classes), src)
print("Validate on test alphabets...")
one_shot_validate(model, test_classes, len(test_classes), src)

In [9]:
# returns a C-way 1-shot batch
# set: set of all validation/test classes (singular characters)
# query_class: query class
# C: C-way (C is tipically the length of all classes set)
def get_one_shot_batch(query_aplhabet, alphabets, C, src):
    S, X, y, c = [], [], [], []

    # select a random character and then a random sample of that character as query sample
    rnd_char = get_n_rnd_element_from_list(os.listdir(os.path.join(src,query_aplhabet)), 1)[0] # select a random character of the current alphabet
    charsamples = os.listdir(os.path.join(src, query_aplhabet, rnd_char)) # get char samples of selected character
    query_sample = get_n_rnd_element_from_list(charsamples, 1)[0] # select a random samples of the current character of the current alphabet
    # query_img = cv2.imread(os.path.join(src, query_aplhabet, rnd_char, query_sample))
    query_img = cv2.cvtColor(cv2.imread(os.path.join(src, query_aplhabet, rnd_char, query_sample)), cv2.COLOR_BGR2GRAY)
    query_img = add_channel_dimension(query_img)
    
    # positive sample pair
    pos_sample = get_n_rnd_element_from_list_excluding_parameter(charsamples, query_sample, 1)[0] # get a different char of the same aplha as positive sample
    # pos_img = cv2.imread(os.path.join(src, query_aplhabet, rnd_char, pos_sample))
    pos_img = cv2.cvtColor(cv2.imread(os.path.join(src, query_aplhabet, rnd_char, pos_sample)), cv2.COLOR_BGR2GRAY)
    pos_img = add_channel_dimension(pos_img)

    X.append([query_img, pos_img])
    y.append(1)
    S.append(pos_img)
    c.append([query_aplhabet + ":" + rnd_char.replace(cp, ""), query_aplhabet + ":" + rnd_char.replace(cp, "")])

    # negative pairs (un carattere per ognuno degli alfabeti)
    negative_aplhabets = get_n_rnd_element_from_list_excluding_parameter(alphabets, query_aplhabet, 4) # get all alphabets different from the query input 
    for neg_alpha in negative_aplhabets:
        negative_chars = os.listdir(os.path.join(src,neg_alpha)) # get all character folders
        neg_char = get_n_rnd_element_from_list(negative_chars, 1)[0] # select a random character of the current alphabet
        neg_samples = os.listdir(os.path.join(src, neg_alpha, neg_char)) # get char samples of selected character
        neg_sample = get_n_rnd_element_from_list(neg_samples, 1)[0] # select a random samples of the current character of the current alphabet
        # neg_img = cv2.imread(os.path.join(src, neg_alpha, neg_char, neg_sample))
        neg_img = cv2.cvtColor(cv2.imread(os.path.join(src, neg_alpha, neg_char, neg_sample)), cv2.COLOR_BGR2GRAY)
        neg_img = add_channel_dimension(neg_img)

        X.append([query_img, neg_img])
        y.append(0)
        S.append(neg_img)
        c.append([query_aplhabet + ":" + rnd_char.replace(cp, ""), neg_alpha + ":" + neg_char.replace(cp, "")])

    

    return query_img, np.array(S), np.array(X), np.array(c)

In [10]:
def expand_dimension(s):
    ''' Expand the spectrogram dimensions for tensorflow predict_on_batch function use 
        input s: spectrogram '''
    return np.expand_dims(copy.deepcopy(s), axis=0)

def predict_similarity(f, x, s):
    return f.predict_on_batch([expand_dimension(x), expand_dimension(s)])[0][0]

Inizializzo in maniera simile a quello che faccio con SINEX

In [11]:
from skimage import morphology
from skimage.filters import sobel
from skimage.color import rgb2gray
from skimage.segmentation import felzenszwalb, slic, quickshift, watershed
from skimage.segmentation import mark_boundaries, find_boundaries

In [12]:
def stack_image(img):
    return np.stack((img[:,:,0],)*3, axis=-1)

In [13]:
def preprocess_img(img):
    ni = None
    
    if len(img.shape) == 2:
        # Two dimensions found. Adding fake channel and stacking it in each RGB dimension.
        ni = add_channel_dimension(img)
        ni = stack_image(ni)
    elif img.shape[2] != 3:
        # Third dimension does not have 3 channels. Stacking the first one in each RGB dimension.
        ni = stack_image(img)
    
    return ni

#### Visualization

In [14]:
from matplotlib.colors import LinearSegmentedColormap

shapelike_cmap = LinearSegmentedColormap.from_list('shaplike_gradient', (
    # Edit this gradient at https://eltos.github.io/gradient/#1E88E5-91C5F2-FFFFFF-FF6395-FF0052
    (0.000, (0.118, 0.533, 0.898)),
    (0.250, (0.569, 0.773, 0.949)),
    (0.500, (1.000, 1.000, 1.000)),
    (0.750, (1.000, 0.388, 0.584)),
    (1.000, (1.000, 0.000, 0.322)))
)

In [16]:
import matplotlib.pyplot as plt
# plt.rcParams.update({'font.size': 17})
plt.rcParams.update({'font.size': 22})

In [17]:
def sub_script(n, label='s'):
    if n == 1:
        return label + '\N{SUBSCRIPT ONE}'
    elif n == 2:
        return label + '\N{SUBSCRIPT TWO}'
    elif n == 3:
        return label + '\N{SUBSCRIPT THREE}'
    elif n == 4:
        return label + '\N{SUBSCRIPT FOUR}'
    elif n == 5:
        return label + '\N{SUBSCRIPT FIVE}'
    else:
        return 'null'

In [18]:
def plot_explanation_omniglet(x, S, labels, norm_attributions, probabilities, cmap):
    N = len(norm_attributions)
    
    plt.figure()
    f, axarr = plt.subplots(2, N + 1,figsize=(30,10))

    axarr[1, 0].imshow(x, cmap='gray')
    axarr[1, 0].set_axis_off()
    axarr[1, 0].set_title('Query x;\n y = ' + str(labels[0][0]))

    for i in range(N):
        sim = round(probabilities[i][0], 2)
    
        # THIRD ROW: Normalized attributions
        im3 = axarr[0, i+1].imshow(norm_attributions[i], cmap='coolwarm', interpolation='none', vmin=-1, vmax=1)
#         plt.colorbar(im3, ax=axarr[0, i+1])
        axarr[0, i+1].set_axis_off()
        subscripth = sub_script(i+1, 'h')
        axarr[0, i+1].set_title(subscripth + '; y = ' + str(labels[i][1]))
        
        
        # 4th ROW: Original Inputs
        axarr[1, i+1].imshow(S[i], cmap='gray')
        axarr[1, i+1].set_axis_off()
        subscripts = 's' + str(i+1)
        axarr[1, i+1].set_title(subscripts + "; sim = " + str(sim)[-2:])

    # Empty 2 row 1 column element
    axarr[0, 0].set_axis_off()
    axarr[0, 0].text(0.0, 0.5, '')
    
    
    f.subplots_adjust(right=0.95)  # making some room for cbar
    # getting the lower left (x0,y0) and upper right (x1,y1) corners:
    [[x00,y00],[x01,y01]] = axarr[0, N].get_position().get_points()
    [[x10,y10],[x11,y11]] = axarr[1, N].get_position().get_points()
    
    pad = 0.01; width = 0.005
    cbar_ax = f.add_axes([x11+pad, y10, width, y01-y10])
    axcb = f.colorbar(im3, cax=cbar_ax)
    # axcb = f.colorbar(im3, cax=cbar_ax, ticks=[-1, 0, 1])
    # axcb.ax.set_yticklabels([str(max_neg), '0', str(max_pos)])  # vertically oriented colorbar
    
    plt.show()

In [19]:
stack_required = 1 # da inserire in qualche parte in sinex quando si inizializza, dipende dal dataset

In [20]:
def segment_input(img_to_segment):
    if stack_required:
        img_to_segment = preprocess_img(img_to_segment)
        
    if sobel_required: # Watershed
        img_to_segment = sobel(rgb2gray(img_to_segment))
        
    if mask_required: # MaskSLIC
        ar = morphology.remove_small_objects(ar = rgb2gray(img_to_segment) < 0.5, min_size = 200)
        mask = morphology.remove_small_holes(ar = ar, area_threshold = 500)
        mask = morphology.opening(mask)
        aparams['mask'] = mask

    R = algo(image=img_to_segment, **aparams) # creating segments on support set si input
#     nR = np.unique(R).shape[0] # number of segments
    uR = np.unique(R) # unique segments
    
    return R, uR 

In [21]:
# # returns only the segments' index containing an actual drawn of the written character 
def find_drawn_segments(img, segments):
    dsid = [] # drawn segments indexes 
    segs = np.unique(segments).tolist() # unique segments as index list
    
    for seg in segs:
        pixels = np.where(segments==seg)
        mean = np.mean(img[pixels])

        # debugging purposes
        if np.isnan(mean) == True: print("Segment: " + str(seg) + " mean is nan.")

        # restrict only on written areas
        if mean != 255:
            dsid.append(seg)
        
    return dsid

In [22]:
def is_segment_drawn(img, seg, segments):
    pixels = np.where(segments==seg)
    mean = np.mean(img[pixels])
    
    return True if mean != 255 else False

In [24]:
# METODO DIVERSO RISPETTO LO STATO ATTUALE DI SINEX, bisogna vedere se si può uniformare allo stato attuale di sinex
def perturb_image_by_disabling(img, segments, segments_to_disable, replace_value = 0):
    wc = copy.deepcopy(img) # makes a working deep copy of the image 

    for seg in segments_to_disable:
        pixels = np.where(segments==seg) # finding segment's pixel coordinate
        wc[pixels] = replace_value # replacing value
    
    return wc # sto restituendo solo l'ultimo pixels: WARNING! al momento questo metovo viene usato
    # passando solo 1 segmento come parametro
    
  #  return wc, np.where(wc == replace_value)

In [25]:
def perturb_image_by_enabling(empty_img, segments, segments_to_enable, replace_value = 0):
    wc = copy.deepcopy(empty_img) # makes a working deep copy of the image 

    for seg in segments_to_enable:
        pixels = np.where(segments==seg) # finding segment's pixel coordinate
        wc[pixels] = replace_value # replacing value

    # return wc, pixels
    return wc

# SINEX

In [26]:
def explain_sinex(f, x, S, verbose=False):
    E = []
    C = {} # { sidx: {'segments': [...], 'deltas': []} } contribution values map
    
    for sidx in range(len(S)):
        si = S[sidx] # support set si
        v = predict_similarity(f, x, si) # calculate initial similarity
        
        if verbose:
            print("Analyzig support set index:", sidx, "Predicted similarity:", v)

        R, uR = segment_input(si.copy()) # creating segments on support set si input
#         plt.imshow(mark_boundaries(preprocess_img(si), R))
#         plt.show()
    
        # Initializes current sample contribution
        hi = np.full(sh, float("-inf"))
        
        if verbose:
            plt.imshow(mark_boundaries(preprocess_img(si), R))
            plt.show()
        
        C[sidx] = {} # segment -> delta map init
        C[sidx]['R'], C[sidx]['contr_seg_map'] = R, []
        
#         print("Total segments:", len(uR))
#         dsidx = find_drawn_segments(si, R) # drawn segments' ids
#         print("Total drawn segments:", len(dsidx))
        
        # REMOVAL FORZATO PER MASKSLIC segmnento 0 è la maschera (o la somma di tutti i segm)
        uR = np.delete(uR, np.where(uR == 0))

        for seg in uR:
            if is_segment_drawn(si, seg, R): # turning on and off only drawn segments
                if mode == 1:
                    zi = perturb_image_by_enabling(np.full(sh, 255), R, [seg], 0)
                else:
                    # disabilitare solo current seg 
                    zi = perturb_image_by_disabling(si, R, [seg], 255)
                    
                idxs = np.where(R == seg)
                pxl = si[idxs]
                
                u = predict_similarity(f, x, zi) # calculate new similarity
                d = v - u # calculate delta of similarity scores
                c = d / len(pxl) 
                hi[idxs] = c
        
                if verbose:
                    print("New sim:", u)
                    print("Delta:", d)
                    print("Segment:", seg)
                    print("Len:", len(pxl))
                    plt.imshow(zi, cmap='gray')
                    plt.show()
                    print("-------------------")
                
                # append contribution value of the current segment 
                C[sidx]['contr_seg_map'].append((c, seg))
                
        # Appending current support set attribution's vector
        E.append(hi)
    
    return E, C

# SINEXC

In [27]:
def get_segments_to_disable_sinexc(curr_seg, drawn_segments):
    drawn = drawn_segments.copy()
    drawn.remove(0) # REMOVAL FORZATO PER MASKSLIC -> seg 0 è la maschera (o la somma di tutit i sgem)
    
    
    total_segs_to_disable = round((len(drawn) * percentage_of_segments_to_disable) / 100)
    
#     print("Total sample to disable", total_segs_to_disable)
#     print("Total drawn segments", drawn)
    
    if total_segs_to_disable <= 1:
#         lo forzo ad averne 2, in modo tale da sceglierne poi solo 1 oltre il curr seg
        total_segs_to_disable = 2

    random_sample = random.sample(drawn, total_segs_to_disable - 1)
    random_sample.append(curr_seg)
    
    return random_sample

In [28]:
def explain_sinexc(f, x, S, verbose=False):
    E = []
    C = {}
    
    for sidx in range(len(S)):
        si = S[sidx] # support set si

        v = predict_similarity(f, x, si) # calculate initial similarity
        
        if verbose:
            print("Analyzig support set index:", sidx, "Predicted similarity:", v)

        R, uR = segment_input(si.copy()) # creating segments on support set si input
        dsidx = find_drawn_segments(si, R) # drawn segments' ids
        
        # Initializes current sample contribution
#         hi = np.zeros(sh)
        hi = np.full(sh, float("-inf"))
        
        C[sidx] = {} # segment -> delta map init
        C[sidx]['R'], C[sidx]['contr_seg_map'] = R, []
        
        # REMOVAL FORZATO PER MASKSLIC segmnento 0 è la maschera (o la somma di tutti i segm)
        uR = np.delete(uR, np.where(uR == 0))

        for seg in uR:
            if is_segment_drawn(si, seg, R): 
#                 _, segidx = perturb_image_by_disabling(si, R, [seg], 255) # seg parameter is the one we want to turn off
#                 pixels = _[segidx]
                sims = 0
                
                if verbose:
                    print("Segment in analysis:", seg)
                    plt.imshow(_, cmap='gray')
                    plt.show()
                
                for p in range(P):
                    segs_to_disable = get_segments_to_disable_sinexc(seg, dsidx)
                                        
                    if mode == 1: # curr seg deve rimanere attivo insieme ad un tot di altri
                        segs_to_enable = segs_to_disable.copy() # in questo caso sono gli unici che devono rimanere attivi
                        zi = perturb_image_by_enabling(np.full(sh, 255), R, segs_to_enable, 0)
                    else: # curr seg deve essere eliminato insieme ad un tot di altri
                        zi = perturb_image_by_disabling(si, R, segs_to_disable, 255) # seg parameter is the one we want to turn off    
                    
                    u = predict_similarity(f, x, zi) # calculate new similarity
                    sims += u
        
#                     if verbose:
#                         print("Perturbation:", p, " for segment:", seg)
#                         plt.imshow(zi, cmap='gray')
#                         plt.show()


                segidx = np.where(R == seg) # qui uso i segidx e i pixels del segmento corrente in analisi
                pixels = si[segidx]
                
                # qui uso i segidx e i pixels del segmento corrente in analisi
                d = v - (sims / P) # calculating delta of similarity scores
                c = d / len(pixels)
                hi[segidx] = c
                
                # append contribution value of the current segment 
                C[sidx]['contr_seg_map'].append((c, seg))
                
                if verbose:
                    print("New sim:", u)
                    print("Delta:", d)
                    print("Segment:", seg)
                    plt.imshow(zi, cmap='gray')
                    plt.show()
                    print("-------------------")
    
        # Appending current support set attribution's vector
        E.append(hi)
        
    return E, C