# Toy examples

In [1]:
import numpy as np
import torch
import random
import os

from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR
from data.data_loading import *
from warnings import simplefilter

simplefilter(action='ignore')


## benchmarking

In [2]:
def seed_torch(seed=12345):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    
seed_torch()

In [3]:
loader = simulated_dataset # toy_1, toy_2, toy_3, simulated_dataset
scaler = StandardScaler()
split_ratios = [0.75, 0.15, 0.10]

X_train, X_val, X_test, y_train, y_val, y_test = scaled_data(loader, scaler, split_ratios)

# Mean pred. and full pred.

In [None]:
y_mean_pred = np.mean(y_train) * np.ones(y_test.shape)
mean_loss = np.mean((y_test - y_mean_pred)**2)
print(f'Mean Prediction Loss: {mean_loss}')

reg = SVR(C=1.0, epsilon=0.2).fit(X_train, y_train)
y_hat_full = reg.predict(X_test)
full_loss = np.mean((y_test - y_hat_full)**2)
print(f'Full Model Loss: {full_loss}')

# Dimensionality reduction

In [None]:
import umap
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.decomposition import PCA

for perc in [0.25, 0.5, 0.75]:
    print("-----------", perc, "-----------")
    n_components = np.min((X_train.shape[0], int(
        X_train.shape[1]*perc), X_test.shape[0]))
    estims = [umap.UMAP(n_components=n_components), PCA(n_components=n_components)]

    for estim, est_n in zip(estims, ["UMAP", "LDA", "PCA"]):
        X_transf_train = estim.fit_transform(X_train, y=y_train)
        X_transf_test = estim.transform(X_test)

        reg = SVR(C=1.0, epsilon=0.2).fit(X_transf_train, y_train)

        yhat_test = reg.predict(X_transf_test)
        test_loss = np.mean((y_test - y_mean_pred)**2)
        print(est_n, ": ", test_loss)

# Feature selection

In [None]:
from sklearn import feature_selection

estims_names = ['GenericUnivariateSelect-f', 'SelectPercentile-f', 'SelectFpr-f', 'SelectFdr-f', 'SelectFwe-f', 'SelectKBest-f25', 'SelectKBest-f50', 'SelectKBest-f75']
score_func = feature_selection.f_classif

f1 = feature_selection.GenericUnivariateSelect(score_func=score_func, param=1.) # Univariate feature selector with configurable strategy.
f2 = feature_selection.SelectPercentile(score_func=score_func) # Select features according to a percentile of the highest scores.
f3 = feature_selection.SelectFpr(score_func=score_func, alpha=5e-2) # Filter: Select the pvalues below alpha based on a FPR test.
f4 = feature_selection.SelectFdr(score_func=score_func, alpha=.975) # Filter: Select the p-values for an estimated false discovery rate.
f5 = feature_selection.SelectFwe(score_func=score_func, alpha=.975) # Filter: Select the p-values corresponding to Family-wise error rate.
f6 = feature_selection.SelectKBest(score_func=score_func, k=int(X_train.shape[-1]*0.25)) # Select features according to the k highest scores.
f7 = feature_selection.SelectKBest(score_func=score_func, k=int(X_train.shape[-1]*0.5)) # Select features according to the k highest scores.
f8 = feature_selection.SelectKBest(score_func=score_func, k=int(X_train.shape[-1]*0.75)) # Select features according to the k highest scores.

estims = [f1, f2, f3, f4, f5, f6, f7, f8]

X_cut = []
a = []
for estim in estims:
    X_train_transf = estim.fit_transform(X_train, y_train)
    X_val_transf = estim.transform(X_val)
    X_test_transf = estim.transform(X_test)        
    
    try:
        reg = SVR(C=1.0, epsilon=0.2).fit(X_train_transf, y_train)
        yhat = reg.predict(X_test_transf).squeeze()
        print(np.mean((y_test - yhat)**2),str(estim)[:10])

    except:
        print("no feature selected", str(estim)[:10])


# FOCI

In [None]:
from xicorpy import select_features_using_foci
selected = select_features_using_foci(y_train, X_train)

X_train_transf_foci = X_train[:,selected]
X_test_transf_foci = X_test[:,selected]
# SVR
model = SVR(C=1.0, epsilon=0.2)
model.fit(X_train_transf_foci, y_train)
yhat = model.predict(X_test_transf_foci).squeeze()
print(np.mean((y_test - yhat)**2), "FOCI")

# difFOCI

In [8]:
import torch.nn as nn
import torch.optim as optim
from difFOCI._utils import one_layer_net, two_layer_net
from difFOCI.conditional_dependence import conditional_dependence

p = X_train.shape[1]
lr = 1e-1
wd = 1e-1
num_epochs = 1000

version = "vec" # vec, nn

if version == "vec":
    param = torch.tensor(torch.normal(1, .1, (1, p)), requires_grad=True)
    optimizer = optim.Adam([param], lr=lr, weight_decay=wd)
else:
    param = one_layer_net(p, 10, 1) # one_layer_net(p, d, 1), two_layer_net(p,d1,d2,1)
    optimizer = optim.Adam(param.parameters(), lr=lr, weight_decay=wd)

X_train, y_train = torch.Tensor(X_train), torch.Tensor(y_train)

In [9]:
def criterion(param, version):
    if version == "vec":
        x_ = param * X_train
    else:
        x_ = param(X_train)
    return -conditional_dependence(x=x_, y=y_train)

In [10]:
for epoch in range(num_epochs):
        loss = criterion(param, version)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

In [None]:
if version == "vec":
    np_param = param.detach().numpy().squeeze()
    X_train = X_train.numpy()
    X_train_transf = (np_param * X_train)[:, np.argwhere(np.abs(np_param) > 0.1).squeeze()]
    X_val_transf = (np_param * X_val)[:, np.argwhere(np.abs(np_param) > 0.1).squeeze()]
    X_test_transf = (np_param * X_test)[:, np.argwhere(np.abs(np_param) > 0.1).squeeze()]
else:
    X_val, X_test = torch.Tensor(X_val), torch.Tensor(X_test)
    X_train_transf = (param(X_train)).detach().numpy()
    X_val_transf = (param(X_val)).detach().numpy()
    X_test_transf = (param(X_test)).detach().numpy()
 

if len(X_train_transf.shape) == 1:
    X_train_transf = X_train_transf.reshape(-1, 1)
    X_val_transf = X_train_transf.reshape(-1, 1)
    X_test_transf = X_test_transf.reshape(-1, 1)

y_train = y_train.numpy()
reg = SVR(C=1.0, epsilon=0.2).fit(X_train_transf, y_train)

train_loss = np.mean((reg.predict(X_train_transf)-y_train)**2)
val_loss = np.mean((reg.predict(X_val_transf)-y_val)**2)
test_loss = np.mean((reg.predict(X_test_transf)-y_test)**2)

print("Train Loss: ", train_loss)
print("Val Loss: ", val_loss)
print("Test Loss: ", test_loss)