In [2]:
import pandas as pd
rna_data = pd.read_csv("rna_common_complete.csv")
rna_data = rna_data.sort_values(by=['sn','period']).reset_index(drop=True)

In [3]:
X_og_shape = rna_data.drop(['sn','group','caarms_status','period'],axis=1).values
X_reshaped = X_og_shape.reshape(len(set(rna_data['sn'])), 3, X_og_shape.shape[1])
labels_group = rna_data[rna_data['period'] == 24]['group'].values
labels = [0 if i == 'C' else 1 for i in labels_group]

In [4]:
import torch
import numpy as np
from neucube.utils import SNR
from neucube.utils import interpolate
from neucube.encoder import Delta

ratios = SNR(X_reshaped[:,0,:], labels)
top_idx = torch.argsort(ratios, descending=True)[0:20]
X_reshaped_topidx = X_reshaped[:,:,top_idx]
interpolated_X = interpolate(X_reshaped_topidx, num_points=104)

encoder = Delta(threshold=0.008)
X = encoder.encode_dataset(interpolated_X)
y = torch.tensor(labels)

In [5]:
import nevergrad as ng
import numpy as np
from functools import partial
from neucube.sampler import TemporalBinning
from neucube.utils import SeparationIndex
from tqdm import tqdm

neuron_parm_dict = {
    'ffs': {'a': 0.1, 'b': 0.2, 'c': -65, 'd': 8},
    'ibfs': {'a': 0.02, 'b': 1, 'c': -65, 'd': 8},
    'ibvfs': {'a': 0.02, 'b': 1, 'c': -65, 'd': 2},
    'ms': {'a': 0.03, 'b': 0.2, 'c': -65, 'd': 8},
    'mch': {'a': 0.05, 'b': 0.2, 'c': -50, 'd': 4},
    'rs': {'a': 0.02, 'b': 0.2, 'c': -65, 'd': 8},
    'ib': {'a': 0.02, 'b': 0.2, 'c': -55, 'd': 4},
    'ch': {'a': 0.02, 'b': 0.2, 'c': -50, 'd': 2},
    'vfs': {'a': 1, 'b': 0.4, 'c': -65, 'd': 2},
}

def objective_function(res_, X_stimuli, labels, params, sampler):
    params = torch.nn.functional.softmax(torch.tensor(params), dim=0).numpy()
    print(params)
    n_type = np.random.choice(['ffs','ch','mch','ibfs','rs'], izh_res.n_neurons, replace=True, p=params)
    a, b, c, d = [torch.tensor(list(map(lambda x: neuron_parm_dict[x][i], n_type))) for i in ['a', 'b', 'c', 'd']]
    izh_res.update_parms(a=a, b=b, c=c, d=d)
    out_spikes = res_.simulate(X_stimuli, mem_thr=30, train=False, verbose=False)
    state_vec = sampler.sample(out_spikes)
    return np.round(SeparationIndex(state_vec, labels).item(), 5)

def run_opt(optimizer, objective_):
    for i in range(optimizer.budget):
        x = optimizer.ask()
        loss = -objective_(params=x.value)
        optimizer.tell(x, loss)
        if (i + 1) % 2 == 0:
            print(f"Iteration {i + 1}/{optimizer.budget}, Current loss: {loss}")

def train_dyanmics(reservoir_, X_, y_, sampler_):
    instrumentation = ng.p.Array(shape=(5,))#.set_bounds(0, 1)

    optimizer = ng.optimizers.TwoPointsDE(parametrization=instrumentation, budget=50, num_workers=1)
    partial_objective_function = partial(objective_function, res_=reservoir_, X_stimuli=X_, labels=y_, sampler=sampler_)

    run_opt(optimizer, partial_objective_function)
    recommendation = optimizer.provide_recommendation()
    return recommendation.value

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn import svm, metrics
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer

from neucube import IzhReservoir
from neucube.sampler import TemporalBinning, SpikeCount
from neucube.utils import SeparationIndex

num_folds = 10
kf = KFold(n_splits=num_folds)
sampler = TemporalBinning(bin_size=10)

true_labels = []
predicted_labels = []
separation_values = []
accuracy_values = []
mcc_values = []

for train_index, test_index in tqdm(kf.split(X)):
    X_train_fold, X_test_fold = X[train_index], X[test_index]
    y_train_fold, y_test_fold = y[train_index], y[test_index]

    izh_res = IzhReservoir(inputs=X.shape[2], c=0.7, l=0.18, input_conn_prob=0.85)
    opt_parms = train_dyanmics(izh_res, X_train_fold, y_train_fold, sampler)

    opt_parms = torch.nn.functional.softmax(torch.tensor(opt_parms), dim=0).numpy()
    n_type = np.random.choice(['ffs','ch','mch','ibfs','rs'], izh_res.n_neurons, replace=True, p=opt_parms)
    a, b, c, d = [torch.tensor(list(map(lambda x: neuron_parm_dict[x][i], n_type))) for i in ['a', 'b', 'c', 'd']]
    izh_res.update_parms(a=a, b=b, c=c, d=d)
    print(pd.Series(n_type).value_counts())
    X_train_opt_spike = izh_res.simulate(X_train_fold, mem_thr=30, train=False, verbose=False)
    X_test_opt_spike = izh_res.simulate(X_test_fold, mem_thr=30, train=False, verbose=False)
    X_train_state_vec = sampler.sample(X_train_opt_spike)
    X_test_state_vec = sampler.sample(X_test_opt_spike)

    param_grid = {'C': [2, 3, 4, 5, 6, 7, 8], 'gamma': [0.1, 0.01, 0.001], 'kernel': ['rbf', 'linear', 'poly']}
    svm_model = svm.SVC()
    mcc_scorer = make_scorer(metrics.matthews_corrcoef)
    grid_search = GridSearchCV(estimator=svm_model, param_grid=param_grid, cv=10, scoring={'accuracy': 'accuracy', 'mcc': mcc_scorer}, refit='accuracy')
    grid_search.fit(X_train_state_vec, y_train_fold)
    y_pred = grid_search.best_estimator_.predict(X_test_state_vec)

    true_labels.extend(y_test_fold)
    predicted_labels.extend(y_pred)
    separation_values.extend([SeparationIndex(X_train_state_vec, y_train_fold), SeparationIndex(X_test_state_vec, y_test_fold)])
    accuracy_values.append(accuracy_score(y_test_fold, y_pred))
    mcc_values.append(metrics.matthews_corrcoef(y_test_fold, y_pred))

0it [00:00, ?it/s]

[0.49832896 0.00810037 0.08040706 0.03526498 0.37789863]
[0.01140512 0.03769752 0.89877574 0.02025139 0.03187023]


In [None]:
# Calculate accuracy
accuracy = accuracy_score(true_labels, predicted_labels)
mcc = metrics.matthews_corrcoef(true_labels, predicted_labels)
print("10-Fold Cross-Validation Accuracy:", accuracy)
print("10-Fold Cross-Validation MCC:", mcc)
print(confusion_matrix(true_labels, predicted_labels))
print(accuracy_values)
print(mcc_values)
print(separation_values)