First, we'll import the Parsl library and the various components we'll need:

In [1]:
import parsl
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor
from parsl.app.app import bash_app,python_app
from parsl import File

Before we can use Parsl's functions, we'll need to define and then load its configuration as a Parsl `Config` object:

In [2]:
config = Config(
    executors=[ThreadPoolExecutor()],
    lazy_errors=True
)
parsl.load(config)

<parsl.dataflow.dflow.DataFlowKernel at 0x7fafea9cdfd0>

Now we define the Python functions we'll use for the workflow.  By decorating each function as an App, Parsl will be able to parallelize them during execution.  We define these as `bash` Apps because we'll use the functions to invoke Perl scripts in the same way we would from the Bash shell's command line:

In [3]:
## Define Apps ##
@bash_app
def WireDelay(threshIn='', outputs=[], geoDir='', daqId='', fw='', stdout='stdout.txt', stderr='stderr.txt'):
        return 'perl ./perl/WireDelay.pl %s %s %s %s %s' %(threshIn,outputs[0],geoDir,daqId,fw)

@bash_app
def Combine(inputs=[], outputs=[], stdout='stdout.txt', stderr='stderr.txt'):
        return 'perl ./perl/Combine.pl ' + ' '.join(inputs) + ' ' + str(outputs[0])

@bash_app
def Sort(inputs=[], outputs=[], key1='1', key2='1', stdout='stdout.txt', stderr='stderr.txt'):
        return 'perl ./perl/Sort.pl %s %s %s %s' %(inputs[0], outputs[0], key1, key2)

@bash_app
def EventSearch(inputs=[], outputs=[], gate='', detCoinc='2', chanCoinc='2', eventCoinc='2', stdout='stdout.txt', stderr='stderr.txt'):
        return 'perl ./perl/EventSearch.pl %s %s %s %s %s %s' %(inputs[0],outputs[0],gate,detCoinc,chanCoinc,eventCoinc)

The last step before the workflow itself is to define the parameters that the Apps will require as inputs.  When used in the Cosmic Ray e-Lab, these are selected by the user through the interface.

For this analysis, the necessary parameters are:
* **thresholdAll** -- the names and locations of the threshold files that the analysis uses as input data
* **wireDelayData** -- what we'd like the analysis to name the Wire Delay files that will be created during execution
* **geoDir** -- the location of the directory that contains the geography (`.geo`) files of the relevant detectors
* **detectors** -- the DAQ IDs of all detectors used in the analysis
* **firmwares** -- the versions of the firmware used on each detector's DAQ board. This can affect how the data from that detector is interpreted!
* **combineOut** -- what we'd like the analysis to name the Combined Data file that will be created during execution
* **sort_sortKey1**, **sort_sortKey2** -- which columns the Sort() function should sort in ascending order.  **sort_sortKey1** is the primary sort column, while **sort_sortKey2** is the secondary sort column.
* **sortOut** - what we'd like the analysis to name the Sorted Data file that will be created during execution
* **gate** -- the size of the gate in nanoseconds.  The analysis will search for events that are coincident within this time interval
* **detectorCoincidence** -- how many different detectors should record hits within the gate interval in order for it to qualify as a candidate event
* **channelCoincidence** -- how many different channels on each detector should record hits within the gate interval in order for it to qualify as a candidate event
* **eventCoincidence** -- how many hits a channel should record within the gate interval in order for it to qualify as a candidate event
* **eventCandidates** -- what we'd like the analysis to name the Event Candidates file that will be created as the end result of its execution

Since these parameters will be used to construct command-line invocations of Perl scripts, we define them all as strings (even the numbers!  Python itself won't be doing any math with them).

In [4]:
## Analysis Parameters ##
# Define what are typically the command-line arguments

# For WireDelay:
thresholdAll = ('files/6119.2016.0104.1.thresh', 'files/6203.2016.0104.1.thresh')
wireDelayData = ('6119.2016.0104.1.wd', '6203.2016.0104.1.wd')
geoDir = './geo'
detectors = ('6119', '6203')
firmwares = ('1.12', '1.12')

# For Combine:
combineOut = 'combineOut'

# For Sort:
sort_sortKey1 = '2'
sort_sortKey2 = '3'
sortOut = 'sortOut'

# For EventSearch:
gate = '1000'
detectorCoincidence = '1'
channelCoincidence = '2'
eventCoincidence = '2'
eventCandidates = 'eventCandidates'

Now we're ready to call on our Apps to do their data crunching. Note carefully the use of `futures` objects and the `inputs[]` and `outputs[]` parameters, which are provided by Parsl.  These define the workflow by telling Parsl which things **must** happen before which other things so that the DataFlowKernel doesn't try to execute Apps in the wrong order - trying to run a function before its input data is ready, for example.

In [5]:
## Workflow ##
# 1) WireDelay() takes input Threshold (.thresh) files and converts
#    each to a Wire Delay (.wd) file:
WireDelay_futures = []
for i in range(len(thresholdAll)):
        WireDelay_futures.append(WireDelay(threshIn=thresholdAll[i], outputs=[wireDelayData[i]], geoDir=geoDir, daqId=detectors[i], fw=firmwares[i]))

# WireDelay_futures is a list of futures.
# Each future has an outputs list with one output.
WireDelay_outputs = [i.outputs[0] for i in WireDelay_futures]

# 2) Combine() takes the WireDelay files output by WireDelay() and combines
#    them into a single file with name given by --combineOut
Combine_future = Combine(inputs=WireDelay_outputs, outputs=[combineOut])

# 3) Sort() sorts the --combineOut file, producing a new file with name given
#    by --sortOut
Sort_future = Sort(inputs=Combine_future.outputs, outputs=[sortOut], key1=sort_sortKey1, key2=sort_sortKey2)

# 4) EventSearch() processes the --sortOut file and identifies event
#    candidates in a output file with name given by --eventCandidates
# NB: This output file is interpreted by the e-Lab webapp, which expects it
#    to be named "eventCandidates"
EventSearch_future = EventSearch(inputs=Sort_future.outputs, outputs=[eventCandidates], gate=gate, detCoinc=detectorCoincidence, chanCoinc=channelCoincidence, eventCoinc=eventCoincidence)

# Wait for the final result before exiting.
x = EventSearch_future.result()

print("Call to EventSearch completed with exit code:", x)

Call to EventSearch completed with exit code: 0


And we're done! The `eventCandidates` file now exists in the working directory and lists every event from the input threshold data that, according to our criteria, might have been part of a shower of cosmic rays.

This will typically be a large file -- too large to read here -- but we can check what it looks like using the Bash shell's `head` utility:

In [6]:
!head -5 ./eventCandidates

#[event number] [num events] [num hit detectors] [ID1.chan] [JD1] [Rising edge 1], [ID2.chan] [JD2] [Rising edge 2], ...
#gatewidth=1.15740740740741e-11 (1000 nanoseconds), detector coincidence=1, channel coincidence=2, event coincidence=2
1	3	1	6203.1	2457392	0.2452230125667072	6203.4	2457392	0.2452230125667216	6203.2	2457392	0.2452230125676070
2	3	1	6203.1	2457392	0.2452298337203386	6203.4	2457392	0.2452298337203386	6203.2	2457392	0.2452298337212240
3	3	1	6203.4	2457392	0.2452305862390307	6203.1	2457392	0.2452305862391320	6203.4	2457392	0.2452305862392767
