# Score multi-session events using the replay score from Davidson et al.

In [1]:
import numpy as np
import os
import pandas as pd
import warnings

import nelpy as nel

warnings.filterwarnings("ignore")

### Load experimental data from Google Cloud bucket

In [2]:
import gcsfs
import pandas as pd

fs = gcsfs.GCSFileSystem(project='polar-program-784', token='cloud')
print(fs.ls('kemerelab-data/diba'))

with fs.open('kemerelab-data/diba/gor01vvp01pin01-metadata.h5', 'rb') as fid:
    with pd.HDFStore('gor01vvp01pin01-metadata.h5', mode="r", driver="H5FD_CORE",
            driver_core_backing_store=0,
            driver_core_image=fid.read()
            ) as store:
        df = store['Session_Metadata']
        df2 = store['Subset_Metadata']


['kemerelab-data/diba/', 'kemerelab-data/diba/gor01vvp01-metadata.h5', 'kemerelab-data/diba/gor01vvp01_processed_speed.nel', 'kemerelab-data/diba/gor01vvp01pin01-metadata.h5', 'kemerelab-data/diba/gor01vvp01pin01_processed_speed.nel']


In [3]:
# %%timeit # beware - this will run it 7 times to get a time! 36 s for 1.4 GB file

with fs.open('kemerelab-data/diba/gor01vvp01pin01_processed_speed.nel', 'rb') as fid:
    jar = nel.load_pkl('',fileobj=fid) # currently requires a specific nelpy branch

exp_data = jar.exp_data
aux_data = jar.aux_data
#del jar


### Define subset of sessions to score

In [4]:
# restrict sessions to explore to a smaller subset
min_n_placecells = 20
min_n_PBEs = 30 # 27 total events ==> minimum 21 events in training set

df2_subset = df2[(df2.n_PBEs >= min_n_PBEs) & (df2.n_placecells >= min_n_placecells)]

sessions = df2_subset['time'].values.tolist()
segments = df2_subset['segment'].values.tolist()

print('Evaluating subset of {} sessions'.format(len(sessions)))

df2_subset.sort_values(by=['n_PBEs', 'n_placecells'], ascending=[0,0])

Evaluating subset of 18 sessions


Unnamed: 0,animal,month,day,time,track,segment,duration,n_cells,n_placecells,n_PBEs,Notes,prescreen_z
54,gor01,6,9,22-24-40,two,short,1620.0,203,61,301,,
1,gor01,6,7,16-40-19,two,short,1330.0,117,46,277,,
28,pin01,11,1,12-58-54,unknown,long,1670.0,49,21,238,,
29,pin01,11,1,12-58-54,unknown,short,925.0,49,26,222,,
0,gor01,6,7,16-40-19,two,long,1180.0,117,43,150,,
42,gor01,6,9,1-22-43,one,long,1012.0,203,62,117,,
53,gor01,6,9,22-24-40,two,long,912.0,203,66,103,,
43,gor01,6,9,1-22-43,one,short,617.0,203,71,91,,
18,gor01,6,8,21-16-25,two,long,720.0,171,82,57,,
60,vvp01,4,9,17-29-30,one,short,490.0,68,32,46,,


### Parallel scoring

**NOTE:** it is relatively easy (syntax-wise) to score each session as a parallel task, but since the Bayesian scoring takes such a long time to compute, we can be more efficient (higher % utilization) by further parallelizing over events, and not just over sessions. This further level of parallelization makes the bookkeeping a little ugly, so I provide the code for both approaches here.

In [5]:
parallelize_by_session = False
parallelize_by_event = True

n_jobs = 8 # set this equal to number of cores
n_shuffles = 10 # 5000
n_samples = 35000
w=3 # single sided bandwidth (0 means only include bin who's center is under line, 3 means a total of 7 bins)

In [6]:
from dask.distributed import Client
client = Client('dask-scheduler:8786') # fill in value from dask-kubernetes startup if connecting remotely

In [7]:
if parallelize_by_session:
    #from joblib import Parallel, delayed 
    import distributed.joblib
    from joblib import Parallel, parallel_backend, delayed


    # A function that can be called to do work:
    def work_sessions(arg):    

        # Split the list to individual variables:
        ii, bst, tc = arg    

        scores, shuffled_scores, percentiles = nel.analysis.replay.score_Davidson_final_bst_fast(bst=bst,
                                                                                            tuningcurve=tc,
                                                                                            w=w,
                                                                                            n_shuffles=n_shuffles,
                                                                                            n_samples=n_samples)

        return (ii, scores, shuffled_scores, percentiles)

    # List of instances to pass to work():
    parallel_sessions = [(ii, aux_data[session][segment]['PBEs'], aux_data[session][segment]['tc']) for (ii, (session, segment)) in enumerate(zip(sessions, segments))]

    with parallel_backend('dask.distributed', scheduler_host='dask-scheduler:8786'):
        # Anything returned by work() can be stored:
        parallel_results = Parallel(n_jobs=n_jobs, verbose=51)(map(delayed(work_sessions), parallel_sessions))

    # standardize parallel results
    idx = [result[0] for result in parallel_results]

    # check that parallel results came back IN ORDER:
    if nel.utils.is_sorted(idx):
        print('parallel results are ordered...')
    else:
        raise ValueError('results are not ordered! handle it here before proceeding...')

    scores_bayes = [result[1] for result in parallel_results]
    scores_bayes_shuffled = [result[2] for result in parallel_results]
    scores_bayes_percentile = [result[3] for result in parallel_results]

    results = dict()
    for ii, (session, segment) in enumerate(zip(sessions, segments)):
        try:
            results[session][segment] = dict()
        except KeyError:
            results[session] = dict()    
            results[session][segment] = dict()
        results[session][segment]['scores_bayes'] = scores_bayes[ii]
        results[session][segment]['scores_bayes_shuffled'] = scores_bayes_shuffled[ii]
        
        results[session][segment]['scores_bayes_percentile'] = scores_bayes_percentile[ii]

    print('done packing results')

In [8]:

if parallelize_by_event:
    import time
    t0 = time.time()
    
    #from joblib import Parallel, delayed 
    #import distributed.joblib
    #from joblib import Parallel, parallel_backend, delayed

    # A function that can be called to do work:
    def work_events(arg):    
        # Split the list to individual variables:
        session, segment, ii, bst, tc = arg
        scores, shuffled_scores, percentiles = nel.analysis.replay.score_Davidson_final_bst_fast(bst=bst,
                                                                                            tuningcurve=tc,
                                                                                            w=w,
                                                                                            n_shuffles=n_shuffles,
                                                                                            n_samples=n_samples)

        t01 = time.time()
        return (session, segment, ii, scores, shuffled_scores, percentiles)

    # List of instances to pass to work():
    # unroll all events:
    parallel_events = []
    for session, segment in zip(sessions, segments):
        for nn in range(aux_data[session][segment]['PBEs'].n_epochs):
            parallel_events.append((session, segment, nn, aux_data[session][segment]['PBEs'][nn], aux_data[session][segment]['tc']))

    #with parallel_backend('dask.distributed', scheduler_host='dask-scheduler:8786'):
        # Anything returned by work() can be stored:
        #parallel_results = Parallel(n_jobs=n_jobs, verbose=51)(map(delayed(work_events), parallel_events))
        
    parallel_results_future = client.map(work_events, parallel_events)
    parallel_results = client.gather(parallel_results_future)
    
    # standardize parallel results
    bdries_ = [aux_data[session][segment]['PBEs'].n_epochs for session, segment in zip(sessions, segments) ]
    bdries = np.cumsum(np.insert(bdries_,0,0))
    bdries

    sessions_ = np.array([result[0] for result in parallel_results])
    segments_ = np.array([result[1] for result in parallel_results])
    idx = [result[2] for result in parallel_results]

    scores_bayes_evt = np.array([float(result[3]) for result in parallel_results])
    scores_bayes_shuffled_evt = np.array([result[4].squeeze() for result in parallel_results])
    scores_bayes_percentile_evt = np.array([float(result[5]) for result in parallel_results])

    results = {}
    for nn in range(len(bdries)-1):
        session = np.unique(sessions_[bdries[nn]:bdries[nn+1]])
        if len(session) > 1:
            raise ValueError("parallel results in different format / order than expected!")
        session = session[0]
        segment = np.unique(segments_[bdries[nn]:bdries[nn+1]])
        if len(segment) > 1:
            raise ValueError("parallel results in different format / order than expected!")
        segment = segment[0]
        try:
            results[session][segment]['scores_bayes'] = scores_bayes_evt[bdries[nn]:bdries[nn+1]]
        except KeyError:
            try:
                results[session][segment] = dict()
                results[session][segment]['scores_bayes'] = scores_bayes_evt[bdries[nn]:bdries[nn+1]]
            except KeyError:
                results[session] = dict()
                results[session][segment] = dict()
                results[session][segment]['scores_bayes'] = scores_bayes_evt[bdries[nn]:bdries[nn+1]]

        results[session][segment]['scores_bayes_shuffled'] = scores_bayes_shuffled_evt[bdries[nn]:bdries[nn+1]]
        results[session][segment]['scores_bayes_percentile'] = scores_bayes_percentile_evt[bdries[nn]:bdries[nn+1]]

    print('done packing results')

t1 = time.time()    
print('Elapsed time ', t1 - t0)

KeyboardInterrupt: 

### Save results to disk

In [None]:
jar = nel.ResultsContainer(results=results, description='gor01 and vvp01 speed restricted results for best 20 candidate sessions')
jar.save_pkl('score_bayes_all_sessions.nel')