<a href="https://colab.research.google.com/github/beatLaboratory/TIMBRE/blob/main/LFP_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Description

In this notebook we import the experimental data and use LFP and spikes to predict which maze arm the rat is occupying.

First, let's install the dependencies.

In [1]:
!git clone https://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@github.com/beatLaboratory/TIMBRE.git
!pip install -r TIMBRE/requirements.txt

Cloning into 'TIMBRE'...
remote: Enumerating objects: 140, done.[K
remote: Counting objects: 100% (140/140), done.[K
remote: Compressing objects: 100% (110/110), done.[K
remote: Total 140 (delta 80), reused 65 (delta 29), pack-reused 0[K
Receiving objects: 100% (140/140), 3.08 MiB | 6.53 MiB/s, done.
Resolving deltas: 100% (80/80), done.
Collecting keras-complex (from -r TIMBRE/requirements.txt (line 1))
  Downloading keras_complex-0.2.3-py3-none-any.whl (26 kB)
Installing collected packages: keras-complex
Successfully installed keras-complex-0.2.3


#Train and evaluate models

Now we will get the data for one session and train three classifiers:
1.   Linear classifier trained on the demodulated LFP.
2.   TIMBRE - a complex-valued neural network for identifying phase-amplitude patterns in the LFP.
3.   Linear classifier trained on the firing rates of the neurons that were simultaneously recorded.

Here we use session 4, which has less data so downloads and runs faster.

In [12]:
import requests
import os
from scipy import io
from sklearn.linear_model import LogisticRegression
import numpy as np
import TIMBRE.helpers as helpers

repository_id = "24757638" #Behavior_and_spiking_data_for_rats_running_a_3-arm_maze
url = f"https://api.figshare.com/v2/articles/{repository_id}"

# Make the API request
response = requests.get(url)
files = response.json()['files']

file_pattern = "data04.mat"

# Find the matching files
file = next((file for file in files if file['name'] == file_pattern), None)
# Download the files using wget
print(f"Downloading file: {file['name']}")
os.system(f"wget -O {'data.mat'} {file['download_url']}")
data = io.loadmat('data.mat')
LFPs = helpers.filter_data(data['lfps'],2,fs=25,use_hilbert=True)

Downloading file: data04.mat


In [35]:
n_folds = 5 #how many folds to split data into
run_folds = 1 #how many folds to train
all_scores = np.zeros((3,2,run_folds))
for i in range(run_folds):#n_folds):
  for j in range(1):#2):
    test_inds, train_inds = helpers.test_train(data['lapID'],j+1,n_folds,i) #only test 1 fold per session
    model_sp = LogisticRegression()
    model_sp.fit(data['spikes'][train_inds],data['lapID'][train_inds,1])
    all_scores[2,j,i] = np.mean(data['lapID'][test_inds,1] == model_sp.predict(data['spikes'][test_inds]))
    wLFPs,_,_ = helpers.whiten(LFPs,train_inds)
    #model_lfp, fm = helpers.TIMBRE(wLFPs,data['lapID'][:,1],test_inds,train_inds,3*2**3) #8 hidden nodes per arm
    #all_scores[1,j,i] = fm.history['val_accuracy'][-1] #the accuracy on test data after training
    model_lfp_demod, fm = carrier_based(wLFPs,data['lapID'][:,1],test_inds,train_inds)
    all_scores[0,j,i] = fm.history['val_accuracy'][-1]

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


Epoch 1/100
75/75 - 1s - loss: 1.0831 - accuracy: 0.5004 - val_loss: 1.0522 - val_accuracy: 0.7268 - 960ms/epoch - 13ms/step
Epoch 2/100
75/75 - 0s - loss: 1.0341 - accuracy: 0.8413 - val_loss: 1.0057 - val_accuracy: 0.8793 - 214ms/epoch - 3ms/step
Epoch 3/100
75/75 - 0s - loss: 0.9812 - accuracy: 0.9204 - val_loss: 0.9587 - val_accuracy: 0.9009 - 227ms/epoch - 3ms/step
Epoch 4/100
75/75 - 0s - loss: 0.9307 - accuracy: 0.9326 - val_loss: 0.9143 - val_accuracy: 0.9047 - 208ms/epoch - 3ms/step
Epoch 5/100
75/75 - 0s - loss: 0.8868 - accuracy: 0.9363 - val_loss: 0.8774 - val_accuracy: 0.9072 - 215ms/epoch - 3ms/step
Epoch 6/100
75/75 - 0s - loss: 0.8483 - accuracy: 0.9389 - val_loss: 0.8432 - val_accuracy: 0.9098 - 197ms/epoch - 3ms/step
Epoch 7/100
75/75 - 0s - loss: 0.8148 - accuracy: 0.9384 - val_loss: 0.8155 - val_accuracy: 0.9060 - 218ms/epoch - 3ms/step
Epoch 8/100
75/75 - 0s - loss: 0.7830 - accuracy: 0.9422 - val_loss: 0.7831 - val_accuracy: 0.9111 - 192ms/epoch - 3ms/step
Epoch 9

In [36]:
print(np.sum(all_scores,axis=2))

[[0.92630243 0.        ]
 [0.         0.        ]
 [0.96315121 0.        ]]


In [34]:
from random import sample
import matplotlib.pyplot as plt
from keras import utils as np_utils
from keras import backend, optimizers, models, constraints, layers, activations
from keras.callbacks import EarlyStopping
import complexnn

def carrier_based(X,Y,inds_test,inds_train,learn_rate=.001,is_categorical=True):
  """
  Predicts output using demodulated LFP and linear regression

  Parameters:
  - X = Multi-channel data (T samples x N channels, complex-valued)
  - Y = Category labels (T samples, integer-valued)
  - inds_test = test indices (Either T x 1 boolean, or U x 1 integers)
  - inds_train = train indices (Either T x 1 boolean, or U x 1 integers)
  - learn_rate = how quickly the network learns
  - is_categorical = whether the output consists of discrete classes

  Returns:
  - model: trained network
  - fittedModel: history of loss and accuracy for test and train data
  """

  #stack the real and imaginary components of the data
  X = X*np.exp(1j*-np.angle(X[:,0][:,np.newaxis]))
  X = np.concatenate((np.real(X), np.imag(X)), axis = 1)
  #use one-hot encoding for the class labels
  if is_categorical:
      Y = np_utils.to_categorical(Y)
      my_loss = 'categorical_crossentropy'
  else:
      my_loss = 'kde'

  backend.clear_session()
  # Early Stopping: stop training model when test loss stops decreasing
  es = EarlyStopping(monitor = 'val_loss', patience = 1)
  # Specify the algorithm and step size used by gradient descent
  adam = optimizers.Adam(learning_rate=learn_rate)
  num_chans = 24
  model = models.Sequential()
  # Layer 1: Takes a complex-valued projection of the input
  model.add(layers.Dense(num_chans, input_shape=(X.shape[1],), use_bias=True, kernel_constraint = constraints.unit_norm()))
  # Layer 2: Softmax of layer 2
  model.add(layers.Activation(activations.softmax))
  model.add(layers.Dense(Y.shape[1],activation='softmax'))
  model.compile(loss=my_loss, optimizer=adam,metrics = ['accuracy'])
  # Train the model
  fittedModel = model.fit(X[inds_train,:], Y[inds_train,:], epochs = 100,
    verbose = 2, validation_data=(X[inds_test,:], Y[inds_test,:]),
    shuffle=True, callbacks=[es])
  return model, fittedModel

def test_train(lapID,which_phase,n_folds = 5,which_fold = 0):
    """
    Returns test and train samples

    Parameters:
    - lapID: contains info about trial number and maze arm of each sample
    - which_phase: which phase of the session to use (see get_data\get_behav for info)
    - n_folds: how many folds to assign
    - which_fold: which fold to return values for

    Returns:
    - train_inds: which samples to use for training model
    - test_inds: which samples to use for testing model
    """
    ctr = np.zeros(3)
    use_sample = lapID[:,3] == which_phase
    if which_phase == 2: # period where rat is staying at port
        use_sample = use_sample & (lapID[:,2] == 1) #only use correct trials
    fold_assign = -np.ones(np.size(use_sample))
    for i in range(int(np.max(lapID[:,0]))):
        inds = (lapID[:,0] == i) & use_sample
        if np.sum(inds):
            which_arm = int(lapID[inds,1][0])
            fold_assign[inds] = ctr[which_arm]%n_folds
            ctr[which_arm] += 1
    test_inds = fold_assign == which_fold
    train_inds = np.isin(fold_assign, np.arange(n_folds)) & ~test_inds
    train_inds = balanced_indices(lapID[:,1],train_inds)

    return test_inds, train_inds

def balanced_indices(vector, bool_indices):

    """
    Returns indices that balance the number of samples for each label in vector

    Parameters:
    vector: The input vector from which to select indices.
    bool_indices: A boolean array indicating which indices in the vector to consider.

    Returns:
    list: A list of indices representing a balanced selection of the unique values in the subset of the vector.

    Generated using ChatGPT
    """
    # Convert boolean indices to actual indices
    actual_indices = np.where(bool_indices)[0]

    # Extract the elements and their corresponding indices
    selected_elements = [(vector[i], i) for i in actual_indices]

    # Find unique elements
    unique_elements = np.unique(vector[bool_indices])

    # Group elements by value and collect their indices
    elements_indices = {element: [] for element in unique_elements}
    for value, idx in selected_elements:
        if value in elements_indices:
            elements_indices[value].append(idx)

    # Find the minimum count among the unique elements
    min_count = min(len(elements_indices[element]) for element in unique_elements)

    # Create a balanced set of indices
    balanced_indices_set = []
    for element in unique_elements:
        if len(elements_indices[element]) >= min_count:
            balanced_indices_set.extend(sample(elements_indices[element], min_count))

    return np.array(balanced_indices_set)