# Pipeline Tasks

<br>Owner: **Alex Drlica-Wagner** ([@kadrlica](https://github.com/LSSTScienceCollaborations/StackClub/issues/new?body=@kadrlica))
<br>Last Verified to Run: **2018-08-10**
<br>Verified Stack Release: **v16.0**

## Learning Objectives:

This notebook seeks to teach users how to unpack a pipeline tasks. As an example, we focus on `processCcd.py`, with the goal of diving into the configuration, interface, and structure of pipeline tasks. This notebook is a digression from Justin Myles script that demonstrates how to run a series of pipeline tasks from the command line to rerun HSC data processing [link].

After working through this tutorial you should be able to:

* Find the source code for a pipeline task
* Configure (and investigate the configuration) of pipeline tasks
* Investigate and run those tasks in python

## Logistics
This notebook is intended to be runnable on `lsst-lspdev.ncsa.illinois.edu` from a local git clone of https://github.com/LSSTScienceCollaborations/StackClub.

In [1]:
# What version of the Stack are we using?
! echo $HOSTNAME
! eups list -s | grep lsst_distrib

jld-lab-kadrlica-r160
lsst_distrib          16.0+1     	current v16_0 setup


In [2]:
# Filter some warnings printed by v16.0 of the stack
import warnings
warnings.simplefilter("ignore", category=FutureWarning)
warnings.simplefilter("ignore", category=UserWarning)

In [3]:
import os
import pydoc

## Diving into a Pipeline Task

Our goal is to dive into the inner workings of `ProcessCcd.py`. We pickup from the command line processing described in the getting started tutorials [here](https://pipelines.lsst.io/getting-started/data-setup.html#) and [here](https://pipelines.lsst.io/getting-started/processccd.html), as well as Justin Myles HSC reprocessing notebook [here](). We are specifically interested in digging into the following line:

```
processCcd.py $DATADIR --rerun processCcdOutputs --id
```

We start by tracking down the location of the `processCcd.py` shell script

In [4]:
!(which processCcd.py)

/opt/lsst/software/stack/stack/miniconda3-4.3.21-10a4fa6/Linux64/pipe_tasks/16.0+1/bin/processCcd.py


This is the proverbial "end of the thread". Our goal is to pull on this thread to unravel the python/C++ functions that are being called under the hood. We start by taking a peak in this script

In [5]:
!cat $(which processCcd.py)

#!/usr/bin/env python
#
# LSST Data Management System
# Copyright 2008, 2009, 2010 LSST Corporation.
#
# This product includes software developed by the
# LSST Project (http://www.lsst.org/).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the LSST License Statement and
# the GNU General Public License along with this program.  If not,
# see <http://www.lsstcorp.org/LegalNotices/>.
#
from lsst.pipe.tasks.processCcd import ProcessCcdTask

ProcessCcdTask.parseAndRun()


Ok, this hasn't gotten us very far, but after getting through the stock header, we now have the next link in our chain:
```
from lsst.pipe.tasks.processCcd import ProcessCcdTask
```

There are two ways we can proceed from here. One is to [Google](http://lmgtfy.com/?q=lsst.pipe.tasks.processCcd) `lsst.pipe.tasks.processCcd`, which will take us to this [doxygen page](http://doxygen.lsst.codes/stack/doxygen/x_masterDoxyDoc/classlsst_1_1pipe_1_1tasks_1_1process_ccd_1_1_process_ccd_task.html) and/or the soure code on [GitHub](https://github.com/lsst/pipe_tasks/blob/master/python/lsst/pipe/tasks/processCcd.py). 

The second approach is to do the import the class oursleves and try to investigate it interactively.


In [6]:
import lsst.pipe.tasks.processCcd
from lsst.pipe.tasks.processCcd import ProcessCcdTask, ProcessCcdConfig

We can get to the source code for these classes directly using the [`stackclub` toolkit module](https://stackclub.readthedocs.io/), as shown in the [FindingDocs.ipynb](https://github.com/LSSTScienceCollaborations/StackClub/blob/master/GettingStarted/FindingDocs.ipynb)

In [7]:
from stackclub import where_is
where_is(ProcessCcdConfig)

[lsst.pipe.tasks.processCcd](https://github.com/lsst/pipe_tasks/blob/master/python/lsst/pipe/tasks/processCcd.py)

[lsst.pipe.tasks.processCcd](https://github.com/lsst/pipe_tasks/blob/master/python/lsst/pipe/tasks/processCcd.py)


# Diving into a Task Config

Pipeline tasks are controlled and tweaked through there associated `TaskConfig` objects. To investigate the configuration parameters of the `ProcessCcdTask`, we create an instance of the `ProcessCcdConfig` and try calling the `help` method (commented out for brevity). What we are really interested in are the "Data descriptors", which we can print directly after capturing the documentation output by `help`.

In [8]:
config = ProcessCcdConfig()
#help(config)
helplist = pydoc.render_doc(config).split('\n')
print('\n'.join(helplist[18:47]))

 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  isr
 |      Task to perform instrumental signature removal or load a post-ISR image; ISR consists of:
 |      - assemble raw amplifier images into an exposure with image, variance and mask planes
 |      - perform bias subtraction, flat fielding, etc.
 |      - mask known bad pixels
 |      - provide a preliminary WCS
 |       (`ConfigurableInstance`, default ``<class 'lsst.ip.isr.isrTask.IsrTaskConfig'>``)
 |  
 |  charImage
 |      Task to characterize a science exposure:
 |      - detect sources, usually at high S/N
 |      - estimate the background, which is subtracted from the image and returned as field "background"
 |      - estimate a PSF model, which is added to the exposure
 |      - interpolate over defects and cosmic rays, updating the image, variance and mask planes
 |       (`ConfigurableInstance`, default ``<class 'lsst.pipe.tasks.characterizeImage.Chara

The first step is to try to get at the documentation through the `help` function (commented out below for brevity). We can find 

The `ProcessCcdConfig` object contains of both raw configurables like `doCalibrate` and other configs, like `isr`. To investigate one of these in more detail, we can import it and query it's "Data descriptors".


In [9]:
from lsst.ip.isr.isrTask import IsrTask, IsrTaskConfig
print('\n'.join(pydoc.render_doc(IsrTaskConfig).split('\n')[16:40]) + '...')

 |  
 |  Data descriptors defined here:
 |  
 |  doBias
 |      Apply bias frame correction? (`bool`, default ``True``)
 |  
 |  doDark
 |      Apply dark frame correction? (`bool`, default ``True``)
 |  
 |  doFlat
 |      Apply flat field correction? (`bool`, default ``True``)
 |  
 |  doFringe
 |      Apply fringe correction? (`bool`, default ``True``)
 |  
 |  doDefect
 |      Apply correction for CCD defects, e.g. hot pixels? (`bool`, default ``True``)
 |  
 |  doAddDistortionModel
 |      Apply a distortion model based on camera geometry to the WCS? (`bool`, default ``True``)
 |  
 |  doWrite
 |      Persist postISRCCD? (`bool`, default ``True``)
 |  ...


These configurationas are pretty self-explanitory, but say that we really want to understand what `doFringe` is doing. Inorder to get that information we need to go to the source code.

In [10]:
where_is(IsrTaskConfig)

[lsst.ip.isr.isrTask](https://github.com/lsst/ip_isr/blob/master/python/lsst/ip/isr/isrTask.py)

[lsst.ip.isr.isrTask](https://github.com/lsst/ip_isr/blob/master/python/lsst/ip/isr/isrTask.py)


We can then search this file for `doFringe` and we find [several lines](https://github.com/lsst/ip_isr/blob/cc4efb7d763d3663c9e989339505df9654f23fd9/python/lsst/ip/isr/isrTask.py#L597-L598) that look like this:

        if self.config.doFringe and not self.config.fringeAfterFlat:
            self.fringe.run(ccdExposure, **fringes.getDict())
            
If we want to go deeper to see what `fringe.run` does, we can repeat the above process

In [11]:
isr_task = IsrTask()
print(isr_task.fringe)
import  lsst.ip.isr.fringe
where_is(lsst.ip.isr.fringe.FringeTask)

<lsst.ip.isr.fringe.FringeTask object at 0x7f1e6b1808d0>


[lsst.ip.isr.fringe](https://github.com/lsst/ip_isr/blob/master/python/lsst/ip/isr/fringe.py)

[lsst.ip.isr.fringe](https://github.com/lsst/ip_isr/blob/master/python/lsst/ip/isr/fringe.py)


We finally make our way to the source code for [FringeTask.run](https://github.com/lsst/ip_isr/blob/cc4efb7d763d3663c9e989339505df9654f23fd9/python/lsst/ip/isr/fringe.py#L104), which gives us details on how the fringe correction is performed (i.e. by creating a fringe image and subtracting it from the data image).

# Running a Task

Now that we've figured out how to investigate the config for a task, let's try to run the task itself.

In [23]:
# First we get the raw image
import shutil
repodir = '/project/stack-club/validation_data_hsc'
datadir = os.path.join(repodir,'data')
outdir = '/home/kadrlica/tmpdir/'

First, we run the task as we would from the command line. All that we are doing here is parsing the `cmdline` string as if it is arguments passed to `processCcdTask.py` on the command line. Unfortunately, the task does not print it's output to a notebook cell, so we just need to wait for ~1 minute for this to run. We turn off several optional subtask steps so that things run faster...

In [19]:
%%timeit
task = ProcessCcdTask()
cmdline = '{0}/data --calib {0}/CALIB --output {1} --id ccd=25 visit=903982'.format(repodir,outdir)
cmdline += ' --config doCalibrate=False isr.doBias=False isr.doDark=False isr.doFlat=False'
struct = task.parseAndRun(cmdline.split())

The next level in complexity is to try to call `task.run`. In order to do this we need to provide a `dataRef`. To do this, we create a butler.

In [24]:
from lsst.daf.persistence import Butler
# output directory cannot exist (wait for Gen3 Butler...)
if os.path.exists(outdir): shutil.rmtree(outdir)
butler = Butler(inputs=datadir, outputs=outdir)

Next, we create the config and task instances

In [30]:
# Since we don't want to do calibration processing we turn off these configs
config = ProcessCcdConfig()
config.isr.doBias = False
config.isr.doDark = False
config.isr.doFlat = False
config.doCalibrate = False

In [33]:
task = ProcessCcdTask(butler=butler, config=config)

In [27]:
# Find the available data sets
ccds = butler.queryMetadata('raw',['visit','ccd'])
print(ccds[10])

(903332, 10)


In [None]:
#subset = butler.subset('raw', dataId={'visit':903332, 'ccd':25})

In [28]:
dataRef = butler.dataRef('raw', dataId={'visit':903332,'ccd':25})

In [36]:
# Setup the astrometry reference catalogs following:
# https://github.com/lsst/validation_data_hsc
!export SETUP_ASTROMETRY_NET_DATA="astrometry_net_data sdss-dr9-fink-v5b"
!export ASTROMETRY_NET_DATA_DIR={repodir + '/sdss-dr9-fink-v5b'}

# TODO: Create the astrometry reference object to pass to the task

In [35]:
%%timeit
struct = task.run(dataRef)

40.8 s ± 476 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
# Running ISR Task

In [21]:
# Try to get the task outputs into the notebook (and failing)
import sys,logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# Create STDERR handler
handler = logging.StreamHandler(sys.stderr)
# ch.setLevel(logging.DEBUG)

# Create formatter and add it to the handler
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# Set STDERR handler as the only handler 
logger.handlers = [handler]