<a href="https://colab.research.google.com/github/MouseLand/course-materials/blob/main/behavior_encoding/tutorial_cajal_solutions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Behavioral encoding models of neural population activity

In this notebook, we will build several encoding models of neural activity based on the orofacial behaviors of mice.
The encoding models are increasingly more complicated:
1) linear regression from spatial keypoints
2) linear regression from spatiotemporal keypoints
3) nonlinear regression (i.e. neural networks) from spatiotemporal keypoints

We will use rastermap to visualize the neural activity, and also to visualize the prediction.

To keep the notebook interactive, there are exercises throughout

* QUESTION MARKS: where ????? need to be replaced by a short equation, such as a variable or a function name.
* Q: (discussion questions and quiz questions) have a short discussion with your colleague about this. At the end of each section we will discuss them as a group.

**Recording info**: We will use a spontaneous activity recording from [Syeda et al, 2023](https://www.biorxiv.org/content/10.1101/2022.11.03.515121v1.abstract). We recorded 34,086 neurons from mouse sensorimotor cortex for 2+ hours using two-photon calcium imaging at a rate of 3.2Hz. FYI to make the download of the dataset faster, we are analyzing only the first half of the recording. During the recording, the mouse was free to run on an air floating ball, and we recorded the mouse face with a camera at a rate of 50Hz and tracked keypoints on the mouse face.

If you are on google colab, select the GPU runtime:
**Runtime > Change runtime type > Hardware accelerator = GPU**

# 0. Setup

Install libraries

In [None]:
!pip install rastermap
# SUGGESTION: you can hide the ouput of a code cell after running it, by double-clicking on left of the output
# SUGGESTION #2: you can instead run the pip install commands in a different anaconda prompt

Helper functions

In [None]:
import numpy as np
from scipy import stats
from matplotlib import pyplot as plt
import time

# colors for the behaviors
kp_colors = np.array([[0.55,0.55,0.55], [0.,0.,1],
                      [0.8,0,0], [1.,0.4,0.2],
                      [0,0.6,0.4], [0.2,1,0.5],
                      ])
pc_colors = plt.get_cmap('viridis')(np.linspace(0,0.9,8))

def plot_pcs_run(Vsv, run, xmin, xmax):
    fig = plt.figure(figsize=(10,6))
    grid = plt.GridSpec(9, 1, figure=fig, hspace = 0.4)

    # plot running speed
    ax = plt.subplot(grid[0, 0])
    ax.plot(run[xmin:xmax], color=kp_colors[0])
    ax.set_xlim([0, xmax-xmin])
    ax.axis('off')
    ax.set_title('running speed', color=kp_colors[0])

    for j in range(8):
        ax = plt.subplot(grid[j+1])
        ax.plot(Vsv[xmin:xmax, j], color=pc_colors[j])
        ax.set_xlim([0, xmax-xmin])
        ax.axis('off')
        ax.set_title(f'PC {j+1}', color=pc_colors[j])


Download data and assign variables from dictionary

In [None]:
from rastermap import utils

# download and load the spontaneous activity recording
filename = utils.download_data(data_type='spont2')
dat = np.load(filename)

# unpack the dictionary into individual variables: spks = deconvolved fluorescence;
spks = dat['spks']

# we will z-score each neuron so that the activity is standard deviation 1 and mean 0 for each neuron
spks = stats.zscore(spks, axis=1)

# xpos and ypos and position of the neuron in the brain tissue
xpos, ypos = dat['xpos'], dat['ypos']

# tcam and tneural are times when the video and neural data were sampled, respectively
tcam, tneural = dat['tcam'], dat['tneural']
# convert to seconds
tneural -= tcam[0]
tcam -= tcam[0]
tcam *= (24.0 * 60.0 * 60.0)
tneural *= (24.0 * 60.0 * 60.0)

# run is the running trace
run = dat['run'][:spks.shape[1]]

# beh are orofacial behavior, beh_names are the names of these orofacial behaviors
beh, beh_names = dat['beh'], dat['beh_names']
# we are using only first half of recording
beh = beh[: np.nonzero(dat['tcam'] > dat['tneural'][-1])[0][0] + 50]
tcam = tcam[: np.nonzero(dat['tcam'] > dat['tneural'][-1])[0][0] + 50]
n_beh = len(beh_names)

n_neurons, n_time = spks.shape

print(f'{n_neurons} neurons by {n_time} timepoints')


# 1. Visualizing neural activity with Rastermap

Let's first look at the positions of the neurons.

In [None]:
# POSITIONS OF ALL NEURONS
plt.figure(figsize=(4, 4))
plt.scatter(ypos, xpos, s = 1)
plt.xlabel('X position (um)')
plt.ylabel('Y position (um)')
plt.axis('square')

Now let's run Rastermap. Rastermap re-arranges neurons in the raster plot based on similarity of activity.

In [None]:
from rastermap import Rastermap

### run rastermap
nbin = 200 # number of neurons per superneuron
rm_model = Rastermap(n_clusters=100, n_PCs=128, locality=0.6,
                  time_lag_window=5, bin_size=nbin).fit(spks)
cc_nodes = rm_model.cc.copy()

# embedding is the embedding position for each neuron from rastermap
embedding = rm_model.embedding[:,0]
isort = embedding.argsort() # sorting of neurons by embedding

# bin the sorted activity across neurons
# (we average neurons instead of across trials to reduce variability)
ndiv = (n_neurons // nbin) * nbin
sn = spks[isort][:ndiv].reshape(-1, nbin, n_time).mean(axis=1)
print('"superneuron" activity matrix', sn.shape)

Let's visualize the activity

In [None]:
# timepoints to visualize
xmin = 0  # YOU can change this number
xmax = xmin + 1000

fig = plt.figure(figsize=(10,4), dpi=200)
grid = plt.GridSpec(9, 10, figure=fig, wspace = 0.05, hspace = 0.3)

# plot running speed
ax = plt.subplot(grid[0, :-1])

# REPLACE ????? with the running speed in the interval xmin to xmax
ax.plot(run[xmin:xmax], color=kp_colors[0])
ax.set_xlim([0, xmax-xmin])
ax.axis('off')
ax.set_title('running speed', color=kp_colors[0])

# plot superneuron activity
ax = plt.subplot(grid[1:, :-1])
# REPLACE ????? with the sorted and binned neural activity in the interval xmin to xmax
ax.imshow(sn[:, xmin:xmax], cmap='gray_r', vmin=0, vmax=0.8, aspect='auto')
ax.set_xlabel('time')
ax.set_ylabel('superneurons')

ax = plt.subplot(grid[1:, -1])
ax.imshow(np.arange(0, len(sn))[:,np.newaxis], cmap='gist_ncar', aspect='auto')
ax.axis('off')

**Q: Approximately how many dimensions of neural activity do you think are in the data based on this plot?**

color the neurons by their position in the rastermap

In [None]:
# color the neurons by their position in the rastermap

# POSITIONS OF ALL NEURONS
plt.figure(figsize=(4, 4))

# color the neurons in this plot with c = ?????, where ???? is the embedding position of each neuron from the Rastermap model
plt.scatter(ypos, xpos, s=1, c=embedding, cmap='gist_ncar')
plt.xlabel('X position (um)')
plt.ylabel('Y position (um)')
plt.axis('square');

# 2. Dimensionality reduction: what are the dominant patterns of activity?

Let's compute the top PCs

In [None]:
from sklearn.decomposition import TruncatedSVD

# this function returns the left singular vectors scaled by the singular values
Vsv = TruncatedSVD(n_components = 128).fit_transform(spks.T)
print('Vsv shape: ', Vsv.shape)

# REPLACE ?????? to normalize the singular vectors to unit norm (across time)
V = Vsv.copy() / (Vsv**2).sum(axis=0)**0.5

# project the spiking data onto the singular vectors
U = spks @ V

# renormalize the neural projections
U /= (U**2).sum(axis=0)**0.5

**Q:, answer A, B or C:**

How to obtain the singular values of the data from the U matrix:
- A) take the column norms of U
- B) take the row norms of U
- C) it cannot be obtained from U

plot the PCs in time

In [None]:
plot_pcs_run(Vsv, run, xmin, xmax)

We will predict the PCs from the behavior instead of the single neurons.

**Q: Why might we use the PCs for predictions rather than the single neurons?**

# 3. Baseline model: linear regression

We will first predict the neural PCs from the behavior using linear regression.

The behavioral video is at 50Hz while the neural data is at 3.2 Hz, a difference in sampling rate of 17x.

Here are the behaviors we tracked:

In [None]:
fig, axs = plt.subplots(n_beh, 1, figsize=(10,5))
for j in range(n_beh):
    axs[j].plot(beh[17*xmin : 17*xmax, j], color=kp_colors[j])
    axs[j].set_xlim([0, 17*(xmax-xmin)])
    axs[j].axis('off')
    axs[j].set_title(beh_names[j], color=kp_colors[j])


Since the sampling rate and timing of neural activity and the behavior are different, we will downsample the behavior data to the timestamps of the neural activity.

The easiest way to do this is with interpolation: we know when in time each behavior frame happened (`tcam`), and then we sample it at each time the neural activity happened (`tneural`). There are a lot of fast things going on in the behavior, so to get the average over timepoints at the neural activity time, we smooth the behavioral data first.

In [None]:
from scipy.ndimage import gaussian_filter1d # here we import a smoothing function
from scipy.interpolate import interp1d # importing an interpolation function

# initialize empty matrix
beh_ds = np.zeros((len(tneural), n_beh), 'float32')

for j in range(n_beh):
    # filter the data
    # (smoothing scale proportional to difference in sampling rate)
    bsmooth = gaussian_filter1d(beh[:,j], 50/3.2)

    # interpolate
    f = interp1d(tcam, bsmooth)

    # REPLACE ?????
    # look up the usage of interp1d to find out how to apply the function f at new times 'tneural'
    beh_ds[:,j] = f(tneural)

print(beh_ds.shape)

Visualize the downsampled traces:

In [None]:
# plot the traces again
fig, axs = plt.subplots(n_beh, 1, figsize=(10,5))
for j in range(n_beh):
    axs[j].plot(beh_ds[xmin:xmax, j], color=kp_colors[j])
    axs[j].set_xlim([0, (xmax-xmin)])
    axs[j].axis('off')
    axs[j].set_title(beh_names[j], color=kp_colors[j])


Now, to do prediction, we have to do a train-test split. You always want to train your model on a subset of data and test its performance on another set

In [None]:
# split behavioral times into train-test batches
# * use interleaved segments *
n_time_beh = beh.shape[0]
n_segs = 20
n_len  = n_time_beh / n_segs
sinds = np.linspace(0, n_time_beh - n_len, n_segs).astype(int)
itest = sinds[:,np.newaxis] + np.arange(0, n_len*0.25, 1, int)
tpad = 50 # space in between segments
itrain = sinds[:,np.newaxis] + np.arange(n_len*0.25 + tpad, n_len - tpad, 1, int)

print(itrain.shape, itest.shape)

# we will downsample into the neural timestamps for the linear regression
f = interp1d(tcam, np.arange(len(tcam)))
icam_best = np.round(f(tneural)).astype('int')

# find indices of elements of itrain / itest in icam_best
ineural_train = [np.nonzero(np.isin(icam_best, itrain[i]))[0] for i in range(n_segs)]
ineural_test = [np.nonzero(np.isin(icam_best, itest[i]))[0] for i in range(n_segs)]

itrain_ds = np.hstack(ineural_train)
itest_ds = np.hstack(ineural_test)

print(itrain_ds.shape, itest_ds.shape)

**Q: Why did we split the train/test into blocked segments rather than randomly interleaving time-points?**

Use linear regression to perform the prediction, predict PCs $Y$ using behaviors $X$:

$$ A = (X_\text{train}^\top X_\text{train})^{-1} (X_\text{train}^\top Y_\text{train})$$

$X$ is time by behavioral components, $Y$ is time by neural components. If you want to regularize the linear regression:

$$ A = (X_\text{train}^\top X_\text{train} + \lambda I)^{-1} (X_\text{train}^\top Y_\text{train})$$

Then the prediction on time points is:

$$ \hat Y_\text{test} = X_\text{test} A $$

In [None]:
## predict using behavior traces
# regularized linear regression from behavior to neural PCs

XtX = beh_ds[itrain_ds].T @ beh_ds[itrain_ds]
XtY = beh_ds[itrain_ds].T @ Vsv[itrain_ds]
lam = 1e1 # regularizer

# REPLACE ?????
# add a ridge regularizer to the linear regression with parameter 'lam'
XtX = XtX + lam * np.eye(n_beh)

# regression matrix
A = np.linalg.solve(XtX, XtY)

# prediction on test data, REPLACE ????? with behaviors on test points and the regression matrix
Vpred_linear = beh_ds[itest_ds] @ A


We quantify performance with fraction of variance explained (unnormalized). The fraction of variance explained (FVE) is the ratio of the variance explained and the total variance:

$$ \text{FVE} = \frac{\text{var}[Y] - \text{var}[Y - \hat{Y}]}{\text{var}[Y]} = 1 - \frac{\text{var}[Y - \hat{Y}]}{\text{var}[Y]} $$

We can compute this per PC $j$ as well:

$$ \text{FVE}_j = \frac{\text{var}[\mathbf{y}_j] - \text{var}[\mathbf{y}_j - \hat{\mathbf{y}}_j]}{\text{var}[\mathbf{y}_j]} = 1 - \frac{\text{var}[\mathbf{y}_j - \hat{\mathbf{y}}_j]}{\text{var}[\mathbf{y}_j]} $$


In [None]:
# overall varexp
varexp_linear = 1 - (Vpred_linear - Vsv[itest_ds]).var() / (Vsv[itest_ds]).var()

# variance explained per PC
residual = (Vpred_linear - Vsv[itest_ds]).var(axis=0)
varexp_PC = 1 - residual / Vsv[itest_ds].var(axis=0)

print(f'overall fraction of variance explained = {varexp_linear : 0.3f}')

Plot PCs and prediction

In [None]:
fig, axs = plt.subplots(8, 1, figsize=(10, 10))
for j in range(8):
    ax = axs[j]
    ax.plot(Vsv[itest_ds][xmin:xmax, j], color=pc_colors[j])
    ax.plot(Vpred_linear[xmin:xmax, j], color='k', linestyle='--')
    ax.set_xlim([0, xmax-xmin])
    ax.axis('off')
    ax.set_title(f'PC {j+1}, FVE = {varexp_PC[j]:.2f}', color=pc_colors[j])
    if j==0:
        ax.legend(['PC', 'prediction'], loc='upper right')


**Q: Why is the top PC predicted best? Does that mean something?**

# 4. Convolutional neural network prediction

There are finer temportal features in the behavioral features that we aren't capturing by smoothing and using the smoothed and downsampled traces.

Instead we can learn the temporal features by using a 1D convolution layer with various filters -- called kernels.

See below a nice illustration of a convolution from this [webpage](https://e2eml.school/convolution_one_d.html). This kernel is a gaussian, you can see how it smooths the data. But a neural network can learn whatever kernels help with prediction.

![conv_gif](https://e2eml.school/images/conv1d/stride_1.gif)

Let's create a model to predict neural activity using convolutions.

We will use `nn.Conv1d()` for the convolutional layer, which requires the following arguments for initialization (see full documentation [here](https://pytorch.org/docs/master/generated/torch.nn.Conv1d.html)):
  * $C^{in}$: the number of input channels
  * $C^{out}$: the number of output channels (number of different convolutional filters)
  * $K$: the size of the $C^{out}$ different convolutional filters

When you run the network, you can input a time series of arbitrary length $(L^{in})$, but it needs to be shaped as a 3D input $(N, C^{in}, L^{in})$, where $N$ is the number of batches.

Note: as in the 2D convolutions we will want to use non-zero padding so that the output is the same size as the input (`padding = K//2`).

We can initialize the convolutional filters with sines and cosines:

In [None]:
K = 201 # length of kernels
theta = np.linspace(-np.pi, np.pi, K)
freq = np.arange(1, 6)
filters = np.vstack((np.sin(theta * freq[:, np.newaxis]),
                     np.cos(theta * freq[:, np.newaxis])))
plt.plot(filters.T);

**Q: Why initialize with sines and cosines for the temporal convolution layer?**

## Linear prediction w/ 1D convolutions

We will create a model with a linear layer, a 1D convolutional layer, and a linear output layer.

To reduce the number of parameters, we will use a simplified version of a 1D convolutional layer. Each input to the convolutional layer will be filtered by the same filters. This enables us to compute many temporal functions of the behavioral inputs while minimizing the number of parameters.

<img src="https://lh3.googleusercontent.com/d/11UDSc6uw51GYnmLhD75QLeHd6nwH_5E7" alt="behavior to neural PC prediction net with one linear layer then a convolutional layer then another linear layer" width=500></img>

In the code, we have 20 inputs to the convolutional layer, and 10 filters - initialized as above - and so there are 120 outputs from the convolutional layer. The filters are each $K$ timepoints, and so it is $10\times K$ parameters in this layer.

**Q: If we were to implement this as a standard 1D convolutional layer with 20 inputs and 120 outputs, how many parameters would there be in the layer?**




In [None]:
import torch
from torch import nn

class PredModel(nn.Module):
    """ Model to predict neural activity from behaviors w/ one conv layer
      Attributes:
          conv1x1 (nn.linear): linear layer
          conv (nn.Conv1d): convolutional layer
          activation (nn.ReLU): non-linearity
          readout (nn.Sequential): linear layers + ReLUs (n_readout determines #)
    """
    def __init__(self, n_in=6, n_hidden=20, n_out=128, n_filters=10, K=201,
                n_readout=1, nonlinear=False, filters=None):
        """ Initialize network

        Args:
            n_in (int, optional): number of input behavior variables. Defaults to 6.
            n_hidden (int, optional): number of combinations of behavior variables to be filtered by conv. Defaults to 20.
            n_out (int, optional): number of output neurons / PCs. Defaults to 128.
            n_filters (int, optional): number of different convolutional filters. Defaults to 10.
            K: size of each convolutional filter
            n_readout (int, optional): number of readout layers
            nonlinear (bool, optional): whether to use nonlinearities. Defaults to False.
            filters: (optional) initialize the convolutional weights

        """
        super().__init__()
        # implement the first linear layer with a 1x1 conv layer (equivalent to a linear layer)
        self.conv1x1 = nn.Conv1d(n_in, n_hidden, kernel_size=1)

        # temporal convolution layer
        # REPLACE ????? with conv1d parameters
        self.conv = nn.Conv1d(1, n_filters, kernel_size=K, padding=K//2, stride=1)

        # initialize filters
        if filters is not None:
            self.conv.weight = nn.Parameter(filters)
            self.conv.bias = nn.Parameter(torch.zeros((n_filters,), dtype=torch.float32))

        # optional activation
        self.activation = nn.ReLU() if nonlinear else nn.Identity()

        # readout from conv layers to neural activity
        self.readout = nn.Sequential()
        n_h = n_hidden * n_filters
        # REPLACE ????? with linear layer parameters
        self.readout.add_module('linear0', nn.Linear(n_h, n_out))

        # add additional layers if n_readout > 1
        for i in range(1, n_readout):
            if nonlinear:
                self.readout.add_module(f'relu{i-1}', nn.ReLU())
            self.readout.add_module(f'linear{i}', nn.Linear(n_out, n_out))

    def forward(self, x, verbose=False):
        """Run stimulus through convolutional layer

        Args:
            x (torch.tensor): n_batches x n_in x n_time
            verbose (bool, optional): whether to print intermediate shapes. Defaults to False.

        """
        h = self.conv1x1(x)
        if verbose:
            print('conv1x1 output: ', h.shape)

        # apply conv layer to each output of the conv1x1
        h = [self.conv(h[:, i:i+1]) for i in range(h.shape[1])]
        h = torch.cat(h, axis=1)

        # apply nonlinearity if nonlinear
        h = self.activation(h)
        if verbose:
            print('conv output: ', h.shape)

        # transpose convolutional output for the linear layer(s) in readout
        h = h.transpose(2, 1)
        if verbose:
            print('input to readout: ', h.shape)

        # readout
        y = self.readout(h)
        if verbose:
            print('output: ', y.shape)

        return y

model = PredModel(filters=torch.from_numpy(filters).unsqueeze(1).float())
print(model)

beh_ex = torch.from_numpy(beh[:1000]).float().unsqueeze(0)
print(beh_ex.shape)

# reshape the behavior for the conv1x1 layer
# REPLACE ????? with transpose
beh_ex = beh_ex.transpose(2, 1)

y = model(beh_ex, verbose=True)


Since we are running the network on the full, nondownsampled behavioral traces, the output of the network is at the timestamps of the behaviors `tcam`. We need to obtain the output at the `tneural` times, which is a subset of these timepoints. We will sample the output of the network at these timepoints to compute the loss function between the prediction and the true neural PC activity.

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

print('itrain = train times, shape: ', itrain.shape)
print('itest = test times, shape: ', itest.shape)

# put behavioral data on GPU
X_train = torch.from_numpy(beh[itrain]).to(device)
X_test = torch.from_numpy(beh[itest]).to(device)
# transpose for conv1x1 layer
X_train = X_train.transpose(2, 1)
X_test = X_test.transpose(2, 1)

## sample output of network at timepoints with neural activity

# get indices of behavior with neural activity
f = interp1d(tcam, np.arange(len(tcam)))
icam_best = np.round(f(tneural)).astype('int')

# obtain neural activity indices in each train batch
in_train = np.isin(itrain, icam_best)
isample_train = np.nonzero(in_train)
isample_train = [isample_train[1][isample_train[0] == i] for i in range(n_segs)]

# obtain neural activity indices in each test batch
in_test = np.isin(itest, icam_best)
isample_test = np.nonzero(in_test)
isample_test = [isample_test[1][isample_test[0] == i] for i in range(n_segs)]

# put on the GPU
isample_train = [torch.from_numpy(isample_train[i]).to(device) for i in range(n_segs)]
isample_test = [torch.from_numpy(isample_test[i]).to(device) for i in range(n_segs)]

**Q: Why might we subsample the output of the network for predicting the neural activity rather than using another strategy? Are there any advantages/disadvantages?**

We have the timepoints of the neural activity in the training and test set from above `ineural_train` and `ineural_test`. We will use these timepoints and put the neural PCs on the GPU.

In [None]:
Y_train = [torch.from_numpy(Vsv[ineural_train[i]]).to(device) for i in range(n_segs)]
Y_test = [torch.from_numpy(Vsv[ineural_test[i]]).to(device) for i in range(n_segs)]
Y_test_cpu = np.vstack([Vsv[ineural_test[i]] for i in range(n_segs)])

The loss function computes the MSE (mean squared error) between the prediction and the true neural PC activity, and during optimization of the parameters we minimize this. We will add an another optional term to the loss function to impose smoothness on our convolutional filters. We compute smoothness as the squared difference between consecutive timepoints in the filter.

In [None]:
def loss_fn(y_pred, y_true, sm_penalty=0.1, cweight=None):
    # mse loss
    mse_loss = nn.MSELoss()
    # REPLACE ?????
    loss = mse_loss(y_pred, y_true)

    # temporal conv smoothness penalty
    if sm_penalty > 0 and cweight is not None:
        smloss = sm_penalty * (torch.diff(cweight)**2).sum()
        loss += smloss
    return loss

We will now define the training function:

In [None]:
def train_model(model, X_train, Y_train, isample_train,
                X_test, Y_test, isample_test,
                learning_rate = 5e-3, weight_decay = 1e-4,
                sm_penalty = 0.1, n_epochs=250, batch_size=4,
                n_period_init=100, n_period_next=50,
                verbose=True):
    """ train PredModel to predict neural activity

        Args:
            model (PredModel): network to train
            X_train (torch.tensor): n_batches x n_in x n_time
            Y_train (torch.tensor): n_batches x n_out x n_time
            isample_train (list of torch.tensor): indices for neural activity samples in train set
            X_test (torch.tensor): n_batches x n_in x n_time_test
            Y_test (torch.tensor): n_batches x n_out x n_time_test
            isample_test (list of torch.tensor): indices for neural activity samples in test set
            learning_rate (float, optional): learning rate. Defaults to 5e-3.
            weight_decay (float, optional): weight decay. Defaults to 1e-4.
            sm_penalty (float, optional): temporal conv smoothness penalty. Defaults to 0.1.
            n_epochs (int, optional): number of epochs. Defaults to 250.
            batch_size (int, optional): batch size. Defaults to 4.
            n_period_init (int, optional): number of epochs until first learning rate reduction. Defaults to 100.
            n_period_next (int, optional): number of epochs until next learning rate reductions. Defaults to 50.
            verbose (bool, optional): whether to print train loss / test loss. Defaults to True.

        Returns:
            train_loss, test_loss, Y_pred_test

    """

    # initialize PyTorch AdamW optimizer
    # REPLACE ????? with optimizer inputs
    optimizer = torch.optim.AdamW(model.parameters(),
                                  weight_decay=weight_decay,
                                  lr=learning_rate)

    # epochs to reduce learning rate
    epochs_per_period = [n_period_init]
    while epochs_per_period[-1] < n_epochs:
        epochs_per_period.append(epochs_per_period[-1] + n_period_next)

    train_loss, test_loss = np.zeros(n_epochs), np.nan * np.zeros(n_epochs)
    n_batches, n_batches_test = X_train.shape[0], X_test.shape[0]

    # Loop over epochs
    tic = time.time()
    for iepoch in range(n_epochs):
        model.train() # set model to training mode
        iperm = np.random.permutation(n_batches)  # random permutation of batches
        if iepoch in epochs_per_period and iepoch > 0:
            learning_rate /= 3 # reduce learning rate by factor of 3 in each period
            for param_group in optimizer.param_groups:
                param_group['lr'] = learning_rate
        for istart in range(0, n_batches, batch_size):
            inds = iperm[istart : istart + batch_size]
            X_batch = X_train[inds]
            Y_batch = torch.cat([Y_train[i] for i in inds], axis=0)

            # compute network output from inputs in train_data
            # REPLACE ????? with call to forward method of model
            Y_pred = model(X_batch)

            # sample at neural activity times
            Y_pred = torch.cat([Y_pred[j, isample_train[i]] for j, i in enumerate(inds)], axis=0)

            # evaluate loss function
            # REPLACE ????? with inputs to loss function
            loss = loss_fn(Y_pred, Y_batch, sm_penalty=sm_penalty,
                          cweight=model.conv.weight)

            optimizer.zero_grad() # clear previous gradients
            loss.backward() # compute gradients
            optimizer.step() # update weights

            train_loss[iepoch] += loss.item() * len(inds) # average loss over epoch

        train_loss[iepoch] /= n_batches

        # evaluate test loss
        if iepoch % 5 == 0:
            model.eval()
            test_loss[iepoch] = 0.0
            Y_pred_test = []
            with torch.no_grad():
                for i in range(n_batches_test):
                    X_batch = X_test[i : i+1]
                    Y_batch = Y_test[i : i+1]
                    Y_pred = model(X_batch)

                    # sample at neural activity times
                    Y_pred = Y_pred[0, isample_test[i]]
                    Y_pred_test.append(Y_pred.cpu().numpy())

                    # evaluate loss function
                    test_loss[iepoch] += loss_fn(Y_pred, Y_batch[0], sm_penalty=sm_penalty,
                                                  cweight=model.conv.weight)
                test_loss[iepoch] /= n_batches_test
                Y_pred_test = np.vstack(Y_pred_test)
                varexp_nl = 1 - (Y_pred_test - Y_test_cpu).var() / (Y_test_cpu).var()

            if verbose:
                print(f'epoch {iepoch}, train_loss = {train_loss[iepoch]:.3f}, test_loss = {test_loss[iepoch]:.3f}, varexp = {varexp_nl:.3f}, LR = {learning_rate:.1e}, time {time.time()-tic:.2f}s')

    return train_loss, test_loss, Y_pred_test


We will first train the model without any nonlinearities (`nonlinear=False`). This means that we are fitting a form of 'reduced rank' regression, that is factorized across time and input variables.

In [None]:
model = PredModel(n_readout=1, nonlinear=False,
                  filters=torch.from_numpy(filters).unsqueeze(1).float())
device = torch.device('cuda')
model.to(device)

train_loss, test_loss, Y_pred_test = train_model(model, X_train, Y_train, isample_train,
                                                 X_test, Y_test, isample_test,
                                                 sm_penalty=0.1)


Let's visualize the filters that the network learned, what timescales do you notice?



In [None]:
learned_filters = model.conv.weight.data.detach().cpu().numpy().squeeze()

plt.plot(np.arange(-K//2, K//2) / 50, learned_filters.T);
plt.xlabel('time (sec.)')


**Q: What happens when we remove the smoothing penalty? Test this empirically.**

We can compute the variance explained overall, and the per-PC variance explained:

In [None]:
Vpred_conv = Y_pred_test.copy()

# overall variance explained
residual = ((Vpred_conv - Vsv[itest_ds])**2).sum()
varexp_conv = 1 - residual / (Vsv[itest_ds]**2).sum()

print(f'overall fraction of variance explained = {varexp_conv : 0.3f}')

# variance explained per PC
residual = ((Vpred_conv - Vsv[itest_ds])**2).sum(axis=0)
varexp_PC_conv = 1 - residual / (Vsv[itest_ds]**2).sum(axis=0)


It looks like this fit better than the linear regression model which did not use temporal information. Let's see what the prediction looks like, it seems like the higher PCs are better captured especially.

In [None]:
fig = plt.figure(figsize=(10, 10))
grid = plt.GridSpec(8, 1, figure=fig, hspace = 0.4)
varexp_PC_conv = 1 - (Y_pred_test - Y_test_cpu).var(axis=0) / (Y_test_cpu).var(axis=0)
for j in range(8):
    ax = plt.subplot(grid[j])
    ax.plot(Y_test_cpu[xmin:xmax, j], color=pc_colors[j])
    ax.plot(Y_pred_test[xmin:xmax, j], color='k', linestyle='--')
    ax.set_xlim([0, xmax-xmin])
    ax.axis('off')
    ax.set_title(f'PC {j+1}, varexp = {varexp_PC_conv[j]:.2f}', color=pc_colors[j])
    if j==0:
        ax.legend(['PC', 'prediction'], loc='upper right')

## Receptive fields of superneurons

We can use this linear model to estimate the receptive fields of the superneurons from the Rastermap.

First we need to get the superneuron PC weights.

In [None]:
# sort and bin PCs for maxstim estimation
ndiv = (U.shape[0] // nbin) * nbin
U_sn = U[isort][:ndiv].reshape(ndiv//nbin, nbin, -1).mean(axis=1)
U_sn = torch.from_numpy(U_sn).to(device)

print(U_sn.shape)

Now let's find the behavioral "stimuli" that maximize the activity of the superneurons. We will set up the stimuli as one per batch with size of `K`. We will maximize the activity of the superneuron at the middle time point `K//2`. As in maximizing stimuli in visual models, we will constrain the norm of the stimulus, resetting it each iteration.

In [None]:
ns = U_sn.shape[0]

# each batch is a max stim
RFs = 0.1 * torch.randn((ns, n_beh, K), device=device)
RFs.requires_grad = True

optimizer = torch.optim.Adam([RFs], lr=1e-1, weight_decay=0)
n_iter = 200
model.eval()
for iter in range(n_iter):
    losses = 0
    # normalize stimuli
    RFs_norm = RFs / (1e-3 + (RFs**2).mean(axis=(1,2), keepdims=True)**0.5)
    y = model(RFs_norm)
    y = y[:, K//2] # use middle time point

    y = (y * U_sn).sum(axis=-1)

    # REPLACE ????? with loss for max_stim ... what might we want to minimize?
    loss = (-y).mean()
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses+=loss.item()
    if iter % 50 == 0 or iter == n_iter-1:
        print(f'iter {iter}, loss = {losses : .3f}')

# detach and convert to CPU
RFs = RFs.detach().cpu().numpy()
RFs /= (RFs**2).sum(axis=(1,2), keepdims=True)**0.5


Visualize a subset of receptive fields

In [None]:
fig = plt.figure(figsize=(12,6), dpi=200)
grid = plt.GridSpec(12, 21, figure=fig, wspace = 0.05, hspace = 0.0)

vmax = 0.2
ks = np.linspace(5, len(U_sn)-5, 12*3).astype('int')
for i, k in enumerate(ks):
    for j in range(n_beh):
        ax = plt.subplot(grid[i%12, j + 6*(i//12) + (i//12)])
        ax.plot(RFs[k, j], color=kp_colors[j])
        ax.set_ylim([-vmax, vmax])
        ax.axis('off')
        if i==0:
            ax.set_title(beh_names[j], color=kp_colors[j], rotation=45)



Visualize the receptive fields with the data

In [None]:
# timepoints to visualize
xmin = 0
xmax = xmin + 1000

fig = plt.figure(figsize=(12,6), dpi=200)
grid = plt.GridSpec(9, 18, figure=fig, wspace = 0.35, hspace = 0.3)

# plot running speed
ax = plt.subplot(grid[0, :12])
ax.plot(run[xmin:xmax], color=kp_colors[0])
ax.set_xlim([0, xmax-xmin])
ax.axis('off')
ax.set_title('running speed', color=kp_colors[0])

# plot superneuron activity
ax = plt.subplot(grid[1:, :12])
ax.imshow(sn[:, xmin:xmax], cmap='gray_r', vmin=0, vmax=0.8, aspect='auto')
ax.set_xlabel('time')
ax.set_ylabel('superneurons')

for j in range(n_beh):
    ax = plt.subplot(grid[1:, j+12])
    ax.imshow(RFs[:,j], aspect='auto', vmin=-vmax, vmax=vmax, cmap='RdBu_r')
    ax.axis('off')
    ax.set_title(beh_names[j], color=kp_colors[j], rotation=45)


**Q: What does a 'receptive field' from behavior mean? Is it the same as the receptive field of a sensory neuron, like a Gabor model? How is it similar / different?**

## Nonlinear prediction w/ 1D convolutions

We can put non-linearities in our neural network models to better model non-linear aspects of the data. We also add another layer to make it more complex. The network below is the default network from the Facemap paper.

<img src="https://lh3.googleusercontent.com/d/1NluhnseWjZS7euHZhsfXil3rEkhVBRGx" alt="behavior to neural PC prediction net with one linear layer then a convolutional layer then another linear layer" width=500></img>

In [None]:
model = PredModel(n_readout=2, nonlinear=True,
                  filters=torch.from_numpy(filters).unsqueeze(1).float())
device = torch.device('cuda')
model.to(device)
print(model)

**Q: how many layers are in this network?**
* A: <=3
* B: >3 but <=6
* C: >6

Train the model:

In [None]:
train_loss, test_loss, Y_pred_test = train_model(model, X_train, Y_train, isample_train,
                                                 X_test, Y_test, isample_test,
                                                 sm_penalty=0.1)


Quantify the performance of the model:

In [None]:
Vpred_nl = Y_pred_test.copy()

# overall variance explained
residual = ((Vpred_nl - Vsv[itest_ds])**2).sum()
varexp_nl = 1 - residual / (Vsv[itest_ds]**2).sum()

print(f'overall fraction of variance explained = {varexp_nl : 0.3f}')

# variance explained per PC
residual = ((Vpred_nl - Vsv[itest_ds])**2).sum(axis=0)
varexp_PC_nl = 1 - residual / (Vsv[itest_ds]**2).sum(axis=0)


This model did even better at fitting the neural activity! Visualize the prediction. Remember we need to project using the PCs into the neuron space.

In [None]:
sn_pred_nl = U_sn.cpu().numpy() @ Vpred_nl.T
sn_pred_linear = U_sn.cpu().numpy() @ Vpred_linear.T

fig = plt.figure(figsize=(12,14))
grid = plt.GridSpec(20, 1, figure=fig, wspace = 0.35, hspace = 0.7)

# plot running speed
ax = plt.subplot(grid[0, 0])
ax.plot(run[itest_ds][xmin:xmax], color=kp_colors[0])
ax.set_xlim([0, xmax-xmin])
ax.axis('off')
ax.set_title('running speed', color=kp_colors[0])

# plot superneuron activity
ax = plt.subplot(grid[1:7, 0])
ax.imshow(sn[:, itest_ds][:, xmin:xmax], cmap='gray_r', vmin=0, vmax=0.85, aspect='auto')
ax.set_ylabel('superneurons')
ax.set_xticks([])
ax.set_title('neural activity')

# plot superneuron prediction
ax = plt.subplot(grid[7:14, 0])
ax.imshow(sn_pred_nl[:, xmin:xmax], cmap='gray_r', vmin=0, vmax=0.85, aspect='auto')
ax.set_ylabel('superneurons')
ax.set_xticks([])
ax.set_title('behavior prediction - nonlinear')


ax = plt.subplot(grid[14:21, 0])
ax.imshow(sn_pred_linear[:, xmin:xmax], cmap='gray_r', vmin=0, vmax=0.85, aspect='auto')
ax.set_xlabel('time')
ax.set_ylabel('superneurons')
ax.set_title('behavior prediction - linear regression')



We can see that the nonlinear prediction captured finer temporal features in the data.

**Q: Can I use Rastermap in a scientific paper which requires rigorous quantification? How could I use it to quantify responses? Is it useful?**