# Gaap pipeline 

Use this notebook to run `gaap` photometry on Merian-reduced data.

Make sure that you are in the right environment! When activating the jupyter notebook:

        module load anaconda3/2022.5
        . /scratch/gpfs/am2907/Merian/gaap/lambo/scripts/setup_env_w40.sh
        jupyter notebook

In [1]:
import lsst.daf.butler as dafButler
import numpy as np
import glob
import os, sys
sys.path.append(os.path.join(os.getenv('LAMBO_HOME'), 'lambo/scripts/'))
from hsc_gaap.deploy_gaap_array import deploy_training_job
from hsc_gaap.check_gaap_run import checkRun
from hsc_gaap.find_patches_to_reduce import * 
from hsc_gaap.compile_catalogs import compileCatalogs
from tqdm import tqdm

%load_ext autoreload
%autoreload 2

Overriding default configuration file with /scratch/gpfs/HSC/LSST/stack_20230302/conda/envs/lsst-scipipe-4.0.1/share/eups/Linux64/dustmaps_cachedata/g41a3ec361e+ac198e9f13/config/.dustmapsrc


---
# Step 1: What patches do we need to reduce?

We want to identify patches that have the necessary merian data products for `gaap` processing and have not already been processed. Patches need to have:

- deepCoadd_ref
- deepCoadd_meas
- deepCoadd_scarletModelData
- deepCoadd_calexp


Get a list of all Merian tracts with reduced data, and we will search through them to see which patches fit our criteria:

In [2]:
repo = '/scratch/gpfs/am2907/Merian/gaap'

In [3]:
output_collection = "DECam/runs/merian/dr1_wide"
data_type = "deepCoadd_calexp"
skymap = "hsc_rings_v1"
butler = dafButler.Butler('/projects/MERIAN/repo/', collections=output_collection, skymap=skymap)

In [4]:

patches = np.array([[data_id['tract'], data_id["patch"]] for data_id in butler.registry.queryDataIds (['tract','patch'], datasets=data_type, 
                                                 collections=output_collection, skymap=skymap)])
patches = patches[patches[:, 0].argsort()]
tracts, idx = np.unique(patches[:,0], return_index=True) 
patches_by_tract = np.split(patches[:,1] ,idx[1:])

Now find patches with necessary data products in N708:

In [5]:
tracts_n708 = []
patches_n708 = []
for tract in tracts:
    patches = findReducedPatches(tract)
    if len(patches) > 0:
        tracts_n708.append(tract)
        patches_n708.append(patches)

tracts_n708 = np.array(tracts_n708)
print(f"{sum([len(p) for p in patches_n708])} patches in {len(tracts_n708)} tracts with necessary data products in N708")

285 patches in 285 tracts with necessary data products in N708


In [10]:
print(f"{sum([len(p) for p in patches_n708])} patches in {len(tracts_n708)} tracts with necessary data products in N708")

14319 patches in 285 tracts with necessary data products in N708


And in N540:

In [6]:
tracts_n540 = []
patches_n540 = []
for tract in tracts:
    patches = findReducedPatches(tract, band="N540")
    if len(patches) > 0:
        tracts_n540.append(tract)
        patches_n540.append(patches)

tracts_n540 = np.array(tracts_n540)
print(f"{sum([len(p) for p in patches_n540])} patches in {len(tracts_n540)} tracts with necessary data products in N540")

318 patches in 318 tracts with necessary data products in N540


In [11]:
print(f"{sum([len(p) for p in patches_n540])} patches in {len(tracts_n540)} tracts with necessary data products in N540")

15259 patches in 318 tracts with necessary data products in N540


We might be interested in seeing which patches have only one but not both bands:

In [25]:
n708_non540_tracts = []
n708_non540_patches = []
n540_non708_tracts = []
n540_non708_patches = []
for tract in tqdm(tracts):
    if tract in tracts_n540:
        patches_n540_i = patches_n540[np.where(tracts_n540 == tract)[0][0]]
    else:
        patches_n540_i=[]
    if tract in tracts_n708:
        patches_n708_i = patches_n708[np.where(tracts_n708 == tract)[0][0]]
    else:
        patches_n708_i=[]
        
    n708_non540 = list(set(patches_n708_i) - set(patches_n540_i))
    n540_non708 = list(set(patches_n540_i) - set(patches_n708_i))

    if len(n708_non540)>0:
        n708_non540_tracts.append(tract)
        n708_non540_patches.append(n708_non540)

    if len(n540_non708)>0:
        n540_non708_tracts.append(tract)
        n540_non708_patches.append(n540_non708)

numpatches_n540_only = sum([len(p) for p in n540_non708_patches])
numpatches_n708_only = sum([len(p) for p in n708_non540_patches])
numpatches_total = sum([len(p) for p in patches_by_tract])
    

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 355/355 [00:00<00:00, 19958.15it/s]


In [26]:
print(f"There are {numpatches_n540_only} patches in {len(n540_non708_tracts)} tracts that have N540 and no N708 ")
print(f"There are {numpatches_n708_only} patches in {len(n708_non540_tracts)} tracts that have N708 and no N540 ")
print(f'There are {numpatches_total - numpatches_n708_only - numpatches_n540_only} with both N540 and N708')

There are 3831 patches in 115 tracts that have N540 and no N708 
There are 2891 patches in 89 tracts that have N708 and no N540 
There are 26690 with both N540 and N708


Save a csv with the info if you want:

In [44]:
# saveMerianReducedPatchList(tracts, os.path.join(repo, "reducedPatches_N708.csv"))
# saveMerianReducedPatchList(tracts, os.path.join(repo, "reducedPatches_N540.csv"), band="N540")

Now find patches that haven't yet been `gaap` processed:

- N708 here is all patches that have N708 (whether they have N540 also or not)
- N540 is patches with only N540

In [39]:
tracts_n708_nogaap = []
patches_n708_nogaap = []
for i, tract in enumerate(tqdm(tracts_n708)):
    patches_mer  = patches_n708[i]
    patches_gaap = findGaapReducedPatches(tract, repo=repo)
    if len(set(patches_mer) - set(patches_gaap)) > 0:
        tracts_n708_nogaap.append(tract)
        patches_n708_nogaap.append(list(set(patches_mer) - set(patches_gaap)))


numpatches_n708_nogaap = sum([len(p) for p in patches_n708_nogaap])
print(f"{numpatches_n708_nogaap} patches in {len(tracts_n708_nogaap)} tracts to be reduced with N708")

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 285/285 [00:28<00:00, 10.10it/s]

3464 patches in 79 tracts to be reduced with N708





In [40]:
tracts_n540_nogaap = []
patches_n540_nogaap = []
for i, tract in enumerate(tqdm(n540_non708_tracts)):
    patches_mer  = n540_non708_patches[i]
    patches_gaap = findGaapReducedPatches(tract, repo=repo)
    if len(set(patches_mer) - set(patches_gaap)) > 0:
        tracts_n540_nogaap.append(tract)
        patches_n540_nogaap.append(list(set(patches_mer) - set(patches_gaap)))

numpatches_n540_nogaap = sum([len(p) for p in patches_n540_nogaap])
print(f"{numpatches_n540_nogaap} patches in {len(tracts_n540_nogaap)} tracts to be reduced with only N540")

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 115/115 [00:03<00:00, 30.29it/s]

3831 patches in 115 tracts to be reduced with only N540





In [38]:
tracts_notcompiled_N708 = [tract for tract in tracts_n708 if not hasCompiledCatalog(tract)]
tracts_notcompiled_N540 = [tract for tract in tracts_n540 if not hasCompiledCatalog(tract)]

In [47]:
# saveGaapReducedPatchList(tracts, os.path.join(repo, "GaapReduced.csv"))
# saveGaapNotReducedPatchList(tracts, os.path.join(repo, "notGaapReduced.csv"), notionformat=False)
saveGaapNotReducedPatchList(tracts_n540_nogaap, os.path.join(repo, "notGaapReduced_N540.csv"), patches = patches_n540_nogaap,
                            notionformat=True, band="N540")

Saved file to /scratch/gpfs/am2907/Merian/gaap/notGaapReduced_N540.csv.


In [44]:
npatches = numpatches_n540_nogaap + numpatches_n708_nogaap
print(f"It will take ~ {npatches/60:.1f} hours to download HSC images for {npatches} patches")
print(f"It will take ~ {npatches*.6/1000:.1f} TBs to download HSC images for {npatches} patches")
print(f"Once the data has been downloaded, it will take ~ {npatches/20/2:.1f} hours to run gaap on {npatches} patches")
print(f"It will take ~ {npatches*.212/1000:.1f} TBs to save the gaap catalogs for {npatches} patches")


It will take ~ 121.6 hours to download HSC images for 7295 patches
It will take ~ 4.4 TBs to download HSC images for 7295 patches
Once the data has been downloaded, it will take ~ 182.4 hours to run gaap on 7295 patches
It will take ~ 1.5 TBs to save the gaap catalogs for 7295 patches


In [48]:
for tract, npatch in zip(tracts_n708_nogaap, npatches):
    if npatch > 0:
        print (f'TRACT:{tract}, {npatch}')

TRACT:8769, 2
TRACT:9085, 3
TRACT:9698, 9
TRACT:9709, 77
TRACT:9710, 76
TRACT:9711, 76
TRACT:9712, 42
TRACT:9713, 42
TRACT:9714, 14
TRACT:9798, 13
TRACT:9799, 74
TRACT:9800, 81
TRACT:9801, 81
TRACT:9802, 81
TRACT:9803, 81
TRACT:9804, 81
TRACT:9805, 81
TRACT:9806, 81
TRACT:9807, 81
TRACT:9808, 80
TRACT:9809, 79
TRACT:9810, 79
TRACT:9811, 75
TRACT:9812, 4
TRACT:9814, 2
TRACT:9815, 81
TRACT:9816, 80
TRACT:9817, 80
TRACT:9818, 80
TRACT:9819, 81
TRACT:9820, 54
TRACT:9821, 2
TRACT:9828, 37
TRACT:9833, 81
TRACT:9837, 69
TRACT:9838, 64
TRACT:9839, 5
TRACT:9862, 21
TRACT:9863, 9
TRACT:9939, 4
TRACT:9940, 50
TRACT:9941, 81
TRACT:9942, 81
TRACT:9943, 81
TRACT:9944, 64
TRACT:9945, 2
TRACT:9949, 9
TRACT:9950, 41
TRACT:9951, 41
TRACT:9952, 41
TRACT:9953, 41
TRACT:10040, 5
TRACT:10041, 65
TRACT:10042, 81
TRACT:10043, 81
TRACT:10044, 79
TRACT:10045, 79
TRACT:10046, 81
TRACT:10047, 81
TRACT:10048, 81
TRACT:10049, 69
TRACT:10050, 62
TRACT:10051, 79
TRACT:10052, 80
TRACT:10053, 78
TRACT:10055, 4
TRACT:10

---
# Step 2: Download the data

We need to download the HSC data for all of the tracts we need to reduce. *Be warned, this takes a while and uses a lot of storage.*

It is recommended to run the following in a bash screen because depending on how much data you need to download, it can take many hours.

The following will download images for tract 9813 to `/scratch/gpfs/am2907/Merian/gaap/S20A/deepCoadd_calexp/9813` and the blendedness catalogs to `/scratch/gpfs/am2907/Merian/gaap/S20A/gaapTable/9813`:
- Unless `--only_merian=False`, this will only download the patches that have been reduced by Merian.
- You can download all of the Merian-reduced data in one go if you set `--alltracts=True`. Be careful with this, because it is ****lots**** of data!

    screen -L -S downloadtract    
    
    cd /scratch/gpfs/am2907/Merian/gaap
    . lambo/scripts/setup_env_w40.sh
    python3 lambo/scripts/hsc_gaap/download_S20A.py --tract=9813 --outdir="/scratch/gpfs/am2907/Merian/gaap/"


To exit screen do `ctrl a d` and to reattach do `screen -r downloaddata`

---
# Step 3: Make slurm scripts and submit

Write one slurm script for each tract – each of which is a job array with one job for each patch. 
You can submit the scripts as you write them if you want, but beware that there is an upper limit for the number of jobs you can submit at once to the queue.

In [63]:
tracts_n708_nogaap[-1]

10428

In [61]:
for tract in tracts_n708_nogaap[78:]:
    deploy_training_job(tract, band = "N708", filter_jobs=5,
                        python_file='lambo/scripts/hsc_gaap/run_gaap.py',
                        name='gaap', email="am2907@princeton.edu", outname = None, 
                        repo='/scratch/gpfs/am2907/Merian/gaap', scriptdir="/scratch/gpfs/am2907/Merian/gaap/", 
                        submit=True, fixpatches=False)

Submitted batch job 9986322


In [76]:
for tract in tracts_n540_nogaap[:5]:
    deploy_training_job(tract, band = "N540", filter_jobs=5,
                        python_file='lambo/scripts/hsc_gaap/run_gaap.py',
                        name='gaap', email="am2907@princeton.edu", outname = None, 
                        repo='/scratch/gpfs/am2907/Merian/gaap', scriptdir="/scratch/gpfs/am2907/Merian/gaap/", 
                        submit=True, fixpatches=False)

sbatch: error: QOSMaxSubmitJobPerUserLimit
sbatch: error: Batch job submission failed: Job violates accounting/QOS policy (job submit limit, user's size and/or time limits)
sbatch: error: QOSMaxSubmitJobPerUserLimit
sbatch: error: Batch job submission failed: Job violates accounting/QOS policy (job submit limit, user's size and/or time limits)


Submitted batch job 9986326


KeyboardInterrupt: 

The gaap reduction will save one catalog for each patch to (for example):

        /scratch/gpfs/am2907/Merian/gaap/S20A/gaapTable/9813/0,0/objectTable_9813_0,0_S20A.fits

---
# Step 4: Check on it!

You can check on the logs while the jobs are running to check for any glaring problems:
- `logs/gaapPhot_array_9813_0.o` 
- `logs/gaap_9813_0.log`

One the jobs are done running (for a given tract), you can check how things went. 

In [71]:
print(tracts_notcompiled_N708[40:])

[9828, 9833, 9837, 9838, 9839, 9862, 9863, 9939, 9940, 9941, 9942, 9943, 9944, 9945, 9949, 9950, 9951, 9952, 9953, 10040, 10041, 10042, 10043, 10044, 10045, 10046, 10047, 10048, 10049, 10050, 10051, 10052, 10053, 10057, 10058, 10060, 10061, 10062, 10070, 10078, 10182, 10183, 10184, 10185, 10186, 10283, 10284, 10285, 10286, 10287, 10288, 10289, 10290, 10291, 10292, 10293, 10294, 10295, 10296, 10297, 10298, 10299, 10300, 10301, 10302, 10303, 10304, 10426, 10427, 10428]


In [69]:
for tract in tracts_notcompiled_N708[0:50]:
    problems = checkRun(tract, band="N708")

TRACT: 9618
NO PROBLEMS

TRACT: 9619
NO PROBLEMS

TRACT: 9620
NO PROBLEMS

TRACT: 9621
NO PROBLEMS

TRACT: 9697
NO PROBLEMS

TRACT: 9698
NO PROBLEMS

TRACT: 9699
NO PROBLEMS

TRACT: 9700
NO PROBLEMS

TRACT: 9701
NO PROBLEMS

TRACT: 9702
NO PROBLEMS

TRACT: 9703
NO PROBLEMS

TRACT: 9707
NO PROBLEMS

TRACT: 9708
NO PROBLEMS

TRACT: 9709
NO PROBLEMS

TRACT: 9710
NO PROBLEMS

TRACT: 9711
NO PROBLEMS

TRACT: 9712
NO PROBLEMS

TRACT: 9713
NO PROBLEMS

TRACT: 9714
NO PROBLEMS

TRACT: 9798
NO PROBLEMS

TRACT: 9799
NO PROBLEMS

TRACT: 9800
NO PROBLEMS

TRACT: 9801
NO PROBLEMS

TRACT: 9802
NO PROBLEMS

TRACT: 9803
NO PROBLEMS

TRACT: 9804
NO PROBLEMS

TRACT: 9805
NO PROBLEMS

TRACT: 9806
NO PROBLEMS

TRACT: 9807
NO PROBLEMS

TRACT: 9808
NO PROBLEMS

TRACT: 9809
NO PROBLEMS

TRACT: 9810
NO PROBLEMS

TRACT: 9811
NO PROBLEMS

TRACT: 9815
MISSING PATCHES

TRACT: 9816
MISSING PATCHES

TRACT: 9817
MISSING PATCHES

TRACT: 9818
MISSING PATCHES

TRACT: 9819
MISSING PATCHES

TRACT: 9820
MISSING PATCHES

T

In [75]:
checkRun(9812)

TRACT: 9812
NO PROBLEMS



array([], dtype=float64)

You might get issues like "Failed for 3 bands" - this could be because HSC images don't exist for all bands. So it might not be an issue you can fix!

---
# Step 4: Merge catalogs

If everything is looking good, you can merge the patch catalogs into a tract-level catalog. 

It's recommended to run this step in a screen in terminal, because it takes some time!

But here is an example:

In [28]:
compileCatalogs([9617], repo, alltracts=False, rewrite=False)

COMPILING CATALOG FOR TRACT 9617 WITH 13 PATCHES
COMPILED TABLE OF 279484 ROWS and 69 COLUMNS
WROTE TABLE TO /scratch/gpfs/am2907/Merian/gaap/S20A/gaapTable/9617/objectTable_9617_S20A.fits


        python3 lambo/scripts/hsc_gaap/compile_catalogs.py --tracts=="[9327,9328,9329,9813,9812]"

This will save a catalog to (for example):
        
        /scratch/gpfs/am2907/Merian/gaap/S20A/gaapTable/9813/objectTable_9813_S20A.fits

If you want to change the columns that are used for the compiled catalog, edit these files:

        lambo/scripts/hsc_gaap/keep_table_columns_gaap.txt
        lambo/scripts/hsc_gaap/keep_table_columns_merian.txt

And you're all done!