Skip to content

Commit

Permalink
Do not expand wildcards for datasets of derived variables where not a…
Browse files Browse the repository at this point in the history
…ll input variables are available (#2374)

Co-authored-by: Valeriu Predoi <valeriu.predoi@gmail.com>
  • Loading branch information
schlunma and valeriupredoi committed Apr 3, 2024
1 parent 6429a41 commit e955995
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 99 deletions.
8 changes: 4 additions & 4 deletions esmvalcore/_recipe/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from .to_datasets import (
_derive_needed,
_get_input_datasets,
_representative_dataset,
_representative_datasets,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -116,7 +116,7 @@ def _update_target_levels(dataset, datasets, settings):
del settings['extract_levels']
else:
target_ds = _select_dataset(dataset_name, datasets)
representative_ds = _representative_dataset(target_ds)
representative_ds = _representative_datasets(target_ds)[0]
check.data_availability(representative_ds)
settings['extract_levels']['levels'] = get_reference_levels(
representative_ds)
Expand All @@ -133,8 +133,8 @@ def _update_target_grid(dataset, datasets, settings):
if dataset.facets['dataset'] == grid:
del settings['regrid']
elif any(grid == d.facets['dataset'] for d in datasets):
representative_ds = _representative_dataset(
_select_dataset(grid, datasets))
representative_ds = _representative_datasets(
_select_dataset(grid, datasets))[0]
check.data_availability(representative_ds)
settings['regrid']['target_grid'] = representative_ds
else:
Expand Down
139 changes: 92 additions & 47 deletions esmvalcore/_recipe/to_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,60 +404,105 @@ def datasets_from_recipe(
def _dataset_from_files(dataset: Dataset) -> list[Dataset]:
"""Replace facet values of '*' based on available files."""
result: list[Dataset] = []
errors = []
errors: list[str] = []

if any(_isglob(f) for f in dataset.facets.values()):
logger.debug(
"Expanding dataset globs for dataset %s, "
"this may take a while..", dataset.summary(shorten=True))

repr_dataset = _representative_dataset(dataset)
for repr_ds in repr_dataset.from_files():
updated_facets = {}
failed = {}
for key, value in dataset.facets.items():
if _isglob(value):
if key in repr_ds.facets and not _isglob(repr_ds[key]):
updated_facets[key] = repr_ds.facets[key]
else:
failed[key] = value

if failed:
msg = ("Unable to replace " +
", ".join(f"{k}={v}" for k, v in failed.items()) +
f" by a value for\n{dataset}")
# Set supplementaries to [] to avoid searching for supplementary
# files.
repr_ds.supplementaries = []
if repr_ds.files:
paths_msg = "paths to " if any(
isinstance(f, LocalFile) for f in repr_ds.files) else ""
msg = (f"{msg}\nDo the {paths_msg}the files:\n" +
"\n".join(f"{f} with facets: {f.facets}"
for f in repr_ds.files) +
"\nprovide the missing facet values?")
else:
timerange = repr_ds.facets.get('timerange')
patterns = repr_ds._file_globs
msg = (
f"{msg}\nNo files found matching:\n" +
"\n".join(str(p) for p in patterns) + # type:ignore
(f"\nwithin the requested timerange {timerange}."
if timerange else ""))
errors.append(msg)
continue

new_ds = dataset.copy()
new_ds.facets.update(updated_facets)
new_ds.supplementaries = repr_ds.supplementaries
result.append(new_ds)
representative_datasets = _representative_datasets(dataset)

# For derived variables, representative_datasets might contain more than
# one element
all_datasets: list[list[tuple[dict, Dataset]]] = []
for representative_dataset in representative_datasets:
all_datasets.append([])
for expanded_ds in representative_dataset.from_files():
updated_facets = {}
unexpanded_globs = {}
for key, value in dataset.facets.items():
if _isglob(value):
if (key in expanded_ds.facets and
not _isglob(expanded_ds[key])):
updated_facets[key] = expanded_ds.facets[key]
else:
unexpanded_globs[key] = value

if unexpanded_globs:
msg = _report_unexpanded_globs(
dataset, expanded_ds, unexpanded_globs
)
errors.append(msg)
continue

new_ds = dataset.copy()
new_ds.facets.update(updated_facets)
new_ds.supplementaries = expanded_ds.supplementaries

all_datasets[-1].append((updated_facets, new_ds))

# If globs have been expanded, only consider those datasets that contain
# all necessary input variables if derivation is necessary
for (updated_facets, new_ds) in all_datasets[0]:
other_facets = [[d[0] for d in ds] for ds in all_datasets[1:]]
if all(updated_facets in facets for facets in other_facets):
result.append(new_ds)
else:
logger.debug(
"Not all necessary input variables to derive '%s' are "
"available for dataset %s",
dataset['short_name'],
updated_facets,
)

if errors:
raise RecipeError("\n".join(errors))

return result


def _report_unexpanded_globs(
unexpanded_ds: Dataset,
expanded_ds: Dataset,
unexpanded_globs: dict,
) -> str:
"""Get error message for unexpanded globs."""
msg = (
"Unable to replace " +
", ".join(f"{k}={v}" for k, v in unexpanded_globs.items()) +
f" by a value for\n{unexpanded_ds}"
)

# Set supplementaries to [] to avoid searching for supplementary files
expanded_ds.supplementaries = []

if expanded_ds.files:
if any(isinstance(f, LocalFile) for f in expanded_ds.files):
paths_msg = "paths to the "
else:
paths_msg = ""
msg = (
f"{msg}\nDo the {paths_msg}files:\n" +
"\n".join(
f"{f} with facets: {f.facets}" for f in expanded_ds.files
) +
"\nprovide the missing facet values?"
)
else:
timerange = expanded_ds.facets.get('timerange')
patterns = expanded_ds._file_globs
msg = (
f"{msg}\nNo files found matching:\n" +
"\n".join(str(p) for p in patterns) + ( # type:ignore
f"\nwithin the requested timerange {timerange}."
if timerange else ""
)
)

return msg


def _derive_needed(dataset: Dataset) -> bool:
"""Check if dataset needs to be derived from other datasets."""
if not dataset.facets.get('derive'):
Expand Down Expand Up @@ -512,11 +557,11 @@ def _get_input_datasets(dataset: Dataset) -> list[Dataset]:
return datasets


def _representative_dataset(dataset: Dataset) -> Dataset:
"""Find a representative dataset that has files available."""
def _representative_datasets(dataset: Dataset) -> list[Dataset]:
"""Find representative datasets for all input variables."""
copy = dataset.copy()
copy.supplementaries = []
datasets = _get_input_datasets(copy)
representative_dataset = datasets[0]
representative_dataset.supplementaries = dataset.supplementaries
return representative_dataset
representative_datasets = _get_input_datasets(copy)
for representative_dataset in representative_datasets:
representative_dataset.supplementaries = dataset.supplementaries
return representative_datasets
106 changes: 74 additions & 32 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,43 +40,72 @@ def create_test_file(filename, tracking_id=None):


def _get_files(root_path, facets, tracking_id):
"""Return dummy files.
Wildcards are only supported for `dataset` and `institute`; in this case
return files for the two "models" AAA and BBB.
"""
if facets['dataset'] == '*':
all_facets = [
{**facets, 'dataset': 'AAA', 'institute': 'A'},
{**facets, 'dataset': 'BBB', 'institute': 'B'},
]
else:
all_facets = [facets]

# Globs without expanded facets
dir_template = _select_drs('input_dir', facets['project'])
file_template = _select_drs('input_file', facets['project'])
dir_globs = _replace_tags(dir_template, facets)
file_globs = _replace_tags(file_template, facets)
filename = Path(file_globs[0]).name
filename = str(root_path / 'input' / filename)
filenames = []
if filename.endswith('[_.]*nc'):
# Restore when we support filenames with no dates
# filenames.append(filename.replace('[_.]*nc', '.nc'))
filename = filename.replace('[_.]*nc', '_*.nc')
if filename.endswith('*.nc'):
filename = filename[:-len('*.nc')] + '_'
if facets['frequency'] == 'fx':
intervals = ['']
globs = sorted(
root_path / 'input' / d / f for d in dir_globs for f in file_globs
)

files = []
for expanded_facets in all_facets:
filenames = []
dir_template = _select_drs('input_dir', expanded_facets['project'])
file_template = _select_drs('input_file', expanded_facets['project'])
dir_globs = _replace_tags(dir_template, expanded_facets)
file_globs = _replace_tags(file_template, expanded_facets)
filename = (
str(root_path / 'input' / dir_globs[0] / Path(file_globs[0]).name)
)

if filename.endswith('[_.]*nc'):
# Restore when we support filenames with no dates
# filenames.append(filename.replace('[_.]*nc', '.nc'))
filename = filename.replace('[_.]*nc', '_*.nc')

if filename.endswith('*.nc'):
filename = filename[:-len('*.nc')] + '_'
if facets['frequency'] == 'fx':
intervals = ['']
else:
intervals = [
'1990_1999',
'2000_2009',
'2010_2019',
]
for interval in intervals:
filenames.append(filename + interval + '.nc')
else:
intervals = [
'1990_1999',
'2000_2009',
'2010_2019',
]
for interval in intervals:
filenames.append(filename + interval + '.nc')
else:
filenames.append(filename)
filenames.append(filename)

if 'timerange' in facets:
filenames = _select_files(filenames, facets['timerange'])
if 'timerange' in facets:
filenames = _select_files(filenames, facets['timerange'])

for filename in filenames:
create_test_file(filename, next(tracking_id))
for filename in filenames:
create_test_file(filename, next(tracking_id))

files = []
for filename in filenames:
file = LocalFile(filename)
file.facets = facets
files.append(file)
for filename in filenames:
file = LocalFile(filename)
file.facets = expanded_facets
files.append(file)

return files, file_globs
return files, globs


@pytest.fixture
Expand All @@ -100,6 +129,15 @@ def find_files(*, debug: bool = False, **facets):

@pytest.fixture
def patched_failing_datafinder(tmp_path, monkeypatch):
"""Failing data finder.
Do not return files for:
- fx files
- Variable rsutcs for model AAA
Otherwise, return files just like `patched_datafinder`.
"""

def tracking_ids(i=0):
while True:
Expand All @@ -112,8 +150,12 @@ def find_files(*, debug: bool = False, **facets):
files, file_globs = _get_files(tmp_path, facets, tracking_id)
if 'fx' == facets['frequency']:
files = []
returned_files = []
for file in files:
if not ('AAA' in file.name and 'rsutcs' in file.name):
returned_files.append(file)
if debug:
return files, file_globs
return files
return returned_files, file_globs
return returned_files

monkeypatch.setattr(esmvalcore.local, 'find_files', find_files)

0 comments on commit e955995

Please sign in to comment.