# Pipelines

TOAST pipelines are standalone Python scripts that apply one or more TOAST operators to existing or simulated data. They typically divide into two major parts:
1. Creating the TOAST `data` object
2. Applying TOAST operators to the `data`

Experiments and simulations typically require specialized methods for creating the `data` object. This part is hard to generalize. For the subsequent data processing TOAST provides a number of convenience functions are kept in [toast.pipeline_tools](https://github.com/hpc4cmb/toast/tree/master/src/toast/pipeline_tools).

In [None]:
# Load common tools for all lessons
import sys
sys.path.insert(0, "..")
from lesson_tools import (
    fake_focalplane
)

# Capture C++ output in the jupyter cells
%reload_ext wurlitzer

## Brief example

We demonstrate these concepts by writing a very simple pipeline that creates its own observations, fills them with noise and applies a polynomial filter.

In [None]:
%%writefile my_first_pipeline.py

import sys
sys.path.insert(0, "..")

import argparse

import numpy as np

import toast
from toast.mpi import MPI
import toast.pipeline_tools

from lesson_tools import fake_focalplane

def parse_arguments(comm):
    """ Declare and parse command line arguments
    
    """
    parser = argparse.ArgumentParser(fromfile_prefix_chars="@")
    
    # pipeline_tools provides arguments to supported operators
    toast.pipeline_tools.add_noise_args(parser)
    toast.pipeline_tools.add_polyfilter_args(parser)
    
    # The pipeline may add its own arguments
    parser.add_argument(
        "--nsample", 
        required=False, 
        default=10000,
        type=np.int64,
        help="Length of the simulation",
    )
    
    args = parser.parse_args()
    return args


def create_observations(comm, args):
    """ Create the TOAST data object using `args`
    
    This method will look very different for different pipelines.
    """
    focalplane = fake_focalplane(samplerate=args.sample_rate, fknee=1.0)
    detnames = list(sorted(focalplane.keys()))
    detquats = {x: focalplane[x]["quat"] for x in detnames}
    tod = toast.tod.TODCache(None, detnames, args.nsample, detquats=detquats)
    
    # Write some auxiliary data
    tod.write_times(stamps=np.arange(args.nsample) / args.sample_rate)
    tod.write_common_flags(flags=np.zeros(args.nsample, dtype=np.uint8))
    for d in detnames:
        tod.write_flags(detector=d, flags=np.zeros(args.nsample, dtype=np.uint8))
    
    noise = toast.pipeline_tools.get_analytic_noise(args, comm, focalplane)
    
    observation = {"tod" : tod, "noise" : noise, "name" : "obs-000", "id" : 0}
    
    data = toast.Data(comm)
    data.obs.append(observation)
    
    return data


def main():
    """ The `main` will instantiate and process the data.
    
    """
    mpiworld, procs, rank, comm = toast.pipeline_tools.get_comm()
    args = parse_arguments(comm)
    data = create_observations(comm, args)
    
    mc = 0
    name = "signal"
    tod = data.obs[0]["tod"]
    det = tod.local_dets[0]
    
    # Simulate noise, write out one detector
    toast.pipeline_tools.simulate_noise(args, comm, data, mc, cache_prefix=name)
    tod.local_signal(det, name).tofile("noise.npy")
    
    # Apply polyfilter, write the filtered data
    toast.pipeline_tools.apply_polyfilter(args, comm, data, cache_name=name)
    tod.local_signal(det, name).tofile("filtered.npy")
    

if __name__ == "__main__":
    main()

In [None]:
! python3 my_first_pipeline.py --help

In [None]:
! python3 my_first_pipeline.py --simulate-noise --polyfilter --poly-order 6 --nsample 10000

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np

noise = np.fromfile("noise.npy")
filtered = np.fromfile("filtered.npy")
plt.plot(noise, '.', label="noise")
plt.plot(noise - filtered, '-', label="polynomial")
plt.legend()

## Satellite simulation pipeline

[pipelines/toast_satellite_sim.py](https://github.com/hpc4cmb/toast/blob/master/pipelines/toast_satellite_sim.py)

In [None]:
! toast_satellite_sim.py --help

`main` without the profiling commands:

```
def main():
    env = Environment.get()

    mpiworld, procs, rank, comm = get_comm()
    args, comm, groupsize = parse_arguments(comm, procs)

    # Parse options                                                                                          

    if comm.world_rank == 0:
        os.makedirs(args.outdir, exist_ok=True)

    focalplane, gain, detweights = load_focalplane(args, comm)

    data = create_observations(args, comm, focalplane, groupsize)

    expand_pointing(args, comm, data)

    signalname = None
    skyname = simulate_sky_signal(
        args, comm, data, [focalplane], "signal"
    )
    if skyname is not None:
        signalname = skyname

    diponame = simulate_dipole(args, comm, data, "signal")
    if diponame is not None:
        signalname = diponame

    # Mapmaking                                                                                         

    if not args.use_madam:
        # THIS BRANCH WILL SOON BE REPLACED WITH THE NATIVE MAPMAKER
    else:
        # Initialize madam parameters                                                                                          

        madampars = setup_madam(args)

        # Loop over Monte Carlos                                                                                         

        firstmc = args.MC_start
        nmc = args.MC_count

        for mc in range(firstmc, firstmc + nmc):
            # create output directory for this realization
            outpath = os.path.join(args.outdir, "mc_{:03d}".format(mc))

            simulate_noise(args, comm, data, mc, "tot_signal", overwrite=True)

            # add sky signal
            add_signal(args, comm, data, "tot_signal", signalname)

            if gain is not None:
                op_apply_gain = OpApplyGain(gain, name="tot_signal")
                op_apply_gain.exec(data)

            apply_madam(args, comm, data, madampars, outpath, detweights, "tot_signal")

```

Check TOAST examples for all available pipelines at <https://github.com/hpc4cmb/toast/tree/master/examples>

In [None]:
%mkdir out

In [None]:
! toast_fake_focalplane.py --bandcenter_ghz 25 --bandwidth_ghz 6.3 --minpix 1 --out "out/fp"

In [None]:
%%writefile toast_satellite_sim.par
--dipole
--noise
--sample-rate
40.0
--spin-period-min
10.0
--spin-angle-deg
30.0
--prec-period-min
50.0
--prec-angle-deg
65.0
--hwp-rpm
20.0
--obs-time-h
24.0
--gap
4.0
--nside
512
--pysm-model
s1,d1,f1,a1
--pysm-apply-beam
--flush
--dipole-mode
total
--dipole-solar-speed-kms
369
--dipole-solar-gal-lat-deg
48.26
--dipole-solar-gal-lon-deg
263.99
--group-size
1
--focalplane
out/fp_1.pkl
--obs-num
8
--outdir
out

In [None]:
import subprocess as sp

runcom = "toast_satellite_sim.py @toast_satellite_sim.par"
print(runcom, flush=True)
sp.check_call(runcom, stderr=sp.STDOUT, shell=True)

In [None]:
import healpy as hp
import numpy as np
binned = hp.read_map("out/mc_000/binned.fits.gz")
hits = hp.read_map("out/hits.fits.gz")
unseen = hits==0
hits[unseen] = hp.UNSEEN
binned[unseen] = hp.UNSEEN

cmap = "coolwarm"
amp = 0.02
unit = "K"
plt.figure(figsize=[12, 8])
hp.mollview(np.log10(hits), sub=[1, 2, 1], title="hitcount", unit="log10(hits)")
hp.mollview(binned, cmap=cmap, min=-amp, max=amp, sub=[1, 2, 2], title="binned", unit=unit)

## Ground simulation pipeline

[pipelines/toast_ground_sim.py](https://github.com/hpc4cmb/toast/blob/master/pipelines/toast_ground_sim.py)

In [None]:
! toast_ground_sim.py --help

`main` without profiling
```
def main():
    mpiworld, procs, rank, comm = get_comm()

    args, comm = parse_arguments(comm)

    # Initialize madam parameters                                                                                          

    madampars = setup_madam(args)

    # Load and broadcast the schedule file                                                                                         

    schedules = load_schedule(args, comm)

    # Load the weather and append to schedules                                                                                         

    load_weather(args, comm, schedules)

    # load or simulate the focalplane                                                                                          

    detweights = load_focalplanes(args, comm, schedules)

    # Create the TOAST data object to match the schedule.  This will                                                                                         
    # include simulating the boresight pointing                                                                                         

    data, telescope_data = create_observations(args, comm, schedules)

    # Split the communicator for day and season mapmaking                                                                                          

    time_comms = get_time_communicators(args, comm, data)

    # Expand boresight quaternions into detector pointing weights and                                                                                          
    # pixel numbers                                                                                          

    expand_pointing(args, comm, data)

    # Purge the pointing if we are NOT going to export the                                                                                         
    # data to a TIDAS volume                                                                                         
    if (args.tidas is None) and (args.spt3g is None):
        for ob in data.obs:
            tod = ob["tod"]
            tod.free_radec_quats()

    if args.pysm_model:
        focalplanes = [s.telescope.focalplane.detector_data for s in schedules]
        signalname = simulate_sky_signal(
            args, comm, data, focalplanes, "signal"
        )
    else:
        signalname = scan_sky_signal(args, comm, data, "signal")

    # Set up objects to take copies of the TOD at appropriate times                                                                                          

    totalname, totalname_freq = setup_sigcopy(args)

    # Loop over Monte Carlos

    firstmc = args.MC_start
    nsimu = args.MC_count

    freqs = [float(freq) for freq in args.freq.split(",")]
    nfreq = len(freqs)

    for mc in range(firstmc, firstmc + nsimu):

        simulate_atmosphere(args, comm, data, mc, totalname)

        # Loop over frequencies with identical focal planes and identical                                                                                          
        # atmospheric noise                                                                                         

        for ifreq, freq in enumerate(freqs):
            # Make a copy of the atmosphere so we can scramble the gains and apply                                                                                         
            # frequency-dependent scaling
            
            copy_signal(args, comm, data, totalname, totalname_freq)

            scale_atmosphere_by_frequency(
                args, comm, data, freq=freq, mc=mc, cache_name=totalname_freq
            )

            update_atmospheric_noise_weights(args, comm, data, freq, mc)

            # Add previously simulated sky signal to the atmospheric noise                                                                                          

            add_signal(args, comm, data, totalname_freq, signalname, purge=(nsimu == 1))

            mcoffset = ifreq * 1000000

            simulate_noise(args, comm, data, mc + mcoffset, totalname_freq)

            simulate_sss(args, comm, data, mc + mcoffset, totalname_freq)

            scramble_gains(args, comm, data, mc + mcoffset, totalname_freq)

            if (mc == firstmc) and (ifreq == 0):
                # For the first realization and frequency, optionally                                                                                                                                                                                                                 
                # export the timestream data
                
                output_tidas(args, comm, data, totalname)
                output_spt3g(args, comm, data, totalname)

            outpath = setup_output(args, comm, mc + mcoffset, freq)

            # Bin and destripe maps                                                                                          

            apply_madam(
                args,
                comm,
                data,
                madampars,
                outpath,
                detweights,
                totalname_freq,
                freq=freq,
                time_comms=time_comms,
                telescope_data=telescope_data,
                first_call=(mc == firstmc),
            )

            if args.apply_polyfilter or args.apply_groundfilter:

                # Filter signal                                                                                          

                apply_polyfilter(args, comm, data, totalname_freq)

                apply_groundfilter(args, comm, data, totalname_freq)

                # Bin filtered maps                                                                                          

                apply_madam(
                    args,
                    comm,
                    data,
                    madampars,
                    outpath,
                    detweights,
                    totalname_freq,
                    freq=freq,
                    time_comms=time_comms,
                    telescope_data=telescope_data,
                    first_call=False,
                    extra_prefix="filtered",
                    bin_only=True,
                )

```

Here is a full working example of the ground simulation pipeline

First we need an observing schedule.  This one is for one patch and covers 24 hours:

In [None]:
! toast_ground_schedule.py \
    --site-lat "-22.958064" \
    --site-lon "-67.786222" \
    --site-alt 5200 \
    --site-name Atacama \
    --telescope LAT \
    --start "2020-01-01 00:00:00" \
    --stop "2020-01-02 00:00:00" \
    --patch-coord C \
    --patch small_patch,1,40,-40,10 \
    --el-min 45 \
    --el-max 60 \
    --out schedule.txt

! cat schedule.txt

Then we need a focalplane

In [None]:
! toast_fake_focalplane.py \
    --minpix 100 \
    --out focalplane \
    --fwhm 30 \
    --fwhm_sigma 0.05 \
    --fov 10 \
    --psd_fknee 5e-2 \
    --psd_NET 1e-3 \
    --psd_alpha 1 \
    --psd_fmin 1e-5 \
    --bandcenter_ghz 100 \
    --bandcenter_sigma 0.01 \
    --bandwidth_ghz 10 \
    --bandwidth_sigma 0.1

And of course we need the weather file

In [None]:
! if [ ! -e weather_Atacama.fits ]; then wget http://portal.nersc.gov/project/cmb/toast_data/example_data/weather_Atacama.fits; fi

Now write a parameter file that uses the above inputs and simulates sky signal (using PySM), atmosphere and instrument noise

In [None]:
%%writefile toast_ground_sim.par
--weather
weather_Atacama.fits
--scan-rate
1
--scan-accel
3
--schedule
schedule.txt
--sample-rate
30
--coord
C
--hwp-rpm
2
--nside
512
--common-flag-mask
1
--polyfilter
--poly-order
5
--groundfilter
--ground-order
3
--atmosphere
--atm-lmin-center
0.1
--atm-lmax-center
30
--atm-gain
3e-5
--atm-zmax
1000
--atm-xstep
30
--atm-ystep
30
--atm-zstep
30
--atm-nelem-sim-max
10000
--atm-wind-dist
5000
--atm-cache
atm_cache
--noise
--gainscrambler
--gain-sigma
0.05
--madam-prefix
groundsim000
--madam-iter-max
200
--madam-precond-width
100
--madam-baseline-length
1
--madam-noisefilter
--madam-allreduce
--destripe
--binmap
--hits
--wcov
--ground-nside
512
--ground-fwhm-deg
3
--ground-lmax
512
--ground-scale
1e-3
--simulate-ground
--focalplane
focalplane_127.pkl
--freq
100

In [None]:
# --pysm-model
# s1,d1,f1,a1
# --pysm-apply-beam

With the PySM part (last two lines) the simulation will take about 20 minutes on a single Cori Haswell node at NERSC.  If you remove them, it will run in about 4 minutes.  Here we just run this job locally with MPI.

In [None]:
runcom = "toast_ground_sim.py @toast_ground_sim.par"
print(runcom, flush=True)
sp.check_call(runcom, stderr=sp.STDOUT, shell=True)

Here we examine the intensity part of the simulated maps.

In [None]:
import healpy as hp
try:
    binned = hp.read_map("out/00000000/100/groundsim000_100_telescope_all_time_all_bmap.fits")
    destriped = hp.read_map("out/00000000/100/groundsim000_100_telescope_all_time_all_map.fits")
    filtered = hp.read_map("out/00000000/100/groundsim000_filtered_100_telescope_all_time_all_bmap.fits")

    rot = [40, -40]
    reso = 10
    cmap = "coolwarm"
    amp = 0.02
    unit = "K"
    plt.figure(figsize=[12, 8])
    hp.gnomview(binned, rot=rot, reso=reso, cmap=cmap, min=-amp, max=amp, sub=[1, 3, 1], title="binned", unit=unit)
    hp.gnomview(destriped, rot=rot, reso=reso, cmap=cmap, min=-amp, max=amp, sub=[1, 3, 2], title="destriped", unit=unit)
    hp.gnomview(filtered, rot=rot, reso=reso, cmap=cmap, min=-amp, max=amp, sub=[1, 3, 3], title="filtered", unit=unit)
except:
    print("Maps are not available.  `Either toast_ground_sim.slrm` failed or you did not submit it yet.")