Skip to content

Commit

Permalink
Netcdf splits (#147)
Browse files Browse the repository at this point in the history
* weather-sp: Improve NetCDF splitting

Extend netCdf file splitting to support all dimensions. Explicitly exclude
latitude and longitude though to avoid unreasonable splitting.
  • Loading branch information
uhager committed Apr 18, 2022
1 parent 333fc21 commit c574e54
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
3 changes: 2 additions & 1 deletion weather_sp/README.md
Expand Up @@ -98,7 +98,8 @@ Any available dimensions can be combined when splitting.
NetCDF files are already in a hypercube format and can only be split by one of the dimensions and by data variable.
Since splitting by latitude or longitude would lead to a large number of small files, this is not supported,
and it is recommended to use the `weather-mv` tool instead. \
Supported splits for NetCDF files are thus 'variable', 'time', 'level'.
Supported splits for NetCDF files are thus 'variable' to split by data variable, and any dimension
other than latitude and longitude.


### Output directory
Expand Down
43 changes: 21 additions & 22 deletions weather_sp/splitter_pipeline/file_splitters.py
Expand Up @@ -120,6 +120,8 @@ def _open_grib_locally(self) -> t.Iterator[t.Iterator[pygrib.gribmessage]]:

class NetCdfSplitter(FileSplitter):

_UNSUPPORTED_DIMENSIONS = ('latitude', 'longitude', 'lat', 'lon')

def __init__(self, input_path: str, output_info: OutFileInfo,
force_split: bool = False, logging_level: int = logging.INFO):
super().__init__(input_path, output_info,
Expand All @@ -128,14 +130,13 @@ def __init__(self, input_path: str, output_info: OutFileInfo,
def split_data(self) -> None:
if not self.output_info.split_dims():
raise ValueError('No splitting specified in template.')
if any(dim in self._UNSUPPORTED_DIMENSIONS for dim in self.output_info.split_dims()):
raise ValueError('Unsupported split dimension (lat, lng).')
if self.should_skip():
metrics.Metrics.counter('file_splitters', 'skipped').inc()
self.logger.info('Skipping %s, file already split.',
repr(self.input_path))
return
if any(split not in ('time', 'level', 'variable') for split in self.output_info.split_dims()):
raise ValueError(
'netcdf split: unknown split dimension, supported are time, level, variable')

with self._open_dataset_locally() as dataset:
if any(split not in dataset.dims and split not in ('variable') for split in self.output_info.split_dims()):
Expand All @@ -147,37 +148,35 @@ def split_data(self) -> None:
for var in dataset.data_vars])
else:
iterlists.append([dataset])
for dim in ('time', 'level'):
if dim in self.output_info.split_dims():
iterlists.append(dataset[dim])
filtered_split_dims = [
x for x in self.output_info.split_dims() if x not in ('variable', self._UNSUPPORTED_DIMENSIONS)]
for dim in filtered_split_dims:
iterlists.append(dataset[dim])
combinations = itertools.product(*iterlists)
for comb in combinations:
selected = comb[0]
for da in comb[1:]:
if 'time' in da.coords:
selected = selected.sel(time=da.time)
if 'level' in da.coords:
selected = selected.sel(level=da.level)
self._write_dataset(selected)
self.logger.info('split %s into %d files',
self.input_path, len(list(combinations)))
for dim in da.coords:
selected = selected.sel({dim: getattr(da, dim)})
self._write_dataset(selected, filtered_split_dims)
self.logger.info('Finished splitting %s', self.input_path)

@contextmanager
def _open_dataset_locally(self) -> t.Iterator[xr.Dataset]:
with self._copy_to_local_file() as local_file:
yield xr.open_dataset(local_file.name)
yield xr.open_dataset(local_file.name, engine='netcdf4')

def _write_dataset(self, dataset: xr.Dataset()) -> None:
with FileSystems().create(self._get_output_for_dataset(dataset)) as dest_file:
def _write_dataset(self, dataset: xr.Dataset, split_dims: t.List[str]) -> None:
with FileSystems().create(self._get_output_for_dataset(dataset, split_dims)) as dest_file:
dest_file.write(dataset.to_netcdf())

def _get_output_for_dataset(self, dataset: xr.Dataset) -> str:
def _get_output_for_dataset(self, dataset: xr.Dataset, split_dims: t.List[str]) -> str:
splits = {'variable': list(dataset.data_vars.keys())[0]}
if 'level' in self.output_info.split_dims():
splits['level'] = dataset.level.values
if 'time' in self.output_info.split_dims():
splits['time'] = np.datetime_as_string(
dataset.time.values, unit='m')
for dim in split_dims:
value = dataset[dim].values
if dim == 'time':
value = np.datetime_as_string(value, unit='m')
splits[dim] = value
return self.output_info.formatted_output_path(splits)


Expand Down
4 changes: 2 additions & 2 deletions weather_sp/splitter_pipeline/file_splitters_test.py
Expand Up @@ -215,12 +215,12 @@ def test_split_data__not_in_dims_raises(self):
with self.assertRaises(ValueError):
splitter.split_data()

def test_split_data__unknown_dim_raises(self):
def test_split_data__unsupported_dim_raises(self):
input_path = f'{self._data_dir}/era5_sample.nc'
output_base = f'{self._data_dir}/split_files/era5_sample'
splitter = NetCdfSplitter(input_path,
OutFileInfo(output_base,
formatting='_{shortName}',
formatting='_{longitude}',
ending='.nc',
template_folders=[]))
with self.assertRaises(ValueError):
Expand Down

0 comments on commit c574e54

Please sign in to comment.