# PyNIPT
### Version: 0.1.0

- The PyNIPT project aims to develop a comprehensive python module for pipeline development for neuroscience data analysis project that comes with a unique project structure that can improve productivity. This module is optimized to work on linux and unix system, but with windows subsystem linux (WSL), it can also be run under particular build of Windows 10.

- The project structure is originally designed based on BIDS, which is intuitive for organizing Neuroimaging dataset with meta-data as well as other neurological signal dataset such as EEG. However, BIDS does not provide clear guidlines how to deal with preprocessed data and analyzed results which derived from well-organized original data.

- There are several software packages have integrated the BIDS structure, but still, the researcher needs to put extra efforts to organize massive derived data in the case of exploratory approachs, new pipeline development, and optimizing process, which often needs to repeat similar procedure with multiple set of parameters or to test different processing software packages to compare the performances.

- Most popular methods to handle derived data are using prefix or suffix on filename, but since the BIDS filename format is already long enough to contains important information, so increasing the complexity of filename severly reduce readibility of dataset.

- In this study, we aims to develop comprehansive expension of BIDS to integrate processed data, analyzed results, source code or scripts, as well as the logs generated from each processing steps. The PyNIPT module is designed to help organizing these derived data automatically while performing complex processing steps, statistics, and generating report in python environment. In addition to this, we developed plugin API on Python to implement researchers own pipeline, including debugging tools.

### Import PyNIPT module

In [1]:
import pynipt as pn

### Download default plugin
- PyNIPT is a pipeline framework and does not contain any hard-coded interface or wrapper to process data. Instead, it can be implemented via plugin
- The plugins consist of two python files of Interface and Pipeline, and each plugin is built by single python class structure using API. The interface plugin is designed to contain methods to interace with command-line tool or python function. Both cases, command-line tool and python function, the command must take input and output file path as its arguments. The pipeline plugin is designed to execute a interface plugin with serial manner to form a workflow of data processing, while printing out description of each pipeline step during execution to inform required parameters and which process will be performed. 
- The plugin will be initially downloaded to '.pynipt' at home folder during installation. But using below python command, it can be re-downloaded or updated. The custom plugin can be imported instead of using the default plugin. We will provide the details on later section of introducing plugin and API.

In [2]:
pn.update_default_plugin()

Update completed..


### Initiate pipeline instance

- The proposing project structure consists of four major DataClass, which include naive data, the intermediate data, result, and system logs. The naive data must follow the BIDS standard for the data structure. As default, it locates at the 'Data' folder under the project root. The intermediate data component locates at the 'Processing' folder under the project root, and most of the files created during point to point file processing will be stored at this location while keeping the original data structure. The result component locates at the Results folder under the project root. It is designed to contain interpretable results data, so the statistic results, reports, and results from pear-to-point processing can be stored at this location. 
- The Pipeline class in PyNIPT module is the major user-interface(UI) that help the researcher to execute pipeline so that all files can be organized as designed manner. It only takes a project root path as an input argument, which contains naive data under 'Data' folder.

In [2]:
pipe = pn.Pipeline('/Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI_2ses')

** Dataset summary

Path of Dataset: /Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI_2ses
Name of Dataset: 3Drat_fMRI_2ses
Selected DataClass: Data

Subject(s): ['sub-F01', 'sub-F02', 'sub-M01', 'sub-M02']
Session(s): ['ses-01', 'ses-02']
Datatype(s): ['anat', 'func']
Multi session dataset


List of installed pipeline packages:
	0 : UNCCH_CAMRI


#### Structure of each DataClass.
1. 'Data': naive BIDS
2. 'Processing': PipelinePackage - StepPath - Subject - (Session) - files
3. 'Results': PipelinePackage - StepPath - ReportObj
4. 'Logs': DEBUG.log, STDOUT.log, STDERR.log

#### Select pipeline package

The 'pipeline package' is a package of several pipelines that composed of interface commands. To use Pipeline class, the pipeline package must be specified. Researcher can also create empty pipeline package for developing new pipeline from the scratch.

In [3]:
pipe.set_package(0)

Description about this package:


        Standard fMRI pipeline package for the University of North Carolina at Chapel Hill,
        to use for the data analysis services in Center of Animal MRI (CAMRI)
        Author  : SungHo Lee(shlee@unc.edu)
        Revised :
            ver.1: Dec.11st.2017
            ver.2: Mar.7th.2019

        Keyword Args: - listed based on each steps
            - 01_EmptyMaskPreparation
            anat(str):          datatype for anatomical image (default='anat')
            func(str):          datatype for functional image (default='func')
            tr(int):            the repetition time of EPI data
            tpattern(str):      slice order of image
                alt+z = altplus   = alternating in the plus direction
                alt+z2            = alternating, starting at slice #1 instead of #0
                alt-z = altminus  = alternating in the minus direction
                alt-z2            = alternating, starting at slice #nz-2 instea

#### Set parameter and run pipeline

- After the pipeline package is selected, it will print out help documents followed by list of availavble pipelines defined in the pipeline plugin. The processing job is performed through the mutltithreading, so while the pipeline running, researcher still can access python interpreter (or jupyter notebook).
- The 'check_progression()' method shows current progression of pipeline execution. The progress bar can be updated realtime, so that no additional manual follow-up is required.

In [4]:
pipe.run(0, tr=2, tpattern='alt+z')
pipe.check_progression()


        The dataset will be first slice timing corrected, and average intensity map of functional image
        will be calculated on motion corrected data. In the end of this pipeline, empty image file will be
        generated in the masking path as a place holder of mask with '_mask' suffix.

        If the anatomical data were inputted, the empty mask files will be generated as well.
        


HBox(children=(FloatProgress(value=0.0, description='UNCCH_CAMRI', max=5.0, style=ProgressStyle(description_wi…

UNCCH_CAMRI:   0%|<bar/>| 0/5 [00:00<?, ?it/s]

- Calling pipeline instance will show the steps that had been processed.

In [6]:
pipe

** List of existing steps in selected package [UNCCH_CAMRI]:

- Processed steps:
	010: SliceTimingCorrection
- Quoue:
	010, 02A, 01A, 01B, 01C

### Access backend Paralexe module for debugging
- PyNIPT uses Paralexe module as backend processor. Paralexe is a pure python module, stands for 'Parallel Execution', that contains major classes includes Worker, Manager, and Scheduler. As the the name indicates, each class take a role of executing single job, manage the job execution, and schedule job to the thread for the parallel processing.  Accessing this backend tool allows to access the background processor for debuging. Below shows some example of usage of it. 

In [7]:
pipe.schedulers

{'010': Scheduled Job:1::Completed,
 '02A': Scheduled Job:1::Completed,
 '01A': Scheduled Job:0::Incompleted,
 '01B': Scheduled Job:0::Incompleted,
 '01C': Scheduled Job:0::Incompleted}

In [8]:
pipe.schedulers['02A'].summary()

	** Summery
Total number of steps:		1
---------------------------------
Step::02A
	Number of workers: 	8
---------------------------------
Status:
	Active


In [9]:
pipe.managers

{'010': [Deployed Workers:[8]::Submitted],
 '02A': [Deployed Workers:[8]::Submitted],
 '01A': None,
 '01B': None,
 '01C': None}

In [10]:
pipe.managers['02A'][0].audit()

WorkerID-0
  Command: "3dvolreg -prefix /Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI_2ses/Processing/UNCCH_CAMRI/02A_MotionCorrection-base/sub-F01/ses-01/sub-F01_ses-01_task-rs_bold.nii.gz -Fourier -verbose -base 0 /Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI_2ses/Processing/UNCCH_CAMRI/010_SliceTimingCorrection/sub-F01/ses-01/sub-F01_ses-01_task-rs_bold.nii.gz"
  *[ Scheduled job is not executed yet. ]

WorkerID-1
  Command: "3dvolreg -prefix /Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI_2ses/Processing/UNCCH_CAMRI/02A_MotionCorrection-base/sub-F01/ses-02/sub-F01_ses-02_task-rs_bold.nii.gz -Fourier -verbose -base 0 /Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI_2ses/Processing/UNCCH_CAMRI/010_SliceTimingCorrection/sub-F01/ses-02/sub-F01_ses-02_task-rs_bold.nii.gz"
  *[ Scheduled job is not executed yet. ]

WorkerID-2
  Command: "3dvolreg -prefix /Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI

### Remove the step(s)
- If certain processed step is no more needed, it can be removed to restore the storage. It also can be used to re-process the same step with different set of parameters if it required.

In [6]:
pipe.remove('01A')
pipe

List of existing steps in selected package [UNCCH_CAMRI]:

- Processed steps:
	010: SliceTimingCorrection
	02A: MotionCorrection-base
- Mask data:
	01B: MakeEmptyMask-func
	01C: MakeEmptyMask-anat

In [7]:
pipe.run(0)
pipe.check_progression()


        The dataset will be first slice timing corrected, and average intensity map of functional image
        will be calculated on motion corrected data. In the end of this pipeline, empty image file will be
        generated in the masking path as a place holder of mask with '_mask' suffix.

        If the anatomical data were inputted, the empty mask files will be generated as well.
        


HBox(children=(FloatProgress(value=0.0, description='UNCCH_CAMRI', max=5.0, style=ProgressStyle(description_wi…

UNCCH_CAMRI:   0%|<bar/>| 0/5 [00:00<?, ?it/s]




### API
- InterfaceBuilder: will show how to make command line interface, and python command interface
- PipelineBuilder: will show how to make pipeline
- Plugin: how to make plugin, and instruction to import new plugin
- How to publish your pipeline with PyNIPT (using gist)

In [1]:
import pynipt as pn
path = '/Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI_2ses'
pipe = pn.Pipeline(path)
title = 'TEST_PIPELINE'
pipe.set_empty_package(title)

** Dataset summary

Path of Dataset: /Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI_2ses
Name of Dataset: 3Drat_fMRI_2ses
Selected DataClass: Data

Subject(s): ['sub-F01', 'sub-F02', 'sub-M01', 'sub-M02']
Session(s): ['ses-01', 'ses-02']
Datatype(s): ['anat', 'func']
Multi session dataset


List of installed pipeline packages:
	0 : UNCCH_CAMRI
temporary pipeline package [TEST_PIPELINE] is initiated.


- Example interface for processing command line tools all at once in a dataset (the example below is copying)

In [None]:
itb = pipe.get_builder()
itb.init_step('FirstStep', suffix='CMD', 
              idx=1, subcode=A, mode='processing')
itb.set_input(label='input', input_path='func', method=0)
itb.set_output(label='output')
itb.set_cmd('cp *[input] *[output]')
itb.set_output_checker()
itb.run()

pipe.check_progression()

- Same as above but python function instead of command line tool.

In [2]:
def sample_func(input, output,               
                # the the name of arguments above must be match with user label on interface builder
                stdout=None, stderr=None):
    import sys
    if stdout is None:
        stdout = sys.stdout
    if stderr is None:
        stderr = sys.stderr
    
    import nibabel as nib
    try:
        img = nib.load(input)
    except Exception as e:
        stderr.write(str(e))
        return 1
    img.to_filename(output)
    stdout.write('Copy file from {} to {}'.format(input, output))
    return 0

itb = pipe.get_builder()
itb.init_step('FirstStep', suffix='PYTHON', 
              idx=1, subcode=0, mode='processing')
itb.set_input(label='input', input_path='func', method=0)
itb.set_output(label='output')
itb.set_func(sample_func)
itb.set_output_checker()
itb.run(mode='python')

pipe.check_progression()

HBox(children=(FloatProgress(value=0.0, description='TEST_PIPELINE', max=1.0, style=ProgressStyle(description_…

TEST_PIPELINE:   0%|<bar/>| 0/1 [00:00<?, ?it/s]




In [4]:
pipe

** List of existing steps in selected package [TEST_PIPELINE]:

- Processed steps:
	010: FirstStep-200221
- Mask data:
	01B: MakeEmptyMask-func
	01C: MakeEmptyMask-anat

In [5]:
pipe.schedulers['010'].summary()

	** Summery
Total number of steps:		1
- Succeeded steps:		1
---------------------------------
Step::010
	Number of workers: 	0
---------------------------------
Status:
	Finished


In [6]:
pipe.managers['010'][0].audit()

*[ No workers deployed. ]*



In [1]:
import pynipt as pn
path = '/Users/shlee419/Projects/Dataset/00_SampleDataset/3Drat_fMRI_2ses'
pipe = pn.Pipeline(path)
title = 'TEST_PIPELINE'
pipe.set_empty_package(title)

** Dataset summary

Path of Dataset: /Users/shlee419/Projects/Dataset/00_SampleDataset/3Drat_fMRI_2ses
Name of Dataset: 3Drat_fMRI_2ses
Selected DataClass: Data

Subject(s): ['sub-F01', 'sub-F02', 'sub-M01', 'sub-M02']
Session(s): ['ses-01', 'ses-02']
Datatype(s): ['anat', 'func']
Multi session dataset


List of installed pipeline packages:
	0 : UNCCH_CAMRI
temporary pipeline package [TEST_PIPELINE] is initiated.


In [2]:
pipe.remove('02A', mode='reporting')

In [3]:
def sample_func2(input, output, test_var,
                stdout=None, stderr=None):
    import sys
    if stdout is None:
        stdout = sys.stdout
    if stderr is None:
        stderr = sys.stderr
    
    import nibabel as nib
    import numpy as np
    affine = None
    try:
        imgobjs = []
        for i, img_path in enumerate(input):
            stdout.write('{} is loaded'.format(img_path))
            img = nib.load(img_path)
            if i == 0:
                affine = img.affine
            if len(img.shape) < 4:
                imgobjs.append(np.asarray(img._dataobj)[..., np.newaxis])
            else:
                imgobjs.append(np.asarray(img._dataobj))
    except Exception as e:
        stderr.write(str(e))
        return 1
    
    stdout.write('input_var: {}'.format(test_var))
    imgobj = np.concatenate(imgobjs, axis=-1)
    new_img = nib.Nifti1Image(imgobj, affine)
    new_img.to_filename(output)
    stdout.write('{} is created'.format(output))
    return 0

itb = pipe.get_builder()
itb.init_step('Reporting', suffix='200226', idx=2, 
              mode='reporting') # for reporting, as a default, output is directory without extension
itb.set_input(label='input', input_path='func', 
              method=1, # multiple inputs to one output
              join_modifier=False) # if this is False, input will return 
                                   # 'list obj' so can run loop within python function
itb.set_output(label='output', 
               modifier='test', ext='nii.gz') # for peers to one output
itb.set_var(label='test_var', value='Hello! World!')
itb.set_func(sample_func2)
itb.set_output_checker()
itb.run(mode='python')
pipe.check_progression()

HBox(children=(FloatProgress(value=0.0, description='TEST_PIPELINE', max=2.0, style=ProgressStyle(description_…

TEST_PIPELINE:  50%|<bar/>| 1/2 [00:00<00:00, 36.24it/s]

In [None]:
cmd = 'copy *[input] *[output]'

In [4]:
pipe

** List of existing steps in selected package [TEST_PIPELINE]:

- Processed steps:
	010: FirstStep-200221
- Reported steps:
	020: Reporting-200226
- Quoue:
	020

In [5]:
pipe.schedulers

{'010': Scheduled Job:1::Completed, '020': Scheduled Job:1::Completed}

In [6]:
pipe.managers

{'010': [Deployed Workers:[8]::Submitted],
 '020': [Deployed Workers:[1]::Submitted]}

In [9]:
pipe.managers['020'][0].audit()

WorkerID-0
  Func: "sample_func2"
  *[ Scheduled job is not executed yet. ]



In [10]:
dset = pipe.get_dset('020')

In [11]:
dset.df

Unnamed: 0,Pipeline,Report,Output,Abspath
0,TEST_PIPELINE,020_Reporting-200226,test.nii.gz,/Users/shlee419/Projects/Dataset/SampleDataset/PyNIPT/3Drat_fMRI_2ses/Results/TEST_PIPELINE/020_...





In [24]:
for i, finfo in dset:
    print(finfo.Abspath)

/Users/shlee419/Projects/Dataset/00_SampleDataset/3Drat_fMRI_2ses/Results/TEST_PIPELINE/02A_Reporting-200226/test.nii.gz
