# Spikeinterface Data Model


In [None]:
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)

![Quote](./quote.png)

# The Object of Spikeinterface

## Why not JUST numpy?

![Numpy is great](./numpy.png)

## Because the data in electrophysiology is TOO MASSIVE for most users

In [None]:
num_channels = 384
sampling_frequency = 30_000.0 # Hz

total_time_hours = 24 
total_time_seconds = total_time_hours * 60 * 60

num_samples = int(total_time_seconds * sampling_frequency)
num_channels, num_samples

In [None]:
import numpy as np
np.random.rand(num_channels, num_samples)

## Recording objects in Spikeinterface
* Keep a reference to the data without loading it into memory (memmaps)
* Keep a set of human readable ids to refer to the channels
* Keep a sampling frequency to transform samples to times



![Recording](./recording.png)


#### Generate synthetic data

In [None]:
from spikeinterface.core import generate_recording 

recording = generate_recording(num_channels=3, durations=[10])
recording

#### Setting channel names

In [None]:
recording = recording.rename_channels(new_channel_ids=["a", "b", "c"])  # This is not in-place
recording.get_channel_ids()

### Getting the data

In [None]:
recording.get_traces(start_frame=0, end_frame=3, channel_ids=["a", "c"])

### Selecting pieces of the recording (lazily!)

#### Channels

In [None]:
channel_sliced_recording = recording.select_channels(channel_ids=["a", "b"])
channel_sliced_recording

#### Frames / Time

In [None]:
sliced_recording = recording.frame_slice(start_frame=0, end_frame=1000)
sliced_recording

### Combining recordings

#### Concatenating recordings (across time)

In [None]:
from spikeinterface.core import concatenate_recordings

recording1 = generate_recording(num_channels=3, durations=[10])
recording2 = generate_recording(num_channels=3, durations=[10])

concanted_recordings = concatenate_recordings([recording1, recording2])

assert concanted_recordings.get_duration() == recording1.get_duration()  + recording2.get_duration()


#### Aggregating channels as a single recording

In [None]:
from spikeinterface.core import aggregate_channels

recording1 = generate_recording(num_channels=3, durations=[10], set_probe=False)  # To avoid location check
recording1 = recording1.rename_channels(new_channel_ids=["a", "b", "c"])
recording2 = generate_recording(num_channels=2, durations=[10], set_probe=False)  
recording2 = recording2.rename_channels(new_channel_ids=["d", "e"])

aggregated_recording = aggregate_channels([recording1, recording2])  
assert aggregated_recording.get_num_channels() == 5
assert list(aggregated_recording.get_channel_ids()) == ['a', 'b', 'c', 'd', 'e']  # Failing right now

## Graphical Summary of Recording Operations

![Recording Operations](./recording_operations.png)



## Sorting objects in Spikeinterface

In their most common representation (as loaded by the user), sorting objects are a dictionary of spike trains (represented as frames) and a sampling frequency to situate those frames in time

![Sorting](./sorting.png)

#### Generate synthetic data

In [None]:
from spikeinterface.core import generate_sorting 

sorting = generate_sorting(num_units=3, durations=[10])
sorting

#### Setting the unit ids

In [None]:
sorting = sorting.rename_units(new_unit_ids=["unit1", "unit2", "unit3"])  # This is not in-place
sorting.get_unit_ids()

#### Getting the data

In [None]:
sorting.get_unit_spike_train(unit_id="unit2")

### Selecting pieces of the sorting (lazily!)

#### Units

In [None]:
unit_selected_sorting = sorting.select_units(unit_ids=["unit1", "unit2"])
unit_selected_sorting

#### Frames / Time

In [None]:
sliced_sorting = sorting.frame_slice(start_frame=0, end_frame=1000)
sliced_sorting

### Combining sortings

#### Concatenating sorting objects (across time)

In [None]:
from spikeinterface.core import generate_sorting, concatenate_sortings


duration = 1.0 # Seconds
sorting1 = generate_sorting(num_units=3, durations=[duration], seed=0)
sorting1 = sorting1.rename_units(new_unit_ids=["unit1", "unit2", "unit3"])
sorting2 = generate_sorting(num_units=3, durations=[duration], seed=1)
sorting2 = sorting2.rename_units(new_unit_ids=["unit1", "unit2", "unit3"])

num_samples_sorting1 = sorting1.sampling_frequency * duration
num_samples_sorting2 = sorting2.sampling_frequency * duration

concatenated_sorting = concatenate_sortings(sorting_list=[sorting1, sorting2], total_samples_list=[num_samples_sorting1, num_samples_sorting2])

In [None]:
sorting1.get_unit_spike_train(unit_id="unit1", return_times=True)

In [None]:
sorting2.get_unit_spike_train(unit_id="unit1", return_times=True)

In [None]:
sorting2.get_unit_spike_train(unit_id="unit1", return_times=True) + duration

In [None]:
concatenated_sorting.get_unit_spike_train(unit_id="unit1", return_times=True)

#### Aggregating units as a single sorting object

In [None]:
from spikeinterface.core import generate_sorting, aggregate_units


duration = 10.0 # Seconds
sorting1 = generate_sorting(num_units=3, durations=[duration])
sorting2 = generate_sorting(num_units=2, durations=[duration])


aggregated_sorting = aggregate_units([sorting1, sorting2])


## Graphical Summary of Sorting Operations


![Sorting Operations](./sorting_operations.png)

## Parallel Processing

### Quick reminder about recording

![Recording](./recording.png)

### What do we parallelize over

![Chuking Description](./parallel_processing.png)

### Parameters to control paralell execution 

* lenght of the chunk
    * chunk_duration :  Lenght of the chunk in seconds
    * chunk_size: Number of samples per chunk
    * chunk_memory: Memory usage for each job
    * total_memory Total memory usage 
* n_jobs: Number of jobs to use. With -1 the number of jobs is the same as number of cores
* progress_bar: Whether to show a progress bar
* mp_context: fork, span or forkserver

In [None]:
from spikeinterface.core import generate_recording, write_binary_recording
import tempfile

recording = generate_recording(num_channels=3, durations=[10])


job_kwargs = {"n_jobs":2, "chunk_duration": 5.0, "progress_bar": 1, 'progress_bar': True}


with tempfile.NamedTemporaryFile(suffix='.raw', delete=False) as temp_file:
    temporary_file_path = temp_file.name
    write_binary_recording(recording=recording, file_paths=[temporary_file_path], **job_kwargs)



![job_kwargs](./job_kwargs.png)

### Setting global job_kwargs

In [None]:
from spikeinterface import set_global_job_kwargs, get_global_job_kwargs
from spikeinterface.core import generate_recording, write_binary_recording
import tempfile

recording = generate_recording(num_channels=3, durations=[10])


print(get_global_job_kwargs())

with tempfile.NamedTemporaryFile(suffix='.raw', delete=False) as temp_file:
    temporary_file_path = temp_file.name
    write_binary_recording(recording=recording, file_paths=[temporary_file_path])


set_global_job_kwargs(n_jobs=2, chunk_duration=5.0, progress_bar=True)

with tempfile.NamedTemporaryFile(suffix='.raw', delete=False) as temp_file:
    temporary_file_path = temp_file.name
    write_binary_recording(recording=recording, file_paths=[temporary_file_path])




Stop to mentioned cautionary tales:
* Performance is highly dependent on the data, operations and the hardware.
* The best way to optimize is to try different configurations and see what works best for your data and hardware.
* Threading vs multiprocessing: Threading is generally faster for I/O bound tasks, while multiprocessing is better for CPU bound tasks.
* Anything to add?

### Spike Vector

![Spike Vector](./spike_vector.png)|

In [None]:
from spikeinterface.core import generate_sorting 

sorting = generate_sorting(num_units=3, durations=[10])
sorting.to_spike_vector(concatenated=False)[0]

![Common Chunk](./common_chunk.png)

## Saving Recording and Sorting Objects

### Binary format

#### Recording

In [None]:
from pathlib import Path
from spikeinterface.core import generate_recording

recording = generate_recording(num_channels=3, durations=[10], set_probe=True)
recording = recording.rename_channels(new_channel_ids=["a", "b", "c"])  # This is not in-place
recording.set_property("a_property", ["value1", "value2", "value3"])  # This is in place



job_kwargs={'progress_bar': True, "verbose":True, "n_jobs":2}

folder_path = Path("./test_recording")


binary_recording = recording.save_to_folder(folder=folder_path,  overwrite=True, **job_kwargs)   
binary_recording

#### Sorting

In [None]:
from pathlib import Path
from spikeinterface.core import generate_sorting

sorting = generate_sorting(num_units=3, durations=[10])
sorting = sorting.rename_units(new_unit_ids=["unit1", "unit2", "unit3"])  # This is not in-place

sorting.set_property("a_property", ["value1", "value2", "value3"])  # This is in place


folder_path = Path("./test_sorting")

job_kwargs={'progress_bar': True, "verbose":True, "n_jobs":2}
binary_sorting = sorting.save_to_folder(folder=folder_path,  overwrite=True, **job_kwargs)   
binary_sorting


In [None]:
sorting

### Zarr format

#### Recording

In [None]:
from pathlib import Path
from spikeinterface.core import generate_recording

recording = generate_recording(num_channels=3, durations=[10], set_probe=True)
recording = recording.rename_channels(new_channel_ids=["a", "b", "c"])  # This is not in-place
recording.set_property("a_property", ["value1", "value2", "value3"])  # This is in place

folder_path = Path("./test_recording.zarr")

job_kwargs={'progress_bar': True, "verbose":True, "n_jobs":2}
zarr_recording = recording.save_to_zarr(folder=folder_path,  overwrite=True, **job_kwargs)   
zarr_recording

In [None]:
zarr_recording._root.tree()

#### Sorting

In [None]:
from pathlib import Path
from spikeinterface.core import generate_sorting

sorting = generate_sorting(num_units=3, durations=[10])
sorting = sorting.rename_units(new_unit_ids=["unit1", "unit2", "unit3"])  # This is not in-place

sorting.set_property("a_property", ["value1", "value2", "value3"])  # This is in place


folder_path = Path("./test_sorting.zarr")

job_kwargs={'progress_bar': True, "verbose":True, "n_jobs":2}
zarr_sorting = sorting.save_to_zarr(folder=folder_path,  overwrite=True, **job_kwargs)   
zarr_sorting


In [None]:
zarr_sorting._root.tree()

### Saving is portable

In [None]:
from pathlib import Path
from spikeinterface.core import generate_recording
from spikeinterface.core import load_extractor  

recording = generate_recording(num_channels=3, durations=[10], set_probe=True)


# Save a recording within a nested folder
base_folder = Path.cwd() / "saving_recording_and_moving" 
original_recording_folder = base_folder / "folderA" / "recording_folder"  

recording = recording.save_to_folder(folder=original_recording_folder, overwrite=True)


# We move the folder from its original location one level up to the current folder
another_folder = base_folder / "folderB"
another_folder.mkdir(exist_ok=True)
destination_folder = another_folder / "recording_folder"

original_recording_folder.rename(destination_folder)



load_extractor(file_or_folder_or_dict=destination_folder)

## Saving preprocessing provenance

In [None]:
from pathlib import Path

from spikeinterface.core import generate_recording, load_extractor
from spikeinterface.preprocessing import bandpass_filter, common_reference

# First we simulate having raw data (can be large!) in a folder 
simulated_recording = generate_recording(num_channels=3, durations=[10])
base_folder = Path.cwd() / "working_with_preprocessing_provenance"

raw_data_location = base_folder/ "raw_data_location_folder"
raw_data_location.mkdir(parents=True, exist_ok=True)
recording_saved = simulated_recording.save(folder=raw_data_location, overwrite=True)


# Now our common analysis pipeline starts by loading the raw data
raw_data_recording = load_extractor(file_or_folder_or_dict=raw_data_location)
# And then we apply some preprocessing steps
recording_preprocessed = bandpass_filter(common_reference(raw_data_recording))

# Note that we can save our preprocessed data for faster access afterwards:
# recording_preprocessed.save(folder=Path.cwd() / "preprocessed_data_folder")
# But maybe we only want to save our provenance data, that is, our pre-processing pipeline and parameters
# For this we can save oure preprocessing pipeline as a json file

# dump_to_json without relative_to

json_file_path = base_folder / "analysis_folder" / "preprocessed_pipeline.json"
recording_preprocessed.dump_to_json(file_path=json_file_path)


json_file_path_relative = base_folder / "analysis_folder" / "preprocessed_pipeline_relative.json"
recording_preprocessed.dump_to_json(file_path=json_file_path_relative, relative_to=base_folder)


# This then can be loaded again to recover the pipeline
recovered_pipeline = load_extractor(file_or_folder_or_dict=json_file_path)

# We move the json to a new folder
new_base_folder = Path.cwd() / "new_working_with_preprocessing_provenance"
new_base_folder.mkdir(exist_ok=True)

base_folder.rename(new_base_folder)

new_json_file_path = new_base_folder / "analysis_folder" / "preprocessed_pipeline.json"
assert new_json_file_path.is_file(), "The json file was not moved correctly" 

# Try to load it
try:
    recovered_pipeline = load_extractor(file_or_folder_or_dict=new_json_file_path)
except:
    print("This generated an error because the references to the raw data were absolute")
    
# We can solve this by saving the json file with relative paths

new_json_file_path_with_relative = new_base_folder / "analysis_folder" / "preprocessed_pipeline_relative.json"
assert new_json_file_path_with_relative.is_file(), "The json file with relative was not moved correctly"
recovered_pipeline = load_extractor(file_or_folder_or_dict=new_json_file_path_with_relative, base_folder=new_base_folder)
recovered_pipeline