In [1]:
# imports
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.collections as mcoll
import matplotlib.path as mpath
import random

# This import registers the 3D projection, but is otherwise unused.
from mpl_toolkits.mplot3d import Axes3D  # noqa: F401 unused import
import random
import numba
from numba import jit
import seaborn as sns
from sklearn import svm
import sklearn.metrics as metrics
import pandas as pd



%matplotlib qt
sns.set_style('whitegrid')

In [7]:

# Simple function for euler approximation of dyns
def run_simulation(ics, ode, dt=0.01, num_steps=2000, ode_ops={}):

    # initialize output vector
    num_svs = len(ics)
    ys = np.zeros((int(num_steps/dt), num_svs))
    ys[:] = np.NaN

    # Set initial values
    ys[0, :] = ics

    # Step through "time", calculating the partial derivatives at the current point
    # and using them to estimate the next point
    for i in range(ys.shape[0]-1):

        # calculate next step
        y_dot = ode(ys[i, :], ode_ops=ode_ops)
        ys[i + 1, :] = ys[i, :] + (y_dot * dt)

        # printout on 10% progress
        #if i % (ys.shape[0]//10) == 0:
        #    print('Simulating step {}'.format(i))
    
    return ys


# helper to plot results from simulation
def plot_simulation(ys, dt=0.01, num_steps=2000, beh=None):

    # set some local vars
    num_svs = ys.shape[1]
    xvals = np.arange(num_steps, step=dt)
    plot_offset = num_svs-1

    # plot overlapping line plot of each state variable
    plt.figure()
    ax1 = plt.subplot(211)
    ax2 = plt.subplot(212)

    # iterate state variables and plot
    for sv in range(num_svs):

        # plot first plot
        to_plot = ys[:,sv]
        ax1.plot(xvals, to_plot, label='x{}'.format(sv))

        # plot second
        to_plot2 = (to_plot - np.min(to_plot)) / (np.max(to_plot) - np.min(to_plot))
        ax2.plot(xvals, to_plot2 + plot_offset, label='x{}'.format(sv))

        plot_offset -= 1
    
    # decorate ax1
    ax1.legend(loc='upper left')
    ax1.get_xaxis().set_visible(False)
    ax1.set_ylabel('y')

    # decorate ax2
    # ax2.legend(loc='upper left')
    ax2.set_xlabel('t')
    # ax2.get_yaxis().set_visible(False)
    ax2.set_yticks([x for x in range(num_svs)])
    ax2.set_yticklabels(['x{}'.format(x) for x in range(num_svs)])
    plt.tight_layout()

    # plot qualitative behavior if necessary
    if beh is not None:        

        # plot line plot of behaviour, normalized
        to_plot = beh.astype(float)
        to_plot2 = (to_plot - np.min(to_plot)) / (np.max(to_plot) - np.min(to_plot))
        ax2.plot(xvals, beh + num_svs, label='beh')

        # fix y axis
        # if behavior is a single vector
        if beh.ndim == 1:
            ax2.set_yticks([x for x in range(num_svs+1)])
            yticklabels = ['x{}'.format(x) for x in range(num_svs)]
            yticklabels.append('behavior')
            ax2.set_yticklabels(yticklabels)


# helper to ascribe qualitative behavior to output from run_simulation
def add_behavior_to_sim(ys, beh_fxn, beh_ops={}):

    # get return vector
    beh = beh_fxn(ys, beh_ops=beh_ops)

    # for now just return that
    return beh


# helper function to report regression results
def regression_results(y_true, y_pred):
    
    # Regression metrics
    explained_variance=metrics.explained_variance_score(y_true, y_pred)
    mean_absolute_error=metrics.mean_absolute_error(y_true, y_pred) 
    mse=metrics.mean_squared_error(y_true, y_pred) 
    mean_squared_log_error=metrics.mean_squared_log_error(y_true, y_pred)
    median_absolute_error=metrics.median_absolute_error(y_true, y_pred)
    r2=metrics.r2_score(y_true, y_pred)
    print('explained_variance: ', round(explained_variance,4))    
    print('mean_squared_log_error: ', round(mean_squared_log_error,4))
    print('r2: ', round(r2,4))
    print('MAE: ', round(mean_absolute_error,4))
    print('MSE: ', round(mse,4))
    print('RMSE: ', round(np.sqrt(mse),4))


def convert_sim_matrices_to_dataframe(ys, beh):
    
    # add timepoints as dictionaries to convert to dataframe
    state_var_sample_list = []
    for t in range(len(beh)):
        
        # at timepoint and behavior output
        mysample = {
            't': t,
            'beh': beh[t],
        }

        # add state vars
        for x in range(ys.shape[1]):
            mysample['x{}'.format(x)] = ys[t, x]

        # append sample
        state_var_sample_list.append(mysample)

    # convert to dataframe
    df = pd.DataFrame(state_var_sample_list)

    return df

def train_predictive_model(ys, beh, model_ops={}):

    model_type = model_ops.get('model_type', 'SVM')
    plot_flag = model_ops.get('plot_flag', True)

    if model_type == 'SVM':

        # build linear model and fit
        clf = svm.LinearSVC()
        y_actual = beh.astype(float)
        m = clf.fit(ys, y_actual)

        if plot_flag:
            
            # plot all samples of each state var as their own 
            plt.figure()
            df = convert_sim_matrices_to_dataframe(ys, beh)
            sns.scatterplot(data=df, x='x0', y='x1', hue='beh')
            plt.tight_layout()

            # plot shaded contour of xy samples
            X = ys
            h = model_ops.get('contour_mesh_step_size', .02)  # step size in the mesh
            x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
            y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
            xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                                np.arange(y_min, y_max, h))
            Z = m.predict(np.c_[xx.ravel(), yy.ravel()])
            
            # Put the result into a color plot
            Z = Z.reshape(xx.shape)
            plt.figure()
            plt.contourf(xx, yy, Z, cmap=plt.cm.coolwarm, alpha=0.6)
            plt.xlabel('x0')
            plt.ylabel('x1')
            plt.tight_layout()

        return m

# simple behavior model where if, given two oscillators, whenever 1 is greater than the other
# referencing these functions in clear terms will be weird so this will just be a lookup table
def simple_behavior(ys, beh_ops={}):
    
    # select model type
    model_type = beh_ops.get('model_type', 'greater than')

    # carry out
    if model_type == 'greater than':

        # in case of two units
        if ys.shape[1] == 2:
            ret = ys[:,0] > ys[:,1]
    
    # return behavior structure
    return ret

In [8]:
# modle for single harmonic oscillator
def harmonic_oscillator_ode(ys_i, ode_ops={}):

    vt_old = ys_i[0]
    xt_old = ys_i[1]
    k = ode_ops['k']
    m = ode_ops['m']

    # simple harmonic oscillator as two coupled odes
    dvdt = (-k/m)*xt_old
    dxdt = vt_old

    return np.array([dvdt, dxdt])


ics = [0, 1]
ode_ops = {'k': 1, 'm': 1}
num_steps = 100
dt = 0.001
ys = run_simulation(ics, ode=harmonic_oscillator_ode, dt=dt, num_steps=num_steps, ode_ops=ode_ops)
plot_simulation(ys, dt=dt, num_steps=num_steps)

In [11]:
#########################################################
####### modle for coupled harmonic oscillators ##########
#########################################################
def coupled_oscillator_ode(ys_i, ode_ops={}):

    # unpack local vars
    v1_old, x1_old, v2_old, x2_old = ys_i
    k = ode_ops['k']
    kprime = ode_ops['kprime']
    m = ode_ops['m']

    # two identical harmonic oscillators, coupled by an additional spring
    dv1 = (-(k + kprime) * x1_old + kprime*x2_old)/m
    dx1 = v1_old
    dv2 = (-(k + kprime) * x2_old + kprime*x1_old)/m
    dx2 = v2_old

    return np.array([dv1, dx1, dv2, dx2])


ics = [0, 3, 0, 7]
ode_ops = {'k': 1, 'm': 3, 'kprime': 3}
num_steps = 100
dt = 0.001
ys = run_simulation(ics, ode=coupled_oscillator_ode, dt=dt, num_steps=num_steps, ode_ops=ode_ops)
plot_simulation(ys[:, [1,3]], dt=dt, num_steps=num_steps)


In [50]:
###########################
### add behavior ##########
###########################

ics = [0, 3, 0, 7]
ode_ops = {'k': 1, 'm': 3, 'kprime': 3}
num_steps = 100
dt = 0.001
ys = run_simulation(ics, ode=coupled_oscillator_ode, dt=dt, num_steps=num_steps, ode_ops=ode_ops)
ys = ys[:, [1,3]]
beh = add_behavior_to_sim(ys, beh_fxn=simple_behavior, beh_ops={'model_type':'greater than'})
plot_simulation(ys, dt=dt, num_steps=num_steps, beh=beh)

# state space
plt.figure()
plt.plot(ys[:,0], ys[:,1])
plt.xlabel('x0')
plt.ylabel('x1')

Text(0, 0.5, 'x1')

In [49]:
#####################################
### train predictive model ##########
#####################################

ics = [0, 3, 0, 7]
ode_ops = {'k': 1, 'm': 3, 'kprime': 3}
num_steps = 100
dt = 0.001
ys = run_simulation(ics, ode=coupled_oscillator_ode, dt=dt, num_steps=num_steps, ode_ops=ode_ops)
ys = ys[:, [1,3]]
beh = add_behavior_to_sim(ys, beh_fxn=simple_behavior, beh_ops={'model_type':'greater than'})
model_ops = {'model_type': "SVM", "plot_flag": True}
mdl = train_predictive_model(ys, beh, model_ops=model_ops)





Text(48.847222222222214, 0.5, 'x1')

In [48]:
#######################################
######### van der pols ################
#######################################
# via https://www.johndcook.com/blog/2019/12/22/van-der-pol/ and https://galileo-unbound.blog/2019/08/26/the-fast-and-the-slow-of-grandfather-clocks/

def van_der_pols_ode(ys_i, ode_ops={}):
    mu = ode_ops.get('mu')
    x, y = ys_i
    dx = y
    dy = mu*(1 - x**2)*y - x
    return np.array([dx, dy])

ics = [0, 1]
ode_ops = {'mu': 1.2}
num_steps = 20
dt = 0.001
ys = run_simulation(ics, ode=van_der_pols_ode, dt=dt, num_steps=num_steps, ode_ops=ode_ops)
plot_simulation(ys, dt=dt, num_steps=num_steps)

# state space
plt.figure()
plt.plot(ys[:,0], ys[:,1])
plt.xlabel('x0')
plt.ylabel('x1')

Text(0, 0.5, 'x1')

In [107]:
# load dataset of OLQV, OLQD, AVA, and SMD around turns
df = pd.read_csv('./20220427_0ATR_OLQ_AVA_SMD_trace_df.csv')

# select based on timepoints
df = df[df['t'].isin([0, 1, 2, 3, 4, 5])]

# select based on neurons
smdv = df[df['neuron ID'] == 'SMDV']
ava = df[df['neuron ID'] == 'AVA']

# grab state variablesz
x0 = smdv['yvals'].to_numpy()
x1 = smdv['deriv'].to_numpy()
x2 = ava['yvals'].to_numpy()
x3 = ava['deriv'].to_numpy()
ys = np.array([x0, x1, x2, x3]).T

# grab behavior (it's the same across neurons)
beh = smdv['turn label'].to_numpy()
beh = beh == 'dorsal'

# train model
model_ops = {'model_type': 'SVM', 'plot_flag':False}
model = train_predictive_model(ys, beh, model_ops=model_ops)

# print regression results
actual_behavior = beh
predicted_behavior = model.predict(ys)
# plt.figure()
# plt.scatter(actual_behavior, predicted_behavior)
regression_results(y_true=actual_behavior, y_pred=predicted_behavior)


df = convert_sim_matrices_to_dataframe(ys, beh)

plt.figure()
sns.scatterplot(data=df, x='x1', y='x3', hue='beh')
plt.xlabel('dAVA')
plt.ylabel('dSMDV')

plt.figure()
sns.scatterplot(data=df, x='x0', y='x2', hue='beh')
plt.xlabel('AVA')
plt.ylabel('SMDV')

plt.figure()
sns.scatterplot(data=df, x='x0', y='x3', hue='beh')
plt.xlabel('AVA')
plt.ylabel('dSMDV')

plt.figure()
sns.scatterplot(data=df, x='x1', y='x2', hue='beh')
plt.xlabel('dAVA')
plt.ylabel('SMDV')



# plot average plot of each neuron over this window
#plt.figure()
#to_plot = df
#to_plot = to_plot[to_plot['neuron ID'] == 'AVA']
#sns.lineplot(data=to_plot, x='yvals', y='deriv', hue='turn label')
#plt.tight_layout()


# plot state plot
#plt.figure()
#sns.pairplot(data=df, hue='neuron ID')




explained_variance:  0.0827
mean_squared_log_error:  0.104
r2:  0.0352
MAE:  0.2163
MSE:  0.2163
RMSE:  0.465


Text(0, 0.5, 'SMDV')

0.7837301587301587

666

342