Code for determining how to adjust VonMises dispersion parameter in cursor probabilistic model based on proximity to target.

We'll use a logistic function (as seen below) with parametrized midpoint and steepness.

In [6]:
%load_ext autoreload
%autoreload 2

import numpy as np
import matplotlib.pyplot as plt

import sys
sys.path.append('../utils/plotting/')
sys.path.append('../utils/recalibration/')
sys.path.append('../utils/simulation/')
from plotting_utils import figSize
from hmm import HMMRecalibration
from simulation import simulateBCIFitts
import simulation_utils

# for a reproducible result
np.random.seed(1)


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


#### **Define an initial decoder and initial neural tuning properties (mean firing rates and preferred directions)**


In [7]:
from sklearn.linear_model import LinearRegression

nUnits         = 192
SNR            = 0.5
nTrainingSteps = 10000

initialTuning          = simulation_utils.generateUnits(n_units = nUnits, SNR = SNR)
calNeural, calVelocity = simulation_utils.simulateUnitActivity(initialTuning, noise = 0.3, nSteps = nTrainingSteps)
lr                     = LinearRegression(fit_intercept = True).fit(calNeural, calVelocity)
D                      = np.hstack([lr.intercept_[:, np.newaxis], lr.coef_]).T

# Normalize the gain of this decoder so that it will output vectors with a magnitude of 1 when the encoded velocity has a magnitude of 1. 
D[:, 0] = D[:,0] / np.linalg.norm(D[1:, :][:, 0]) / np.linalg.norm(initialTuning[:, 1])
D[:, 1] = D[:,1] / np.linalg.norm(D[1:, :][:, 1]) / np.linalg.norm(initialTuning[:, 2])


Here we define the amount of exponential smoothing used in the decoder (alpha). Values between 0.9 and 0.96 are pretty reasonable. We then do a sweep for beta.

In [3]:
cfg = dict()
cfg['neuralTuning'] = initialTuning
cfg['D']            = D
cfg['alpha']        = 0.94 # amount of exponential smoothing (0.9 to 0.96 are reasonable)
cfg['delT']         = 0.02 # define the time step (20 ms)
cfg['nDelaySteps']  = 10   # define the simulated user's visual feedback delay (200 ms)
cfg['nSimSteps']    = 10000

possibleGain = np.linspace(0.1,2.5,10)
cfg['beta']  = simulation_utils.gainSweep(cfg, possibleGain, verbose = True)

print('Using gain value beta = ', cfg['beta'])

0 / 10


  simAct  = getNeuralTuning(currControl, decode_params)
  rawDecVec = getDecodedControl(simAct, decode_params)


1 / 10
2 / 10
3 / 10
4 / 10
5 / 10
6 / 10
7 / 10
8 / 10
9 / 10
Using gain value beta =  0.6333333333333333


Now simulate nonstationarity and identify optimal HMM parameters for recalibration:

In [21]:
import hmm_utils
import copy

def simulate_HMM(args, cfg, n_sessions, days_between, shrinkage):
    
    adjustKappa             = lambda x: 1 / (1 + np.exp(-1 * (x - args['inflection']) * args['exp']))
    targLocs                = hmm_utils.generateTargetGrid(gridSize = args['gridSize'])
    stateTrans, pStateStart = hmm_utils.generateTransitionMatrix(gridSize = args['gridSize'], stayProb = args['stayProb'])

    hmm = HMMRecalibration(stateTrans, targLocs, pStateStart, args['kappa'], adjustKappa = adjustKappa)

    session_scores = np.zeros((n_sessions + 1))
    
    # Day 0 performance:
    D_HMM     = np.copy(cfg['D'])
    tuning    = np.copy(cfg['neuralTuning'])
    
    norecal_cfg = copy.deepcopy(cfg)
    hmminit_cfg = copy.deepcopy(cfg)
    hmmrep_cfg = copy.deepcopy(cfg)
    
    ttt     = simulateBCIFitts(cfg)['ttt']
    session_scores[0] = np.mean(ttt)
    
    for i in range(n_sessions):
        for j in range(days_between + 1):
            # introduce daily nonstationarities between recorded sessions
            tuning = simulation_utils.simulateTuningShift(tuning, PD_shrinkage = shrinkage, 
                                                          mean_shift = 0, renormalize = None)  
        
        norecal_cfg['neuralTuning'] = tuning
        hmminit_cfg['neuralTuning'] = tuning
        hmmrep_cfg['neuralTuning'] = tuning
    

        # HMM recalibration (repeated):
        results_hmmrep          = simulateBCIFitts(hmmrep_cfg) 
        targStates, pTargState  = hmm.viterbi_search(results_hmmrep['rawDecTraj'], results_hmmrep['posTraj'])

        inferredTargLoc  = targLocs[targStates.astype('int').flatten(),:]
        inferredPosErr   = inferredTargLoc - results_hmmrep['posTraj']
        inferredTargDist = np.linalg.norm(inferredPosErr, axis = 1)
        neural           = results_hmmrep['neuralTraj']

        D_HMM            = np.linalg.lstsq(np.hstack([np.ones((neural.shape[0], 1)), neural]), inferredPosErr, rcond = -1)[0]  # update previous decoder
        decVec_new       = np.hstack([np.ones((neural.shape[0], 1)), neural]).dot(D_HMM)

        inferredTargDir  = inferredPosErr / inferredTargDist[:, np.newaxis]
        farIdx           = np.where(inferredTargDist > 0.4)[0]
        projVec          = np.sum(np.multiply(decVec_new[farIdx, :], inferredTargDir[farIdx, :]), axis = 1)
        D_HMM           /= np.mean(projVec)
        
        
        hmmrep_cfg['D']          = D_HMM
        ttt_RepeatedRecal        = simulateBCIFitts(hmmrep_cfg)['ttt'] #Simulate BCI performance with the HMM-recalibrated decoder
        session_scores[i+1]  = np.mean(ttt_RepeatedRecal)
        
    return session_scores

In [22]:
sys.path.append('../utils/preprocessing/')
import sweep_utils
from joblib import Parallel, delayed


# general settings:
nSimSteps  = 10000

# HMM settings:
base_opts               = dict()
base_opts['gridSize']   = 20
base_opts['probThresh'] = 'probWeighted'
base_opts['stayProb']   = 0.9999


# grid search settings:
sweep_opts = dict()
sweep_opts['inflection'] = np.linspace(0, 0.5, 21)
sweep_opts['exp']        = np.linspace(1, 40, 11)
sweep_opts['kappa']      = [0.5, 1, 2, 4, 8]
           
args = sweep_utils.generateArgs(sweep_opts, base_opts)

           

#--------------------------------------------------
session_scores = Parallel(n_jobs= -1, verbose = 11)(delayed(simulate_HMM)(*[x,cfg, 10, 1, 0.89 ]) for x in args)

    

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done   2 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done   3 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done   4 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done   5 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done   6 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done   7 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done   8 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done   9 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done  10 tasks      | elapsed:  1.0min
[Parallel(n_jobs=-1)]: Done  11 tasks      | elapsed:  1.8min
[Parallel(n_jobs=-1)]: Done  12 tasks      | elapsed:  1.8min
[Parallel(n_jobs=-1)]: Done  13 tasks      | elapsed:  1.8min
[Parallel(n_jobs=-1)]: Done  14 tasks      | elapsed:  1.8min
[Parallel(n_jobs=-1)]: Done  15 tasks      | elapsed:  

KeyboardInterrupt: 

In [None]:
figSize(10, 24)

argx, argy = np.unravel_index(np.argmin(sweep_scores), sweep_scores.shape)
print('Best inflection: ', inflection_sweep[argx])
print('Best exponent: ', exp_sweep[argy])

plt.imshow(sweep_scores.T, aspect = 'auto')
plt.xticks(np.arange(len(inflection_sweep)), np.round(inflection_sweep, 2))
plt.yticks(np.arange(len(exp_sweep)), np.round(exp_sweep, 2))
plt.colorbar(label = 'Time to target (sec)')
#plt.clim([0, 2])
plt.xlabel('Inflection offset')
plt.ylabel('Exponent')
plt.title('Average time-to-target for different kappa adjustment parameters', fontsize = 12)