Skip to content

Commit

Permalink
Merge 6c9608d into 80df0c0
Browse files Browse the repository at this point in the history
  • Loading branch information
iprafols committed May 16, 2022
2 parents 80df0c0 + 6c9608d commit 5d798ce
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 63 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ py/picca\.egg-info/
*.log

py/picca/tests/delta_extraction/results/

*.drawio.bkp
Binary file modified docs/delta_extraction/data_model/data_model-Data.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/delta_extraction/data_model/data_model-FullDataModel.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/delta_extraction/data_model/data_model.drawio

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion docs/delta_extraction/data_model/~$data_model.drawio.bkp

This file was deleted.

18 changes: 15 additions & 3 deletions py/picca/delta_extraction/data_catalogues/desi_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module defines the class DesiData to load DESI data
"""
import logging
import multiprocessing
import numpy as np

from picca.delta_extraction.astronomical_objects.desi_forest import DesiForest
Expand All @@ -18,8 +19,10 @@

accepted_options = sorted(
list(
set(accepted_options + accepted_options_quasar_catalogue +
["blinding", "use non-coadded spectra", "wave solution"])))
set(accepted_options + accepted_options_quasar_catalogue + [
"blinding", "num processors", "use non-coadded spectra",
"wave solution"
])))

defaults = defaults.copy()
defaults.update({
Expand All @@ -46,7 +49,8 @@ def merge_new_forest(forests_by_targetid, new_forests_by_targetid):
forests_by_targetid
"""
parent_targetids = set(forests_by_targetid.keys())
existing_targetids = parent_targetids.intersection(new_forests_by_targetid.keys())
existing_targetids = parent_targetids.intersection(
new_forests_by_targetid.keys())
new_targetids = new_forests_by_targetid.keys() - existing_targetids

# Does not fail if existing_targetids is empty
Expand Down Expand Up @@ -102,6 +106,7 @@ def __init__(self, config):

# load variables from config
self.blinding = None
self.num_processors = None
self.use_non_coadded_spectra = None
self.__parse_config(config)

Expand Down Expand Up @@ -136,6 +141,13 @@ def __parse_config(self, config):
f"are {ACCEPTED_BLINDING_STRATEGIES}. "
f"Found '{self.blinding}'")

self.num_processors = config.getint("num processors")
if self.num_processors is None:
raise DataError(
"Missing argument 'num processors' required by DesiData")
if self.num_processors == 0:
self.num_processors = (multiprocessing.cpu_count() // 2)

self.use_non_coadded_spectra = config.getboolean(
"use non-coadded spectra")
if self.use_non_coadded_spectra is None:
Expand Down
26 changes: 0 additions & 26 deletions py/picca/delta_extraction/data_catalogues/desi_healpix.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
defaults, accepted_options)
from picca.delta_extraction.errors import DataError

accepted_options = sorted(list(set(accepted_options + ["num processors"])))


class DesiHealpix(DesiData):
"""Reads the spectra from DESI using healpix mode and formats its data as a
Expand Down Expand Up @@ -51,35 +49,11 @@ def __init__(self, config):
"""
self.logger = logging.getLogger(__name__)

self.num_processors = None
self.__parse_config(config)

# init of DesiData needs to come last, as it contains the actual data
# reading and thus needs all config
super().__init__(config)

if self.analysis_type == "PK 1D":
DesiPk1dForest.update_class_variables()

def __parse_config(self, config):
"""Parse the configuration options
Arguments
---------
config: configparser.SectionProxy
Parsed options to initialize class
Raise
-----
DataError upon missing required variables
"""
self.num_processors = config.getint("num processors")
if self.num_processors is None:
raise DataError(
"Missing argument 'num processors' required by DesiHealpix")
if self.num_processors == 0:
self.num_processors = (multiprocessing.cpu_count() // 2)

def get_filename(self, survey, healpix):
"""Get the name of the file to read
Expand Down
48 changes: 32 additions & 16 deletions py/picca/delta_extraction/data_catalogues/desi_tile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import logging
import glob
import multiprocessing

import fitsio
import numpy as np
Expand All @@ -13,6 +14,8 @@
defaults, accepted_options)
from picca.delta_extraction.errors import DataError

accepted_options = sorted(list(set(accepted_options + ["num processors"])))


class DesiTile(DesiData):
"""Reads the spectra from DESI using tile mode and formats its data as a
Expand Down Expand Up @@ -95,22 +98,35 @@ def read_data(self):
filenames.append(file_in)
filenames = np.unique(filenames)

# TODO: add parallelisation here
num_data = 0
reader = DesiTileFileHandler(self.analysis_type,
self.use_non_coadded_spectra, self.logger,
self.input_directory)
for index, filename in enumerate(filenames):
forests_by_targetid_aux, num_data_aux = reader(
(filename, self.catalogue))
merge_new_forest(forests_by_targetid, forests_by_targetid_aux)
num_data += num_data_aux
self.logger.progress(
f"read tile {index} of {len(filename)}. ndata: {num_data}")

self.logger.progress(f"Found {num_data} quasars in input files")

if num_data == 0:
if self.num_processors > 1:
arguments = [(filename, self.catalogue) for filename in filenames]
context = multiprocessing.get_context('fork')
with context.Pool(processes=self.num_processors) as pool:
imap_it = pool.imap(
DesiTileFileHandler(self.analysis_type,
self.use_non_coadded_spectra,
self.logger, self.input_directory),
arguments)
for forests_by_targetid_aux, _ in imap_it:
# Merge each dict to master forests_by_targetid
merge_new_forest(forests_by_targetid,
forests_by_targetid_aux)
else:
num_data = 0
reader = DesiTileFileHandler(self.analysis_type,
self.use_non_coadded_spectra,
self.logger, self.input_directory)
for index, filename in enumerate(filenames):
forests_by_targetid_aux, num_data_aux = reader(
(filename, self.catalogue))
merge_new_forest(forests_by_targetid, forests_by_targetid_aux)
num_data += num_data_aux
self.logger.progress(
f"read tile {index} of {len(filename)}. ndata: {num_data}")

self.logger.progress(f"Found {num_data} quasars in input files")

if len(forests_by_targetid) == 0:
raise DataError("No Quasars found, stopping here")

self.forests = list(forests_by_targetid.values())
Expand Down
138 changes: 122 additions & 16 deletions py/picca/tests/delta_extraction/data_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
from picca.delta_extraction.astronomical_objects.desi_forest import DesiForest
from picca.delta_extraction.astronomical_objects.desi_pk1d_forest import DesiPk1dForest
from picca.delta_extraction.astronomical_objects.sdss_forest import SdssForest
from picca.delta_extraction.config import default_config
from picca.delta_extraction.data import Data
from picca.delta_extraction.data import defaults as defaults_data
from picca.delta_extraction.data import accepted_analysis_type
from picca.delta_extraction.data_catalogues.desi_data import DesiData
from picca.delta_extraction.data_catalogues.desi_data import defaults as defaults_desi_data
from picca.delta_extraction.data_catalogues.desi_data import accepted_options as accepted_options_desi_data
from picca.delta_extraction.data_catalogues.desi_healpix import DesiHealpix, DesiHealpixFileHandler
from picca.delta_extraction.data_catalogues.desi_healpix import defaults as defaults_desi_healpix
from picca.delta_extraction.data_catalogues.desi_tile import DesiTile
Expand Down Expand Up @@ -591,6 +593,7 @@ def test_desi_data(self):
"keep surveys": "all special",
"input directory": f"{THIS_DIR}/data/",
"out dir": f"{THIS_DIR}/results/",
"num processors": 1,
}})
for key, value in defaults_desi_data.items():
if key not in config["data"]:
Expand Down Expand Up @@ -632,10 +635,23 @@ def test_desi_data_parse_config(self):
data._DesiData__parse_config(config["data"])
self.compare_error_message(context_manager, expected_message)

# run __parse_config with missing num_processors
config = ConfigParser()
config.read_dict({"data": {
"blinding": "none",
}})
expected_message = (
"Missing argument 'num processors' required by DesiData"
)
with self.assertRaises(DataError) as context_manager:
data._DesiData__parse_config(config["data"])
self.compare_error_message(context_manager, expected_message)

# run __parse_config with missing 'use_non_coadded_spectra'
config = ConfigParser()
config.read_dict({"data": {
"blinding": "none",
"num processors": 1,
}})
expected_message = (
"Missing argument 'use non-coadded spectra' required by DesiData"
Expand All @@ -651,6 +667,9 @@ def test_desi_data_parse_config(self):
for key, value in defaults_desi_data.items():
if key not in config["data"]:
config["data"][key] = str(value)
for key, value in default_config.get("general").items():
if key in accepted_options_desi_data and key not in config["data"]:
config["data"][key] = str(value)
data._DesiData__parse_config(config["data"])

# check loading with the wrong blinding
Expand Down Expand Up @@ -816,21 +835,6 @@ def test_desi_healpix(self):

def test_desi_healpix_parse_config(self):
"""Test method __parse_config from DesiHealpix"""
# create a DesiHealpix with missing num_processors
config = ConfigParser()
config.read_dict({"data": {
"catalogue": f"{THIS_DIR}/data/QSO_cat_fuji_dark_healpix.fits.gz",
"keep surveys": "all",
"input directory": f"{THIS_DIR}/data/",
"out dir": f"{THIS_DIR}/results/",
"use non-coadded spectra": False,
}})
expected_message = (
"Missing argument 'num processors' required by DesiHealpix"
)
with self.assertRaises(DataError) as context_manager:
DesiHealpix(config["data"])
self.compare_error_message(context_manager, expected_message)

# create a DesiHealpix with missing Data options
config = ConfigParser()
Expand Down Expand Up @@ -1116,7 +1120,8 @@ def test_desi_tile(self):
"catalogue": f"{THIS_DIR}/data/QSO_cat_fuji_dark_tile.fits.gz",
"input directory": f"{THIS_DIR}/data/tile/cumulative",
"out dir": f"{THIS_DIR}/results/",
"use non-coadded spectra": "False",
"use non-coadded spectra": False,
"num processors": 1,
}})
for key, value in defaults_desi_tile.items():
if key not in config["data"]:
Expand Down Expand Up @@ -1145,6 +1150,107 @@ def test_desi_tile_parse_config(self):
DesiTile(config["data"])
self.compare_error_message(context_manager, expected_message)

def test_desi_tile_read_data(self):
"""Test method read_data from DesiTile"""
# run with one processor; case: using coadds
config = ConfigParser()
config.read_dict({"data": {
"catalogue": f"{THIS_DIR}/data/QSO_cat_fuji_dark_tile.fits.gz",
"input directory": f"{THIS_DIR}/data/tile/cumulative",
"out dir": f"{THIS_DIR}/results/",
"num processors": 1,
}})
for key, value in defaults_desi_tile.items():
if key not in config["data"]:
config["data"][key] = str(value)

data = DesiTile(config["data"])

self.assertTrue(len(data.forests) == 10)

# run with 0 processors; case: using coadds
config = ConfigParser()
config.read_dict({"data": {
"catalogue": f"{THIS_DIR}/data/QSO_cat_fuji_dark_tile.fits.gz",
"input directory": f"{THIS_DIR}/data/tile/cumulative",
"out dir": f"{THIS_DIR}/results/",
"num processors": 0,
}})
for key, value in defaults_desi_tile.items():
if key not in config["data"]:
config["data"][key] = str(value)

data = DesiTile(config["data"])

self.assertTrue(len(data.forests) == 10)

# run with 2 processors; case: using coadds
config = ConfigParser()
config.read_dict({"data": {
"catalogue": f"{THIS_DIR}/data/QSO_cat_fuji_dark_tile.fits.gz",
"input directory": f"{THIS_DIR}/data/tile/cumulative",
"out dir": f"{THIS_DIR}/results/",
"num processors": 2,
}})
for key, value in defaults_desi_tile.items():
if key not in config["data"]:
config["data"][key] = str(value)

data = DesiTile(config["data"])

self.assertTrue(len(data.forests) == 10)

# run with one processor; case: using individual spectra
config = ConfigParser()
config.read_dict({"data": {
"catalogue": f"{THIS_DIR}/data/QSO_cat_fuji_dark_tile.fits.gz",
"input directory": f"{THIS_DIR}/data/tile/cumulative",
"out dir": f"{THIS_DIR}/results/",
"use non-coadded spectra": False,
"num processors": 1,
}})
for key, value in defaults_desi_tile.items():
if key not in config["data"]:
config["data"][key] = str(value)

data = DesiTile(config["data"])

self.assertTrue(len(data.forests) == 10)

# run with 0 processors; case: using individual spectra
config = ConfigParser()
config.read_dict({"data": {
"catalogue": f"{THIS_DIR}/data/QSO_cat_fuji_dark_tile.fits.gz",
"input directory": f"{THIS_DIR}/data/tile/cumulative",
"out dir": f"{THIS_DIR}/results/",
"use non-coadded spectra": False,
"num processors": 0,
}})
for key, value in defaults_desi_tile.items():
if key not in config["data"]:
config["data"][key] = str(value)

data = DesiTile(config["data"])

self.assertTrue(len(data.forests) == 10)

# run with 2 processors; case: using individual spectra
config = ConfigParser()
config.read_dict({"data": {
"catalogue": f"{THIS_DIR}/data/QSO_cat_fuji_dark_tile.fits.gz",
"input directory": f"{THIS_DIR}/data/tile/cumulative",
"out dir": f"{THIS_DIR}/results/",
"use non-coadded spectra": False,
"num processors": 2,
}})
for key, value in defaults_desi_tile.items():
if key not in config["data"]:
config["data"][key] = str(value)

data = DesiTile(config["data"])

self.assertTrue(len(data.forests) == 10)

def test_desisim_mocks(self):
"""Test DesisimMocks"""
# case: BAO 3D
Expand Down

0 comments on commit 5d798ce

Please sign in to comment.