# **Example Usage of Pangeo-Fish Software**


**Overview:**
This Jupyter notebook demonstrates the usage of the Pangeo-Fish software, a tool designed for analyzing biologging data in reference to Earth Observation (EO) data.

The biologging data consist of Data Storage Tag (DST), along with release and recapture time and location of the species in question. Both biologging data and the reference EO data are accessible with https and the access methods are incorporated in this notebook.   

**Purpose:**
By executing this notebook, users will learn how to set up a workflow for utilizing the Pangeo-Fish software. The workflow consists of 9 steps which are described below:

1. **Configure the Notebook:** Prepare the notebook environment for analysis.
2. **Compare Reference Model with DST Information:** Analyze and compare data from the reference model with information from the biologging data of the species in question. 
3. **Regrid the Grid from Reference Model Grid to Healpix Grid:** Transform the grid from the reference model to the Healpix grid for further analysis.
4. **Construct Emission Matrix:** Create an emission matrix based on the transformed grid.
5. **Replace emission for flagged tags:** If the tags are flagged for warm water, then it use the detection file associated and change the flagged timestamps.
6. **Combine and Normalize Emission Matrix:** Merge the emission matrix and normalize it for further processing.
7. **Estimate Model Parameters:** Determine the parameters of the model based on the normalized emission matrix.
8. **Compute State Probabilities and Tracks:** Calculate the probability distribution of the species in question and compute the tracks.
9. **Visualization:** Visualize the results of the analysis for interpretation and insight.

Throughout this notebook, users will gain practical experience in setting up and executing a workflow using Pangeo-Fish, enabling them to apply similar methodologies to their own biologging data analysis tasks.



## 1. **Configure the Notebook:** Prepare the notebook environment for analysis.

In this step, we sets up the notebook environment for analysis. It includes installing necessary packages, importing required libraries, setting up parameters, and configuring the cluster for distributed computing. It also retrieves the tag data needed for analysis.

    

In [1]:
# Import necessary libraries and modules.
import xarray as xr
from pint_xarray import unit_registry as ureg
from pangeo_fish.io import open_tag
import hvplot.xarray
import intake
import pandas as pd



In [68]:
#
# Set up execution parameters for the analysis.
#
# Note: This cell is tagged as parameters, allowing automatic updates when configuring with papermil.

# tag_name corresponds to the name of the biologging tag name (DST identification number), 
# which is also a path for storing all the information for the specific fish tagged with tag_name.
# tag_name = "AD_A11849"
# tag_name = "SV_A11957"


tag_list = ['NO_A12710','CB_A11036','LT_A11385','SQ_A10684','AD_A11177','PB_A12063','NO_A12742','DK_A10642','CB_A11071']
tag_name = tag_list[8]
tag_name = "DK_A10531"

cloud_root = "s3://gfts-ifremer/tags/bargip"

# tag_root specifies the root URL for tag data used for this computation.
tag_root = f"{cloud_root}/cleaned"

# catalog_url specifies the URL for the catalog for reference data used.
catalog_url = "s3://gfts-ifremer/copernicus_catalogs/master.yml"

# scratch_root specifies the root directory for storing output files.
scratch_root = f"{cloud_root}/tracks"


# storage_options specifies options for the filesystem storing output files.
storage_options = {
    'anon': False, 
    # 'profile' : "gfts",
    'client_kwargs': {
        "endpoint_url": "https://s3.gra.perf.cloud.ovh.net",
        "region_name": "gra",
    }
}

# if you are using local file system, activate following two lines
folder_name = "../toto"
storage_options =None
scratch_root = f"/home/jovyan/notebooks/papermill/{folder_name}"

# Default chunk value for time dimension.  This values depends on the configuration of your dask cluster.
chunk_time=24

#
# Parameters for step 2. **Compare Reference Model with DST Information:**
#
# bbox, bounding box, defines the latitude and longitude range for the analysis area.
bbox = {"latitude": [40, 56], "longitude": [-13, 5]} 

# relative_depth_threshold defines the acceptable fish depth relative to the maximum tag depth.
# It determines whether the fish can be considered to be in a certain location based on depth.
relative_depth_threshold = 0.8

#
# Parameters for step 3. **Regrid the Grid from Reference Model Grid to Healpix Grid:** 
#
# Distance filepath is the path to the coastal distance file.
distance_filepath = "s3://gfts-ifremer/tags/distance2coast.zarr"

# distance_scale_factor scales the squared distance in the exponential decay function.
distance_scale_factor = 0.01

# nside defines the resolution of the healpix grid used for regridding.
nside = 4096 #*2

# rot defines the rotation angles for the healpix grid.
rot = {"lat": 0, "lon": 30}

# min_vertices sets the minimum number of vertices for a valid transcription for regridding.
min_vertices = 1

#
# Parameters for step 4. **Construct Emission Matrix:**
#
# differences_std sets the standard deviation for scipy.stats.norm.pdf.
# It expresses the estimated certainty of the field of difference.
differences_std = 0.75

# recapture_std sets the covariance for recapture event.
# It shows the certainty of the final recapture area if it is known.
recapture_std = 1e-2

# earth_radius defines the radius of the Earth used for distance calculations.
earth_radius = ureg.Quantity(6371, "km")

# maximum_speed sets the maximum allowable speed for the tagged fish.
maximum_speed = ureg.Quantity(20, "km / day")

# adjustment_factor adjusts parameters for a more fuzzy search.
# It will factor the allowed maximum displacement of the fish.
adjustment_factor = 5  

# truncate sets the truncating factor for computed maximum allowed sigma for convolution process.
truncate = 4

#
# Parameters for step 5. **Compute Additional Emission Probability Matrix:**
#


# buffer_size sets the size of the powerplant warm plume.
buffer_size = ureg.Quantity(1000, "m")
# powerplant_flag is a boolean that states if the fish has swam in warm plume


#
# Parameters for step 7. **Estimate Model Parameters:** 
#
# tolerance sets the tolerance level for optimised parameter serarch computation.
tolerance = 1e-3

#
# Parameters for step 8. **Compute State Probabilities and Tracks:** 
#
# track_modes defines the modes for track calculation.
track_modes = ["mean", "mode"]

# additional_track_quantities sets quantities to compute for tracks using moving pandas.
additional_track_quantities = ["speed", "distance"]


#
# Parameters for step 9. **Visualization:** 
#
# time_step defines for each time_step value we visualize state and emission matrix. 
time_step = 3


# Define target root directories for storing analysis results.
target_root = f"{scratch_root}/{tag_name}"

# Defines default chunk size for optimisation.  
default_chunk = {"time": chunk_time, "lat": -1, "lon": -1}
default_chunk_xy = {"time": chunk_time, "x": -1, "y": -1}


In [5]:
# Define target root directories for storing analysis results.
target_root = f"{scratch_root}/{tag_name}"

In [6]:
target_root

'/home/jovyan/notebooks/papermill/../toto/DK_A10531'

In [7]:
tag_root

's3://gfts-ifremer/tags/bargip/cleaned'

In [8]:
warm_plume = pd.read_csv("s3://gfts-ifremer/tags/bargip/bar_flag_warm_plume.txt",sep = "\t")
warm_list = list(warm_plume[warm_plume["warm_plume"]==True]["tag_name"])

if tag_name in  warm_list : 
    powerplant_flag = True
else : 
    powerplant_flag = False

In [9]:
if powerplant_flag:
    detection_file = f"{tag_root}/{tag_name}/detection.csv"
    powerplant_file = f"{cloud_root}/nuclear_plant_loc.csv"

In [10]:
# Set up a local cluster for distributed computing.
from distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 10
Total threads: 60,Total memory: 447.03 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:37545,Workers: 10
Dashboard: http://127.0.0.1:8787/status,Total threads: 60
Started: Just now,Total memory: 447.03 GiB

0,1
Comm: tcp://127.0.0.1:40941,Total threads: 6
Dashboard: http://127.0.0.1:32819/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:44149,
Local directory: /tmp/dask-scratch-space/worker-c504ocsj,Local directory: /tmp/dask-scratch-space/worker-c504ocsj

0,1
Comm: tcp://127.0.0.1:42291,Total threads: 6
Dashboard: http://127.0.0.1:46747/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:32787,
Local directory: /tmp/dask-scratch-space/worker-dvkfaops,Local directory: /tmp/dask-scratch-space/worker-dvkfaops

0,1
Comm: tcp://127.0.0.1:37429,Total threads: 6
Dashboard: http://127.0.0.1:35883/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:36925,
Local directory: /tmp/dask-scratch-space/worker-icspvkji,Local directory: /tmp/dask-scratch-space/worker-icspvkji

0,1
Comm: tcp://127.0.0.1:42085,Total threads: 6
Dashboard: http://127.0.0.1:37197/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:45157,
Local directory: /tmp/dask-scratch-space/worker-v5htkwzs,Local directory: /tmp/dask-scratch-space/worker-v5htkwzs

0,1
Comm: tcp://127.0.0.1:44485,Total threads: 6
Dashboard: http://127.0.0.1:41247/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:36323,
Local directory: /tmp/dask-scratch-space/worker-lfrrzcse,Local directory: /tmp/dask-scratch-space/worker-lfrrzcse

0,1
Comm: tcp://127.0.0.1:39077,Total threads: 6
Dashboard: http://127.0.0.1:38785/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:41867,
Local directory: /tmp/dask-scratch-space/worker-rouibnc_,Local directory: /tmp/dask-scratch-space/worker-rouibnc_

0,1
Comm: tcp://127.0.0.1:39913,Total threads: 6
Dashboard: http://127.0.0.1:38817/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:40617,
Local directory: /tmp/dask-scratch-space/worker-wiqe8iu7,Local directory: /tmp/dask-scratch-space/worker-wiqe8iu7

0,1
Comm: tcp://127.0.0.1:42037,Total threads: 6
Dashboard: http://127.0.0.1:43939/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:45735,
Local directory: /tmp/dask-scratch-space/worker-1rjjet2u,Local directory: /tmp/dask-scratch-space/worker-1rjjet2u

0,1
Comm: tcp://127.0.0.1:42547,Total threads: 6
Dashboard: http://127.0.0.1:40799/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:46383,
Local directory: /tmp/dask-scratch-space/worker-dtx8x96p,Local directory: /tmp/dask-scratch-space/worker-dtx8x96p

0,1
Comm: tcp://127.0.0.1:33441,Total threads: 6
Dashboard: http://127.0.0.1:34917/status,Memory: 44.70 GiB
Nanny: tcp://127.0.0.1:44531,
Local directory: /tmp/dask-scratch-space/worker-ifqfmlgw,Local directory: /tmp/dask-scratch-space/worker-ifqfmlgw




In [11]:
#Open and retrieve the tag data required for the analysis
tag = open_tag(tag_root, tag_name)
tag

## 2. **Compare Reference Model with DST Tag Information:** Analyze and compare data from the reference model with information from the biologging data of the species in question. 

In this step, we compare the reference model data with Data Storage Tag information.
The process involves reading and cleaning the reference model, aligning time, converting depth units, subtracting tag data from the model, and saving the results.

In [12]:
# Import necessary libraries
import intake
from pangeo_fish.cf import bounds_to_bins
from pangeo_fish.diff import diff_z
from pangeo_fish.io import open_copernicus_catalog
from pangeo_fish.tags import adapt_model_time, reshape_by_bins, to_time_slice

# Drop data outside the reference interval
time_slice = to_time_slice(tag["tagging_events/time"])
time = tag["dst"].ds.time
cond = (time <= time_slice.stop) & (time >= time_slice.start)

tag_log = tag["dst"].ds.where(cond,drop=True)

min_ = tag_log.time[0]
max_ = tag_log.time[-1]

time_slice = slice(min_.data, max_.data)

In [13]:
def get_copernicus_zarr(product_id="IBI_MULTIYEAR_PHY_005_002"):
    master_cat = intake.open_catalog(catalog_url)
    if product_id == "IBI_MULTIYEAR_PHY_005_002":
        
        # Open necessary datasets
        sub_cat = master_cat[product_id]
        thetao = sub_cat["cmems_mod_ibi_phy_my_0.083deg-3D_P1D-m"](chunk="time").to_dask()[["thetao"]]
        zos = sub_cat["cmems_mod_ibi_phy_my_0.083deg-3D_P1D-m"](chunk="time").to_dask().zos
        deptho = sub_cat["cmems_mod_ibi_phy_my_0.083deg-3D_static"].to_dask().deptho
    
    # Assign latitude and longitude from thetao to deptho to shift in positions
    deptho["latitude"] = thetao["latitude"]
    deptho["longitude"] = thetao["longitude"]
    
    # Create mask for deptho
    mask = deptho.isnull()

    # Merge datasets and assign relevant variables
    ds = (
        thetao.rename({"thetao": "TEMP"})
        .assign(
            {
                "XE": zos,
                "H0": deptho,
                "mask": mask,
            }
        )
    ).rename({"latitude": "lat", "longitude": "lon", "elevation": "depth"})

    # Ensure depth is positive
    ds["depth"] = abs(ds["depth"])

    # Rearrange depth coordinates and assign dynamic depth and bathymetry
    ds = ds.isel(depth=slice(None, None, -1)).assign(
        {
            "dynamic_depth": lambda ds: (ds["depth"] + ds["XE"]).assign_attrs(
                {"units": "m", "positive": "down"}
            ),
            "dynamic_bathymetry": lambda ds: (ds["H0"] + ds["XE"]).assign_attrs(
                {"units": "m", "positive": "down"}
            ),
        }
    ).pipe(broadcast_variables, {"lat": "latitude", "lon": "longitude"})
    # print(uris_by_key)
    return ds

In [14]:
# Verify the data
import hvplot.xarray
import cmocean
from pangeo_fish.io import save_html_hvplot
plot=((-tag['dst'].pressure).hvplot(width=1000,height=500,color='blue')*(-tag_log).hvplot.scatter(x='time',y='pressure',color='red',size=5,width=1000,height=500)
      *((tag['dst'].temperature).hvplot(width=1000,height=500,color='blue')*(tag_log).hvplot.scatter(x='time',y='temperature',color='red',size=5,width=1000,height=500)))
filepath = f"{target_root}/tags.html"

save_html_hvplot(plot, filepath, storage_options)

# plot

(True, 'Plot saved successfully.')

In [15]:
from pangeo_fish.io import broadcast_variables

In [16]:
model = get_copernicus_zarr()

  'dims': dict(self._ds.dims),
  'dims': dict(self._ds.dims),


In [17]:
# Subset the reference_model by 
# - align model time with the time of tag_log, also
# - drop data for depth later that are unlikely due to the observed pressure from tag_log
# - defined latitude and longitude of bbox.  
#
reference_model = (
    model.sel(time=adapt_model_time(time_slice))
    .sel(lat=slice(*bbox["latitude"]), lon=slice(*bbox["longitude"]))
    .pipe(
        lambda ds: ds.sel(
            depth=slice(None, (tag_log["pressure"].max() - ds["XE"].min()).compute())
        )
    )
)

In [18]:
%%time
# Reshape the tag log, so that it bins to the time step of reference_model
reshaped_tag = reshape_by_bins(
    tag_log,
    dim="time",
    bins=(
        reference_model.cf.add_bounds(["time"], output_dim="bounds")
        .pipe(bounds_to_bins, bounds_dim="bounds")
        .get("time_bins")
    ),
    bin_dim="bincount",
    other_dim="obs",
).chunk({"time": chunk_time})

CPU times: user 220 ms, sys: 26.7 ms, total: 247 ms
Wall time: 228 ms


In [19]:
# Subtract the time_bined tag_log from the reference_model. 
# Here, for each time_bin, each observed value are compared with the correspoindng depth of reference_model using diff_z function.  
#

diff = (
    diff_z(reference_model.chunk(dict(depth=-1)), reshaped_tag, depth_threshold=relative_depth_threshold)
    .assign_attrs({"tag_id": tag_name})
    .assign(
        {
            "H0": reference_model["H0"],
            "ocean_mask": reference_model["H0"].notnull(),
        }
    )
)

# Persist the diff data
diff = diff.chunk(default_chunk).persist()
# diff

In [20]:
%%time
# Verify the data
# diff["diff"].count(["lat","lon"]).plot()

CPU times: user 5 µs, sys: 2 µs, total: 7 µs
Wall time: 15.3 µs


In [21]:
diff

Unnamed: 0,Array,Chunk
Bytes,163.60 kiB,163.60 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 163.60 kiB 163.60 kiB Shape (193, 217) (193, 217) Dask graph 1 chunks in 1 graph layer Data type float32 numpy.ndarray",217  193,

Unnamed: 0,Array,Chunk
Bytes,163.60 kiB,163.60 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,163.60 kiB,163.60 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 163.60 kiB 163.60 kiB Shape (193, 217) (193, 217) Dask graph 1 chunks in 1 graph layer Data type float32 numpy.ndarray",217  193,

Unnamed: 0,Array,Chunk
Bytes,163.60 kiB,163.60 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.69 MiB,7.67 MiB
Shape,"(71, 193, 217)","(24, 193, 217)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 22.69 MiB 7.67 MiB Shape (71, 193, 217) (24, 193, 217) Dask graph 3 chunks in 1 graph layer Data type float64 numpy.ndarray",217  193  71,

Unnamed: 0,Array,Chunk
Bytes,22.69 MiB,7.67 MiB
Shape,"(71, 193, 217)","(24, 193, 217)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,163.60 kiB,163.60 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 163.60 kiB 163.60 kiB Shape (193, 217) (193, 217) Dask graph 1 chunks in 1 graph layer Data type float32 numpy.ndarray",217  193,

Unnamed: 0,Array,Chunk
Bytes,163.60 kiB,163.60 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,40.90 kiB,40.90 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,bool numpy.ndarray,bool numpy.ndarray
"Array Chunk Bytes 40.90 kiB 40.90 kiB Shape (193, 217) (193, 217) Dask graph 1 chunks in 1 graph layer Data type bool numpy.ndarray",217  193,

Unnamed: 0,Array,Chunk
Bytes,40.90 kiB,40.90 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,bool numpy.ndarray,bool numpy.ndarray


In [22]:
target_lat = diff["lat"]
target_lon = diff["lon"]

In [23]:
%%time
# Save snapshot to disk
diff.to_zarr(
    f"{target_root}/diff.zarr", mode="w", storage_options=storage_options
)

# Cleanup
del tag_log, model, reference_model, reshaped_tag, diff

CPU times: user 727 ms, sys: 117 ms, total: 844 ms
Wall time: 9.51 s


## 3. **Regrid the Grid from Reference Model Grid to Healpix Grid:** Transform the grid from the reference model to the Healpix grid for further analysis.

In this step, we regrid the data from the reference model grid to a Healpix grid. This process involves defining the Healpix grid, creating the target grid, computing interpolation weights, performing the regridding, and saving the regridded data.


In [24]:
# Import necessary libraries
import s3fs
import numpy as np
from xarray_healpy import HealpyGridInfo, HealpyRegridder
from pangeo_fish.grid import center_longitude

In [25]:
%%time

# Open the diff data and performs cleaning operations to prepare it for regridding.

ds = (
    xr.open_dataset(f"{target_root}/diff.zarr", engine="zarr", chunks={},
                    storage_options=storage_options)
    .pipe(lambda ds: ds.merge(ds[["latitude", "longitude"]].compute()))
    .swap_dims({"lat": "yi", "lon": "xi"})
)
ds

CPU times: user 34.1 ms, sys: 1.63 ms, total: 35.7 ms
Wall time: 52.8 ms


Unnamed: 0,Array,Chunk
Bytes,163.60 kiB,163.60 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 163.60 kiB 163.60 kiB Shape (193, 217) (193, 217) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",217  193,

Unnamed: 0,Array,Chunk
Bytes,163.60 kiB,163.60 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.69 MiB,7.67 MiB
Shape,"(71, 193, 217)","(24, 193, 217)"
Dask graph,3 chunks in 2 graph layers,3 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 22.69 MiB 7.67 MiB Shape (71, 193, 217) (24, 193, 217) Dask graph 3 chunks in 2 graph layers Data type float64 numpy.ndarray",217  193  71,

Unnamed: 0,Array,Chunk
Bytes,22.69 MiB,7.67 MiB
Shape,"(71, 193, 217)","(24, 193, 217)"
Dask graph,3 chunks in 2 graph layers,3 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,40.90 kiB,40.90 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,bool numpy.ndarray,bool numpy.ndarray
"Array Chunk Bytes 40.90 kiB 40.90 kiB Shape (193, 217) (193, 217) Dask graph 1 chunks in 2 graph layers Data type bool numpy.ndarray",217  193,

Unnamed: 0,Array,Chunk
Bytes,40.90 kiB,40.90 kiB
Shape,"(193, 217)","(193, 217)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,bool numpy.ndarray,bool numpy.ndarray


In [26]:
s3 = s3fs.S3FileSystem(
    anon=False,
    client_kwargs={
        "endpoint_url": "https://s3.gra.perf.cloud.ovh.net",
    },
)

In [27]:
coastal_distance = xr.open_zarr(distance_filepath).sel(lat = slice(56,40),lon=slice(-13,5))

In [28]:
coastal_distance = coastal_distance.sortby("lat")

In [29]:
coastal_distance = coastal_distance.interp(lat=target_lat, lon=target_lon, method='linear')

In [30]:
coastal_distance["dist"] = (1+np.exp(-(coastal_distance.dist*coastal_distance.dist)*distance_scale_factor))

In [31]:
coastal_distance = (
    coastal_distance
    .swap_dims({"lat": "yi", "lon": "xi"})
    .drop_vars(["lat", "lon"])
)

In [32]:
%%time
# Define the target Healpix grid information
grid = HealpyGridInfo(level=int(np.log2(nside)), rot=rot)
target_grid = grid.target_grid(ds).pipe(center_longitude, 0)
target_grid

CPU times: user 837 ms, sys: 85.2 ms, total: 922 ms
Wall time: 639 ms


In [33]:
%%time
# Compute the interpolation weights for regridding the diff data
regridder = HealpyRegridder(
    ds[["longitude", "latitude", "ocean_mask"]],
    target_grid,
    method="bilinear",
    interpolation_kwargs={"mask": "ocean_mask", "min_vertices": min_vertices},
)
regridder

CPU times: user 6.05 s, sys: 285 ms, total: 6.34 s
Wall time: 5.97 s


  return masked_weights / np.sum(masked_weights, axis=-1)[:, None]


HealpyRegridder(method='bilinear', interpolation_kwargs={'mask': 'ocean_mask', 'min_vertices': 1})

In [34]:
%%time
# Perform the regridding operation using the computed interpolation weights.
regridded = regridder.regrid_ds(ds)
regridded

CPU times: user 1.12 s, sys: 276 ms, total: 1.4 s
Wall time: 1.3 s


Unnamed: 0,Array,Chunk
Bytes,7.13 MiB,7.13 MiB
Shape,"(934413,)","(934413,)"
Dask graph,1 chunks in 7 graph layers,1 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.13 MiB 7.13 MiB Shape (934413,) (934413,) Dask graph 1 chunks in 7 graph layers Data type float64 numpy.ndarray",934413  1,

Unnamed: 0,Array,Chunk
Bytes,7.13 MiB,7.13 MiB
Shape,"(934413,)","(934413,)"
Dask graph,1 chunks in 7 graph layers,1 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,506.16 MiB,171.10 MiB
Shape,"(71, 934413)","(24, 934413)"
Dask graph,3 chunks in 8 graph layers,3 chunks in 8 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 506.16 MiB 171.10 MiB Shape (71, 934413) (24, 934413) Dask graph 3 chunks in 8 graph layers Data type float64 numpy.ndarray",934413  71,

Unnamed: 0,Array,Chunk
Bytes,506.16 MiB,171.10 MiB
Shape,"(71, 934413)","(24, 934413)"
Dask graph,3 chunks in 8 graph layers,3 chunks in 8 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.13 MiB,7.13 MiB
Shape,"(934413,)","(934413,)"
Dask graph,1 chunks in 7 graph layers,1 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.13 MiB 7.13 MiB Shape (934413,) (934413,) Dask graph 1 chunks in 7 graph layers Data type float64 numpy.ndarray",934413  1,

Unnamed: 0,Array,Chunk
Bytes,7.13 MiB,7.13 MiB
Shape,"(934413,)","(934413,)"
Dask graph,1 chunks in 7 graph layers,1 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [35]:
regridded_coastal = regridder.regrid_ds(coastal_distance)

In [36]:
%%time
# Reshape the regridded data to 2D
reshaped = grid.to_2d(regridded).pipe(center_longitude, 0)
reshaped = reshaped.persist()
reshaped

CPU times: user 4.32 s, sys: 8.48 s, total: 12.8 s
Wall time: 1.78 s


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 0.96 GiB 332.49 MiB Shape (71, 1213, 1497) (24, 1213, 1497) Dask graph 3 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213  71,

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [37]:
reshaped_coastal = grid.to_2d(regridded_coastal).pipe(center_longitude,0)

In [38]:
# This cell verifies the regridded data by plotting the count of non-NaN values.
# reshaped["diff"].count(["x", "y"]).plot()

In [39]:
coastal_chunk = {"x" : default_chunk_xy["x"], "y":default_chunk_xy["y"]}

In [40]:
reshaped["diff"] = reshaped["diff"] / reshaped_coastal["dist"]

In [41]:
%%time
# This cell saves the regridded data to Zarr format, then cleans up unnecessary variables to free up memory after the regridding process.  
reshaped.chunk(default_chunk_xy).to_zarr(
    f"{target_root}/diff-regridded.zarr",
    mode="w",
    consolidated=True,
    compute=True,
    storage_options=storage_options,
)

reshaped_coastal.chunk(coastal_chunk).to_zarr(
    f"{target_root}/coastal.zarr",
    mode="w",
    consolidated=True,
    compute=True,
    storage_options=storage_options,
)
# Cleanup unnecessary variables to free up memory
del ds, grid, target_grid, regridder, regridded, reshaped,reshaped_coastal

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


CPU times: user 683 ms, sys: 638 ms, total: 1.32 s
Wall time: 4.25 s


## 4. **Construct Emission Matrix:** Create an emission matrix based on the transformed grid.

In this step, we construct the emission probability matrix based on the differences between the observed tag temperature and the reference sea temperature computed in Workflow 2 and regridded in Workflow 3. The emission probability matrix represents the likelihood of observing a specific temperature difference given the model parameters and configurations.


In [42]:
# Import necessary libraries
from toolz.dicttoolz import valfilter
from pangeo_fish.distributions import create_covariances, normal_at
from pangeo_fish.pdf import normal


In [43]:
%%time
# Open the regridded diff data 
differences = xr.open_dataset(
    f"{target_root}/diff-regridded.zarr",
    engine="zarr",
    chunks={},
    storage_options=storage_options,
).pipe(lambda ds: ds.merge(ds[["latitude", "longitude"]].compute()))
differences

CPU times: user 122 ms, sys: 65.7 ms, total: 187 ms
Wall time: 260 ms


Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 2 graph layers,3 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 0.96 GiB 332.49 MiB Shape (71, 1213, 1497) (24, 1213, 1497) Dask graph 3 chunks in 2 graph layers Data type float64 numpy.ndarray",1497  1213  71,

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 2 graph layers,3 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [44]:
%%time
# Compute initial and final position
grid = differences[["latitude", "longitude"]].compute()

initial_position = tag["tagging_events"].ds.sel(event_name="release")
cov = create_covariances(1e-6, coord_names=["latitude", "longitude"])
initial_probability = normal_at(
    grid, pos=initial_position, cov=cov, normalize=True, axes=["latitude", "longitude"]
)

final_position = tag["tagging_events"].ds.sel(event_name="fish_death")
if final_position[["longitude", "latitude"]].to_dataarray().isnull().all():
    final_probability = None
else:
    cov = create_covariances(recapture_std**2, coord_names=["latitude", "longitude"])
    final_probability = normal_at(
        grid,
        pos=final_position,
        cov=cov,
        normalize=True,
        axes=["latitude", "longitude"],
    )


CPU times: user 159 ms, sys: 12.6 ms, total: 172 ms
Wall time: 162 ms


In [45]:
%%time
#compute emission probability matrix

emission_pdf = (
    normal(differences["diff"], mean=0, std=differences_std, dims=["y", "x"])
    .to_dataset(name="pdf")
    .assign(
        valfilter(
            lambda x: x is not None,
            {
                "initial": initial_probability,
                "final": final_probability,
                "mask": differences["ocean_mask"],
            },
        )
    )
    .assign_attrs(differences.attrs )#| {"max_sigma": max_sigma})
)

emission_pdf=emission_pdf.chunk(default_chunk_xy).persist()
emission_pdf

CPU times: user 18.3 ms, sys: 2.06 ms, total: 20.4 ms
Wall time: 17.5 ms


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type int64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 0.96 GiB 332.49 MiB Shape (71, 1213, 1497) (24, 1213, 1497) Dask graph 3 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213  71,

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [46]:
#Verify the data
# emission_pdf["pdf"].count(["x", "y"]).plot()

In [47]:
# This cell saves the emission data to Zarr format, then cleans up unnecessary variables to free up memory.

emission_pdf.to_zarr(
    f"{target_root}/emission.zarr", mode="w", consolidated=True,
    storage_options=storage_options,
)

del differences, grid, initial_probability, final_probability, emission_pdf

## 5. **Replace emission for the tags with warm spikes detected**

In [48]:
%%time 
# Import necessary libraries and open data and perform initial setup
from pangeo_fish import acoustic, utils
import hvplot.xarray
import pandas as pd
from pangeo_fish.heat import heat_regulation, powerpalnt_emission_map
emission = xr.open_dataset(
    f"{target_root}/emission.zarr", engine="zarr", chunks={},#"x": -1, "y": -1},
    storage_options=storage_options,
)
emission

CPU times: user 70.6 ms, sys: 6.38 ms, total: 77 ms
Wall time: 73.1 ms


Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 2 graph layers Data type int64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 2 graph layers,3 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 0.96 GiB 332.49 MiB Shape (71, 1213, 1497) (24, 1213, 1497) Dask graph 3 chunks in 2 graph layers Data type float64 numpy.ndarray",1497  1213  71,

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 2 graph layers,3 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [49]:
if powerplant_flag :
    # Loading detections, formatting and reducing observation window
    detections = pd.read_csv(detection_file).set_index("time").to_xarray()
    detections["time"]=detections["time"].astype("datetime64")
    detections = detections.sel(time=emission["time"]) # Narrowing the data to the observed days only
    
    pp_map = pd.read_csv(powerplant_file,sep=";").drop("Country",axis=1).to_xarray() # Loading powerplant locations data
    
    # Combining and replacing the emission map at the given timestamps for the days where warm plume are detected
    combined_masks = powerpalnt_emission_map(pp_map,emission,buffer_size,rot)
    emission = heat_regulation(emission,detections,combined_masks)

  detections["time"]=detections["time"].astype("datetime64")


## 6. **Combine and Normalize Emission Matrix:** Merge the emission matrix and normalize it for further processing.

In this step, we combine the emission probability matrix constructed in Workflow 4 and 5 then normalize it to ensure that the probabilities sum up to one. This step prepares the combined emission matrix for further analysis and interpretation.


In [50]:
# Import necessary libraries 
from pangeo_fish.pdf import combine_emission_pdf
import hvplot.xarray

In [51]:
# Open and combine the emission probability matrix

combined = (
    emission
    .pipe(combine_emission_pdf)
    .chunk(default_chunk_xy)
    .persist()  # convert to comment if the emission matrix does *not* fit in memory
)
combined

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type int64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 0.96 GiB 332.49 MiB Shape (71, 1213, 1497) (24, 1213, 1497) Dask graph 3 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213  71,

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [52]:
# Verify the data and visualize the sum of probabilities
# combined["pdf"].sum(["x", "y"]).hvplot(width=400)

In [53]:
# Save the combined and normalized emission matrix
combined.to_zarr(
    f"{target_root}/combined.zarr", mode="w", consolidated=True,
    storage_options=storage_options    
)
del combined

## 7. **Estimate Model Parameters:** Determine the parameters of the model based on the normalized emission matrix.

This step first estimates maxixmum allowed value of  model parameter 'sigma' max_sigma.  Then we
create an optimizer with an expected parameter range, fitting the model to the normalized emission matrix.  
The resulting optimized parameters is saved to a json file.  

In [54]:
# Import necessary libraries and modules for data analysis.
import xarray as xr
import pandas as pd
from pangeo_fish.hmm.estimator import EagerScoreEstimator
from pangeo_fish.hmm.optimize import EagerBoundsSearch
from pangeo_fish.utils import temporal_resolution
# Open the data
emission = (
    xr.open_dataset(
        f"{target_root}/combined.zarr",
        engine="zarr",
        chunks={},
        inline_array=True,
        storage_options=storage_options            
    )
)
emission

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type int64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 0.96 GiB 332.49 MiB Shape (71, 1213, 1497) (24, 1213, 1497) Dask graph 3 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213  71,

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [55]:
# Compute maximum displacement for each reference model time step
# and estimate maximum sigma value for limiting the optimisation step

earth_radius_ = xr.DataArray(earth_radius, dims=None)

timedelta = temporal_resolution(emission["time"]).pint.quantify().pint.to("h")
grid_resolution = earth_radius_ * emission["resolution"].pint.quantify()

maximum_speed_ = xr.DataArray(maximum_speed, dims=None).pint.to("km / h")
max_grid_displacement = maximum_speed_ * timedelta * adjustment_factor / grid_resolution
max_sigma = max_grid_displacement.pint.to("dimensionless").pint.magnitude / truncate
emission.attrs["max_sigma"]=max_sigma
max_sigma

  timedelta = to_offset(freq).delta.to_numpy()


47.11935070054214

In [56]:
# Create and configure estimator and optimizer
emission = emission.compute()  # Convert to comment if the emission matrix does *not* fit in memory
estimator = EagerScoreEstimator()
optimizer = EagerBoundsSearch(
    estimator,
    (1e-4, emission.attrs["max_sigma"]),
    optimizer_kwargs={"disp": 3, "xtol": tolerance},
)

In [57]:
%%time
# Fit the model parameter to the data
optimized = optimizer.fit(emission)

 
 Func-count     x          f(x)          Procedure
    1        17.9981      1244.04        initial
    2        29.1214      1282.02        golden
    3        11.1234      1222.28        golden
    4        6.87471      1172.73        golden
    5        4.24884      1127.84        golden
    6        2.62597      1091.76        golden


  normalized = updated / normalizations[index]
  return -np.sum(np.log(normalizations_))


    7        1.62297          inf        golden
    8        3.24585      1106.53        golden
    9        2.24286      1082.03        golden
   10        2.00608      1076.49        golden
   11        1.85975      1073.82        golden
   12        1.76931      1072.26        golden
   13        1.71341      1071.46        golden
   14        1.67887      1071.04        golden
   15        1.65752      1070.82        golden
   16        1.64432      1070.69        golden
   17        1.63617      1070.62        golden
   18        1.63113      1070.57        golden
   19        1.62801      1070.55        golden
   20        1.62609      1070.53        golden
   21         1.6249          inf        golden
   22        1.62682      1070.54        golden
   23        1.62563      1070.53        golden
   24         1.6253      1070.53        golden

Optimization terminated successfully;
The returned value satisfies the termination criteria
(using xtol =  0.001 )
CPU times: user 1min

In [58]:
# Save the optimized parameters
params=optimized.to_dict()
pd.DataFrame.from_dict(params, orient='index').to_json(f"{target_root}/parameters.json", 
                                                               storage_options=storage_options    )       

In [59]:
# Cleanup
del optimized,  emission 

## 8. **Compute State Probabilities and Tracks:** Calculate the probability distribution of the species in question and compute the tracks.

This step involves predicting state probabilities using the optimised parameter sigma computed in the last step together with normalized emission matrix.  

In [60]:
# Import necessary libraries and modules for data analysis.
import xarray as xr
import pandas as pd
import hvplot.xarray
from pangeo_fish.hmm.estimator import EagerScoreEstimator
from pangeo_fish.io import save_trajectories

# Recreate the Estimator
params=pd.read_json(f"{target_root}/parameters.json", storage_options=storage_options    ).to_dict()[0]
optimized = EagerScoreEstimator(**params)
optimized

EagerScoreEstimator(sigma=1.6253014351, truncate=4.0)

In [61]:
%%time
# Load the Data
emission = (
    xr.open_dataset(
        f"{target_root}/combined.zarr",
        engine="zarr",
        chunks=default_chunk_xy,
        inline_array=True,
        storage_options=storage_options            
    ).compute()
)

# Predict the State Probabilities

states = optimized.predict_proba(emission)
states = states.to_dataset().chunk(default_chunk_xy).persist()
states

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


CPU times: user 5.22 s, sys: 1.5 s, total: 6.72 s
Wall time: 6.38 s


Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type int64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.85 MiB 13.85 MiB Shape (1213, 1497) (1213, 1497) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213,

Unnamed: 0,Array,Chunk
Bytes,13.85 MiB,13.85 MiB
Shape,"(1213, 1497)","(1213, 1497)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 0.96 GiB 332.49 MiB Shape (71, 1213, 1497) (24, 1213, 1497) Dask graph 3 chunks in 1 graph layer Data type float64 numpy.ndarray",1497  1213  71,

Unnamed: 0,Array,Chunk
Bytes,0.96 GiB,332.49 MiB
Shape,"(71, 1213, 1497)","(24, 1213, 1497)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [62]:
# Verify the data and visualize the sum of probabilities
# states.sum(["x", "y"]).hvplot() +states.count(["x", "y"]).hvplot()

In [63]:
%%time
# Save probability distirbution, state matrix.  
states.chunk(default_chunk_xy).to_zarr(
    f"{target_root}/states.zarr", mode="w", consolidated=True,  
        storage_options=storage_options                
)

CPU times: user 320 ms, sys: 1.09 s, total: 1.41 s
Wall time: 1.5 s


<xarray.backends.zarr.ZarrStore at 0x7fe52fbbacc0>

In [64]:
%%time 
#decode tracks

trajectories = optimized.decode(
    emission,
    states.fillna(0),
    mode=track_modes,
    progress=False,
    additional_quantities=additional_track_quantities,
)
trajectories

    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array.reshape(shape)

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    >>> array.reshape(shape, limit='128 MiB')
  stacked = self.stack({newdimname: dim})


CPU times: user 299 ms, sys: 43.3 ms, total: 342 ms
Wall time: 748 ms


TrajectoryCollection with 2 trajectories

In [65]:
# Save trajectories. 
# Here we can chose format parquet for loading files from 'R'
# or chose to  format 'geoparquet' for further analysis of tracks using 
# geopands. 

save_trajectories(trajectories, target_root,storage_options, format="parquet")

In [66]:
# Cleanup
del optimized,  emission, states, trajectories

In [67]:
cluster.close()
client.close()

## 9. **Visualization:** Visualize the results of the analysis for interpretation and insight.


In this step, we visualize various aspects of the analysis results to gain insights and interpret the model outcomes. We plot the emission matrix, which represents the likelihood of observing a specific temperature difference given the model parameters and configurations. Additionally, we visualize the state probabilities, showing the likelihood of the system being in different states at each time step. We also plot each of the tracks of the tagged fish, displaying their movement patterns over time. Finally, we create a movie that combines the emission matrix and state probabilities to provide a comprehensive visualization of the analysis results.
