In [None]:
%load_ext autoreload
%autoreload 2

from scipy.io import loadmat
import matplotlib.pylab as plt
from matplotlib.colors import CenteredNorm

import pandas as pd
import numpy as np
import seaborn as sns

# Load and visualize data

- 100 ms bins
- 160 ms after stimulus onset
- 1 s per trial
- subtracted appropriate PSTH (response to drifting grating)
- only firing rates > 0.5 Hz
- mean-matched selection of units based on firing rates

- varied bin sizes: 20 ms -> 1 s
- shuffle control

In [None]:
# load data
m = loadmat('./communication-subspace-master/mat_sample/sample_data.mat', squeeze_me=True, struct_as_record=False)
X, Y_V1, Y_V2 = m['X'], m['Y_V1'], m['Y_V2']
print(f'   X: {X.shape}')
print(f'Y_V1: {Y_V1.shape}')
print(f'Y_V2: {Y_V2.shape}')

In [None]:
fig, axarr = plt.subplots(ncols=3, figsize=(20, 4))
for ax, data, t in zip(axarr, [X, Y_V1, Y_V2], ['source V1', 'target V1', 'target V2']):
    nx, ny = data.shape
    x = np.arange(nx) * .1
    y = np.arange(ny) + 1

    im = ax.pcolormesh(x, y, data.T) #, norm=CenteredNorm(), cmap='coolwarm')
    
    label = 'firing rate [Hz]'

    fig.colorbar(im, ax=ax, location='right', orientation='vertical', label=label)
    ax.set_xlabel('time [s]')
    ax.set_ylabel('unit')
    ax.set_title(t)

fig.tight_layout()

# Regression models

In [None]:
from sklearn.model_selection import train_test_split, KFold, GridSearchCV, cross_val_score
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score, make_scorer
from custom_models import RRRegressor

X, Y = X, Y_V2

def nse(Y_true, Y_pred):
        
    dY_sq = (Y_pred - Y_true)**2
    nse = np.sum(dY_sq) / np.sum(Y_true**2)

    return nse


def get_nse(mod, X, Y, n_fold=10):
    
    mod.fit(X, Y)
    calc_nse = lambda model, X, Y: nse(Y, model.predict(X))
    scores = cross_val_score(mod, X, Y_V2, cv=n_fold, scoring=calc_nse)
    mean, std = np.mean(scores), np.std(scores)
    ser = std / np.sqrt(len(scores))
    
    return mean, ser

pred = dict()

In [None]:
# linear
pipe = Pipeline(steps=[
    ('scaler', StandardScaler()),
    ('mod', LinearRegression())
])

pred['lin'] = get_nse(pipe, X, Y)

# pipe.fit(X, Y)

scores = cross_val_score(pipe, X, Y_V2, cv=10)
lin_mean, lin_std = np.mean(scores), np.std(scores)

In [None]:
# ridge 
l = np.logspace(0, 4, 100)

pipe = Pipeline(steps=[
    ('scaler', StandardScaler()),
    ('mod', Ridge())
])

grd = GridSearchCV(
    pipe, 
    { 'mod__alpha': l, },
    cv=10,
    n_jobs=-1
)

out = dict()

mods = grd.fit(X, Y)
mod = mods.best_estimator_

# pred['ridge'] = cross_val_score(mod, X, Y_v2, cv=10)
pred['ridge'] = get_nse(mod, X, Y)

scores = cross_val_score(mod, X, Y_V2, cv=10)
ridge_mean, ridge_std = np.mean(scores), np.std(scores)


In [None]:
# plot ridge
df = pd.DataFrame(mods.cv_results_)
x, y, yerr = df.loc[:, 'param_mod__alpha'], df.loc[:, 'mean_test_score'], df.loc[:, 'std_test_score']

fig, ax = plt.subplots()
ax.errorbar(x, y, yerr, label='ridge')
ax.axhline(lin_mean, c='C1', label='linear')
ax.axhline(lin_mean + lin_std, c='C1', ls='--')
ax.axhline(lin_mean - lin_std, c='C1', ls='--')
ax.legend()
ax.set_xlabel('ridge parameter alpha')
ax.set_ylabel('R2 score')

ax.set_xscale('log')

In [None]:
# reduced rank
r = np.arange(Y.shape[1]) + 1

pipe = Pipeline(steps=[
    ('scaler', StandardScaler()),
    ('mod', RRRegressor())
])

grd = GridSearchCV(
    pipe, 
    { 'mod__r': r, },
    cv=10,
    n_jobs=1,
)

out = dict()

mods = grd.fit(X, Y)
mod = mods.best_estimator_

In [None]:
# plot RRR
df = pd.DataFrame(mods.cv_results_)
x, y, yerr = df.loc[:, 'param_mod__r'], df.loc[:, 'mean_test_score'], df.loc[:, 'std_test_score']

fig, axarr = plt.subplots(ncols=2, figsize=(16, 5))
ax = axarr[0]
ax.errorbar(x, y, yerr, label='RR')
ax.axhline(lin_mean, c='C1', label='linear')
ax.axhline(lin_mean + lin_std, c='C1', ls='--')
ax.axhline(lin_mean - lin_std, c='C1', ls='--')

ax = axarr[1]
ax.errorbar(x, y, yerr, label='RR')
ax.axhline(ridge_mean, c='C2', label='ridge')
ax.axhline(ridge_mean + ridge_std, c='C2', ls='--')
ax.axhline(ridge_mean - ridge_std, c='C2', ls='--')

for ax in axarr:
    ax.legend(loc='upper right')
    ax.set_xlabel('RRR parameter r')
    ax.set_ylabel('R2 score')

In [None]:
fig, ax = plt.subplots()

mask = df.loc[:, 'n_comp'] < 100
sns.lineplot(data=df.loc[ mask ], x='n_comp', y='NSE', errorbar='se', err_style='bars')

mean, std = pred['lin']
ax.axhline(mean, c='C0', label='linear')
ax.axhline(mean + std, c='C0', ls='--', lw=0.5)
ax.axhline(mean - std, c='C0', ls='--', lw=0.5)

mean, std = pred['ridge']
ax.axhline(mean, c='C1', label='ridge')
ax.axhline(mean + std, c='C1', ls='--', lw=0.5)
ax.axhline(mean - std, c='C1', ls='--', lw=0.5)

nse_mat = loadmat('./communication-subspace-master/nse.mat', squeeze_me=True)['cvLoss']
y = nse_mat[0]
yerr = nse_mat[1]
x = np.arange(len(y)) + 1
ax.errorbar(x, y, yerr)

ax.legend()

In [None]:
# reduced rank (ridge init)

In [None]:
# exclude neurons/folds with 0 variance

# OLD

In [None]:
def add_bias(X):
    'add bias (ones) to an N x p feature array'

    bias = np.ones_like(X[:, :1])
    Z = np.concatenate([bias, X], axis=1)

    return Z


def get_pca_components(X):
    'calulate PCA components (columns of matrix V) for X (rows: observations, cols: variables)'
    # PCA (alternatives: sklearn.decomposition.PCA or np.linalg.svd)
    C = np.cov(X, rowvar=False)        # covariance matrix
    val, vec = np.linalg.eig(C.T)         # diagonalize
    V = vec[:, np.argsort(val)[::-1]]     # ensure descending components

    # from sklearn.decomposition import PCA
    # pca = PCA()
    # pca.fit(Yf)
    # V = pca.components_

    # U, S, V = np.linalg.svd(Yf)

    # V = loadmat('./communication-subspace-master/V.mat', squeeze_me=True)['V']

    return V

def lstsq_prediction(X, Y):

   # least-squares solution
    res = np.linalg.lstsq(X, Y, rcond=None) # lstsq(a, b) solves a * x = b
    B = res[0]
    Y_p = X @ B # predict targets 

    return Y_p, B

def get_rr_weights(X, Y, n_components=None):

    Y_ls, B_ls = lstsq_prediction(X, Y)

    V = get_pca_components(Y_ls)

    n = n_components if n_components else V.shape[1] 

    l = []
    Y_m = Y.mean(axis=0)
    for i in range(n):

        V_i = V[:, :i+1]            # first i principle components
        B_i = B_ls @ V_i @ V_i.T    # enforcing low rank on B_ls

        # # add bias term TODO: check if this can be done earlier
        # bias = Y.mean(axis=0) - (X.mean(axis=0) @ B_i)
        # bias = np.expand_dims(bias, axis=0)
        # B_i = np.concatenate([bias, B_i], axis=0)

        l.append(B_i)

    B = np.array(l)

    return B

def calculate_score(X, B, Y, scorer):
    'expects specific strucutre for B TODO'

    # predict Y
    Y_p = np.tensordot(X, B, axes=(1, 1))
    Y_p = np.moveaxis(Y_p, 0, 1)

    if scorer == 'MSE':
        
        dY_sq = (Y_p - Y)**2
        scr = np.mean(dY_sq, axis=(1, 2))

        return scr

    elif scorer == 'NSE':
        dY_sq = (Y_p - Y)**2

        a = np.sum(dY_sq, axis=(1, 2))  # sum of squared errors
        b = np.sum(Y**2)                # sum of squared targets
        scr = a / b
        
        return scr
    
    else:
        raise ValueError(f'Scorer type {scorer} not implemented')

def pipe(X, Y, n_fold=10, n_components=None):
    
    X = add_bias(X)

    ds = []
    for i, (i_train, i_test) in enumerate(KFold(n_fold).split(X)):

        x_train, y_train = X[i_train], Y[i_train]
        x_test, y_test = X[i_test], Y[i_test]

        B = get_rr_weights(x_train, y_train, n_components=n_components)
        
        # x_test = add_bias(x_test)
        
        score = calculate_score(x_test, B, y_test, scorer='NSE')
        d = pd.DataFrame(data={
            'NSE' : score,
            'n_fold': i + 1,
            'n_comp': np.arange(len(score)) + 1
        })
        ds.append(d)
    
    df = pd.concat(ds, ignore_index=True)

    return df


df = pipe(X, Y, n_components=10)
df