Skip to content

Commit

Permalink
Added support for consolidate subpackage functions to accept both i…
Browse files Browse the repository at this point in the history
…n-memory or stored datasets (#1216)

* added support for  subpackage functions to accept both in-memory or stored datasets

* fixed circular import error

* fixed failing tests when echodata is None

* refactored open_ds_da_ed_path method

* minor comment changes

* minor changes to method names

* removed source_sv_path from add_splitbeam_angle method

* removed the usage of add_angle_to_ds method

* added a new argument to_disk

* fixed failing tests

* added check for source_sv not a path and to_disk is true

* fixed bug and failing test case
  • Loading branch information
praneethratna committed Dec 18, 2023
1 parent 63547fc commit 6c83378
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 156 deletions.
104 changes: 63 additions & 41 deletions echopype/consolidate/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import pathlib
from pathlib import Path
from typing import Optional, Union

import numpy as np
Expand All @@ -8,14 +9,14 @@
from ..calibrate.ek80_complex import get_filter_coeff
from ..echodata import EchoData
from ..echodata.simrad import retrieve_correct_beam_group
from ..utils.io import validate_source_ds_da
from ..utils.io import get_file_format, open_source
from ..utils.prov import add_processing_level
from .split_beam_angle import add_angle_to_ds, get_angle_complex_samples, get_angle_power_samples
from .split_beam_angle import get_angle_complex_samples, get_angle_power_samples

POSITION_VARIABLES = ["latitude", "longitude"]


def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset:
def swap_dims_channel_frequency(ds: Union[xr.Dataset, str, pathlib.Path]) -> xr.Dataset:
"""
Use frequency_nominal in place of channel to be dataset dimension and coorindate.
Expand All @@ -24,8 +25,9 @@ def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset:
Parameters
----------
ds : xr.Dataset
Dataset for which the dimension will be swapped
ds : xr.Dataset or str or pathlib.Path
Dataset or path to a file containing the Dataset
for which the dimension will be swapped
Returns
-------
Expand All @@ -35,6 +37,7 @@ def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset:
-----
This operation is only possible when there are no duplicated frequencies present in the file.
"""
ds = open_source(ds, "dataset", {})
# Only possible if no duplicated frequencies
if np.unique(ds["frequency_nominal"]).size == ds["frequency_nominal"].size:
return (
Expand All @@ -50,7 +53,7 @@ def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset:


def add_depth(
ds: xr.Dataset,
ds: Union[xr.Dataset, str, pathlib.Path],
depth_offset: float = 0,
tilt: float = 0,
downward: bool = True,
Expand All @@ -64,8 +67,9 @@ def add_depth(
Parameters
----------
ds : xr.Dataset
Source Sv dataset to which a depth variable will be added.
ds : xr.Dataset or str or pathlib.Path
Source Sv dataset or path to a file containing the Source Sv dataset
to which a depth variable will be added.
Must contain `echo_range`.
depth_offset : float
Offset along the vertical (depth) dimension to account for actual transducer
Expand Down Expand Up @@ -114,6 +118,7 @@ def add_depth(
# else:
# tilt = 0

ds = open_source(ds, "dataset", {})
# Multiplication factor depending on if transducers are pointing downward
mult = 1 if downward else -1

Expand All @@ -132,7 +137,11 @@ def add_depth(


@add_processing_level("L2A")
def add_location(ds: xr.Dataset, echodata: EchoData = None, nmea_sentence: Optional[str] = None):
def add_location(
ds: Union[xr.Dataset, str, pathlib.Path],
echodata: Optional[Union[EchoData, str, pathlib.Path]],
nmea_sentence: Optional[str] = None,
):
"""
Add geographical location (latitude/longitude) to the Sv dataset.
Expand All @@ -142,10 +151,12 @@ def add_location(ds: xr.Dataset, echodata: EchoData = None, nmea_sentence: Optio
Parameters
----------
ds : xr.Dataset
An Sv or MVBS dataset for which the geographical locations will be added to
echodata
An `EchoData` object holding the raw data
ds : xr.Dataset or str or pathlib.Path
An Sv or MVBS dataset or path to a file containing the Sv or MVBS
dataset for which the geographical locations will be added to
echodata : EchoData or str or pathlib.Path
An ``EchoData`` object or path to a file containing the ``EchoData``
object holding the raw data
nmea_sentence
NMEA sentence to select a subset of location data (optional)
Expand Down Expand Up @@ -174,6 +185,9 @@ def sel_interp(var, time_dim_name):
# Values may be nan if there are ping_time values outside the time_dim_name range
return position_var.interp(**{time_dim_name: ds["ping_time"]})

ds = open_source(ds, "dataset", {})
echodata = open_source(echodata, "echodata", {})

if "longitude" not in echodata["Platform"] or echodata["Platform"]["longitude"].isnull().all():
raise ValueError("Coordinate variables not present or all nan")

Expand All @@ -198,12 +212,12 @@ def sel_interp(var, time_dim_name):

def add_splitbeam_angle(
source_Sv: Union[xr.Dataset, str, pathlib.Path],
echodata: EchoData,
echodata: Union[EchoData, str, pathlib.Path],
waveform_mode: str,
encode_mode: str,
pulse_compression: bool = False,
storage_options: dict = {},
return_dataset: bool = True,
to_disk: bool = True,
) -> xr.Dataset:
"""
Add split-beam (alongship/athwartship) angles into the Sv dataset.
Expand All @@ -218,8 +232,9 @@ def add_splitbeam_angle(
source_Sv: xr.Dataset or str or pathlib.Path
The Sv Dataset or path to a file containing the Sv Dataset,
to which the split-beam angles will be added
echodata: EchoData
An ``EchoData`` object holding the raw data
echodata: EchoData or str or pathlib.Path
An ``EchoData`` object or path to a file containing the ``EchoData``
object holding the raw data
waveform_mode : {"CW", "BB"}
Type of transmit waveform
Expand All @@ -240,19 +255,20 @@ def add_splitbeam_angle(
storage_options: dict, default={}
Any additional parameters for the storage backend, corresponding to the
path provided for ``source_Sv``
return_dataset: bool, default=True
If ``True``, ``source_Sv`` with split-beam angles added will be returned.
``return_dataset=False`` is useful when ``source_Sv`` is a path and
to_disk: bool, default=True
If ``False``, ``to_disk`` with split-beam angles added will be returned.
``to_disk=True`` is useful when ``source_Sv`` is a path and
users only want to write the split-beam angle data to this path.
Returns
-------
xr.Dataset or None
If ``return_dataset=False``, nothing will be returned.
If ``return_dataset=True``, either the input dataset ``source_Sv``
If ``to_disk=False``, nothing will be returned.
If ``to_disk=True``, either the input dataset ``source_Sv``
or a lazy-loaded Dataset (from the path ``source_Sv``)
with split-beam angles added will be returned.
Raises
------
ValueError
Expand All @@ -279,6 +295,19 @@ def add_splitbeam_angle(
`echodata`` will be identical. If this is not the case, only angle data corresponding
to channels existing in ``source_Sv`` will be added.
"""
# ensure that when source_Sv is a Dataset then to_disk should be False
if not isinstance(source_Sv, (str, Path)) and to_disk:
raise ValueError(
"The input source_Sv must be a path when to_disk=True, "
"so that the split-beam angles can be written to disk!"
)

# obtain the file format of source_Sv if it is a path
if isinstance(source_Sv, (str, Path)):
source_Sv_type = get_file_format(source_Sv)

source_Sv = open_source(source_Sv, "dataset", storage_options)
echodata = open_source(echodata, "echodata", storage_options)

# ensure that echodata was produced by EK60 or EK80-like sensors
if echodata.sonar_model not in ["EK60", "ES70", "EK80", "ES80", "EA640"]:
Expand All @@ -287,22 +316,6 @@ def add_splitbeam_angle(
"transducers, split-beam angles cannot be added to source_Sv!"
)

# validate the source_Sv type or path (if it is provided)
source_Sv, file_type = validate_source_ds_da(source_Sv, storage_options)

# initialize source_Sv_path
source_Sv_path = None

if isinstance(source_Sv, str):
# store source_Sv path so we can use it to write to later
source_Sv_path = source_Sv

# TODO: In the future we can improve this by obtaining the variable names, channels,
# and dimension lengths directly from source_Sv using zarr or netcdf4. This would
# prevent the unnecessary loading in of the coordinates, which the below statement does.
# open up Dataset using source_Sv path
source_Sv = xr.open_dataset(source_Sv, engine=file_type, chunks={}, **storage_options)

# raise not implemented error if source_Sv corresponds to MVBS
if source_Sv.attrs["processing_function"] == "commongrid.compute_MVBS":
raise NotImplementedError("Adding split-beam data to MVBS has not been implemented!")
Expand Down Expand Up @@ -364,9 +377,18 @@ def add_splitbeam_angle(
theta, phi = get_angle_complex_samples(ds_beam, angle_params)

# add theta and phi to source_Sv input
source_Sv = add_angle_to_ds(
theta, phi, source_Sv, return_dataset, source_Sv_path, file_type, storage_options
)
theta.attrs["long_name"] = "split-beam alongship angle"
phi.attrs["long_name"] = "split-beam athwartship angle"

# add the split-beam angles to the provided Dataset
source_Sv["angle_alongship"] = theta
source_Sv["angle_athwartship"] = phi
if to_disk:
if source_Sv_type == "netcdf4":
source_Sv.to_netcdf(mode="a", **storage_options)
else:
source_Sv.to_zarr(mode="a", **storage_options)
source_Sv = open_source(source_Sv, "dataset", storage_options)

# Add history attribute
history_attr = (
Expand Down
78 changes: 1 addition & 77 deletions echopype/consolidate/split_beam_angle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Contains functions necessary to compute the split-beam (alongship/athwartship)
angles and add them to a Dataset.
"""
from typing import List, Optional, Tuple
from typing import List, Tuple

import numpy as np
import xarray as xr
Expand Down Expand Up @@ -245,79 +245,3 @@ def get_angle_complex_samples(
)

return theta, phi


def add_angle_to_ds(
theta: xr.Dataset,
phi: xr.Dataset,
ds: xr.Dataset,
return_dataset: bool,
source_ds_path: Optional[str] = None,
file_type: Optional[str] = None,
storage_options: dict = {},
) -> Optional[xr.Dataset]:
"""
Adds the split-beam angle data to the provided input ``ds``.
Parameters
----------
theta: xr.Dataset
The calculated split-beam alongship angle
phi: xr.Dataset
The calculated split-beam athwartship angle
ds: xr.Dataset
The Dataset that ``theta`` and ``phi`` will be added to
return_dataset: bool
Whether a dataset will be returned or not
source_ds_path: str, optional
The path to the file corresponding to ``ds``, if it exists
file_type: {"netcdf4", "zarr"}, optional
The file type corresponding to ``source_ds_path``
storage_options: dict, default={}
Any additional parameters for the storage backend, corresponding to the
path ``source_ds_path``
Returns
-------
xr.Dataset or None
If ``return_dataset=False``, nothing will be returned. If ``return_dataset=True``
either the input dataset ``ds`` or a lazy-loaded Dataset (obtained from
the path provided by ``source_ds_path``) with the split-beam angle data added
will be returned.
"""

# TODO: do we want to add anymore attributes to these variables?
# add appropriate attributes to theta and phi
theta.attrs["long_name"] = "split-beam alongship angle"
phi.attrs["long_name"] = "split-beam athwartship angle"

if source_ds_path is not None:
# put the variables into a Dataset, so they can be written at the same time
# add ds attributes to splitb_ds since they will be overwritten by to_netcdf/zarr
splitb_ds = xr.Dataset(
data_vars={"angle_alongship": theta, "angle_athwartship": phi},
coords=theta.coords,
attrs=ds.attrs,
)

# release any resources linked to ds (necessary for to_netcdf)
ds.close()

# write the split-beam angle data to the provided path
if file_type == "netcdf4":
splitb_ds.to_netcdf(path=source_ds_path, mode="a", **storage_options)
else:
splitb_ds.to_zarr(store=source_ds_path, mode="a", **storage_options)

if return_dataset:
# open up and return Dataset in source_ds_path
return xr.open_dataset(source_ds_path, engine=file_type, chunks={}, **storage_options)

else:
# add the split-beam angles to the provided Dataset
ds["angle_alongship"] = theta
ds["angle_athwartship"] = phi

if return_dataset:
# return input dataset with split-beam angle data
return ds
7 changes: 6 additions & 1 deletion echopype/echodata/echodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from ..echodata.utils_platform import _clip_by_time_dim, get_mappings_expanded
from ..utils.coding import sanitize_dtypes, set_time_encodings
from ..utils.io import check_file_existence, delete_zarr_store, sanitize_file_path
from ..utils.log import _init_logger
from ..utils.prov import add_processing_level
from .convention import sonarnetcdf_1
Expand Down Expand Up @@ -93,6 +92,8 @@ def cleanup_swap_files(self):
v.store for k, v in dask_graph.items() if "original-from-zarr" in k
]
fs = zarr_stores[0].fs
from ..utils.io import delete_zarr_store

for store in zarr_stores:
delete_zarr_store(store, fs)

Expand Down Expand Up @@ -486,11 +487,15 @@ def _load_file(self, raw_path: "PathHint"):

def _check_path(self, filepath: "PathHint"):
"""Check if converted_raw_path exists"""
from ..utils.io import check_file_existence

file_exists = check_file_existence(filepath, self.storage_options)
if not file_exists:
raise FileNotFoundError(f"There is no file named {filepath}")

def _sanitize_path(self, filepath: "PathHint") -> "PathHint":
from ..utils.io import sanitize_file_path

filepath = sanitize_file_path(filepath, self.storage_options)
return filepath

Expand Down

0 comments on commit 6c83378

Please sign in to comment.