Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not expand wildcards for datasets of derived variables where not all input variables are available #2374

Merged
merged 9 commits into from
Apr 3, 2024
8 changes: 4 additions & 4 deletions esmvalcore/_recipe/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from .to_datasets import (
_derive_needed,
_get_input_datasets,
_representative_dataset,
_representative_datasets,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -116,7 +116,7 @@ def _update_target_levels(dataset, datasets, settings):
del settings['extract_levels']
else:
target_ds = _select_dataset(dataset_name, datasets)
representative_ds = _representative_dataset(target_ds)
representative_ds = _representative_datasets(target_ds)[0]
check.data_availability(representative_ds)
settings['extract_levels']['levels'] = get_reference_levels(
representative_ds)
Expand All @@ -133,8 +133,8 @@ def _update_target_grid(dataset, datasets, settings):
if dataset.facets['dataset'] == grid:
del settings['regrid']
elif any(grid == d.facets['dataset'] for d in datasets):
representative_ds = _representative_dataset(
_select_dataset(grid, datasets))
representative_ds = _representative_datasets(
_select_dataset(grid, datasets))[0]
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved
check.data_availability(representative_ds)
settings['regrid']['target_grid'] = representative_ds
else:
Expand Down
139 changes: 92 additions & 47 deletions esmvalcore/_recipe/to_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,60 +404,105 @@
def _dataset_from_files(dataset: Dataset) -> list[Dataset]:
"""Replace facet values of '*' based on available files."""
result: list[Dataset] = []
errors = []
errors: list[str] = []

if any(_isglob(f) for f in dataset.facets.values()):
logger.debug(
"Expanding dataset globs for dataset %s, "
"this may take a while..", dataset.summary(shorten=True))

repr_dataset = _representative_dataset(dataset)
for repr_ds in repr_dataset.from_files():
updated_facets = {}
failed = {}
for key, value in dataset.facets.items():
if _isglob(value):
if key in repr_ds.facets and not _isglob(repr_ds[key]):
updated_facets[key] = repr_ds.facets[key]
else:
failed[key] = value

if failed:
msg = ("Unable to replace " +
", ".join(f"{k}={v}" for k, v in failed.items()) +
f" by a value for\n{dataset}")
# Set supplementaries to [] to avoid searching for supplementary
# files.
repr_ds.supplementaries = []
if repr_ds.files:
paths_msg = "paths to " if any(
isinstance(f, LocalFile) for f in repr_ds.files) else ""
msg = (f"{msg}\nDo the {paths_msg}the files:\n" +
"\n".join(f"{f} with facets: {f.facets}"
for f in repr_ds.files) +
"\nprovide the missing facet values?")
else:
timerange = repr_ds.facets.get('timerange')
patterns = repr_ds._file_globs
msg = (
f"{msg}\nNo files found matching:\n" +
"\n".join(str(p) for p in patterns) + # type:ignore
(f"\nwithin the requested timerange {timerange}."
if timerange else ""))
errors.append(msg)
continue

new_ds = dataset.copy()
new_ds.facets.update(updated_facets)
new_ds.supplementaries = repr_ds.supplementaries
result.append(new_ds)
representative_datasets = _representative_datasets(dataset)

# For derived variables, representative_datasets might contain more than
# one element
all_datasets: list[list[tuple[dict, Dataset]]] = []
for representative_dataset in representative_datasets:
all_datasets.append([])
for expanded_ds in representative_dataset.from_files():
updated_facets = {}
unexpanded_globs = {}
for key, value in dataset.facets.items():
if _isglob(value):
if (key in expanded_ds.facets and
not _isglob(expanded_ds[key])):
updated_facets[key] = expanded_ds.facets[key]
else:
unexpanded_globs[key] = value

if unexpanded_globs:
msg = _report_unexpanded_globs(
dataset, expanded_ds, unexpanded_globs
)
errors.append(msg)
continue

new_ds = dataset.copy()
new_ds.facets.update(updated_facets)
new_ds.supplementaries = expanded_ds.supplementaries

all_datasets[-1].append((updated_facets, new_ds))

# If globs have been expanded, only consider those datasets that contain
# all necessary input variables if derivation is necessary
for (updated_facets, new_ds) in all_datasets[0]:
other_facets = [[d[0] for d in ds] for ds in all_datasets[1:]]
if all(updated_facets in facets for facets in other_facets):
result.append(new_ds)
else:
logger.debug(
"Not all necessary input variables to derive '%s' are "
"available for dataset %s",
dataset['short_name'],
updated_facets,
)

if errors:
raise RecipeError("\n".join(errors))

return result


def _report_unexpanded_globs(
unexpanded_ds: Dataset,
expanded_ds: Dataset,
unexpanded_globs: dict,
) -> str:
"""Get error message for unexpanded globs."""
msg = (
"Unable to replace " +
", ".join(f"{k}={v}" for k, v in unexpanded_globs.items()) +
f" by a value for\n{unexpanded_ds}"
)

# Set supplementaries to [] to avoid searching for supplementary files
expanded_ds.supplementaries = []

if expanded_ds.files:
if any(isinstance(f, LocalFile) for f in expanded_ds.files):
paths_msg = "paths to the "
else:
paths_msg = ""
msg = (
f"{msg}\nDo the {paths_msg}files:\n" +
"\n".join(
f"{f} with facets: {f.facets}" for f in expanded_ds.files
) +
"\nprovide the missing facet values?"
)
else:
timerange = expanded_ds.facets.get('timerange')
patterns = expanded_ds._file_globs

Check notice on line 494 in esmvalcore/_recipe/to_datasets.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

esmvalcore/_recipe/to_datasets.py#L494

Access to a protected member _file_globs of a client class (protected-access)
msg = (
f"{msg}\nNo files found matching:\n" +
"\n".join(str(p) for p in patterns) + ( # type:ignore
f"\nwithin the requested timerange {timerange}."
if timerange else ""
)
)

return msg


def _derive_needed(dataset: Dataset) -> bool:
"""Check if dataset needs to be derived from other datasets."""
if not dataset.facets.get('derive'):
Expand Down Expand Up @@ -512,11 +557,11 @@
return datasets


def _representative_dataset(dataset: Dataset) -> Dataset:
"""Find a representative dataset that has files available."""
def _representative_datasets(dataset: Dataset) -> list[Dataset]:
"""Find representative datasets for all input variables."""
copy = dataset.copy()
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved
copy.supplementaries = []
datasets = _get_input_datasets(copy)
representative_dataset = datasets[0]
representative_dataset.supplementaries = dataset.supplementaries
return representative_dataset
representative_datasets = _get_input_datasets(copy)
for representative_dataset in representative_datasets:
representative_dataset.supplementaries = dataset.supplementaries
return representative_datasets
106 changes: 74 additions & 32 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,43 +40,72 @@ def create_test_file(filename, tracking_id=None):


def _get_files(root_path, facets, tracking_id):
"""Return dummy files.

Wildcards are only supported for `dataset` and `institute`; in this case
return files for the two "models" AAA and BBB.

"""
if facets['dataset'] == '*':
all_facets = [
{**facets, 'dataset': 'AAA', 'institute': 'A'},
{**facets, 'dataset': 'BBB', 'institute': 'B'},
]
else:
all_facets = [facets]

# Globs without expanded facets
dir_template = _select_drs('input_dir', facets['project'])
file_template = _select_drs('input_file', facets['project'])
dir_globs = _replace_tags(dir_template, facets)
file_globs = _replace_tags(file_template, facets)
filename = Path(file_globs[0]).name
filename = str(root_path / 'input' / filename)
filenames = []
if filename.endswith('[_.]*nc'):
# Restore when we support filenames with no dates
# filenames.append(filename.replace('[_.]*nc', '.nc'))
filename = filename.replace('[_.]*nc', '_*.nc')
if filename.endswith('*.nc'):
filename = filename[:-len('*.nc')] + '_'
if facets['frequency'] == 'fx':
intervals = ['']
globs = sorted(
root_path / 'input' / d / f for d in dir_globs for f in file_globs
)

files = []
for expanded_facets in all_facets:
filenames = []
dir_template = _select_drs('input_dir', expanded_facets['project'])
file_template = _select_drs('input_file', expanded_facets['project'])
dir_globs = _replace_tags(dir_template, expanded_facets)
file_globs = _replace_tags(file_template, expanded_facets)
filename = (
str(root_path / 'input' / dir_globs[0] / Path(file_globs[0]).name)
)

if filename.endswith('[_.]*nc'):
# Restore when we support filenames with no dates
# filenames.append(filename.replace('[_.]*nc', '.nc'))
filename = filename.replace('[_.]*nc', '_*.nc')

if filename.endswith('*.nc'):
filename = filename[:-len('*.nc')] + '_'
if facets['frequency'] == 'fx':
intervals = ['']
else:
intervals = [
'1990_1999',
'2000_2009',
'2010_2019',
]
for interval in intervals:
filenames.append(filename + interval + '.nc')
else:
intervals = [
'1990_1999',
'2000_2009',
'2010_2019',
]
for interval in intervals:
filenames.append(filename + interval + '.nc')
else:
filenames.append(filename)
filenames.append(filename)

if 'timerange' in facets:
filenames = _select_files(filenames, facets['timerange'])
if 'timerange' in facets:
filenames = _select_files(filenames, facets['timerange'])

for filename in filenames:
create_test_file(filename, next(tracking_id))
for filename in filenames:
create_test_file(filename, next(tracking_id))

files = []
for filename in filenames:
file = LocalFile(filename)
file.facets = facets
files.append(file)
for filename in filenames:
file = LocalFile(filename)
file.facets = expanded_facets
files.append(file)

return files, file_globs
return files, globs


@pytest.fixture
Expand All @@ -100,6 +129,15 @@ def find_files(*, debug: bool = False, **facets):

@pytest.fixture
def patched_failing_datafinder(tmp_path, monkeypatch):
"""Failing data finder.

Do not return files for:
- fx files
- Variable rsutcs for model AAA

Otherwise, return files just like `patched_datafinder`.

"""

def tracking_ids(i=0):
while True:
Expand All @@ -112,8 +150,12 @@ def find_files(*, debug: bool = False, **facets):
files, file_globs = _get_files(tmp_path, facets, tracking_id)
if 'fx' == facets['frequency']:
files = []
returned_files = []
for file in files:
if not ('AAA' in file.name and 'rsutcs' in file.name):
returned_files.append(file)
if debug:
return files, file_globs
return files
return returned_files, file_globs
return returned_files

monkeypatch.setattr(esmvalcore.local, 'find_files', find_files)