# **Example Usage of Pangeo-Fish Software**


**Overview:**
This Jupyter notebook demonstrates the usage of the Pangeo-Fish software, a tool designed for analyzing biologging data in reference to Earth Observation (EO) data. Specifically, it utilizes data employed in the study conducted by M. Gonze et al. titled "Combining acoustic telemetry with archival tagging to investigate the spatial dynamics of the understudied pollack *Pollachius pollachius*," accepted for publication in the Journal of Fish Biology.

We showcase the application using the biologging tag 'A19124' attached to a pollack fish, along with reference EO data from the European Union Copernicus Marine Service Information (CMEMS) product 'NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013'. The biologging data consist of Data Storage Tag (DST) and teledetection by acoustic signals, along with release and recapture time and location of the species in question.  Both biologging data and the reference EO data are accessible with https and the access methods are incropolated in this notebook.   



**Purpose:**
By executing this notebook, users will learn how to set up a workflow for utilizing the Pangeo-Fish software. The workflow consists of 9 steps which are described below:

1. **Configure the Notebook:** Prepare the notebook environment for analysis.
2. **Compare Reference Model with DST Information:** Analyze and compare data from the reference model with information from the biologging data of the species in question. 
3. **Regrid the Grid from Reference Model Grid to Healpix Grid:** Transform the grid from the reference model to the Healpix grid for further analysis.
4. **Construct Emission Matrix:** Create an emission matrix based on the transformed grid.
5. **Compute Additional Emission Probability Matrix:** Calculate an additional emission probability matrix, particularly focusing on teledetection from acoustic signals.
6. **Combine and Normalize Emission Matrix:** Merge the emission matrix and normalize it for further processing.
7. **Estimate Model Parameters:** Determine the parameters of the model based on the normalized emission matrix.
8. **Compute State Probabilities and Tracks:** Calculate the probability distribution of the species in question and compute the tracks.
9. **Visualization:** Visualize the results of the analysis for interpretation and insight.

Throughout this notebook, users will gain practical experience in setting up and executing a workflow using Pangeo-Fish, enabling them to apply similar methodologies to their own biologging data analysis tasks.



## 1. **Configure the Notebook:** Prepare the notebook environment for analysis.

In this step, we sets up the notebook environment for analysis. It includes installing necessary packages, importing required libraries, setting up parameters, and configuring the cluster for distributed computing. It also retrieves the tag data needed for analysis.

    

In [None]:
# Import necessary libraries and modules.
import xarray as xr
from pint_xarray import unit_registry as ureg
from pangeo_fish.io import open_tag
import hvplot.xarray

In [None]:
import warnings

# Define a filter function to catch specific warnings
def filter_warning(message, category, filename, lineno, file=None, line=None):
    # Filter out the warning you want to hide
    if "Could not set timeout on TCP stream" in str(message):
        return True
    return False
    
warnings.showwarning = filter_warning

In [None]:
#
# Set up execution parameters for the analysis.
#
# Note: This cell is tagged as parameters, allowing automatic updates when configuring with papermil.

# tag_name corresponds to the name of the biologging tag name (DST identification number), 
# which is also a path for storing all the information for the specific fish tagged with tag_name.
# tag_name = "AD_A11849"
tag_name = "SV_A11957"

# tag_list = ['NO_A12710','LT_A11205','CB_A11036','LT_A11385','SQ_A10684','AD_A11177','PB_A12063','NO_A12742','DK_A10642','CB_A11071']
# tag_name = tag_list[9]

# tag_root specifies the root URL for tag data used for this computation.
tag_root = "cleaned"

# catalog_url specifies the URL for the catalog for reference data used.
catalog_url = "https://data-taos.ifremer.fr/kerchunk/ref-copernicus.yaml"

# scratch_root specifies the root directory for storing output files.
scratch_root = "s3://destine-gfts-data-lake/demo"

# storage_options specifies options for the filesystem storing output files.
storage_options = {
    'anon': False, 
    'profile' : "gfts",
    'client_kwargs': {
        "endpoint_url": "https://s3.gra.perf.cloud.ovh.net",
        "region_name": "gra",
    }
}

# if you are using local file system, activate following two lines
scratch_root = "./track_generation/"
storage_options = None

# Default chunk value for time dimension.  This values depends on the configuration of your dask cluster.
chunk_time=24

#
# Parameters for step 2. **Compare Reference Model with DST Information:**
#
# bbox, bounding box, defines the latitude and longitude range for the analysis area.
bbox = {"latitude": [42, 53], "longitude": [-8, 4]} 

# relative_depth_threshold defines the acceptable fish depth relative to the maximum tag depth.
# It determines whether the fish can be considered to be in a certain location based on depth.
relative_depth_threshold = 0.8

#
# Parameters for step 3. **Regrid the Grid from Reference Model Grid to Healpix Grid:** 
#
# nside defines the resolution of the healpix grid used for regridding.
nside = 4096 #*2

# rot defines the rotation angles for the healpix grid.
rot = {"lat": 0, "lon": 0}

# min_vertices sets the minimum number of vertices for a valid transcription for regridding.
min_vertices = 1

#
# Parameters for step 4. **Construct Emission Matrix:**
#
# differences_std sets the standard deviation for scipy.stats.norm.pdf.
# It expresses the estimated certainty of the field of difference.
differences_std = 0.75

# recapture_std sets the covariance for recapture event.
# It shows the certainty of the final recapture area if it is known.
recapture_std = 1e-2

# earth_radius defines the radius of the Earth used for distance calculations.
earth_radius = ureg.Quantity(6371, "km")

# maximum_speed sets the maximum allowable speed for the tagged fish.
maximum_speed = ureg.Quantity(60, "km / day")

# adjustment_factor adjusts parameters for a more fuzzy search.
# It will factor the allowed maximum displacement of the fish.
adjustment_factor = 5  

# truncate sets the truncating factor for computed maximum allowed sigma for convolution process.
truncate = 4

#
# Parameters for step 5. **Compute Additional Emission Probability Matrix:**
#
# receiver_buffer sets the maximum allowed detection distance for acoustic receivers.
receiver_buffer = ureg.Quantity(1000, "m")

#
# Parameters for step 7. **Estimate Model Parameters:** 
#
# tolerance sets the tolerance level for optimised parameter serarch computation.
tolerance = 1e-3

#
# Parameters for step 8. **Compute State Probabilities and Tracks:** 
#
# track_modes defines the modes for track calculation.
track_modes = ["mean", "mode"]

# additional_track_quantities sets quantities to compute for tracks using moving pandas.
additional_track_quantities = ["speed", "distance"]


#
# Parameters for step 9. **Visualization:** 
#
# time_step defines for each time_step value we visualize state and emission matrix. 
time_step = 3


# Define target root directories for storing analysis results.
target_root = f"{scratch_root}/{tag_name}"

# Defines default chunk size for optimisation.  
default_chunk = {"time": chunk_time, "lat": -1, "lon": -1}
default_chunk_xy = {"time": chunk_time, "x": -1, "y": -1}


In [None]:
# Set up a local cluster for distributed computing.
from distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()
client

In [None]:
#Open and retrieve the tag data required for the analysis
tag = open_tag(tag_root, tag_name)
tag

## 3. **Regrid the Grid from Reference Model Grid to Healpix Grid:** Transform the grid from the reference model to the Healpix grid for further analysis.

In this step, we regrid the data from the reference model grid to a Healpix grid. This process involves defining the Healpix grid, creating the target grid, computing interpolation weights, performing the regridding, and saving the regridded data.


In [None]:
# Import necessary libraries
import numpy as np
from xarray_healpy import HealpyGridInfo, HealpyRegridder
from pangeo_fish.grid import center_longitude

In [None]:
%%time

# Open the diff data and performs cleaning operations to prepare it for regridding.

ds = (
    xr.open_dataset(f"{target_root}/diff.zarr", engine="zarr", chunks={},
                    storage_options=storage_options)
    .pipe(lambda ds: ds.merge(ds[["latitude", "longitude"]].compute()))
    .swap_dims({"lat": "yi", "lon": "xi"})
    .drop_vars(["lat", "lon"])
)
ds

In [None]:
%%time
# Define the target Healpix grid information
grid = HealpyGridInfo(level=int(np.log2(nside)), rot=rot)
target_grid = grid.target_grid(ds).pipe(center_longitude, 0)
target_grid

In [None]:
%%time
# Compute the interpolation weights for regridding the diff data
regridder = HealpyRegridder(
    ds[["longitude", "latitude", "ocean_mask"]],
    target_grid,
    method="bilinear",
    interpolation_kwargs={"mask": "ocean_mask", "min_vertices": min_vertices},
)
regridder

In [None]:
%%time
# Perform the regridding operation using the computed interpolation weights.
regridded = regridder.regrid_ds(ds)
regridded

In [None]:
reshaped = grid.to_2d_bis(regridded,dim="cells").pipe(center_longitude,0)

In [None]:
reshaped["diff"].isel(time=0).hvplot.quadmesh(x="longitude",y="latitude",coastline="10m",cmap="cool",width=1000,height=600)

In [None]:
def base_pixel(level, indices):
    return indices >> (level * 2)

In [None]:
# Define the filtering condition
cond = base_pixel(grid.level, regridded["cell_ids"]) == 0

# using the function
regridded_filtered = regridded.where(cond, other=0, drop=True)

# Result
regridded_filtered

In [None]:
%%time
# Reshape the regridded data to 2D
reshaped_1 = grid.to_2d(regridded_filtered).pipe(center_longitude, 0)
reshaped_1 = reshaped_1.persist()
reshaped_1

In [None]:
# Define the filtering condition
cond = base_pixel(grid.level, regridded["cell_ids"]) == 3

# using the function
regridded_filtered = regridded.where(cond, other=0, drop=True)

# Result
regridded_filtered

In [None]:
%%time
# Reshape the regridded data to 2D
reshaped_2 = grid.to_2d(regridded_filtered).pipe(center_longitude, 0)
reshaped_2 = reshaped_2.persist()
reshaped_2

In [None]:
display(reshaped_1,reshaped_2)

In [None]:
reshaped_1["diff"].isel(time=0).hvplot.quadmesh(x="longitude",y="latitude",coastline="10m",cmap="cool",width=1000,height=600,grid=True) * reshaped_2["diff"].isel(time=0).hvplot.quadmesh(x="longitude",y="latitude",coastline="10m",cmap="fire",width=1000,height=600,grid=True)

In [None]:
# This cell verifies the regridded data by plotting the count of non-NaN values.
reshaped["diff"].count(["x", "y"]).plot()

In [None]:
%%time
# This cell saves the regridded data to Zarr format, then cleans up unnecessary variables to free up memory after the regridding process.  
reshaped.chunk(default_chunk_xy).to_zarr(
    f"{target_root}/diff-regridded.zarr",
    mode="w",
    consolidated=True,
    compute=True,
    storage_options=storage_options,
)
# Cleanup unnecessary variables to free up memory
del ds, grid, target_grid, regridder, regridded, reshaped

## 4. **Construct Emission Matrix:** Create an emission matrix based on the transformed grid.

In this step, we construct the emission probability matrix based on the differences between the observed tag temperature and the reference sea temperature computed in Workflow 2 and regridded in Workflow 3. The emission probability matrix represents the likelihood of observing a specific temperature difference given the model parameters and configurations.


In [None]:
# Import necessary libraries
from toolz.dicttoolz import valfilter
from pangeo_fish.distributions import create_covariances, normal_at
from pangeo_fish.pdf import normal


In [None]:
%%time
# Open the regridded diff data 
differences = xr.open_dataset(
    f"{target_root}/diff-regridded.zarr",
    engine="zarr",
    chunks={},
    storage_options=storage_options,
).pipe(lambda ds: ds.merge(ds[["latitude", "longitude"]].compute()))
differences

In [None]:
%%time
# Compute initial and final position
grid = differences[["latitude", "longitude"]].compute()

initial_position = tag["tagging_events"].ds.sel(event_name="release")
cov = create_covariances(1e-6, coord_names=["latitude", "longitude"])
initial_probability = normal_at(
    grid, pos=initial_position, cov=cov, normalize=True, axes=["latitude", "longitude"]
)

final_position = tag["tagging_events"].ds.sel(event_name="fish_death")
if final_position[["longitude", "latitude"]].to_dataarray().isnull().all():
    final_probability = None
else:
    cov = create_covariances(recapture_std**2, coord_names=["latitude", "longitude"])
    final_probability = normal_at(
        grid,
        pos=final_position,
        cov=cov,
        normalize=True,
        axes=["latitude", "longitude"],
    )


In [None]:
%%time
#compute emission probability matrix

emission_pdf = (
    normal(differences["diff"], mean=0, std=differences_std, dims=["y", "x"])
    .to_dataset(name="pdf")
    .assign(
        valfilter(
            lambda x: x is not None,
            {
                "initial": initial_probability,
                "final": final_probability,
                "mask": differences["ocean_mask"],
            },
        )
    )
    .assign_attrs(differences.attrs )#| {"max_sigma": max_sigma})
)

emission_pdf=emission_pdf.chunk(default_chunk_xy).persist()
emission_pdf

In [None]:
#Verify the data
emission_pdf["pdf"].count(["x", "y"]).plot()

In [None]:
emission_pdf["pdf"].isel(time=0).hvplot.quadmesh(x='longitude',y='latitude',cmap="cool",clim=(-2,2))

In [None]:
# This cell saves the emission data to Zarr format, then cleans up unnecessary variables to free up memory.

emission_pdf.to_zarr(
    f"{target_root}/emission.zarr", mode="w", consolidated=True,
    storage_options=storage_options,
)


del differences, grid, initial_probability, final_probability, emission_pdf

## 6. **Combine and Normalize Emission Matrix:** Merge the emission matrix and normalize it for further processing.

In this step, we combine the emission probability matrix constructed in Workflow 4 and 5 then normalize it to ensure that the probabilities sum up to one. This step prepares the combined emission matrix for further analysis and interpretation.


In [None]:
# Import necessary libraries 
from pangeo_fish.pdf import combine_emission_pdf
import hvplot.xarray

In [None]:
# Open and combine the emission probability matrix

combined = (
    xr.open_dataset(
        f"{target_root}/emission.zarr",
        engine="zarr",
        chunks=default_chunk_xy,
        inline_array=True,
        storage_options=storage_options          
    )
    .pipe(combine_emission_pdf)
    .chunk(default_chunk_xy)
    .persist()  # convert to comment if the emission matrix does *not* fit in memory
)
combined

In [None]:
# Verify the data and visualize the sum of probabilities
combined["pdf"].sum(["x", "y"]).hvplot(width=400)

In [None]:
# Save the combined and normalized emission matrix
combined.to_zarr(
    f"{target_root}/combined.zarr", mode="w", consolidated=True,
    storage_options=storage_options    
)
del combined

## 7. **Estimate Model Parameters:** Determine the parameters of the model based on the normalized emission matrix.

This step first estimates maxixmum allowed value of  model parameter 'sigma' max_sigma.  Then we
create an optimizer with an expected parameter range, fitting the model to the normalized emission matrix.  
The resulting optimized parameters is saved to a json file.  

In [None]:
# Import necessary libraries and modules for data analysis.
import xarray as xr
import pandas as pd
from pangeo_fish.hmm.estimator import EagerScoreEstimator
from pangeo_fish.hmm.optimize import EagerBoundsSearch
from pangeo_fish.utils import temporal_resolution
# Open the data
emission = (
    xr.open_dataset(
        f"{target_root}/combined.zarr",
        engine="zarr",
        chunks={},
        inline_array=True,
        storage_options=storage_options            
    )
)
emission

In [None]:
# Compute maximum displacement for each reference model time step
# and estimate maximum sigma value for limiting the optimisation step

earth_radius_ = xr.DataArray(earth_radius, dims=None)

timedelta = temporal_resolution(emission["time"]).pint.quantify().pint.to("h")
grid_resolution = earth_radius_ * emission["resolution"].pint.quantify()

maximum_speed_ = xr.DataArray(maximum_speed, dims=None).pint.to("km / h")
max_grid_displacement = maximum_speed_ * timedelta * adjustment_factor / grid_resolution
max_sigma = max_grid_displacement.pint.to("dimensionless").pint.magnitude / truncate
emission.attrs["max_sigma"]=max_sigma
max_sigma

In [None]:
# Create and configure estimator and optimizer
emission = emission.compute()  # Convert to comment if the emission matrix does *not* fit in memory
estimator = EagerScoreEstimator()
optimizer = EagerBoundsSearch(
    estimator,
    (1e-4, emission.attrs["max_sigma"]),
    optimizer_kwargs={"disp": 3, "xtol": tolerance},
)

In [None]:
%%time
# Fit the model parameter to the data
optimized = optimizer.fit(emission)

In [None]:
# Save the optimized parameters
params=optimized.to_dict()
pd.DataFrame.from_dict(params, orient='index').to_json(f"{target_root}/parameters.json", 
                                                               storage_options=storage_options    )       

In [None]:
# Cleanup
del optimized,  emission 

## 8. **Compute State Probabilities and Tracks:** Calculate the probability distribution of the species in question and compute the tracks.

This step involves predicting state probabilities using the optimised parameter sigma computed in the last step together with normalized emission matrix.  

In [None]:
# Import necessary libraries and modules for data analysis.
import xarray as xr
import pandas as pd
import hvplot.xarray
from pangeo_fish.hmm.estimator import EagerScoreEstimator
from pangeo_fish.io import save_trajectories

# Recreate the Estimator
params=pd.read_json(f"{target_root}/parameters.json", storage_options=storage_options    ).to_dict()[0]
optimized = EagerScoreEstimator(**params)
optimized

In [None]:
%%time
# Load the Data
emission = (
    xr.open_dataset(
        f"{target_root}/combined.zarr",
        engine="zarr",
        chunks=default_chunk_xy,
        inline_array=True,
        storage_options=storage_options            
    ).compute()
)

# Predict the State Probabilities

states = optimized.predict_proba(emission)
states = states.to_dataset().chunk(default_chunk_xy).persist()
states

In [None]:
# Verify the data and visualize the sum of probabilities
states.sum(["x", "y"]).hvplot() +states.count(["x", "y"]).hvplot()

In [None]:
%%time
# Save probability distirbution, state matrix.  
states.chunk(default_chunk_xy).to_zarr(
    f"{target_root}/states.zarr", mode="w", consolidated=True,  
        storage_options=storage_options                
)

In [None]:
emission

In [None]:
emission["pdf"][0]

In [None]:
%%time 
#decode tracks

trajectories = optimized.decode(
    emission,
    states.fillna(0),
    mode=track_modes,
    progress=False,
    additional_quantities=additional_track_quantities,
)
trajectories

In [None]:
# Save trajectories. 
# Here we can chose format parquet for loading files from 'R'
# or chose to  format 'geoparquet' for further analysis of tracks using 
# geopands. 

save_trajectories(trajectories, target_root,storage_options, format="parquet")

In [None]:
# Cleanup
del optimized,  emission, states, trajectories

## 9. **Visualization:** Visualize the results of the analysis for interpretation and insight.


In this step, we visualize various aspects of the analysis results to gain insights and interpret the model outcomes. We plot the emission matrix, which represents the likelihood of observing a specific temperature difference given the model parameters and configurations. Additionally, we visualize the state probabilities, showing the likelihood of the system being in different states at each time step. We also plot each of the tracks of the tagged fish, displaying their movement patterns over time. Finally, we create a movie that combines the emission matrix and state probabilities to provide a comprehensive visualization of the analysis results.


In [None]:
# Import necessary libraries
import holoviews as hv
import hvplot.xarray
import cmocean
import xmovie
from pangeo_fish import visualization
from pangeo_fish.io import read_trajectories, save_html_hvplot

In [None]:
# load trajectories 
trajectories = read_trajectories( track_modes,target_root,storage_options, format="parquet")

In [None]:
# Plot trajectoriesand plot.  

plots = [
    traj.hvplot(c="speed", tiles="CartoLight", title=traj.id, cmap="cmo.speed"
#                ,xlim=bbox['longitude'],        ylim=bbox['latitude']
                ,width=300,height=300)
    for traj in trajectories.trajectories
]
plot=hv.Layout(plots).cols(2)

filepath = f"{target_root}/trajectories.html"
save_html_hvplot(plot, filepath, storage_options)

plot

In [None]:
# load files for plotting

emission = (
    xr.open_dataset(
        f"{target_root}/combined.zarr",
        engine="zarr",
        chunks={},
        inline_array=True,
        storage_options=storage_options,
    )
    .rename_vars({"pdf": "emission"})
    .drop_vars(["final", "initial"])
)#.where(emission["mask"])
states = (
    xr.open_dataset(
        f"{target_root}/states.zarr", 
        engine="zarr", 
        chunks={}, 
        inline_array=True,
        storage_options=storage_options,
    ).where(emission["mask"])
)
data = xr.merge([states, emission.drop_vars(["mask"])])


In [None]:
%%time
# visualize states and emission matrix.  Save the visualisation in an html file.  
# 
plot1 = visualization.plot_map(data["states"],bbox)
plot2 = visualization.plot_map(data["emission"],bbox)
plot=hv.Layout([plot1, plot2]).cols(2)
filepath = f"{target_root}/states_emission.html"

save_html_hvplot(plot, filepath, storage_options)

plot

In [None]:
%%time
## Create Movies 
#
mov = xmovie.Movie(
    (data
     .isel(time=slice(0,data.time.size-1,time_step))
     .chunk({"time": 1, "x": -1, "y": -1})
     .pipe(lambda ds: ds.merge(ds[["longitude", "latitude"]].compute())))
    .pipe(
        visualization.filter_by_states
    ),
    plotfunc=visualization.create_frame,
    input_check=False,
    pixelwidth=15 * 400,
    pixelheight=12 * 400,
    dpi=400,
)

In [None]:
## with s3fs, this does not work due to https://github.com/jbusecke/xmovie/issues/162
# use local file system
#!mkdir movie
#mov.save(f"./movie/states.mp4", parallel=True, overwrite_existing=True, )
mov.save(f"{target_root}/states.mp4", parallel=True, overwrite_existing=True, )