In [1]:
import matplotlib
from sklearn.model_selection import ShuffleSplit 
matplotlib.use('Qt5Agg')
import matplotlib.pyplot as plt
from src.utils import *
from src.InstrumentalVariable import InstrumentalVariable
from sklearn.metrics import r2_score
from scipy.special import softmax

In [2]:
def cor_selector(X, y):
    cor_list = []
    n_tests, n_metrics  = X.shape

    for i in range(n_metrics):
        cor = np.corrcoef(X[:, i], y)[0, 1]
        cor_list.append(cor)
    # replace NaN with 0
    cor_list = [0 if np.isnan(i) else i for i in cor_list]
    # feature name
    return cor_list

In [30]:
def run_manifold_tests(X, y, max_outlier=0.2, min_p_value=80, max_p_value=95, bootstrap=True, min_l2_reg=0, 
                       max_l2_reg=50, n_tests=100):
    n_samples, n_features, _ = X.shape
    experiment_coefs = np.zeros((n_tests, n_features))
    experiment_meta_info = []
    for experiment in experiment_coefs:
        meta_info = {}
        p_value = np.random.uniform(min_p_value, max_p_value)
        meta_info['p_value'] = p_value
        
        if max_outlier > 0:
            nu = np.random.uniform(0, max_outlier)
            meta_info['nu'] = nu            
        else:
            nu = None
            
        if max_l2_reg > 0:
            l2_reg = np.random.uniform(min_l2_reg, max_l2_reg)
            meta_info['l2_reg'] = l2_reg
        else:
            l2_reg = None
            
        iv_model = InstrumentalVariable(p_value, l2_reg)
                
        sh_split = ShuffleSplit(n_splits=1, test_size=0.1)
        for train_index, test_index in sh_split.split(X, y):
            X_train, X_test = X[train_index], X[test_index]
            y_train, y_test = y[train_index], y[test_index]
            
            feature_probs = np.abs(cor_selector(X_train[:, :, 0], y_train))
            feature_probs = feature_probs / sum(feature_probs)
            # feature_probs = softmax(feature_probs)
            feature_size = np.random.randint(8, n_features + 1)
            feature_inds = np.random.choice(n_features, feature_size, replace=False, p=feature_probs)
            feature_inds = sorted(feature_inds)
            meta_info['features'] = feature_inds
            
            X_train, X_test = X_train[:, feature_inds], X_test[:, feature_inds]
            
            outliers = get_outlier_experiments(np.append(short_metrics[train_index], long_metrics[train_index],
                                                         axis=1), nu=nu)
            mask = np.ones(len(X_train), np.bool)
            mask[outliers] = 0
            X_train, y_train = X_train[mask], y_train[mask]
            
            if bootstrap:
                bootstrap_inds = np.random.choice(len(X_train), len(X_train))
                X_train, y_train = X_train[bootstrap_inds], y_train[bootstrap_inds]
                
            iv_model.fit(X_train, y_train)
            y_pred = iv_model.predict(X_test)
            r2 = r2_score(y_test, y_pred)
            meta_info['r2'] = r2
            np.put(experiment, feature_inds, iv_model.coef_)
            experiment_meta_info.append(meta_info)
    return experiment_coefs, experiment_meta_info

In [31]:
def meta_to_text(meta_info):
    text_info = []
    for exp_info in meta_info:
        text = ""
        for k, v in exp_info.items():
            if isinstance(v, list):
                text_value = ", ".join(list(map(lambda num: str(round(num, 2)), v)))
            else:
                text_value = str(round(v, 2))
            text += str.format("{}: {} ", k, text_value)      
        text_info.append(text)
    return text_info

In [32]:
def filter_metrics(coefs):
    positive_coefs = np.apply_along_axis(lambda feature: len(np.where(feature > 0)[0]), 0, coefs)
    negative_coefs = np.apply_along_axis(lambda feature: len(np.where(feature < 0)[0]), 0, coefs)
    filtered_coef_inds = []
    for i, feature_coefs in enumerate(coefs.T):
        pos = positive_coefs[i]
        neg = negative_coefs[i]
        if pos == 0 or neg == 0 or min(pos/neg, neg/pos) < 0.2:
            filtered_coef_inds.append(i)
    return np.array(filtered_coef_inds)

In [33]:
def plot_coefficients(coefs, meta_info, metric_map=None):
    def update_annot(ind):
        text = "\n".join(list(map(text_info.__getitem__, ind["ind"])))
        annot.set_text(text)

    def hover(event):
        visible = annot.get_visible()
        cont, ind = False, None
        for col in collections:
            cont, ind = col.contains(event)
            if cont:
                break
        if cont:
            update_annot(ind)
            annot.set_visible(True)
            fig.canvas.draw_idle()
        elif visible:
            annot.set_visible(False)
            fig.canvas.draw_idle()
    
    # def on_pick(event):
    #     for col in collections:
    #         cur_color = col._facecolors[event.ind][0]
    #         if np.array_equal(cur_color, np.array(mcolors.to_rgba('r'))):
    #             new_color = mcolors.to_rgba('b')
    #         else:
    #             new_color = mcolors.to_rgba('r')
    #         col._facecolors[event.ind] = new_color
    #         col._edgecolors[event.ind] = new_color
    #     fig.canvas.draw_idle()
            
    n_tests, n_features = coefs.shape
    text_info = meta_to_text(meta_info)
    r2_values = [experiment['r2'] for experiment in meta_info]
    
    fig, axes = plt.subplots(nrows=n_features, sharex=True)
    fig.suptitle("2_metric_stability")
    collections = []
    for i, metric_coefs in enumerate(coefs.T):
        ax = axes[i]
        ax.set_title('Weights for short_term_' + str(metric_map[i]), loc='left')
        ax.plot([0, 0], [-1, 1], 'r')
        annot = ax.annotate("", xy=(50, 50), xycoords='figure pixels', xytext=(50, 50), textcoords='figure pixels', 
                    bbox=dict(boxstyle="round", fc="w"))
        annot.set_visible(False)
        col = ax.scatter(metric_coefs, np.random.rand(n_tests) * 2 - 1, c=r2_values, 
                         cmap=plt.get_cmap("RdBu"), picker=5, s=50)
        collections.append(col)
    
    # fig.canvas.mpl_connect("pick_event", on_pick)
    fig.canvas.mpl_connect("motion_notify_event", hover)
    plt.show()

In [34]:
short_metrics_p, long_metrics_p = read_data(shift=True)
short_metrics = short_metrics_p[:, :, 0]
long_metrics = long_metrics_p[:, :, 0]

target_metric_p = long_metrics_p[:, 0, :]   # <--- here you can choose target (0, 1, 2, 3)
target_metric = target_metric_p[:, 0]

In [35]:
#main part of the sandbox, as it allows to change the constraints

coefs, info = run_manifold_tests(short_metrics_p, target_metric, max_outlier=0.1,
                                 min_l2_reg=70, max_l2_reg=200,
                                 min_p_value=80, max_p_value=95, n_tests=1000)

In [36]:
clear_metrics = filter_metrics(coefs)
coefs = coefs[:, clear_metrics]

In [37]:
print(np.mean([experiment['r2'] for experiment in info]))

0.48210860176855946


In [38]:
plot_coefficients(coefs, info, clear_metrics)
