Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality that directly writes variables to a temporary zarr store #774

Merged
merged 42 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1e37ecc
add initial code to grab the appropriate parsed data
b-reyes Jul 22, 2022
23d845b
establish initial structure to go from parsed to zarr
b-reyes Jul 22, 2022
11245d5
modify open_raw routines for EK60/80 so that we can load in zarr arra…
b-reyes Jul 25, 2022
105667a
change distribution of times to a red-robin like distribution
b-reyes Jul 26, 2022
583206d
take first step towards generalizing the parsed_to_zarr module
b-reyes Jul 27, 2022
9af7944
generalize write_df_to_zarr so it can handle columns without arrays, …
b-reyes Jul 27, 2022
b32a7ea
finish cleaning up the code in parsed_to_zarr
b-reyes Jul 28, 2022
97a36cb
improve chunking in parsed_to_zarr and change num_mb to max_mb
b-reyes Jul 28, 2022
5519f6a
make a preliminary attempt at writing complex data
b-reyes Jul 29, 2022
5a82799
start the restructuring of parsed_to_zarr into a class
b-reyes Aug 2, 2022
aa637e1
finish parsed to zarr reorganization for EK60 and EK80
b-reyes Aug 3, 2022
4e71f31
document and clean up the code associated with set_groups_ek60
b-reyes Aug 3, 2022
13d45e6
clean up parse_base and make it so that we do not store zarr variable…
b-reyes Aug 3, 2022
c4a79c2
move get_power_dataarray and get_angle_dataarrays to set_groups_base,…
b-reyes Aug 4, 2022
44ccd5e
obtain partially working version of Beam_group2 for EK80
b-reyes Aug 4, 2022
6fecd9d
finish constructing ds_beam_power when zarr variables are present
b-reyes Aug 5, 2022
9d0d186
add method to get complex data arrays from zarr in set_groups_base
b-reyes Aug 5, 2022
947941a
generalize parsed_to_zarr so we can have column elements with multi-d…
b-reyes Aug 6, 2022
e0aafa4
finish get_ds_complex_zarr in set_groups_ek80
b-reyes Aug 6, 2022
3dbf1c9
add open_raw zarr variables to api and create a routine that automati…
b-reyes Aug 8, 2022
b13a2b9
modify the condition for when we should write directly to a temporary…
b-reyes Aug 9, 2022
200d3ee
only store zarr varriables when we do not have receieve data, add str…
b-reyes Aug 10, 2022
c486be4
merged dev into branch and resolved conflict in set_groups_ek80.py
b-reyes Aug 10, 2022
1cc687d
change all occurances of parser2zarr to parsed2zarr
b-reyes Aug 10, 2022
b5e33ff
merge dev into branch and resolve conflict within convert/api.py
b-reyes Aug 10, 2022
d172172
correct zarr typo in _append_channel_ping_data
b-reyes Aug 10, 2022
8d07a67
correct kwarg in rectangularize_data
b-reyes Aug 10, 2022
f4f1ab1
replace hardwired time dtype in parsed to zarr with times.dtype.str
b-reyes Aug 10, 2022
7e2a4d8
add return docstrings and types to a couple of functions in parsed_to…
b-reyes Aug 10, 2022
d3c9688
add back the EK60 file description in the test_data readme
b-reyes Aug 11, 2022
e38343a
add pytest.mark.skip to unit test
b-reyes Aug 11, 2022
3fc0dfb
remove xfail and add pass to unit test
b-reyes Aug 11, 2022
86a2ff9
remove pandas from requirements file
b-reyes Aug 11, 2022
4df500c
Merge branch 'dev' into file-to-zarr
b-reyes Aug 11, 2022
473a26d
Add simple test for noaa file
lsetiawan Aug 11, 2022
f5188e3
Merge pull request #1 from lsetiawan/add_test
b-reyes Aug 11, 2022
c8a77ec
remove the auto option in open_raw
b-reyes Aug 11, 2022
7c9b89f
remove Union typing import
b-reyes Aug 11, 2022
926d591
add test_data/README.md lines back in
b-reyes Aug 11, 2022
33c11f1
add spaces in test_data/README.md
b-reyes Aug 11, 2022
77b36ee
remove optional typing for `offload_to_zarr`
b-reyes Aug 11, 2022
d9d06e2
remove auto description in notes and add beta statement in open_raw
b-reyes Aug 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 86 additions & 25 deletions echopype/convert/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# fmt: off
# black and isort have conflicting ideas about how this should be formatted
from ..core import SONAR_MODELS
from .parsed_to_zarr import Parsed2Zarr

if TYPE_CHECKING:
from ..core import EngineHint, PathHint, SonarModelsHint
Expand Down Expand Up @@ -110,9 +111,10 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
# Environment group
if "time1" in echodata["Environment"]:
io.save_file(
echodata["Environment"].chunk(
{"time1": DEFAULT_CHUNK_SIZE["ping_time"]}
), # TODO: chunking necessary?
# echodata["Environment"].chunk(
# {"time1": DEFAULT_CHUNK_SIZE["ping_time"]}
# ), # TODO: chunking necessary?
echodata["Environment"],
path=output_path,
mode="a",
engine=engine,
Expand Down Expand Up @@ -170,11 +172,12 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
if echodata.sonar_model == "AD2CP":
for i in range(1, len(echodata["Sonar"]["beam_group"]) + 1):
io.save_file(
echodata[f"Sonar/Beam_group{i}"].chunk(
{
"ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
}
),
# echodata[f"Sonar/Beam_group{i}"].chunk(
# {
# "ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
# }
# ),
echodata[f"Sonar/Beam_group{i}"],
path=output_path,
mode="a",
engine=engine,
Expand All @@ -183,12 +186,13 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
)
else:
io.save_file(
echodata[f"Sonar/{BEAM_SUBGROUP_DEFAULT}"].chunk(
{
"range_sample": DEFAULT_CHUNK_SIZE["range_sample"],
"ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
}
),
# echodata[f"Sonar/{BEAM_SUBGROUP_DEFAULT}"].chunk(
# {
# "range_sample": DEFAULT_CHUNK_SIZE["range_sample"],
# "ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
# }
# ),
echodata[f"Sonar/{BEAM_SUBGROUP_DEFAULT}"],
path=output_path,
mode="a",
engine=engine,
Expand All @@ -198,12 +202,13 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
if echodata["Sonar/Beam_group2"] is not None:
# some sonar model does not produce Sonar/Beam_group2
io.save_file(
echodata["Sonar/Beam_group2"].chunk(
{
"range_sample": DEFAULT_CHUNK_SIZE["range_sample"],
"ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
}
),
# echodata["Sonar/Beam_group2"].chunk(
# {
# "range_sample": DEFAULT_CHUNK_SIZE["range_sample"],
# "ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
# }
# ),
echodata["Sonar/Beam_group2"],
path=output_path,
mode="a",
engine=engine,
Expand All @@ -214,9 +219,10 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
# Vendor_specific group
if "ping_time" in echodata["Vendor_specific"]:
io.save_file(
echodata["Vendor_specific"].chunk(
{"ping_time": DEFAULT_CHUNK_SIZE["ping_time"]}
), # TODO: chunking necessary?
# echodata["Vendor_specific"].chunk(
# {"ping_time": DEFAULT_CHUNK_SIZE["ping_time"]}
# ), # TODO: chunking necessary?
echodata["Vendor_specific"],
path=output_path,
mode="a",
engine=engine,
Expand Down Expand Up @@ -334,6 +340,8 @@ def open_raw(
xml_path: Optional["PathHint"] = None,
convert_params: Optional[Dict[str, str]] = None,
storage_options: Optional[Dict[str, str]] = None,
offload_to_zarr: bool = False,
max_zarr_mb: int = 100,
) -> Optional[EchoData]:
"""Create an EchoData object containing parsed data from a single raw data file.

Expand Down Expand Up @@ -362,10 +370,22 @@ def open_raw(
and need to be added to the converted file
storage_options : dict
options for cloud storage
offload_to_zarr: bool
If True, variables with a large memory footprint will be
written to a temporary zarr store.
max_zarr_mb : int
maximum MB that each zarr chunk should hold, when offloading
variables with a large memory footprint to a temporary zarr store

Returns
-------
EchoData object

Notes
-----
``offload_to_zarr=True`` is only available for the following
echosounders: EK60, ES70, EK80, ES80, EA640. Additionally, this feature
is currently in beta.
"""
if (sonar_model is None) and (raw_file is None):
print("Please specify the path to the raw data file and the sonar model.")
Expand Down Expand Up @@ -421,24 +441,63 @@ def open_raw(
# Check file extension and existence
file_chk, xml_chk = _check_file(raw_file, sonar_model, xml_path, storage_options)

# TODO: remove once 'auto' option is added
if not isinstance(offload_to_zarr, bool):
raise ValueError("offload_to_zarr must be of type bool.")

# Ensure offload_to_zarr is 'auto', if it is a string
# TODO: use the following when we allow for 'auto' option
# if isinstance(offload_to_zarr, str) and offload_to_zarr != "auto":
# raise ValueError("offload_to_zarr must be a bool or equal to 'auto'.")

# TODO: the if-else below only works for the AZFP vs EK contrast,
# but is brittle since it is abusing params by using it implicitly
if SONAR_MODELS[sonar_model]["xml"]:
params = xml_chk
else:
params = "ALL" # reserved to control if only wants to parse a certain type of datagram

# obtain dict associated with directly writing to zarr
dgram_zarr_vars = SONAR_MODELS[sonar_model]["dgram_zarr_vars"]

# Parse raw file and organize data into groups
parser = SONAR_MODELS[sonar_model]["parser"](
file_chk, params=params, storage_options=storage_options
file_chk, params=params, storage_options=storage_options, dgram_zarr_vars=dgram_zarr_vars
)

parser.parse_raw()

# code block corresponding to directly writing parsed data to zarr
if offload_to_zarr and (sonar_model in ["EK60", "ES70", "EK80", "ES80", "EA640"]):

# Determines if writing to zarr is necessary and writes to zarr
p2z = SONAR_MODELS[sonar_model]["parsed2zarr"](parser)

# TODO: perform more robust tests for the 'auto' heuristic value
if offload_to_zarr == "auto" and p2z.write_to_zarr(mem_mult=0.4):
p2z.datagram_to_zarr(max_mb=max_zarr_mb)
elif offload_to_zarr is True:
p2z.datagram_to_zarr(max_mb=max_zarr_mb)
else:
del p2z
p2z = Parsed2Zarr(parser)
if "ALL" in parser.data_type:
parser.rectangularize_data()

else:
p2z = Parsed2Zarr(parser)
if (sonar_model in ["EK60", "ES70", "EK80", "ES80", "EA640"]) and (
"ALL" in parser.data_type
):
parser.rectangularize_data()

setgrouper = SONAR_MODELS[sonar_model]["set_groups"](
parser,
input_file=file_chk,
output_path=None,
sonar_model=sonar_model,
params=_set_convert_params(convert_params),
parsed2zarr_obj=p2z,
)

# Setup tree dictionary
Expand Down Expand Up @@ -485,7 +544,9 @@ def open_raw(
# Create tree and echodata
# TODO: make the creation of tree dynamically generated from yaml
tree = DataTree.from_dict(tree_dict, name="root")
echodata = EchoData(source_file=file_chk, xml_path=xml_chk, sonar_model=sonar_model)
echodata = EchoData(
source_file=file_chk, xml_path=xml_chk, sonar_model=sonar_model, parsed2zarr_obj=p2z
)
echodata._set_tree(tree)
echodata._load_tree()

Expand Down
9 changes: 2 additions & 7 deletions echopype/convert/parse_ad2cp.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,8 @@ class NoMorePackets(Exception):


class ParseAd2cp(ParseBase):
def __init__(
self,
*args,
params,
**kwargs,
):
super().__init__(*args, **kwargs)
def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}):
super().__init__(file, storage_options)
self.config = None
self.packets: List[Ad2cpDataPacket] = []

Expand Down
2 changes: 1 addition & 1 deletion echopype/convert/parse_azfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class ParseAZFP(ParseBase):
"""Class for converting data from ASL Environmental Sciences AZFP echosounder."""

def __init__(self, file, params, storage_options={}):
def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}):
super().__init__(file, storage_options)
# Parent class attributes
# regex pattern used to grab datetime embedded in filename
Expand Down