Skip to content

Commit

Permalink
added support for cloud storage
Browse files Browse the repository at this point in the history
  • Loading branch information
beatfactor committed Jun 12, 2024
1 parent 2e3ef6f commit 689bb8d
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 119 deletions.
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- defaults
- conda-forge
dependencies:
- python=3.11
- python=3.10
- jinja2
- netcdf4
- numpy
Expand Down
18 changes: 12 additions & 6 deletions oceanstream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from rich import print
from rich.traceback import install, Traceback
from oceanstream.settings import load_config
from dask.distributed import LocalCluster, Client
from dask.distributed import LocalCluster, Client, Variable
from rich.console import Console


Expand Down Expand Up @@ -195,6 +195,7 @@ def compute_sv(
sonar_model: str = typer.Option(DEFAULT_SONAR_MODEL, help="Sonar model used to collect the data",
show_choices=["AZFP", "EK60", "ES70", "EK80", "ES80", "EA640", "AD2CP"]),
plot_echogram: bool = typer.Option(False, help="Plot the echogram after processing"),
use_dask: bool = typer.Option(False, help="Start a Local Dask cluster for parallel processing (always enabled for multiple files)"),
depth_offset: float = typer.Option(0.0, help="Depth offset for the echogram plot"),
waveform_mode: str = typer.Option("CW", help="Waveform mode, can be either CW or BB",
show_choices=["CW", "BB"]),
Expand All @@ -213,11 +214,14 @@ def compute_sv(
file_path = Path(source)
config_data = initialize(settings_dict, file_path, log_level=log_level)

client = None
console = Console()
single_file = file_path.is_dir() and source.endswith(".zarr")
with console.status("Processing...", spinner="dots") as status:
status.start()
cluster = LocalCluster(n_workers=workers_count, threads_per_worker=1)
client = Client(cluster)
if use_dask or not single_file:
cluster = LocalCluster(n_workers=workers_count, threads_per_worker=1)
client = Client(cluster)

try:
if file_path.is_dir() and source.endswith(".zarr"):
Expand All @@ -235,13 +239,14 @@ def compute_sv(
f"[blue] Processing zarr files in {file_path}...[/blue] – navigate to "
f"http://localhost:8787/status for progress")
from oceanstream.process import process_zarr_files

processed_count_var = Variable('processed_count', client)
process_zarr_files(config_data,
workers_count=workers_count,
status=status,
chunks=config_data.get('base_chunk_sizes'),
plot_echogram=plot_echogram,
waveform_mode=waveform_mode,
processed_count_var=processed_count_var,
depth_offset=depth_offset)
else:
print(f"[red]❌ The provided path '{source}' is not a valid Zarr root.[/red]")
Expand All @@ -252,8 +257,9 @@ def compute_sv(
logging.exception("Error while processing %s", config_data['raw_path'])
print(Traceback())
finally:
client.close()
cluster.close()
if use_dask:
client.close()
cluster.close()
status.stop()


Expand Down
24 changes: 18 additions & 6 deletions oceanstream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

def initialize(settings, file_path, log_level=None):
logging.debug(f"Initializing with settings: {settings}, file path: {file_path}, log level: {log_level}")
if "config" not in settings:
settings["config"] = ""

config_data = load_config(settings["config"])
config_data["raw_path"] = file_path

Expand All @@ -27,6 +30,9 @@ def initialize(settings, file_path, log_level=None):
if settings["output_folder"] is not None:
config_data["output_folder"] = settings["output_folder"]

if settings['cloud_storage'] is not None:
config_data['cloud_storage'] = settings['cloud_storage']

return config_data


Expand Down Expand Up @@ -103,8 +109,7 @@ def combine(source, output=None, config=None, log_level="WARNING", chunks=None):

file_name = f"{Path(dir_path).stem}-combined.zarr"
zarr_output_file = os.path.join(config_data['output_folder'], file_name)
logging.info(
f"Combining Zarr files to {zarr_output_file}; navigate to http://localhost:8787/status for progress")
logging.info(f"Combining Zarr files to {zarr_output_file}")

combine_zarr_files(dir_path, zarr_output_file=zarr_output_file, chunks=chunks)
logging.info("Zarr files have been combined successfully.")
Expand All @@ -113,18 +118,24 @@ def combine(source, output=None, config=None, log_level="WARNING", chunks=None):


def compute_sv(source, output=None, workers_count=None, sonar_model=DEFAULT_SONAR_MODEL, plot_echogram=False,
depth_offset=0.0, waveform_mode="CW", config=None, log_level="WARNING", chunks=None):
logging.debug("Starting compute_sv function")
depth_offset=0.0, waveform_mode="CW", log_level="WARNING", chunks=None, config=None,
processed_count_var=None):
settings_dict = {
"config": config,
"sonar_model": sonar_model,
"output_folder": output or DEFAULT_OUTPUT_FOLDER
}

if config is not None:
settings_dict.update(config)
# settings_dict["config"] = ''

file_path = Path(source)
config_data = initialize(settings_dict, file_path, log_level=log_level)

if chunks:
config_data['chunks'] = chunks
else:
config_data['chunks'] = config_data.get('base_chunk_sizes', None)

if file_path.is_dir() and source.endswith(".zarr"):
logging.debug(f"Computing Sv for Zarr root file: {file_path}")
Expand All @@ -136,7 +147,8 @@ def compute_sv(source, output=None, workers_count=None, sonar_model=DEFAULT_SONA
logging.debug(f"Processing Zarr files in directory: {file_path}")
from oceanstream.process import process_zarr_files

process_zarr_files(config_data, workers_count=workers_count, chunks=chunks, plot_echogram=plot_echogram,
process_zarr_files(config_data, workers_count=workers_count, chunks=chunks,
processed_count_var=processed_count_var, plot_echogram=plot_echogram,
waveform_mode=waveform_mode, depth_offset=depth_offset)
else:
logging.error(f"The provided path '{source}' is not a valid Zarr root.")
Expand Down
59 changes: 45 additions & 14 deletions oceanstream/plot/echogram.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import os
import tempfile
import xarray as xr
import numpy as np
import logging

from matplotlib.colors import LinearSegmentedColormap, Colormap
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
Expand Down Expand Up @@ -287,18 +290,13 @@ def plot_individual_channel_image_only(ds_Sv, channel, output_path, file_base_na
plt.close()


def plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_name,
cmap='viridis',
regions2d=None,
region_ids=None, region_class=None):
def plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_name, echogram_path=None,
config_data=None, cmap='ocean_r'):
"""Plot and save echogram for a single channel with optional regions and enhancements."""
full_channel_name = ds_Sv.channel.values[channel]
channel_name = "_".join(full_channel_name.split()[:3])

plt.figure(figsize=(30, 18))
echogram_output_path = os.path.join(output_path, f"{file_base_name}_{channel_name}.png")

# Apply the same preprocessing steps from _plot_echogram
ds = ds_Sv

filtered_ds = ds['Sv']
Expand Down Expand Up @@ -332,16 +330,28 @@ def plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_na
yincrease=False,
vmin=-80,
vmax=-50,
cmap='ocean_r',
cmap=cmap,
cbar_kwargs={'label': 'Volume backscattering strength (Sv re 1 m-1)'}
)

plt.grid(True, linestyle='--', alpha=0.5)
plt.xlabel('Ping time', fontsize=14)
plt.ylabel('Depth', fontsize=14)
plt.title(f'Echogram for Channel {channel_name}', fontsize=16, fontweight='bold')
plt.savefig(echogram_output_path, dpi=300, bbox_inches='tight')
plt.close()

echogram_file_name = f"{file_base_name}_{channel_name}.png"

if config_data and 'cloud_storage' in config_data:
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file:
plt.savefig(temp_file.name, dpi=300, bbox_inches='tight')
plt.close()
echogram_output_path = os.path.join(echogram_path, echogram_file_name)
upload_to_cloud_storage(temp_file.name, echogram_output_path, config_data['cloud_storage'])
os.remove(temp_file.name)
else:
echogram_output_path = os.path.join(output_path, echogram_file_name)
plt.savefig(echogram_output_path, dpi=300, bbox_inches='tight')
plt.close()


def plot_sv_data_parallel(ds_Sv, file_base_name=None, output_path=None, cmap=None, client=None):
Expand All @@ -360,15 +370,15 @@ def plot_sv_data_parallel(ds_Sv, file_base_name=None, output_path=None, cmap=Non
wait(futures)


def plot_sv_data(ds_Sv, file_base_name=None, output_path=None, cmap=None, regions2d=None, region_ids=None,
region_class=None):
def plot_sv_data(ds_Sv, file_base_name=None, output_path=None, echogram_path=None, config_data=None, cmap=None):
"""Plot the echogram data and the regions."""
if not plt.isinteractive():
plt.switch_backend('Agg')

for channel in range(ds_Sv.dims['channel']):
plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_name, cmap, regions2d, region_ids,
region_class)
plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_name, echogram_path=echogram_path,
config_data=config_data,
cmap='ocean_r')
# plot_individual_channel_image_only(ds_Sv, channel, output_path, file_base_name, cmap)
# plot_individual_channel_shaders(ds_Sv=ds_Sv, channel=channel, output_path=output_path,
# file_base_name=file_base_name, cmap='ocean_r')
Expand Down Expand Up @@ -540,3 +550,24 @@ def _plot_region_ids(colors, ds_Sv, idx, labels_added, region_ids, regions2d):
labels_added.add(label)
idx += 1
return idx


def upload_to_cloud_storage(local_path, remote_path, cloud_storage_config):
storage_type = cloud_storage_config['storage_type']
container_name = cloud_storage_config['container_name']
storage_options = cloud_storage_config['storage_options']

if storage_type == 'azure':
upload_to_azure_blob(local_path, remote_path, container_name, storage_options)
else:
raise ValueError(f"Unsupported storage type: {storage_type}")


def upload_to_azure_blob(local_path, remote_path, container_name, storage_options):
from azure.storage.blob import BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(storage_options['connection_string'])
blob_client = blob_service_client.get_blob_client(container=container_name, blob=remote_path)

with open(local_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
logging.info(f"Uploaded {local_path} to Azure Blob Storage as {remote_path}")
5 changes: 2 additions & 3 deletions oceanstream/process/combine_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from datetime import datetime
from rich import print
from pathlib import Path
import psutil
from oceanstream.echodata import check_reversed_time, fix_time_reversions


Expand All @@ -21,7 +20,7 @@ def read_zarr_files(input_folder):
logging.error("Input folder does not exist: %s", input_folder)
return

zarr_files = list(input_path.glob("*.zarr"))
zarr_files = list(input_path.rglob("*.zarr"))
if not zarr_files:
logging.error("No .zarr files found in directory: %s", input_folder)
return
Expand All @@ -47,7 +46,7 @@ def fix_time(ed):
return ed


def combine_zarr_files(input_folder, zarr_output_file=None, chunks=None, log_level=logging.DEBUG):
def combine_zarr_files(input_folder, zarr_output_file=None, chunks=None):
start_time = time.time()

logging.debug("Starting to combine Zarr files from folder: %s", input_folder)
Expand Down
Loading

0 comments on commit 689bb8d

Please sign in to comment.