In [None]:
import pickle
import pandas as pd
import numpy as np
import scipy.spatial.distance as spd
import libmr
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from sklearn.metrics import f1_score

In [None]:
DIR = './data'
MASKING_VER = 'masked_02_drop'

train = pd.read_csv(f'{DIR}/train_{MASKING_VER}.csv', index_col=0)
submission = pd.read_csv(f'{DIR}/sample_submission.csv', index_col=0)

with open(f"{DIR}/train_padded_{MASKING_VER}.pickle", "rb") as f:
    train_padded = pickle.load(f)
    
with open(f"{DIR}/valid_padded_{MASKING_VER}.pickle", "rb") as f:
    valid_padded = pickle.load(f)
    
with open(f"{DIR}/test_padded_{MASKING_VER}.pickle", "rb") as f:
    test_padded = pickle.load(f)

y_data = train['level']
y_train = to_categorical(y_data)

  mask |= (ar1 == a)


In [None]:
MODEL_SAVE_FOLDER_PATH = './model'

def get_model(file_name):
    file = f'{MODEL_SAVE_FOLDER_PATH}/{file_name}.hdf5'
    return tf.keras.models.load_model(file)

In [None]:
model = get_model('BI_LSTM_32_16- 9-0.00558-0.99525')
new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

In [None]:
categories = range(0, 7)
y_true = train['level']
scores = new_model.predict(train_padded)

# Functions

## EVT Meta-Recognition

In [None]:
def compute_distances(mean_vec, features, eu_scale=200.):
    eucos_dist, eu_dist, cos_dist = [], [], []
    
    for feature in features:
        eu = spd.euclidean(mean_vec, feature)
        cos = spd.cosine(mean_vec, feature)
        
        eu_dist.append(eu / eu_scale)
        cos_dist.append(cos)
        eucos_dist.append(eu / eu_scale + cos)

    distances = {'eucos': eucos_dist, 'cosine': cos_dist, 'euclidean':eu_dist}
    return distances

def compute_mean_distances(categories, y_true, scores, eu_scale=200.):
    mean_vectors = {}
    distributions = {'eucos': {}, 
                     'cosine': {}, 
                     'euclidean': {}}
    
    y_pred = np.argmax(scores, axis=-1)
    
    for category in categories:
        print('=' * 100)
        print(category)
        print('=' * 100)

        
        correct_features = scores[(y_true == category) & (y_pred == y_true)]
        
        mean_vec = np.mean(correct_features, axis=0)
        mean_vectors[category] = mean_vec
        print(f'Mean Activation Vector : {mean_vec}')
        
        distances = compute_distances(mean_vec, correct_features)
        for key, distribution in distributions.items():
            distribution[category] = distances[key]
            
        print('=' * 100)
        
    return mean_vectors, distributions

def weibull_tailfitting(categories, means, distances, tailsize=20):
    """ Fit weibull model for each category
    
    Input:
    --------------------------------
    means : pre-computed mean-activation vector
    distances : pre-computed distances for images from MAV
    categories : category list
    
    Output:
    --------------------------------
    weibull_model : Perform EVT based analysis using tails of distances and save
                    weibull model parameters for re-adjusting softmax scores    
    """
    
    weibull_model = {}
    
    for category in categories:
        weibull_model[category] = {}
        weibull_model[category]['distances'] = distances[category]
        weibull_model[category]['mean_vec'] = means[category]
        
        mr = libmr.MR()
        tailtofit = sorted(distances[category])[-tailsize:]
        mr.fit_high(tailtofit, len(tailtofit))
        
        weibull_model[category]['weibull_model'] = mr

    return weibull_model

## OpenMax

In [None]:
def compute_distance(query_vec, mean_vec, distance_type='eucos', eu_scale=200.):
    """ Compute the specified distance type between mean vector and query data.
    
    Input:
    --------
    query_vec: activation vector of query data
    mean_vec: mean activation vector
    eu_scale: scaling for eucliean
    distance_type: 'eucos', 'euclidean', 'cosine'
    
    Output:
    --------
    query_distance : Distance between mean vector and query data
    """
    
    if distance_type == 'eucos':
        query_distance = spd.euclidean(mean_vec, query_vec) / eu_scale + spd.cosine(mean_vec, query_vec)
    elif distance_type == 'euclidean':
        query_distance = spd.euclidean(mean_vec, query_vec) / eu_scale
    elif distance_type == 'cosine':
        query_distance = spd.cosine(mean_vec, query_vec)
    else:
        assert "distance type not known: enter either of eucos, euclidean or cosine"
        
    return query_distance

def compute_softmax(scores):
    scores = np.array(scores)
    e_x = np.exp(scores - np.max(scores))
    probs = e_x / np.sum(e_x)
    
    return probs.tolist()

def recalibrate_scores(weibull_model, categories, scores, alpha=10, distance_type='eucos'):
    top_alpha = scores.argsort()[::-1][:alpha]
    alpha_weights = np.zeros(len(categories))
    alpha_weights[top_alpha] = [((alpha + 1) - i) / alpha for i in range(1, alpha + 1)]
        
    modified_scores = []
    score_unknown = 0

    for idx, category in enumerate(categories):
        category_weibull = weibull_model[category]
        
        distance = compute_distance(scores, category_weibull['mean_vec'], distance_type=distance_type)
        w_score = category_weibull['weibull_model'].w_score(distance)
        modified_score = scores[idx] * (1 - w_score * alpha_weights[idx])
        
        modified_scores.append(modified_score)
        score_unknown += (scores[idx] - modified_score)
    
    modified_scores.append(score_unknown)
    print(modified_scores)
    print(scores)
    
    return compute_softmax(modified_scores)

# Calibration

In [None]:
WEIBULL_TAIL_SIZE = 20
ALPHA_RANK = 3

In [None]:
mean_vectors, distributions = compute_mean_distances(categories, y_true, scores)

In [None]:
for key, values in distributions.items():
    print(key)
    for d in values.values():
        print(np.mean(d))

In [None]:
weibulls = {}

for dist_type, distribution in distributions.items():
    weibulls[dist_type] = weibull_tailfitting(categories, mean_vectors, distribution, tailsize=WEIBULL_TAIL_SIZE)
    
print(weibulls.keys())

In [None]:
DIST_TYPE = 'eucos'

weibull_model = weibulls[DIST_TYPE]
openmax = recalibrate_scores(weibull_model, categories, scores[0], alpha=ALPHA_RANK, distance_type=DIST_TYPE)

print(f"Softmax Scores {compute_softmax(scores[0])}")
print(f"Openmax Scores {openmax}")