# Recurrent Neural Networks (in TensorFlow)
- Also see tensorflow time series analysis tutorial: https://www.tensorflow.org/tutorials/structured_data/time_series
- Simplified version of the RNNs used for the manuscript https://www.biorxiv.org/content/10.1101/2025.05.13.653873v1.full.pdf
- Uses a cusomtized variant of SimpleRNN from tensorflow to include private noise term as well as a decay factor $\gamma$

$\texttt{LeakyRNN}$ Equation:
<p>
$\mathbf{h}_{t+1} = (1 - {\gamma})\mathbf{h}_t +{\gamma}\sigma\left( \mathbf{W}_{IN}\mathbf{x}_t +\mathbf{W}_{REC}\mathbf{h}_t + \mathbf{b} + \mathbf{\xi_t}\right)$
</p>

<p>
$y_t = \mathbf{W}_{OUT}\mathbf{h}_t$
</p>

<p style="text-align:left;float:left;margin-right:50px">
$$ \begin{matrix*}[l]
 \gamma &-& \text{activity decay bias}\\
 \mathbf{x}_t &-& \text{input values} \\
 \mathbf{W}_{IN} &-& \text{input weights} \\
 \mathbf{y}_t &-& \text{output values} \\
 \mathbf{b} &-& \text{unit biases} \\
\end{matrix*} $$
</p>
<p style="text-align:left;float:left">
$$ \begin{matrix*}[l]
 \mathbf{\xi}_t &-& \text{private unit noise} \\
 \mathbf{W}_{OUT} &-& \text{output weights} \\
 \mathbf{h}_t &-& \text{unit activations} \\
 \mathbf{W}_{REC} &-& \text{connection weights} \\
 \sigma &-& \text{activation function (} \mathtt{tanh}\text{)} \\
\end{matrix*} $$
</p>

In [None]:
import tensorflow as tf
from pathlib import Path
import matplotlib.pyplot as plt
import os
import numpy as np
import pandas as pd
import random
import sys

# clone supporting files if running on colab, otherwise assume present
if 'google.colab' in sys.modules:
    os.chdir('/content')
    if not Path('rnn_demo_repo').exists():
      !git clone https://github.com/heyslab/rnn_demo_repo.git 
    os.chdir('/content/rnn_demo_repo')

from classes.models import LeakyRNNCell, LeakyRNN

### Define parameters for model

In [None]:
params = {
    'units': 128,                       # number of units in recurrent layer
    'activation': 'tanh',               # activation function for recurrent layer
    'weights_regularizer_coef': 1e-3,   # L2 penalty on all weights
    'activity_regularizer_coef': 1e-3,  # L2 penalty on activity
    'learning_rate': 1e-5,
    'noise_level': 0.3,                 # private noise on each unit
    'input_noise': 0.15,                # noise on the inputs
    'gamma': 0.2,                       # activity decay parameter
}

### Setup and build the model

Leaky RNN describes the inputs as well as the recurrent layer. [code here](classes/models.py)

tf.keras.layers.Dense is the feed forward connections to the single output node

In [None]:
activity_regularizer = tf.keras.regularizers.L2(params['activity_regularizer_coef'])
recurrent_regularizer = tf.keras.regularizers.L2(params['activity_regularizer_coef'])
kernel_regularizer = tf.keras.regularizers.L2(params['weights_regularizer_coef'])

optimizer = tf.keras.optimizers.Adam(params['learning_rate'])

model = tf.keras.models.Sequential([
    LeakyRNN(params['units'], activation=params['activation'], gamma=params['gamma'],
             return_sequences=True, activity_regularizer=activity_regularizer,
             recurrent_regularizer=recurrent_regularizer, kernel_regularizer=kernel_regularizer),
    tf.keras.layers.Dense(
        units=1, activity_regularizer=activity_regularizer,
        kernel_regularizer=kernel_regularizer)
])
model.compile(loss=tf.keras.losses.MeanSquaredError(),
             optimizer=optimizer, metrics=[tf.keras.metrics.MeanSquaredError()])  

### Need a way to generate data for training the model
- Here we generate data for tDNMS trials

In [None]:
def generate_trial_data(n_trials, input_noise):
    
    # generate np arrays with one-hot encoding of cue durations
    trial_ls = np.zeros((200))
    trial_ls[30:80] = 1
    trial_ls[110:130] = 1

    trial_sl = np.zeros((200))
    trial_sl[30:50] = 1
    trial_sl[80:130] = 1

    trial_ss = np.zeros((200))
    trial_ss[30:50] = 1
    trial_ss[80:100] = 1

    target_go = np.zeros(200)
    target_go[131:160] = 1
    target_nogo = np.zeros(200)

    trial_key = {
        'LS': trial_ls,
        'SL': trial_sl,
        'SS': trial_ss
    }

    target_key = {
        'LS': target_go,
        'SL': target_go,
        'SS': target_nogo
    }

    # Also include a start cue as in standard tDNMS training
    start_cue = np.zeros((200))
    start_cue[0:3] = 1

    # Here we omit the SS trial type to simplify trianing
    block_trials = ['SL', 'LS']
    
    blocks = pd.DataFrame(np.tile(block_trials, (n_trials, 1)))
    blocks.apply(random.shuffle, axis=1)
    blocks = blocks.stack().reset_index(drop=True).reset_index()
    blocks.columns = ['trial', 'trial_type']
    blocks.index = pd.MultiIndex.from_frame(blocks)

    trials = blocks['trial_type'].apply(lambda x, trial_key=trial_key: trial_key[x])\
                                .apply(pd.Series)  
    trials.columns.name = 'time_bin'

    start_cues = blocks['trial_type'].apply(lambda _, start_cue=start_cue: start_cue)\
                                    .apply(pd.Series)
    start_cues.columns.name = 'time_bin'

    targets =  blocks['trial_type'].apply(lambda x, target_key=target_key: target_key[x])\
                                  .apply(pd.Series)
    targets.columns.name = 'time_bin'
    trials = trials.stack()
    targets = targets.stack()
    start_cues = start_cues.stack()

    trials = trials + np.random.normal(0, input_noise, trials.shape)
    start_cues = start_cues + np.random.normal(0, input_noise, start_cues.shape)

    return pd.concat((start_cues, trials), axis=1, keys=['light', 'odor']), targets

### Tensor flow requres data to be propery fomatted
- Dimensions need to be batch_size x observations x features
- here we use batch_size = 1, batches are trained simultenously and can speed up learning substantially

In [None]:
def format_for_training(X):
    if len(X.shape) == 1:
        return np.expand_dims(np.expand_dims(np.array(X.values), 0), -1)
    elif len(X.shape) == 2:
        return np.expand_dims(np.array(X.values), 0)

    raise Exception('Can\'t format')

### Generate data for validation
- validation data won't be trained on, but will be tested on each epoc and give a score to help us track model performance

In [None]:
validation_trials, validation_targets = generate_trial_data(4, params['input_noise'])
val_trials_fmt = format_for_training(validation_trials)
val_target_fmt = format_for_training(validation_targets)
history = []

In [None]:
for i in range(100):
    trials, targets = generate_trial_data(1, params['input_noise'])
    targets_fmt = format_for_training(targets)
    trials_fmt = format_for_training(trials)
    history.append(model.fit(trials_fmt, targets_fmt, validation_data=(val_trials_fmt, val_target_fmt), epochs=1, batch_size=1))

In [None]:
mse = list(map(lambda x: x.history['val_mean_squared_error'], history))
plt.plot(mse)

In [None]:
h = model.layers[0](val_trials_fmt)
o = model.layers[1](h)

output = pd.Series(o[0, :, 0], index=validation_trials.index)
trial_cues_series = (validation_trials['odor'] > 0.6).reset_index(drop=True)
plt.plot(output.values)
plt.plot(validation_targets.values)
cue_times = pd.concat(
    (trial_cues_series.where(trial_cues_series.astype(int).diff() > 0).dropna().reset_index()['index'],
     trial_cues_series.where(trial_cues_series.astype(int).diff() < 0).dropna().reset_index()['index']),
    keys=('starts',  'stops'), axis=1)
cue_times.apply(lambda x: plt.gca().axvspan(*x, color='k', alpha=0.15), axis=1)
plt.gca().spines['right'].set_visible(False)
plt.gca().spines['top'].set_visible(False)
plt.gca().set_xlim(0, 400)

In [None]:
from sklearn.decomposition import PCA
import matplotlib.gridspec as gridspec

trials, _ = generate_trial_data(15, params['input_noise'])
h = model.layers[0](format_for_training(trials))
o = model.layers[1](h)

output = pd.Series(o[0, :, 0], index=trials.index)

h_df = pd.DataFrame(h[0], index=trials.index)
pca = pd.DataFrame(PCA(n_components=3).fit_transform(h_df), h_df.index)

plt.figure(figsize=(9, 3))
gs = gridspec.GridSpec(1, 2, wspace=0.4)
axs = list(map(plt.subplot, gs))
pca[[0, 1]].groupby('trial_type').apply(lambda x, ax=axs[0]: ax.plot(*x.values.T))
axs[0].set_ylabel('PC2')
axs[0].set_xlabel('PC1')
axs[0].set_aspect('equal')

pca[[0, 2]].groupby('trial_type').apply(lambda x, ax=axs[1]: ax.plot(*x.values.T))
axs[1].set_ylabel('PC3')
axs[1].set_xlabel('PC1')
axs[1].set_aspect('equal')

## Next Steps
1. Use a Dataset Generator for the training data in order to generate unique data for each batch
2. Identify Fixed and slow points:
   - https://github.com/google-research/computation-thru-dynamics/blob/master/notebooks/Fixed%20Point%20Finder%20Tutorial.ipynb
   - https://www.theoj.org/joss-papers/joss.01003/10.21105.joss.01003.pdf