<a href="https://colab.research.google.com/github/cfoli/Kinematic-Decoding-4-BCI-Control/blob/main/utils.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Mar 30.2025.Su (by Crispin Foli)

# Utility Functions

---



In [None]:

def bin_spikes(spikes, bin_size):

    """
    Bin spikes in time.

    Inputs
    ------
    spikes: numpy array of spikes (neurons x time)

    bin_size: number of time points to pool into a time bin

    Outputs
    -------
    S: numpy array of spike counts (neurons x bins)

    """

    # Get some useful constants.
    [N, n_time_samples] = spikes.shape
    K = int(n_time_samples/bin_size) # number of time bins

    # Count spikes in bins.
    S = np.empty([N, K])
    for k in range(K):
        S[:, k] = np.sum(spikes[:, k*bin_size:(k+1)*bin_size], axis=1)

    return S


In [None]:
def append_history(neural_data, targ_data=None, num_bins_before=1, mode='flatten'):
    """
    Constructs a history matrix from neural data for causal decoding,
    supporting both feedforward and LSTM-style output shapes.

    Parameters
    ----------
    neural_data : np.ndarray of shape (T, N)
        Neural features over time.

    targ_data : np.ndarray of shape (T, m), optional
        Corresponding target data, aligned with valid rows of neural data.

    num_bins_before : int, optional (default=1)
        Number of past time bins to include before the current bin.

    mode : str, either 'flatten' or 'expand'
        'flatten' returns (T-k, (k+1)*N) for feedforward networks.
        'expand' returns (T-k, k+1, N) for LSTM or RNN input.

    Returns
    -------
    X : np.ndarray
        Time-lagged neural features of shape:
            - (T - num_bins_before, (num_bins_before + 1) * N) if mode='flatten'
            - (T - num_bins_before, num_bins_before + 1, N) if mode='expand'

    y : np.ndarray of shape (T - num_bins_before, m) or None
        Aligned target data, if provided.
    """
    if num_bins_before < 0:
        raise ValueError("num_bins_before must be non-negative")
    if mode not in ['flatten', 'expand']:
        raise ValueError("mode must be 'flatten' or 'expand'")
    if neural_data.ndim != 2:
        raise ValueError("neural_data must be 2D (time, features)")

    T, N = neural_data.shape
    window_size = num_bins_before + 1

    if T <= num_bins_before:
        raise ValueError("Not enough time bins to apply specified history length.")

    # Create full lagged matrix
    X_full = np.full((T, window_size, N), np.nan)

    for t in range(num_bins_before, T):
        X_full[t] = neural_data[t - num_bins_before : t + 1]

    # Drop initial rows with NaNs
    X_valid = X_full[num_bins_before:]

    # Return in desired mode
    if mode == 'flatten':
        X_out = X_valid.reshape(X_valid.shape[0], -1)
    else:  # mode == 'expand'
        X_out = X_valid

    y_out = targ_data[num_bins_before:, :] if targ_data is not None else None

    return X_out, y_out


In [None]:

import numpy as np

def append_history_cnn(neural_data, targ_data=None, num_bins_before=1):
    """
    Appends time-lagged history to the neural input.

    Parameters
    ----------
    neural_data : ndarray, shape (T, N)
        Raw neural activity over time (time x channels).
    targ_data : ndarray, shape (T, D), optional
        Target values (e.g., kinematics) over time.
    num_bins_before : int
        Number of previous bins to include in addition to the current bin.

    Returns
    -------
    X : ndarray, shape (T - num_bins_before, num_bins_before + 1, N)
        History-augmented neural input.
    y : ndarray or None
        Trimmed target array, or None if targ_data not provided.
    """
    T, N = neural_data.shape
    window = num_bins_before + 1
    X = np.full((T, window, N), np.nan)

    for i in range(num_bins_before, T):
        X[i] = neural_data[i - num_bins_before:i + 1]

    X = X[num_bins_before:]
    y = targ_data[num_bins_before:] if targ_data is not None else None
    return X, y


In [None]:

def plot_targets(targ, fing_color, targ_radius = 0.075, x_lim=None, y_pred=None, plot_targ_TF=False):

  if x_lim is None:
    x_lim = [0, len(targ)]

  import numpy as np
  import matplotlib.pyplot as plt
  import pandas as pd

  # Find transitions in target display
  targ_starts = np.concatenate(([0], np.where(np.diff(targ) != 0)[0] + 1))

  # Compute the duration (W) of each target
  targ_durations = np.diff(np.append(targ_starts, len(targ)))  # Ensuring last target is included

  # Compute target heights
  # targ_half_height = targsize[targ_starts].flatten()  # Select correct column
  targ_height = 2 * targ_radius  # Full height = 2 * R
  targ_locs = targ[targ_starts] - targ_radius  # Compute Y positions

  # Plot the targets as rectangles
  fig, ax = plt.subplots(figsize=(22, 3), dpi=300)
  if plot_targ_TF:
    ax.plot(range(len(targ)), targ, 'k', linewidth=0.5, linestyle='-', zorder=1)  # Plot the target trajectory

  # Draw rectangles
  for i in range(len(targ_starts)):
      rect = plt.Rectangle((targ_starts[i], targ_locs[i]),
                          targ_durations[i], targ_height,
                          edgecolor='k', facecolor=fing_color['target'],
                          alpha=0.5, linestyle='--', linewidth=0.5, zorder=2)
      ax.add_patch(rect)

  if y_pred is not None:
    ax.plot(range(len(y_pred)), y_pred, fing_color['line'], linewidth=2, linestyle='-', zorder=3)  # Plot the target trajectory

  ax.spines[:].set_visible(False)

  ax.set_xlim(x_lim[0], x_lim[-1])

  # Configure plot
  ax.set_xlabel('Time (samples)',fontsize=18,labelpad=10)
  ax.set_ylabel('Extension',fontsize=18,labelpad=10)
  ax.set_title('Kinematic Targets',fontsize=20)
  ax.tick_params(axis='both', which='major', labelsize=14)
  ax.grid(False)
  plt.show()


In [None]:
import numpy as np

def extend_kinematics_from_position(targ_pos_binned, bin_width, compute='velocity'):
    """
    Extends target position data to include velocity and/or acceleration.

    Parameters:
    -----------
    targ_pos_binned : np.ndarray
        Array of shape [t, d], where t is time and d is the number of spatial dimensions.
    bin_width : float
        Time interval between consecutive position samples.
    compute : str
        One of {'velocity', 'acceleration', 'both'} indicating which kinematics to compute.

    Returns:
    --------
    y_kf : np.ndarray
        Extended array of shape [t, d*k], where k = 1 (position + one of velocity/acceleration)
        or k = 2 (position + velocity + acceleration), ordered as [position, velocity, acceleration].
    """
    if compute not in ['velocity', 'acceleration', 'both']:
        raise ValueError("compute must be one of {'velocity', 'acceleration', 'both'}")

    # Check dimensionality
    if targ_pos_binned.ndim == 1:
        # If 1D, reshape to 2D with single spatial dimension
        targ_pos_binned = targ_pos_binned.reshape(-1, 1)

    t, d = targ_pos_binned.shape
    vel = acc = None

    if compute in ['velocity', 'both']:
        vel = np.diff(targ_pos_binned, axis=0) / bin_width
        vel = np.vstack((vel, vel[-1]))  # Keep same length by repeating last row

    if compute in ['acceleration', 'both']:
        if vel is None:
            vel_temp = np.diff(targ_pos_binned, axis=0) / bin_width
            vel_temp = np.vstack((vel_temp, vel_temp[-1]))
        else:
            vel_temp = vel
        acc = np.diff(vel_temp, axis=0) / bin_width
        acc = np.vstack((acc, acc[-1]))

    # Stack based on compute flag
    components = [targ_pos_binned]
    if compute == 'velocity':
        components.append(vel)
    elif compute == 'acceleration':
        components.append(acc)
    elif compute == 'both':
        components.extend([vel, acc])

    y_kf = np.hstack(components)
    return y_kf
