In [2]:
# This notebook shows how to run small!! colore boxes
# within the jupyter shared node.

# It also shows how to run corrfunc correlation function
# measurements on these boxes.

# For this notebook to work, picca_bookkeeper
# needs to be installed so the runs can be sent and logged
# properly with ease.
# It can be downloaded from https://github.com/cramirezpe/picca_bookkeeper

In [14]:
from pathlib import Path
from subprocess import run # To send runs 
from multiprocessing import Pool # To run multiple seeds in parallel

from picca_bookkeeper.tasker import get_Tasker # To organize runs outs and logs
from picca_bookkeeper.dict_utils import DictUtils # To check if two dicts are the same 

import libconf # To read CoLoRe config file.

# Run CoLoRe 

In [9]:
# We need the path to the CoLoRe executable to run it.
colore_executable = "/global/common/software/desi/users/cramirez/colore_snap_threshold/CoLoRe_snap/CoLoRe"

In [6]:
n_grid = 512 # Grid size
z_snap = 2.0 # Redshift 
overwrite_colore = False # Overwrite if already exists.
overwrite_corrf = False 

In [7]:
colore_runs = Path(
    "/pscratch/sd/c/cramirez/oxford_visit/test_boxes"
).resolve() # The resolve here is important because we don't want relative paths, only absolute

In [37]:
def get_colore_box_name(seed):
    return colore_runs / f"colore_box_{n_grid}_{seed}_v0" # Use anything here

In [46]:
def compute_colore(seed): # We define function to then run multiple seeds using multiprocessing
    colore_box = get_colore_box_name(seed)
    print(colore_box, seed)
    (colore_box / "results").mkdir(exist_ok=True, parents=True) # Create folder structure
    
    # Now we create the param.cfg for CoLoRe in a dict, so it can be customize
    param_cfg = {
        "global": {
            "prefix_out": f"{colore_box}/results/out",
            "output_format": "FITS",
            "output_density": True,
            "pk_filename": "/global/cfs/cdirs/desicollab/users/cramirez/LyaCoLoRe_mocks_inputs/CoLoRe/PlanckDR12_kmax_matterpower_z0.dat",
            "l_box": 1000.0,
            "z_snap": z_snap,
            "seed": seed,
        },
        "field_par": {
            "r_smooth": 0.001,
            "smooth_potential": True,
            "n_grid": n_grid,
            "dens_type": 2,
            "lpt_buffer_fraction": 0.6,
            "lpt_interp_type": 1,
            "output_lpt": 0,
        },
        "cosmo_par": {
            "omega_M": 0.3147,
            "omega_L": 0.6853,
            "omega_B": 0.04904,
            "h": 0.6731,
            "w": -1.0,
            "ns": 0.9655,
            "sigma_8": 0.83,
        },
        "srcs1": { # here we are adding multiple sources with different choices of bias and threshold.
            "ndens" : 0.0005,
            "bias" : 1,
            "threshold": 0.5
        },
        "srcs2": {
            "ndens" : 0.0005,
            "bias" : 2,
            "threshold": -1,
        },
        "srcs3": {
            "ndens" : 0.0005,
            "bias" : 3,
            "threshold": -1,
        },
        "srcs4": {
            "ndens" : 0.0005,
            "bias" : 4,
            "threshold": -1,
        },
        # Now the same with threshold 1
        "srcs5": {
            "ndens" : 0.0005,
            "bias" : 1,
            "threshold": 1.5,
        },
        "srcs6": {
            "ndens" : 0.0005,
            "bias" : 2,
            "threshold": 1.5,
        },
        "srcs7": {
            "ndens" : 0.0005,
            "bias" : 3,
            "threshold": 1.5,
        },
        "srcs8": {
            
            "ndens" : 0.0005,
            "bias" : 4,
            "threshold": 1.5,   
        },        
    }
    
    # If paramcfg already exists, we  need to check that the content is the same
    if (colore_box / "param.cfg").is_file():
        with open(colore_box / "param.cfg") as f:
            existing_config = libconf.load(f) # We use libconf to read the file
        
        diff = DictUtils.remove_empty(
            DictUtils.diff_dicts(existing_config, param_cfg)
        )
        if diff != dict():
            raise ValueError("Different param provided", diff)        
            
    with open(colore_box / "param.cfg", "w") as f:
        libconf.dump(param_cfg, f) # Write configuration to file.
        
    args = {
        "": str(colore_box / "param.cfg"), # This is the only terminal arg needed to run CoLoRe
    }
    
    # Create the logs directory
    colore_logs_dir = colore_box / "logs"
    colore_logs_dir.mkdir(exist_ok=True)
    
    # This is to appropiate set the output and error files
    # j will be subtituted by the time of execution
    # all of this is handled by picca_bookkeeper
    slurm_header_args = dict(
        output=str(colore_logs_dir / "CoLoRe-%j.out"),
        error=str(colore_logs_dir / "CoLoRe-%j.err"),
    )
    
    # Create the scripts directory
    colore_scripts_dir = colore_box / "scripts"
    colore_scripts_dir.mkdir(exist_ok=True)
    
    # Create the tasker instance that will be responsible of sending the job.
    tasker = get_Tasker("bash")( # bash means: do not run it in a computing node.
        command = colore_executable,
        command_args = args,
        environment = "picca", # Name or path to the conda environment to be activated through ``source/conda activate``
        slurm_header_args = slurm_header_args,
        jobid_log_file = None, # This is only used for chaining slurm jobs, not needed here.
        run_file = colore_scripts_dir.resolve() / "run_colore.sh", # This is the file that will be executed
    )
    
    if len(list((colore_box/"results").glob("out_srcs*fits"))) == 0 or overwrite_colore:
        # If there are no results, we just run the job
        tasker.write_job()
        tasker.send_job()

        if tasker.retcode.returncode != 0:
            # If the task got a non-zero return code, we raise an error
            # If the return code is 0, it doesnt' mean that the task was successful
            raise ValueError(tasker.retcode)
    else:
        print("Skipping CoLoRe")
           
    return colore_box    

In [48]:
# Opening the multiprocessing tool,
# the use of imap allows us to see
# which runs are finished

nproc_colore = 4 # This should be keep to a small value to avoid running out of memory
pool = Pool(nproc_colore)
for out in pool.imap(
    compute_colore,
    [seed for seed in range(5)]
):
    print(out)


/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_0_v0/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_3_v0/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_2_v0/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_1_v0    0213



/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_4_v0 4
/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_0_v0
/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_1_v0
/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_2_v0
/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_3_v0
/pscratch/sd/c/cramirez/oxford_visit/test_boxes/colore_box_512_4_v0


# Run Corrf

In [49]:
# In order to make this work, make sure that you can run corrf. 
# The README in https://github.com/cramirezpe/modules-corrf-analysis
# details which packages are required for it.

In [50]:
rsd = True # If we want to compute correlations to RSD quasars.
source = 3 # Source number to compute correlations from.

In [53]:
def compute_corrf(seed):
    print("Corrf", seed)
    
    # Define paths where to store the results.
    colore_box = get_colore_box_name(seed)
    rsd_string = "rsd" if rsd else "no_rsd"
    corrf_directory = colore_box / "Corrf" / f"source_{source}{rsd_string}" 
    (corrf_directory / "results").mkdir(exist_ok=True, parents=True) 
    
    # The actual script that we will be running is: 
    # https://github.com/cramirezpe/modules-corrf-analysis/blob/master/CoLoRe_corrf_analysis/compute_correlations.py
    # The options available can also be found by running in a terminal
    # CoLoRe_corrf_run_correlations --help
    # Having the previous packaged installed.
    args = {
        "data": str(colore_box / "results" / f"out_srcs_s{source}_*"),
        "log-level": "DEBUG", # We can select how fine we want the logs to be
        "data-format": "CoLoRe", # Correlations can also be computed from catalogs (see script)
        "out-dir": str(corrf_directory / "results"),
        "grid-format": "snapshot", # Need this to run Corrf on CoLoRe snapshot. It can also be run in lightcones.
        "min-bin": 0.1, # min-bin and max-bin defines the range of r (in Mpc/h)
        "max-bin": 200,
        "n-bins": 41,
        "compute-npoles": "0 2 4", # Corrf compute counts as a 2D matrix (rper rpar counts). We can pre-compute multipoles afterwards using this option.
        "velocity-boost": 1, # RSD velocities can be boosted, and it might be needed to build the final mocks. 
        "box-size": 1000, # Should of course match the one from CoLoRe
        "store-generated-rands": "", # Randoms are not really needed in snapshot, but I haven't implemented the code to not using them yet.
    }
        
    if not rsd:
        # Extra args in case we don't want to compute RSD
        args["data-norsd"] = ""
        
    logs_dir = corrf_directory / "logs"
    logs_dir.mkdir(exist_ok=True)
    
    slurm_header_args = dict(
        output=str(logs_dir / "corrf-%j.out"),
        error=str(logs_dir / "corrf-%j.err"),
    )
    
    scripts_dir = corrf_directory / "scripts"
    scripts_dir.mkdir(exist_ok=True)
    
    tasker = get_Tasker("bash")( # bash means: do not run it in a computing node.
        command = "CoLoRe_corrf_run_correlations",
        command_args = args,
        environment = "picca",
        slurm_header_args = slurm_header_args,
        jobid_log_file = None,
        run_file = scripts_dir.resolve() / "run_corrf.sh",
    )
    
    if len(list((corrf_directory / "results").glob("*.dat"))) == 0 or overwrite_config:
        tasker.write_job()
        tasker.send_job()
        
        if tasker.retcode.returncode != 0:
            raise ValueError(tasker.retcode)
            
    else:
        print("Skipping Corrf")

In [54]:
# Opening the multiprocessing tool,
# the use of imap allows us to see
# which runs are finished

nproc_corrf = 5 # Not sure about the best value here.
pool = Pool(nproc_colore)
for out in pool.imap(
    compute_corrf,
    [seed for seed in range(5)]
):
    print(out)


CorrfCorrfCorrfCorrf    2130



Corrf 4
None
None
None
None
None
