# Workflows

Usually, we want to run more complex pipelines.

The [workflow object](http://nipy.org/nipype/api/generated/nipype.pipeline.engine.html#workflow
)
>Controls the setup and execution of a pipeline of processes.


In [1]:
import os
try:
    os.chdir(notebook_path)
except:
    notebook_path = os.path.abspath('.')

output_path = os.path.abspath('outputs')
if not os.path.exists(output_path):
    os.mkdir(output_path)
    
wd_path = os.path.join(output_path, '04_workflow')
if not os.path.exists(wd_path):
    os.mkdir(wd_path)
os.chdir(wd_path)
print(wd_path)

/Users/franzliem/Dropbox/Workspace/nipype_notebooks/notebooks/outputs/04_workflow


In [2]:
data_path = os.path.join(notebook_path, 'data')
funct_file = os.path.join(data_path, 'ds107/sub001/BOLD/task001_run001/bold.nii.gz')
print(funct_file)

/Users/franzliem/Dropbox/Workspace/nipype_notebooks/notebooks/data/ds107/sub001/BOLD/task001_run001/bold.nii.gz


# A very simple workflow

Let's start with basic workflow definition

In [3]:
from nipype.pipeline.engine import Node, Workflow
from nipype.interfaces.nipy.preprocess import Trim

workflow_path = os.path.join(wd_path, 'wf_1')
wf = Workflow(name='my_first_wf')
wf.base_dir = workflow_path
wf.config['execution']['crashdump_dir'] = os.path.join(workflow_path, 'crash')
wf.config['execution']['remove_unnecessary_outputs'] = False

Let's define the first node of our wf

In [4]:
trim = Node(Trim(), 'trim')
trim.inputs.in_file = funct_file
trim.inputs.end_index = 10

  from ..utils import mahalanobis
  import nipy.labs.glm.glm as GLM


If we have only one node in our wf we can add the node to the workflow with __wf.add_nodes()__ and run the entire workflow with __wf.run()__

In [5]:
wf.add_nodes([trim])
wf.run()

INFO:workflow:['check', 'execution', 'logging']
INFO:workflow:Running serially.
INFO:workflow:Executing node trim in dir: /Users/franzliem/Dropbox/Workspace/nipype_notebooks/notebooks/outputs/04_workflow/wf_1/my_first_wf/trim


<networkx.classes.digraph.DiGraph at 0x10e5a73d0>

## Now inspect the created directories

In [24]:
!ls wf_1/my_first_wf/

d3.js       graph.json  graph1.json index.html  [1m[34mtrim[m[m


In [25]:
!ls wf_1/my_first_wf/trim

_0x199322f066e7989f8a8b2bbc929721a4.json
_inputs.pklz
_node.pklz
[1m[34m_report[m[m
bold_trim.nii.gz
result_trim.pklz


# A more complex workflow

Let's start with wf definition

In [8]:
from nipype.pipeline.engine import Node, Workflow
from nipype.interfaces.nipy.preprocess import Trim, FmriRealign4d

workflow_path = os.path.join(wd_path, 'wf_2')
wf = Workflow(name='my_second_wf')
wf.base_dir = workflow_path
wf.config['execution']['crashdump_dir'] = os.path.join(workflow_path, 'crash')
wf.config['execution']['remove_unnecessary_outputs'] = False

Now we define 2 nodes

In [9]:
trim = Node(Trim(), 'trim')
trim.inputs.in_file = funct_file
trim.inputs.end_index = 10

realign = Node(FmriRealign4d(), 'realign')
realign.inputs.tr = 3.
realign.inputs.time_interp = True
realign.inputs.slice_order = range(0,35)

  warn(msg)


We now need to add nodes to our workflow and connect node inputs and outputs.

Instead of using `add_nodes()` we now use __wf.connect()__

Using the following command we can connect the 'out_file' field of trim with the 'in_file' field of realign.

In [10]:
wf.connect(trim, 'out_file', realign, 'in_file')

and run the entire workflow

In [11]:
wf.run()

INFO:workflow:['check', 'execution', 'logging']
INFO:workflow:Running serially.
INFO:workflow:Executing node trim in dir: /Users/franzliem/Dropbox/Workspace/nipype_notebooks/notebooks/outputs/04_workflow/wf_2/my_second_wf/trim
INFO:workflow:Executing node realign in dir: /Users/franzliem/Dropbox/Workspace/nipype_notebooks/notebooks/outputs/04_workflow/wf_2/my_second_wf/realign
  start=self.inputs.start)
  ni_img = nipy2nifti(img, data_dtype = io_dtype)


<networkx.classes.digraph.DiGraph at 0x10e5a79d0>

## Now inspect the created directories

In [19]:
!ls wf_2/my_second_wf

d3.js                  graph1.json            [1m[34mrealign[m[m
graph.dot              graph_detailed.dot     [1m[34mtrim[m[m
graph.dot.svg          graph_detailed.dot.svg
graph.json             index.html


In [20]:
!ls wf_2/my_second_wf/trim

_0x199322f066e7989f8a8b2bbc929721a4.json
_inputs.pklz
_node.pklz
[1m[34m_report[m[m
bold_trim.nii.gz
result_trim.pklz


In [21]:
!ls wf_2/my_second_wf/trim

_0x199322f066e7989f8a8b2bbc929721a4.json
_inputs.pklz
_node.pklz
[1m[34m_report[m[m
bold_trim.nii.gz
result_trim.pklz


# Run workflows

* `wf.run()` runs the workflow on your machine with in one process (core) (equal to `wf.run(plugin='Linear')`)


* `wf.run(plugin='MultiProc', plugin_args={'n_procs' : 2})` runs it on your machine using 2 process in parallel


* `wf.run(plugin='CondorDAGMan')` submits jobs to condor


see http://www.mit.edu/~satra/nipype-nightly/users/plugins.html