# Snakemake Introduction to Elephant
## Workflow management based on a electrophysiology example

<table  bgcolor="#000000"><tr>
<td><img src=logos/snakemake_logo.svg alt="Drawing" style="width: 300px;"/>
<td><img src=http://neo.readthedocs.org/en/latest/_images/neologo.png alt="Drawing" style="width: 400px;"/>
<td><img src=http://elephant.readthedocs.org/en/latest/_static/elephant_logo_sidebar.png alt="Drawing" style="width: 400px;"/>
</tr></table>

In this tutorial I will first introduce the workflow management system [snakemake](https://snakemake.readthedocs.io) and show how this can be used for analysis of electrophysiology data using the electrophysiology analysis toolkit [Elephant](http://neuralensemble.org/elephant). For snakemake as well as Elephant tutorials exist already as part of the documentation and were partially reused in the tutorial presented here.

*NOTE: When running this tutorial locally, please first install the [requirements](environment.yml) to ensure the examples are working.*

## Workflow management - Do I need this?

There is a number of reasons for managing your managing workflows besides the classical 'I always run first script a and then script b':
* **Growing Complexity** When starting a seemingly small and simple project everything is still pretty easy, but typically projects tend to grow beyond the initially expected size and complexity. This affects two different aspects 1) the dependency between different steps in workflow as well as 2) the dependency of your analysis results on version of intermediate steps in the pipeline (data as well as code).
* **Collaboration & Sharing** Some day a collegue of yours wants to use your workflow or you are required to publish the analysis together with the manuscript presenting the results of your work to the scientific community. Instead of writing a book about which versions of what programm you installed in which order, wouldn't it be nice to have a structured and self explanatory of your analysis steps already at hand?
* **Software Evolution** Luckily, software develops and bugs are fixed from time to time. Unfortunately this also implies that your workflow might break unexpectedly upon updating your system. Having a well defined environment to run your analysis in is essential for reproducibility of your results.

## Snakemake  <img src=logos/snakemake_logo.svg alt="Drawing" style="width: 100px;"/>

Snakemake helps you to structure your workflow by providing a framework to specify the dependencies between individual steps in your analysis workflow. By doing so it enforces a modular structure in the project and allows to specify the (python) software versions used in each step of the process.

The workflow definition according to snakemake is file based, i.e. each step in the workflow is specified via the required input files and the generated output files, as you might now from common [`Makefiles`](https://en.wikipedia.org/wiki/Make_(software)). Each step is defined in a `rule`, specifying input and output files as well as instructions on how to get from the input to the output files. Here the instructions to convert `fileA.txt` into `fileB.txt` are a simple copy performed in a shell:

In [None]:
%%writefile Snakefile
rule:
    input: 'fileA.txt'
    output: 'fileB.txt'
    shell: 'cp fileA.txt fileB.txt'

For snakemake the workflow definition needs to be specified in a `Snakefile` and can be executed by calling `snakemake` in a terminal in the same location as the `Snakefile`. Here the example rule above has been exported into a [`Snakefile`](Snakefile) using the `%%writefile` jupyter magic command.

Now we can ask snakemake to generate `fileB.txt` for us:

In [None]:
%%sh
snakemake

This fails with snakemake complaining about
```
Missing input files for rule 1:
fileA.txt
```
Which is correct, since there is no `fileA.txt` present to generate `fileB.txt` from. So let's add second rule which is capable of generating `fileA.txt` without required inputfiles.


In [None]:
%%writefile -a Snakefile
rule:
    output: 'fileA.txt'
    shell: 'touch fileA.txt'

And ask snakemake again to generate the `fileB.txt` for us

In [None]:
%%sh
snakemake

Internally snakemake is first resolving the set of rules into a directed acyclic graph (dag) to determine in which order the rules neet do be executed. We can generate a visualization of the workflow using the `--dag` flag in combination with `dot` and `display` (for local notebook instances) or save the graph as svg (e.g. for remote instances).

In [None]:
%%sh
snakemake --dag | dot | display
snakemake --dag | dot -Tsvg > dag0.svg

The resulting graph shows the dependencies between the two rules, which were automatically enumerated. The line style (continuous/dashed) indicated whether the rules were already executed or not.

![DAG](dag0.svg)

We can also provide explicit names for rules to make the graph better human readable:

In [None]:
%%writefile Snakefile
rule copy_A_to_B:
    input: 'fileA.txt'
    output: 'fileB.txt'
    shell: 'cp {input} {output}'
rule create_A:
    output: 'fileA.txt'
    shell: 'touch fileA.txt'

In [None]:
%%sh
snakemake --dag | dot | display
snakemake --dag | dot -Tsvg > dag1.svg

![DAG](dag1.svg)

Here we already used a different notation to specify in the shell comand `cp {input} {output}` instead of explicitely repeating the input and output filenames. These placeholders will be substituted by snakemake during the execution by the filenames defined as `input` / `output`. We can use the same notation to generalize the required input of the rule depending on the output, e.g. we permit the copy rule to work for arbitrary files having a certain naming scheme. Here a new folder `new_folder` is automatically generated for the copied files.

In [None]:
%%writefile Snakefile
rule copy_to_new_folder:
    input: 'original_data/file{id}.txt'
    output: 'new_data/file{id}.txt'
    shell: 'cp {input} {output}'
rule create_file:
    output: 'original_data/file{id}.txt'
    shell: 'touch {output}'

For running the workflow now, we need to specify, which file we actually need as a final result and snakemake takes care of the individual steps to generate that file. We specify the desired output file as a snakemake argument:

In [None]:
%%sh
snakemake new_data/fileZ.txt --dag | dot | display
snakemake new_data/fileZ.txt --dag | dot -Tsvg > dag2.svg
snakemake new_data/fileZ.txt

To generate a set of output files, we can either request these individually when running snakemake, e.g. using `snakemake -np new_folder/file{0,1,2,3,4,5,6,7,8,9}.txt`. In case the workflow output is not being changed frequently, it is also possible to add a final rule (conventionally named 'all'), which requests all desired output files of the workflow:

In [None]:
%%writefile Snakefile
rule all:
    input: expand('new_data/file{id}.txt', id=range(10))
rule copy_to_new_folder:
    input: 'original_data/file{id}.txt'
    output: 'new_data/file{id}.txt'
    shell: 'cp {input} {output}'
rule create_file:
    output: 'original_data/file{id}.txt'
    shell: 'touch {output}'

In [None]:
%%sh
snakemake --dag | dot | display
snakemake --dag | dot -Tsvg > dag3.svg

Here I used the snakemake function `expand`, which extends a given statement (here `new_folder_file{id}.txt`) for all combinations of parameters provided (here `id` values from 0 to 10). This permits to easily applied a set of rules to a number of different files.

![DAG](dag3.svg)

Typically, analysis is a bit more complicated than creating empty files and copying them from A to B using shell commands. Snakemake also support a number of different execution methods
* in a shell (as used above)
* in python (using run:)
* run python/R/Markdown scripts directly (using script:)
As an example we can use a small python script to generate our initial data files and store a (randomly generated) value. The Python script would look like this:

In [None]:
%%writefile generate_data.py
import sys
import numpy as np

def generate_random_data(output_filename):
    # write a random number in an output file
    f = open(output_filename, "w")
    f.write(np.random.random())
    f.close()

# extracting the output filename from the command line parameters provided
output_filename = sys.argv[1]
generate_random_data(output_filename)

The corresponding snakemake rule now needs to provide the argument to the `generate_data.py` script:

In [None]:
%%writefile Snakefile
rule all:
    input: expand('new_data/file{id}.txt', id=range(10))
rule copy_to_new_folder:
    input: 'original_data/file{id}.txt'
    output: 'new_data/file{id}.txt'
    shell: 'cp {input} {output}'
rule generate_data:
    output: 'original_data/file{id}.txt'
    run: 'generate_data.py {output}'

In [None]:
%%sh
snakemake --dag | dot | display
snakemake --dag | dot -Tsvg > dag4.svg

Additional features worth having a look at
* conda integration (--use-conda flag)
* test runs (--dryrun)
* print shell commands (-p)
* ...
* [snakemake documentation](https://snakemake.readthedocs.io) and [FAQ](https://snakemake.readthedocs.io/en/stable/project_info/faq.html)

## Utilizing Snakemake for Data Analysis

We will present a simple data analysis workflow based on a published dataset of complex electrophysiology data using the electrophysiology analysis toolkit [Elephant](http://neuralensemble.org/elephant). Elephant is based on the [Neo](https://neo.readthedocs.io) data framework, which will first introduce based on artificially generatey spiking data with a known correlation.

## Neo - the data framework <img src=http://neo.readthedocs.org/en/latest/_images/neologo.png alt="NeoLogo" style="width: 400px;"/>    

![NeoDataModel](https://neo.readthedocs.io/en/0.7.0/_images/base_schematic.png)
*Neo* provides a set of classes for representing time series data (`SpikeTrain`, `AnalogSignalArray`, etc.), for representing the hierarchical arrangement of data in an experiment (`Segment`, `Block`) and for representing the relationship between spike trains and recording channels (`Unit`). Neo is used by a number of data visualization tools ([OpenElectrophy](http://neuralensemble.org/OpenElectrophy), [SpykeViewer](https://spyke-viewer.readthedocs.org/)) and by the [PyNN](http://neuralensemble.org/PyNN) metasimulator. Neo offers reading capabilities for a large number of proprietary electrophysiology data formats and conversion capability to generic open source data formats/models.

The Neo data model consists of container (Block, Segment, ChannelIndex, Unit) and data objects (AnalogSignals, Spiketrains, Epoch, Events). Container objects provide the relation between the data stored in the Neo structure. Here, we consider only Block and Segment objects as containers. Blocks are designed to contain everything related to a whole recording session and link to Segments, which hold data belonging to a common time frame. For data objects we will use SpikeTrains, designed to capture the time series describing the occurrence of spikes together with the corresponding waveforms. All Neo data objects are derivatives of numpy arrays enhanced with the handling of physical quantities, minimal metadata as well as a generic mechanism to add custom metadata.

Performing the analysis requires minimum 3 steps:
* getting / generating the data
* running the main analysis
* generating result plots

Before running the analysis on experimental data, let's generate some data with known ground truth. Here Elephant provides a number of methods to generate spiketrain activity with defined statistical properties. The simplest would be to have independent spike time (Poisson process). Since we later want to detect higher order correlations, we will use a Poisson process as background activity and add correlated spikes to it.

In [None]:
# imports for data handling and visualization
from quantities import Hz, ms
from elephant.spike_train_generation import homogeneous_poisson_process, compound_poisson_process
import neo
import numpy as np
import matplotlib.pyplot as plt

Neo and Elephant are handling physical units consistently during the analysis by using the python module [quantities](https://python-quantities.readthedocs.io). This also requires parameters to be supplied in the correct dimension, such that the physical units can be matched during analysis. In general Neo objects capture all minimal information relevant for the interpretation of the data. In case of the spiketrain, this encompasses the start and stop times of recording/spike extraction as well the sampling_rate and potential custom annotations in form of a dictionary.

In [None]:
spiketrain = homogeneous_poisson_process(20*Hz, 0*ms, 1000*ms)
print('The spiketrain', spiketrain)
print('Spiketrain attributes and physical units')
print(['{}: {}'.format(att, getattr(spiketrain,att)) for att in ['t_start', 't_stop', 'sampling_rate', 'annotations']])
print(['{}: {}'.format(att, getattr(spiketrain,att).units.dimensionality) for att in ['t_start', 't_stop', 'sampling_rate']])

An a first rule in or new workflow, let's generate multiple datasets with spiking activity and save them for future analysis steps. From the variety of file formats supported by Neo NIX has an hdf5 backend. Let's implement a virtual expiment, generating 100 poisson spiketrains stored in the NIX framework:

In [None]:
def generate_poisson_data(output_file, n=10):
    with neo.io.NixIO(output_file, 'ow') as io:
        block = neo.Block(experiment='poisson')
        block.segments.append(neo.Segment(name='trial 1'))
        block.segments[0].spiketrains = [homogeneous_poisson_process(20*Hz, 0*ms, 1000*ms).rescale('s') for i in range(n)]
        block.create_relationship()
        io.write_block(block)
    
generate_poisson_data('original_data/example_dataset.nix')

We export this piece of code into a standalone script so we can use it in the snakemake workflow:

In [None]:
%%writefile generate_poisson.py
import sys
import neo
import quantities as pq
from quantities import Hz, ms
from elephant.spike_train_generation import homogeneous_poisson_process, compound_poisson_process
def generate_data(output_file, n=10):
    with neo.io.NixIO(output_file, 'ow') as io:
        # generate neo structure
        block = neo.Block(experiment='poisson')
        block.segments.append(neo.Segment(name='trial 1'))
        # generate correlated spike trains
        sts = compound_poisson_process(rate=5*pq.Hz, A=[0]+[0.90]+[0]*9+[0.1], t_stop=10*pq.s)
        # add background poisson spike trains
        for i in range(89):
            sts.append(homogeneous_poisson_process(rate=5*pq.Hz, t_stop=10*pq.s))
        # block.segments[0].spiketrains = [homogeneous_poisson_process(20*Hz, 0*ms, 1000*ms).rescale('s') for i in range(n)]
        block.segments[0].spiketrains.extend(sts)
        block.create_relationship()
        io.write_block(block)
    
if __name__=='__main__':
    generate_data(*sys.argv[1:])

In [None]:
import os
from generate_poisson import generate_data
if not os.path.exists('original_data'):
    os.mkdir('original_data')
generate_data('original_data/dataset6.nix')

The first rule in the workflow know how to utilize the script to generate new datasets:

In [17]:
%%writefile Snakefile
rule all:
    input: expand('original_data/dataset{id}.nix', id=range(10))
        
rule generate_data:
    output: 'original_data/dataset{id,[0-9]}.nix'
    shell: 'python generate_poisson.py {output}'

Overwriting Snakefile


And running the workflow. The output files should appear [here](original_data).

In [2]:
%%sh
snakemake -pr

Building DAG of jobs...
Using shell: /bin/bash
Provided cores: 1
Rules claiming more threads will be scaled down.
Job counts:
	count	jobs
	1	all
	10	generate_data
	11

[Thu Jun 13 06:37:16 2019]
rule generate_data:
    output: original_data/dataset4.nix
    jobid: 5
    reason: Missing output files: original_data/dataset4.nix
    wildcards: id=4

python generate_poisson.py original_data/dataset4.nix
  return np.true_divide(other, self)
[Thu Jun 13 06:37:19 2019]
Finished job 5.
1 of 11 steps (9%) done

[Thu Jun 13 06:37:19 2019]
rule generate_data:
    output: original_data/dataset0.nix
    jobid: 1
    reason: Missing output files: original_data/dataset0.nix
    wildcards: id=0

python generate_poisson.py original_data/dataset0.nix
  return np.true_divide(other, self)
[Thu Jun 13 06:37:23 2019]
Finished job 1.
2 of 11 steps (18%) done

[Thu Jun 13 06:37:23 2019]
rule generate_data:
    output: original_data/dataset8.nix
    jobid: 9
    reason: Missing output files: original_data/data

In [None]:
%%sh
snakemake --dag | dot | display
snakemake --dag | dot -Tsvg > dag5.svg

To visualize the generated data, we implement a utility script to load the data and a second script plot the data using matplotlib.

In [3]:
%%writefile plot_data.py
import sys
import numpy as np
import neo
import matplotlib.pyplot as plt
def plot_data(data_filename, plot_filename):
    with neo.io.NixIO(data_filename, 'ro') as io:
        block = io.read_block()
    # plot spiketrains
    for i, spiketrain in enumerate(block.segments[0].spiketrains):
        plt.plot(spiketrain, [i]*len(spiketrain), 'b.')
        # plot patterns if present
        if 'pattern' in spiketrain.annotations:
            pattern_spikes = spiketrain[np.where(spiketrain.annotations['pattern'])]
            plt.plot(pattern_spikes, [i]*len(pattern_spikes), 'r.')
    plt.xlabel('Time [{}]'.format(spiketrain[0].units.dimensionality.latex))
    plt.ylabel('Spiketrains')
    plt.savefig(plot_filename)
    
if __name__=='__main__':
    plot_data(*sys.argv[1:])

Overwriting plot_data.py


In [1]:
from plot_data import plot_data
plot_data('original_data/dataset2.nix', 'original_data/dataset2.png')

In [18]:
%%writefile -a Snakefile

rule plot_data:
    input: '{folder}/dataset{id}.nix'
    output: '{folder}/dataset{id}.png'
    run: 'plot_original_data.py {input} {output}'

Appending to Snakefile


In [None]:
%%sh
snakemake original_data/dataset9.png --dag | dot | display

In the next step we want to run an analysis on the data, which is detecting the injected higher order correlation ins the data. One of the methods suited for the detection of spatio-temporal patterns is `SPADE` [(Quaglio et al, 2017)](https://doi.org/10.3389/fncom.2017.00041)
.

In [3]:
%%writefile run_spade.py
import sys
import numpy as np
import neo
import quantities as pq
import elephant.spade


def clean_nix_annotations(block):
    # removing unnessesary annotations to permit block to be saved again via nix
    objs = [block] + block.filter(data=True, container=True)
    print(len(objs))
    for obj in objs:
        if 'nix_name' in obj.annotations:
            obj.annotations.pop('nix_name')
        if 'neo_name' in obj.annotations:
            obj.annotations.pop('neo_name')
    

def run_spade(data_filename, output_filename):
    with neo.io.get_io(data_filename, 'rw') as io:
        block = io.read_block()
    spiketrains = block.segments[0].spiketrains
    
    # run spade analysis
    patterns = elephant.spade.spade(
        data=spiketrains, binsize=1*pq.ms, winlen=1, dither=5*pq.ms,
        min_spikes=3, n_surr=10, psr_param=[0,0,3],
        output_format='patterns')['patterns']
    
    # convert first pattern into array annotations of spiketrains
    # default: all spikes don't belong to pattern
    for spiketrain in spiketrains:
        spiketrain.annotate(pattern=np.array([False]*len(spiketrain)))
        
    # convert first detected pattern to array annotation
    for pattern_id, pattern in enumerate(patterns):
        neurons = pattern['neurons']
        times = pattern['times']
        lags = pattern['lags']
        for n, neuron in enumerate(neurons):
            if n == 0:
                pattern_times = times
            else:
                pattern_times = times + lags[n-1]
            for pattern_time in pattern_times:
                spike_idx = np.abs(spiketrains[neuron]-pattern_time).argmin()
                spiketrains[neuron].annotations['pattern'][spike_idx] = True

    # overwrite original neo file
    clean_nix_annotations(block)
    with neo.io.NixIO(output_filename, 'ow') as io2: 
        io2.write_block(block)
        
if __name__=='__main__':
    run_spade(*sys.argv[1:])

Overwriting run_spade.py


We add a rule to the snakemake workflow for runnig spade:

In [19]:
%%writefile -a Snakefile

rule run_spade:
    input: 'original_data/dataset{id}.nix'
    output: 'results/dataset{id}.nix'
    run: 'run_spade.py {input} {output}'

Appending to Snakefile


## Apply analysis to experimental data <img src=https://web.gin.g-node.org/img/favicon.png alt="GIN" style="width: 100px;"/>
In the next step we want to apply the same workflow to a set of experimental data. The data are publicly available at the data hosting platform [GIN](https://web.gin.g-node.org/). We are going to download two specific files of the dataset, which are save in BlackRock nev/ns2 format. Since Neo also supports BlackRock formats, we use it to extract the relevant data and save them in the nix format to be consistent with the simulated data.

In [2]:
%%writefile convert_to_nix.py
import neo
import quantities as pq
def convert_data_to_nix(input_filename, nix_filename):
    '''extracting first seconds of first spiketrains into separate nix file''' 
    io = neo.io.BlackrockIO(input_filename)
    original_block = io.read_block()
    
    # Extracting spiketrain and append to new block
    block = neo.Block(experiment='monkey')
    block.segments.append(neo.Segment(name='first seconds of recording'))
    new_spiketrains = [st.time_slice(0*pq.s,10*pq.s) for st in original_block.segments[-1].spiketrains[:100]]
    block.segments[0].spiketrains = new_spiketrains
    
    # save new block
    with neo.io.NixIO(nix_filename, 'ow') as io2:
        io2.write_block(block)

Overwriting convert_to_nix.py


In [3]:
#import neo
#with neo.io.NixIO('original_data/dataset10.nix') as io:
#    bl = io.read_block()
#    print(bl.segments[0].annotations)
#    print(len(bl.segments[0].spiketrains))

from convert_to_nix import convert_data_to_nix
convert_data_to_nix('original_data/dataset10', 'original_data/dataset10.nix')

  if len(data[mask_outside]) > 0:
  if len(data[mask_after_seg]) > 0:


writing


In [20]:
%%writefile -a Snakefile

rule download_data:
    output:
        nev = 'original_data/dataset10.nev',
        ns2 = 'original_data/dataset10.ns2'
    shell: '''
    wget -O {output.nev} https://web.gin.g-node.org/INT/multielectrode_grasp/raw/24cd5caee3ae79066ca37844cab931d04dcad977/datasets/i140703-001.nev
    wget -O {output.ns2} https://web.gin.g-node.org/INT/multielectrode_grasp/raw/24cd5caee3ae79066ca37844cab931d04dcad977/datasets/i140703-001.ns2
    '''
        
rule convert_to_nix:
    input: '{filename}.nev',
           '{filename}.ns2'
    output: '{filename}.nix'
    run: 'convert_to_nix.py {wildcards.filename} {output}'

Appending to Snakefile


In [48]:
%%sh
snakemake original_data/dataset10.nix

Building DAG of jobs...
Using shell: /bin/bash
Provided cores: 1
Rules claiming more threads will be scaled down.
Job counts:
	count	jobs
	1	convert_to_nix
	1

[Thu Jun 13 08:35:54 2019]
rule convert_to_nix:
    input: original_data/dataset10.nev, original_data/dataset10.ns2
    output: original_data/dataset10.nix
    jobid: 0
    wildcards: filename=original_data/dataset10

[33mJob counts:
	count	jobs
	1	convert_to_nix
	1[0m
[31mMissingOutputException in line 11 of /home/julia/presentations/2019-06-20_Toronto/snakemake_elephant_demo/Snakefile:
Missing files after 5 seconds:
original_data/dataset10.nix
This might be due to filesystem latency. If that is the case, consider to increase the wait time with --latency-wait.[0m
[31mExiting because a job execution failed. Look above for error message[0m
Shutting down, this might take some time.
Exiting because a job execution failed. Look above for error message
Complete log: /home/julia/presentations/2019-06-20_Toronto/snakemake_elephan

CalledProcessError: Command 'b'snakemake original_data/dataset10.nix\n'' returned non-zero exit status 1.

In [23]:
%%sh
snakemake results/dataset10.nix results/dataset0.nix --dag | dot | display

Building DAG of jobs...


In [27]:
%%sh
snakemake original_data/dataset10.nev

Building DAG of jobs...
Using shell: /bin/bash
Provided cores: 1
Rules claiming more threads will be scaled down.
Job counts:
	count	jobs
	1	download_data
	1

[Thu Jun 13 08:14:04 2019]
rule download_data:
    output: original_data/dataset10.nev, original_data/dataset10.ns2
    jobid: 0

--2019-06-13 08:14:04--  https://web.gin.g-node.org/INT/multielectrode_grasp/raw/24cd5caee3ae79066ca37844cab931d04dcad977/datasets/i140703-001.nev
Resolving web.gin.g-node.org (web.gin.g-node.org)... 141.84.41.216
Connecting to web.gin.g-node.org (web.gin.g-node.org)|141.84.41.216|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/octet-stream]
Saving to: ‘original_data/dataset10.nev’

     0K .......... .......... .......... .......... ..........  121K
    50K .......... .......... .......... .......... ..........  247K
   100K .......... .......... .......... .......... .......... 6,16M
   150K .......... .......... .......... .......... .......... 12,5

In [None]:
# TODOs:
* get data from gin
* run spade on nev files

In [29]:
import neo
io = neo.io.get_io('/home/julia/presentations/2019-06-20_Toronto/snakemake_elephant_demo/original_data/dataset10.nev')
block = io.read_block(lazy=True)


*Elephant* is a data analysis library built on Neo. It aims to provide a standard library for electrophysiology data analysis, merging functions from OpenElectrophy, SpykeViewer and [NeuroTools](http://neuralensemble.org/NeuroTools).

In [None]:
TOADD
SNAKEMAKE
* wildcards (done)
* different rule execution statements (done)
* conda environments
* expand (done)
* config
* rulegraph
NEO
* class structure
* io overview
* intro to main data objects
ELEPHANT
* spade elephant demo