In [13]:
# from preproc_tools import get_fr_by_sample, to_python_hdf5
import glob
import os
import numpy as np
from joblib import Parallel, delayed
from tqdm import tqdm
import json
from pathlib import Path
import h5py
from sklearn.svm import SVC
from scipy.spatial.distance import pdist
import pickle
import pandas as pd
from datetime import datetime
from ephysvibe.structures.population_data import PopulationData
from ephysvibe.structures.neuron_data import NeuronData
from ephysvibe.trials import  select_trials
from ephysvibe.trials.spikes import firing_rate
from typing import Dict, List

In [None]:
def get_neu_align_sample_test(path, params, sp_sample=False):
    """Returns test and sample aligned data

    Args:
        path (_type_): _description_
        params (_type_): _description_
        sp_sample (bool, optional): _description_. Defaults to False.

    Returns:
        _type_: _description_
    """
    neu = NeuronData.from_python_hdf5(path)
    endsample = params["time_before_sample"] + params["time_after_sample"]
    endtest = params["time_before_test"] + params["time_after_test"]

    for inout in ["in","out"]:
        sp_sample_on, mask_s = neu.align_on(
            select_block=params["select_block"],
            select_pos=inout,
            event="sample_on",
            time_before=params["time_before_sample"],
            error_type=0,
        )

        sp_test_on, _ = neu.align_on(
            select_block=params["select_block"],
            select_pos=inout,
            event="test_on_1",
            time_before=params["time_before_test"],
            error_type=0,
        )        
        setattr(
            neu, "sp_sample_"+inout, np.array(sp_sample_on[:, :endsample], dtype=params["dtype_sp"])
        )
        setattr(neu, "mask_"+inout, np.array(mask_s, dtype=params["dtype_mask"]))       
        setattr(neu, "sp_test_"+inout, np.array(sp_test_on[:, :endtest], dtype=params["dtype_sp"]))

    setattr(
        neu,
        "time_before_sample",
        np.array(params["time_before_sample"], dtype=int),
    )
    setattr(
        neu,
        "time_before_test",
        np.array(params["time_before_test"], dtype=int),
    )
    if ~sp_sample:
        setattr(neu, "sp_samples", np.array([]))
    return neu

In [6]:
40*75/100

30.0

In [8]:
def check_number_of_trials(xdict,samples,min_ntr):
    for key in samples:
        if xdict[key].shape[0]<min_ntr:
            return False
    return True

In [7]:
def color_data(fr_samples:Dict,min_ntr:int):
    samples = ['11','15','51','55']
    enough_tr = check_number_of_trials(fr_samples,samples,min_ntr)
    if not enough_tr: return None
    c1 = np.concatenate([fr_samples['11'],fr_samples['51']],axis=0)
    c5 = np.concatenate([fr_samples['15'],fr_samples['55']],axis=0)
    color = {'c1':c1,'c5':c5}
    return color

def orient_data(fr_samples:Dict,min_ntr:int):
    samples = ['11','15','51','55']
    enough_tr = check_number_of_trials(fr_samples,samples,min_ntr)
    if not enough_tr: return None
    o1 = np.concatenate([fr_samples['11'],fr_samples['15']],axis=0)
    o5 = np.concatenate([fr_samples['51'],fr_samples['55']],axis=0)
    orient = {'o1':o1,'o5':o5}
    return orient

def sampleid_data(fr_samples:Dict,min_ntr:int):
    samples = ['11','15','51','55']
    enough_tr = check_number_of_trials(fr_samples,samples,min_ntr)
    if not enough_tr: return None
    return fr_samples

def neutral_data(fr_samples:Dict,min_ntr:int):
    samples = ['0','11','15','51','55']
    enough_tr = check_number_of_trials(fr_samples,samples,min_ntr)
    if not enough_tr: return None
    n = fr_samples['0']
    nn = np.concatenate([fr_samples['11'],fr_samples['15'],fr_samples['51'],fr_samples['55']],axis=0)
    neutral = {'n':n,'nn':nn}
    return neutral

In [None]:
def preproc_for_decoding(neu:NeuronData,params:Dict,to_decode:str,min_ntr:int,start_sample:int,end_sample:int,start_test:int,end_test:int,avgwin:int=100):
    # Average fr across time
    idx_start_sample = getattr(neu,params['time_before_son'])  + start_sample
    idx_end_sample = getattr(neu,params['time_before_son'])  + end_sample
    idx_start_test = getattr(neu,params['time_before_t1on']) + start_test
    idx_end_test = getattr(neu,params['time_before_t1on']) + end_test
    sampleon=getattr(neu,params['sampleon'])
    t1on=getattr(neu,params['t1on'])

    fr_son = firing_rate.moving_average(
            sampleon, win=avgwin, step=1
        )[:, idx_start_sample:idx_end_sample]
    fr_t1on = firing_rate.moving_average(
            t1on, win=avgwin, step=1
        )[:, idx_start_test:idx_end_test]

    fr = np.concatenate([fr_son,fr_t1on],axis=1)
    fr_samples = select_trials.get_sp_by_sample(fr, neu.sample_id[getattr(neu,params['maskson'])])

    if to_decode == 'color':
        data = color_data(fr_samples,min_ntr)
    elif to_decode == 'orient':
        data = orient_data(fr_samples,min_ntr)
    elif to_decode == 'sampleid':
        data = sampleid_data(fr_samples,min_ntr)
    elif to_decode == 'neutral':
        data = neutral_data(fr_samples,min_ntr)

    return data



In [None]:
def pick_train_test_trials(trials, train_ratio):
    tmp = np.random.permutation(trials)
    train = tmp[: int((trials.shape[0] * train_ratio))]
    test = tmp[int((trials.shape[0] * train_ratio)) :]

    return train, test

In [14]:
def compute_decoding(model,list_data,trial_duration,ntr_train,ntr_test,topred):
    test_train_ratio = 1 - ntr_test / ntr_train
    ntopred = len(topred)
    num_cells=len(list_data)
    # Initialize arrays to store train and test data
    data_train = np.empty(
        [trial_duration, ntr_train*ntopred, num_cells]
    )
    data_test = np.empty(
        [trial_duration, ntr_test*ntopred, num_cells]
    )
    perf = np.empty([trial_duration, trial_duration])
    y_train,y_test = [],[]
    for i in range(ntopred):
        y_train.append(np.zeros(ntr_train)+i)
        y_test.append(np.zeros(ntr_test)+i) 
    y_train,y_test = np.array(y_train),np.array(y_test)

    # Iterate through neurons to randomly pick trials
    for cell in range(list_data):
        trials_train,trials_test=[],[]
        for ipred in topred:
            train, test = pick_train_test_trials(cell[ipred], test_train_ratio)
            train = np.random.choice(train, ntr_train,replace=True)
            test = np.random.choice(test, ntr_test,replace=True)
            trials_train.append(train)
            trials_test.append(train)

        # build matrices of  [timestamp, trials, neurons] dimensions to feed to classifiers
        data_train[:, :, cell] = np.array(trials_train)
        data_test[:, :, cell] = np.array(trials_test)
    
    # train and test classifier
    for time_train in range(trial_duration):
        model.fit(data_train[time_train], y_train)
        for time_test in range(trial_duration):
            y_predict = model.predict(data_test[time_test])
            perf[time_train, time_test] = np.where(y_predict - y_test == 0)[0].shape[0]
    
    return perf

In [None]:

#! This has to be done in populationData or in a script with different options to preproces populations
# create population
get_neu_align_sample_test(path, params, sp_sample=False)
# save population
popu.to_python........

# -- end of preprocessing 

### Prepeare data for decoding

In [None]:
# define vars
to_decode='color'
niterations = 10
ntr_train = 30
ntr_test = 10
min_ntr = 25
trial_duration =17 #! calculate using start end info
start_sample=
end_sample=
start_test=
end_test=
path=""
params= {
        'time_before_son': 'time_before_son_in',
        'time_before_t1on': 'time_before_t1on_in',
        'sampleon': 'sampleon_in',
        't1on': 't1on_in',
        'maskson': 'maskson_in'
}
# preprocessing 
popu = PopulationData.from_python_hdf5(path)
# split by condition and check enough number of trials (>25)
list_data = popu.execute_funtion(preproc_for_decoding(to_decode,params,min_ntr,start_sample,end_sample,start_test,end_test,avgwin=100))
list_data = [idata for idata in list_data if idata is not None ]

### Decode

In [None]:
topred=['c1','c5']
# pick random trials for training and testing and iterate with a double for to train and test in all t
model = SVC(
    kernel="linear", C=0.8, decision_function_shape="ovr", gamma="auto", degree=1
)
all_perf = Parallel(n_jobs=20)(delayed(compute_decoding)(model,list_data,trial_duration,ntr_train,ntr_test,topred) for _ in tqdm(range(niterations)))