In [1]:
from pathlib import Path
from subprocess import run # To send runs 
from multiprocessing import Pool # To run multiple seeds in parallel
import numpy as np

from picca_bookkeeper.tasker import get_Tasker # To organize runs outs and logs
from picca_bookkeeper.dict_utils import DictUtils # To check if two dicts are the same 

import libconf # To read CoLoRe config file.

## Generate inputs

# Run CoLoRe 

In [3]:
# We need the path to the CoLoRe executable to run it.
colore_executables = {
    "exp_bias": "/pscratch/sd/l/lauracdp/CoLoRe_threshold/CoLoRe",
}

for item in colore_executables.values():
    assert Path(item).is_file()

In [4]:
n_grid = 4096 # Grid size
overwrite_colore = False # Overwrite if already exists.
overwrite_corrf = False 
overwrite_config=True

In [5]:
here = Path('.').resolve()


In [6]:
colore_runs = here / "v1/error_boxes"
colore_runs.mkdir(exist_ok=True, parents=True)

In [23]:
def get_colore_box_name(seed, version):
    box_number = seed-2024
    return colore_runs / f"box-{box_number}" # Use anything here

In [24]:
def compute_colore(seed, version): # We define function to then run multiple seeds using multiprocessing
    colore_box = get_colore_box_name(seed, version)
    print(colore_box, seed)
    (colore_box / "results").mkdir(exist_ok=True, parents=True) # Create folder structure
    
    # Now we create the param.cfg for CoLoRe in a dict, so it can be customize
    param_cfg = {
        "global": {
            "prefix_out": f"{colore_box}/results/out",
            "output_format": "FITS",
            "output_density": False,
            "pk_filename": "/path/to/smoothed_mu0_NL_matter_powerspectrum_DR12.dat",
            "z_min": 1.6,
            "z_max": 3.79,
            "write_pred": False,
            "just_write_pred": False,
            "seed": seed,
            "pred_dz": 0.1,
        },
        "field_par": {
            "r_smooth": 2.0,
            "smooth_potential": True,
            "n_grid": n_grid,
            "dens_type": 0,
            "lpt_buffer_fraction": 0.6,
            "lpt_interp_type": 1,
            "output_lpt": 0,
        },
        "cosmo_par": {
            "omega_M": 0.3147,
            "omega_L": 0.6853,
            "omega_B": 0.04904,
            "h": 0.6731,
            "w": -1.0,
            "ns": 0.9655,
            "sigma_8": 0.83,
        },
        "srcs1": { # here we are adding multiple sources with different choices of bias and threshold.
            "nz_filename" : str(here / "Nz.txt"),
            "bias_filename": str(here / f"/path/to/bias_colore-ql.txt"),
            "threshold_filename": str(here / f"/path/to/threshold_colore-ql.txt"),
            "include_shear": False,
            "include_lensing": False,
            "store_skewers": True,
            "gaussian_skewers": True,
        },

            
    }
    
    # If paramcfg already exists, we  need to check that the content is the same
    if (colore_box / "param.cfg").is_file():
        with open(colore_box / "param.cfg") as f:
            existing_config = libconf.load(f) # We use libconf to read the file
        
        diff = DictUtils.remove_empty(
            DictUtils.diff_dicts(existing_config, param_cfg)
        )
        if diff != dict():
            raise ValueError("Different param provided", diff)        
            
    with open(colore_box / "param.cfg", "w") as f:
        libconf.dump(param_cfg, f) # Write configuration to file.
        
    args = {
        "": str(colore_box / "param.cfg"), # This is the only terminal arg needed to run CoLoRe
    }
    
    # Create the logs directory
    colore_logs_dir = colore_box / "logs"
    colore_logs_dir.mkdir(exist_ok=True)
    
    # This is to appropiate set the output and error files
    # j will be subtituted by the time of execution
    # all of this is handled by picca_bookkeeper
    slurm_header_args = dict(
        qos="regular",
        nodes=8,
        time = "00:30:00",
        output=str(colore_logs_dir / "CoLoRe-%j.out"),
        error=str(colore_logs_dir / "CoLoRe-%j.err"),
        constraint = "cpu",
        account = "desi",
    )
    slurm_header_args["job-name"] = "CoLoRe"
    slurm_header_args["ntasks-per-node"] = 8
    
    # Create the scripts directory
    colore_scripts_dir = colore_box / "scripts"
    colore_scripts_dir.mkdir(exist_ok=True)
    
    # Create the tasker instance that will be responsible of sending the job.
    tasker = get_Tasker("slurm_perlmutter")( # bash means: do not run it in a computing node.
        command = colore_executables[version],
        command_args = args,
        environment = "colore_env", # Name or path to the conda environment to be activated through ``source/conda activate``
        slurm_header_args = slurm_header_args,
        jobid_log_file = colore_logs_dir / "jobids.log", # This is only used for chaining slurm jobs, not needed here.
        run_file = colore_scripts_dir.resolve() / "run_colore.sh", # This is the file that will be executed
        force_OMP_threads = 128
    )
    
    if len(list((colore_box/"results").glob("out_srcs*fits"))) == 0 or overwrite_colore:
        # If there are no results, we just run the job
        tasker.write_job()
        tasker.send_job()

    else:
        print("Skipping CoLoRe")
           
    return seed, tasker.jobid

In [20]:
jobids = {
    0 : None
}

In [21]:
#seed_list = np.round(np.linspace(2050, 2074, 23)).astype(int)  # Ensuring seed_list is integers
#print(seed_list)

In [13]:
# Opening the multiprocessing tool,
# the use of imap allows us to see
# which runs are finished
version = "base"
seed_list = np.linspace(2237, 2252, 15, dtype=int)
jobids = dict()
print(seed_list)

nproc_colore = 1# This should be keep to a small value to avoid running out of memory
with Pool(nproc_colore) as pool:
    for result in [pool.apply_async(
        compute_colore,
        args=(int(seed), version)
    ) for seed in seed_list for version in ["exp_bias",]]:
        seed, jobid = result.get()
        print(seed, jobid)
        jobids[int(seed)] = jobid


[2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250
 2252]
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-213 2237
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-214 2238
2237 31842120
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-215 2239
2238 31842121
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-216 2240
2239 31842122
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-217 2241
2240 31842125
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-218 2242
2241 31842127
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-219 2243
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-220 2244
2242 31842129
2243 31842130
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-221 2245
2244 31842131
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-222 2246
2245 31842132
/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-223 2247
2246 31842135
/pscratch/sd/l/lauracdp

In [25]:
# Opening the multiprocessing tool,
# the use of imap allows us to see
# which runs are finished
version = "base"
seed_list=1
jobids = dict()
nproc_colore = 1# This should be keep to a small value to avoid running out of memory
with Pool(nproc_colore) as pool:
    for result in [pool.apply_async(
        compute_colore,
        args=(seed, version)
    ) for seed in range(seed_list) for version in ["exp_bias",]]:
        seed, jobid = result.get()
        print(seed, jobid)
        jobids[seed] = jobid


/pscratch/sd/l/lauracdp/New_boxes_CoLoRe/v1/error_boxes/box-275 0
0 31846431


In [94]:
jobids
#

{0: 27720822}