In [26]:
import nest_asyncio

nest_asyncio.apply()

import pydra

In [None]:

@pydra.mark.task
def add_var(a, b) ->{'sum_a_b': int} :
    return a + b

task1 = add_var()

task1.inputs.a = 3
task1.inputs.b = 6

task1()


## create workflows using pydra

First we will define the functions used within the workflows


In [None]:
@pydra.mark.task
def add_var(a, b):
    return a+b

@pydra.mark.task
def power(a, n=2):
    return a**n

@pydra.mark.task
def mult_var(a,b):
    return a*b

@pydra.mark.task
def add_two(a):
    return a+2


Now, we define some workflows to get a feeling of how to connect the different task within a workflow

In [None]:
wf1 = pydra.Workflow(name= 'wf1', input_spec=['x','y'], x=3,y=4)

wf1.add(add_var(name='sum_x_y', a=wf1.lzin.x, b=wf1.lzin.y))

wf1.set_output([('out', wf1.sum_x_y.lzout.out)])

with pydra.Submitter(plugin='cf') as sub:
    sub(wf1)


wf1.result()



Now, that we have created a simple worflow lets do something slightly more complex and interconnect different task within a workflow(output of one task should be the input of another task)

In [None]:
wf2=pydra.Workflow(name= 'wf2', input_spec=['x','y'], x=3,y=4)  
# Adding the two numbers
wf2.add(add_var(name='sum_x_y', a=wf2.lzin.x, b=wf2.lzin.y))
# Multiply the two numbers
wf2.add(mult_var(name='mult_x_y', a=wf2.lzin.x, b=wf2.lzin.y))
# take the values of the above operations and feed both of them into the power function using a splitter
#wf2.add(power(name='power', a=[wf2.sum_x_y.lzout.sum_a_b, wf2.mult_x_y.lzout.out]).split('a'))
wf2.add(power(name='power', a= wf2.sum_x_y.lzout.out))
# adding two to the given output
wf2.add(add_two(name='add_two', a=wf2.mult_x_y.lzout.out))

wf2.set_output([('out_p', wf2.power.lzout.out),('out_other', wf2.add_two.lzout.out)])


with pydra.Submitter(plugin='cf') as sub:
    sub(wf2)


wf2.result()



### create workflows containing shell commands

In [None]:
cmd = 'pwd'
shellCommand = pydra.ShellCommandTask(name= 'printwd', executable=cmd)

with pydra.Submitter(plugin='cf') as sub:
    sub(shellCommand)


shellCommand.result()
# produces weird output for working directory, Check and issue on Github!

In [None]:
cmd =['echo','hail','pydra'] 
shellCommand = pydra.ShellCommandTask(name= 'printwd', executable=cmd)
print('cmdline =', shellCommand.cmdline)

with pydra.Submitter(plugin='cf') as sub:
    sub(shellCommand)


shellCommand.result()

### Create more specific (to the domain of neuroimaging) tasks and/or Workflows

In [30]:

data_path = '/data2/egapontseva/MEG_QC_stuff/data/from openneuro/ds003483/sub-009/ses-1/meg/sub-009_ses-1_task-deduction_run-1_meg.fif'

raw = mne.io.read_raw_fif(data_path, preload=True)


Opening raw data file /data2/egapontseva/MEG_QC_stuff/data/from openneuro/ds003483/sub-009/ses-1/meg/sub-009_ses-1_task-deduction_run-1_meg.fif...
    Range : 60000 ... 1255999 =     60.000 ...  1255.999 secs
Ready.
Reading 0 ... 1195999  =      0.000 ...  1195.999 secs...


In [31]:
import pydra
import mne
from pydra.engine.task import TaskBase

l_freq = 0.5
h_freq = 100
#data_path = '/data2/egapontseva/MEG_QC_stuff/data/from openneuro/ds000117/sub-01/ses-meg/meg/*run-01_meg.fif'

# Define the Pydra workflow
workflow_name = "mne_filter_workflow"
input_spec = ["data_path", "l_freq", "h_freq"]
mne_filter_wf = pydra.Workflow(name=workflow_name, input_spec=input_spec)

# Load MEG data from file
@pydra.mark.task
def load_meg_data(data_path):
    raw = mne.io.read_raw_fif(data_path, preload=True)
    return raw
@pydra.mark.task
def apply_mne_filters(raw, l_freq, h_freq):
    filt_raw = raw.copy().filter(l_freq, h_freq, fir_design='firwin')
    return filt_raw


# Load data with MNE
mne_filter_wf.add(load_meg_data(name='load_meg_data', data_path= data_path))

# Apply MNE filtering methods to the data

mne_filter_wf.add(apply_mne_filters(name= 'filtering' , raw= mne_filter_wf.load_meg_data.lzout.out, l_freq =l_freq, h_freq=h_freq))

# Specify the output of the workflow
mne_filter_wf.set_output([
    ("filtered_data", mne_filter_wf.filtering.lzout.out)
])

# Submit the workflow for execution using the 'cf' plugin
with pydra.Submitter(plugin='cf') as sub:
    sub(mne_filter_wf)

# View the output of the workflow
print(mne_filter_wf.result())


Opening raw data file /data2/egapontseva/MEG_QC_stuff/data/from openneuro/ds003483/sub-009/ses-1/meg/sub-009_ses-1_task-deduction_run-1_meg.fif...
    Range : 60000 ... 1255999 =     60.000 ...  1255.999 secs
Ready.
Reading 0 ... 1195999  =      0.000 ...  1195.999 secs...
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 1e+02 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 100.00 Hz
- Upper transition bandwidth: 25.00 Hz (-6 dB cutoff frequency: 112.50 Hz)
- Filter length: 6601 samples (6.601 sec)

Result(output=Output(filtered_data=<Raw | sub-009_ses-1_task-deduction_run-1_meg.fif, 320 x 1196000 (1196.0 s), ~2.86 GB, data loaded>), runtime=None, erro

The filtering workflow runs through lets now try some workflows with embedded megqc functions

### MEGQC Workflows

In [7]:
import meg_qc  
from meg_qc.meg_qc_pipeline import make_derivative_meg_qc



Using matplotlib as 2D backend.
