<font size=7 face="courier">Final Analysis Source Code

This is the code used to create the diagrams in the notebook, `final_analysis.ipynb`. I recommend you at least go through some of these functions to understand how to implement your own analysis functions. Also it is worth looking at the bonus section as it has code for eigendecomposition, which could be a useful lens of analysis if you wanted to use it.

In [None]:
print("Loading: final_analysis_source_code.ipynb...")

Import packages

In [None]:
import numpy as np                                                    # Packages for data analysis
import pandas as pd
import pytz
import sys
import zipfile
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib.patches import Patch, Circle
from scipy.ndimage import gaussian_filter1d
import braingeneers                                                   # Braingeneers code
from braingeneers.analysis.analysis import SpikeData
import braingeneers.data.datasets_electrophysiology as ephys
from scipy.ndimage import gaussian_filter1d
from ipywidgets import interact, interactive, fixed, interact_manual  # package for interactive widgets 
#from food_land import FoodLandEnv
#import gymnasium as gym
#import pygame

Here is some minor stuff to help set up the notebook :

In [None]:
Timezone = pytz.timezone("America/Los_Angeles")

In [None]:
groups = ["Group 1", "Group 2", "Group 3", "Group 4", "Group 5"]
groupID = {"Group 1":22097, "Group 2":21985, "Group 3":22194, "Group 4":20264, "Group 5":22717,}

# <font color="blue"> Loading Data

Here are the functions used to load the data, it's pretty messy and lazy so don't use this as an example of "good" coding, hopefully you never find yourself in a situation where you _need_ to declare this many globals, but sometimes complete code is better than neat code :)

In [None]:
def load_curation(qm_path):
    with zipfile.ZipFile(qm_path, 'r') as f_zip:
        qm = f_zip.open("qm.npz")
        data = np.load(qm, allow_pickle=True)
        spike_times = data["train"].item()
        fs = data["fs"]
        train = [times / fs for _, times in spike_times.items()]
        if "config" in data:
            config = data["config"].item()
        else:
            config = None
        neuron_data = data["neuron_data"].item()
    return train, neuron_data, config, fs

def Load_Data(ChipID): 
    path = "Data/{}/".format(ChipID)
    global game_log, train_log, generic_log, sd_pre, sd_post, spikes_pre, spikes_post, info, mapping, before_first, before_multi, after_first, after_multi; # never do this kids :)

    game_log = pd.read_csv(path +"exp1_food_land_game_log.csv")
    train_log = pd.read_csv(path +"exp1_food_land_train_log.csv")
    generic_log = pd.read_csv(path +"exp1_food_land_log.csv")

    train_pre, neuron_data_pre, _, fs_pre = load_curation("Data/recordings/"+str(ChipID)+"_exp1_acqm.zip")
    train_post, neuron_data_post, _, fs_post = load_curation("Data/recordings/"+str(ChipID)+"_exp1_1_acqm.zip")

    sd_pre = SpikeData(train_pre, neuron_data={0: neuron_data_pre})
    sd_post = SpikeData(train_post, neuron_data={0: neuron_data_post})

    spikes_pre = np.load(path + 'before/spikes.npy',allow_pickle=True)
    spikes_post = np.load(path + 'after/spikes.npy',allow_pickle=True)
    info = np.load(path + 'before/info.npy',allow_pickle=True).item()
    mapping = pd.read_csv(path + 'exp1_mapping.csv')

    before_first = np.load(path + 'before/first_order_connectivity.npy',allow_pickle=True)
    before_multi = np.load(path + 'before/multi_order_connectivity.npy',allow_pickle=True)
    after_first = np.load(path + 'after/first_order_connectivity.npy',allow_pickle=True)
    after_multi = np.load(path + 'after/multi_order_connectivity.npy',allow_pickle=True)


#Load_Data(20264)

def Select_Group(group):
    global group_selected;
    group_selected = group
    ChipID = groupID[group]
    print(f"{group} chip ID : {ChipID}\nLoading Data...")
    Load_Data(ChipID)
    print("Data Loaded!")

#interact_manual( Select_Group, group=groups)


# <font color="Green">Analysis Plots

Here is the source code for all the figures used in the main notebook. Feel free to use these as reference/inspiration when working on your own analysis methods!

## <font color="green"> Game Log Vis

In [2]:
def game_plots(glog):
    # Plot the agent pos, a different color for each episode
    fig, ax = plt.subplots(2, 1, figsize=(10,9))
    for episode in glog['episode'].unique():
        ep_log = glog[glog['episode'] == episode]
        ax[0].plot(ep_log['agent_pos_x'], ep_log['agent_pos_y'], label=f'Episode {episode}')
    ax[0].set_xlabel('x')
    ax[0].set_ylabel('y')
    ax[0].set_title('Agent Position')
    ax[0].legend()

    # Plot the reward per episode

    reward_for_plots = np.array(glog['reward'], dtype=object)
    #ax[1].plot(game_log['reward'])
    ax[1].plot(reward_for_plots)
    ax[1].set_xlabel('Episode')
    ax[1].set_ylabel('Reward')
    ax[1].set_title('Reward per Episode')

    plt.show()

def speed_plots(glog):
    # Plot turning and forward speed
    fig, ax = plt.subplots(2, 1, figsize=(10,9))

    turning_speed_for_plots = np.array(glog['turning_speed'], dtype=object)
    moving_speed_for_plots = np.array(glog['moving_speed'], dtype=object)

    #ax[0].plot(game_log['turning_speed'], label='Turn Speed')
    ax[0].plot(turning_speed_for_plots, label='Turn Speed')
    ax[0].set_xlabel('Step')
    ax[0].set_ylabel('Turn Speed')
    ax[0].set_title('Turn Speed')
    ax[0].legend()

    #ax[1].plot(game_log['moving_speed'], label='Forward Speed')
    ax[1].plot(moving_speed_for_plots, label='Forward Speed')
    ax[1].set_xlabel('Step')
    ax[1].set_ylabel('Forward Speed')
    ax[1].set_title('Forward Speed')
    ax[1].legend()

    plt.show()

#game_plots(game_log)
#speed_plots(game_log)

## <font color="Green"> Graph Raster Plots

In [None]:
def plotRaster( sd, title, ax):
    idces, times = sd.idces_times()
    ax.scatter(times,idces,marker='|',s=1)   # Creates spike raster
    ax.set_title( title )
    ax.set_xlabel("Time(s)")
    ax.set_ylabel('Unit #')
    
def Plot_Rasters():
    print(f"Displaying Spike Rasters for Group {group_selected}")
    fig, axs = plt.subplots( nrows=2, figsize=(30, 15) )
    plotRaster(sd_pre, "Before Stimulation", axs[0] )
    plotRaster(sd_post, "After Stimulation", axs[1] )
    plt.show()

#Plot_Rasters()

## <font color="green">Causal Connectivity Plots

In [None]:
def Plot_Causal(): #plot 4 subplots instead
    cc_bef_first = before_first
    cc_bef_multi = before_multi
    cc_aft_first = after_first
    cc_aft_multi = after_multi
    figlayout = """
                AB
                CD
                """
    
    fig, plot = plt.subplot_mosaic(figlayout, figsize=(12,10)) 
    fig.suptitle("Causal Connectivity Matrices")

    #plot before first order
    pltA = plot["A"].imshow(cc_bef_first, cmap='Greens') # plot before CC
    plot["A"].set_title("Before\nFirst Order (10-15ms)")
    plot["A"].set_xlabel("Reactivity Index")
    plot["A"].set_ylabel("Stimulus Index" )
    fig.colorbar(pltA, ax=plot["A"], shrink=0.7)

    # Plot after first order
    pltB = plot["B"].imshow(cc_aft_first, cmap='Greens') # plot after cc
    plot["B"].set_title("After\nFirst Order (10-15ms)")
    plot["B"].set_xlabel("Reactivity Index")
    plot["B"].set_ylabel("Stimulus Index" )
    fig.colorbar(pltB, ax=plot["B"], shrink=0.7)

    # Plot after first order
    pltB = plot["C"].imshow(cc_bef_multi, cmap='Greens') # plot after cc
    plot["C"].set_title("Nth Order (200ms)")
    plot["C"].set_xlabel("Reactivity Index")
    plot["C"].set_ylabel("Stimulus Index" )
    fig.colorbar(pltB, ax=plot["C"], shrink=0.7)

    # Plot after first order
    pltB = plot["D"].imshow(cc_aft_multi, cmap='Greens') # plot after cc
    plot["D"].set_title("Nth Order (200ms)")
    plot["D"].set_xlabel("Reactivity Index")
    plot["D"].set_ylabel("Stimulus Index" )
    fig.colorbar(pltB, ax=plot["D"], shrink=0.7)

#Plot_Causal()

## <font color="green">MEA Layout

In [None]:
def Get_Mapping():
    fig, ax = plt.subplots(figsize=(10,8))
    ax.set_title("Organoid Mapping")

    neurons = pd.DataFrame(columns=mapping.columns)
    for i in range(len(info["stim_patterns"])):
        item = mapping.loc[mapping["electrode"] == info["stim_patterns"][i][0]]
        neurons = pd.concat([neurons, item], ignore_index=True)

    ax.scatter(mapping['x'], mapping['y'], s=2) #plot electrodes, then a subset of 20 of them will be the neuronsin the expr
    ax.scatter(neurons['x'], neurons['y'], s=30, c='r') #plot neurons

    ax.set_xlabel('um')
    ax.set_ylabel('um')
    ax.legend(["Electrodes", "Neurons"])
    # get causal info for stim pattern electrons, those tuples there are the electrodes that are stimulated
    # print(mapping)

#Get_Mapping()

## <font color="green">Spikes Per Rep

In [None]:
def Inter_Reactions(set):
    global spikes;
    if set == "before":
        spikes = spikes_pre
    else:
        spikes = spikes_post
    interact_manual(Plot_Spikes, Count_1=(1, spikes.shape[1]), Count_2=(1, spikes.shape[2]))  

def Plot_Spikes(Count_1, Count_2):
    colors = cm.rainbow(np.linspace(0, 1, spikes.shape[1]))


    for st_i in range(Count_1):
        for r_i in range(Count_2):    
            fig, ax = plt.subplots(1, 1, figsize=(10, 4))    
            for rep in range(spikes.shape[0]):
                x_bias = 0#r_i * 210
                y_bias = 0#st_i * 55

                rep_i = np.ones_like(spikes[rep,st_i,r_i])*rep
                ax.scatter(x_bias + spikes[rep,st_i,r_i]/20, y_bias + rep_i, color=colors[r_i],s=1)#, marker='x')
            ax.set_xlabel('Time (ms)')
            ax.set_ylabel('Stim Rep')
            plt.show()

# Bonus Content!!! :)

Some functions that we do not use in the notebook but might be helpful as a boost when working on your own data analysis. Hope you make the best of them! 

In [None]:
def Pretty_Plot_Raster(sd, title="Spike Raster", l1=-10, l2=False, xsize=10, ysize=6, analize=False):
    """
    Plots a configuable raster plot of the spike data.
        sd : spike data object from braingeneers
        title : Title of the plot
        l1 : start time in seconds
        l2 : end time in seconds
        xsize : width of the plot
        ysize : height of the plot
        analize : If True, will plot the population rate as well
    """

    if l2==False:
        l2 = sd.length / 1000 + 10
    
    idces, times = sd.idces_times()
    
    if analize == True:
        # Get population rate for everything
        pop_rate = sd.binned(bin_size=1)  # in ms
        # Lets smooth this to make it neater
        sigma = 5
        pop_rate_smooth = gaussian_filter1d(pop_rate.astype(float), sigma=sigma)
        t = np.linspace(0, sd.length, pop_rate.shape[0]) / 1000

        # Determine the stop_time if it's not provided
        if l2 is None:
            l2 = t[-1]

        # Filter times and idces within the specified start and stop times
        mask = (times >= l1 * 1000) & (times <= l2 * 1000)
        times = times[mask]
        idces = idces[mask]

    fig, ax = plt.subplots(figsize=(xsize, ysize))
    fig.suptitle(title)
    ax.scatter(times/1000,idces,marker='|',s=1)
    
    if analize == True:
        ax2 = ax.twinx()
        ax2.plot(t, pop_rate_smooth, c='r')
        ax2.set_ylabel('Firing Rate')
        
    ax.set_xlabel("Time(s)")
    ax.set_ylabel('Unit #')
    plt.xlim(l1, l2)
    plt.show()

Matrix functions

In [None]:
def correlation(sd):
    """
    Returns the correlation matrix of a spike data object
        sd : spike data object from braingeneers
        returns : N x N matrix of correlation values
    """
    
    corr = np.zeros((sd.N,sd.N)) #inds by inds
    
    dense_raster = sd.raster(bin_size=1) # in ms
    sigma = 5                            # Blur it
    dense_raster = gaussian_filter1d(dense_raster.astype(float),sigma=sigma)
    corr=np.corrcoef( dense_raster )
        
    return corr;

def get_sttc(sd):
    """
    Returns the spike time tiling coefficient of a spike data object
        sd : spike data object from braingeneers
        returns : N x N matrix of STTC values
    """

    sttc = sd.spike_time_tilings()
    return sttc

def eigenvalues_eigenvectors(sd): # gets the eigenvalues and eigenvectors of a matrix
    """
    returns the eigenvalues and eigenvectors of a matrix
        sd : spike data object from braingeneers
        returns : eigenvalues, eigenvectors
    """

    W, U = np.linalg.eigh(sd)
    # The rank of A can be no greater than the smaller of its
    # dimensions, so cut off the returned values there.
    rank = min(*sd.shape)
    U = U[:,-rank:]
    sgn = (-1)**(U[0,:] < 0)
    # Also reverse the order of the eigenvalues because eigh()
    # returns them in ascending order but descending makes more sense.
    return W[-rank:][::-1], (U*sgn[np.newaxis,:])[:, ::-1]

Eigendecomposition 

In [None]:
def plot_basis(sd, method):
    """
    Plots the first 5 eigenvectors of the correlation or STTC matrix.
        sd : spike data object from braingeneers
        method : "Correlation" or "STTC"
    
    """
    
    if method == "Correlation":
        Corr = correlation(sd)
        Wcorr, Ucorr = eigenvalues_eigenvectors(Corr)
        A = Ucorr
    else:
        STTC = get_sttc(sd)
        Wsttc, Usttc = eigenvalues_eigenvectors(STTC)
        A = Usttc
        
    plt.figure(figsize=(6,8))
    for i in range(5):
        if i: plt.xticks([])
        plt.subplot(5, 1, i+1)
        plt.stem(A[:,i])
    plt.xlabel('Observation Dimension')
    plt.suptitle('First 5 Eigenvectors', y=0.92)    

#plot_basis(sd_pre, "STTC")

def plot_evectmatrix(sd):
    """
    Plots the eigenvectors of the correlation and STTC matrices for a given spike data object.
        sd : spike data object from braingeneers
    """
    
    fig, plot = plt.subplot_mosaic("AB", figsize=(14,7))
    
    corr = correlation(sd)
    sttc = get_sttc(sd)

    Wcorr, Ucorr = eigenvalues_eigenvectors(corr)
    Wsttc, Usttc = eigenvalues_eigenvectors(sttc)

    # Plot Correlation Matrix
    pltA = plot["A"].imshow(Ucorr[:,:2*len(Ucorr)].T, interpolation='none', cmap="magma")
    #plot["A"].gca().set_aspect('auto')
    plot["A"].set_ylabel('Eigenvector Number')
    plot["A"].set_xlabel('Observation Dimension')
    plot["A"].set_title('Eigenvectors of Correlation')
    fig.colorbar(pltA, ax=plot["A"], shrink=0.7)
    
    # Plot STTC matrix
    pltA = plot["B"].imshow(Usttc[:,:2*len(Usttc)].T, interpolation='none', cmap="magma")
    #plot["B"].gca().set_aspect('auto')
    plot["B"].set_ylabel('Eigenvector Number')
    plot["B"].set_xlabel('Observation Dimension')
    plot["B"].set_title('Eigenvectors of STTC')
    fig.colorbar(pltA, ax=plot["B"], shrink=0.7)

#plot_evectmatrix(sd_pre)
#plot_evectmatrix(sd_post)

# <font color="red">Bookend

In [None]:
now = datetime.now(Timezone)
printNow = now.strftime("%Y/%m/%d %H:%M:%S")

print(f"Done at: {printNow}")