# Executing multiple parameters notebooks with papermill
___
**[Papermill](https://papermill.readthedocs.io/en/latest/)** is a Python library that allows you to parameterize, execute, and analyze Jupyter Notebooks, making it useful for automating and scaling data analysis workflows.

In this notebook, we will set up a routine that can generate trajectories for multiple fishes with adjusted parameters.
First we define important parameters that will be used in the loop that executes the notebooks.
The second part will generate ipynb files, based on a template noteboook, with the modified parameters, defined in the first cells of the notebook

### Parameters set up

In [1]:
# Necessary imports
import json
import os
import re
from datetime import datetime
from pathlib import Path

import kbatch
from kbatch_papermill import kbatch_papermill
import numpy as np
import pandas as pd
import pytz
import s3fs
from tqdm.notebook import tqdm

In [2]:
# Connecting to the bucket
s3 = s3fs.S3FileSystem(anon=False)

In [3]:
### Parameters for the execution of the notebook

pangeo_fish_dir = Path.home() / "pangeo-fish"

# Tags repo is the s3like path to the data stored on the bucket.
tags_repo = "gfts-ifremer/bargip/tag/formatted/"

# local_output is the path where the parametrized notebooks will be stored
local_output = "papermill_output"

# Change notebook path to the notebook in pangeo-fish
input_notebook = "notebooks/papermill/pangeo-fish_papermill.ipynb"

# cloud_root is the path to acces the reference file in remote (for
cloud_root = "s3://gfts-ifremer/bargip"

# folder name is the name of the folder where the result will be stored
folder_name = "test"

In [4]:
# Retrieve the username in a JupyterHub environment
user_name = os.getenv("JUPYTERHUB_USER")

# Setting up parameters for the computation
remote = True

if remote:
    storage_options = {
        "anon": False,
        "client_kwargs": {
            "endpoint_url": "https://s3.gra.perf.cloud.ovh.net",
            "region_name": "gra",
        },
    }
    scratch_root = f"{cloud_root}/run/{user_name}/{folder_name}"
    s3_code_dir = f"gfts-ifremer/kbatch/{user_name}"
else:
    storage_options = None
    scratch_root = f"/home/jovyan/notebooks/papermill/{folder_name}"  # Update this path with your local path where you want to it to be stored

In [5]:
# Setting up path for the s3 file access
tag_list = [tag.replace(tags_repo, "") for tag in s3.ls(tags_repo)]

In [6]:
# limiting the number of computation to limit the test.
tag_list = tag_list[1:15]
tag_list.append("SV_A11963")

In [7]:
# param is the dict passed as an argument to papermill
param = {"storage_options": storage_options, "scratch_root": scratch_root}

In [8]:
# Verifying the params for the location
param

{'storage_options': {'anon': False,
  'client_kwargs': {'endpoint_url': 'https://s3.gra.perf.cloud.ovh.net',
   'region_name': 'gra'}},
 'scratch_root': 's3://gfts-ifremer/bargip/run/minrk/test'}

![warning](warning.png)

### BE CAREFUL FOR THE PATH CHOOSEN FOR **SCRATCH ROOT**, THIS PATH IS THE DIRECTORY WHERE THE RESULT OF THE COMPUTATION WILL BE STORED. WATCH OUT TO NOT OVERWRITE SOMETHING !

___
### Explantion of the code below 
- If the fish has observation over 2 days and has not been processed yet, it starts running a parametrized notebook.
- nbs is a list of the notebooks that has been processed, wether they failed or not. This list is used to keep a track of the tags that already has been generated.
These two conditions are used in the following way.
```
observation_length = (recapture_date - release_date) / np.timedelta64(1, "D")

if ((tag_name not in nbs) and observation_length > 2):  
```
First, it means that if the fish, based on the tagging events, has a observation period of less than two days, his trajectory will not be computed.
Second, It means that you can either choose to regenerate for all the tags that you generated once if you noticed that there was an issue in the results.
If the generation was interrupted during the process but the results are valid, you can start back you computation where it has stopped.

You might need to update this line of to switch from one behaviour to another, by removing 
```
(tag_name not in nbs) and
```
from this statement.
- The code loops over the tag id present in tag list and calculates the time difference in the tagging events.
- If it succeds, the generated notebook is placed papermill_output/done, else, it goes at papermill_output/failed
 

In [11]:
timezone = pytz.timezone("Europe/Paris")
# force resubmits the whole job instead of checking what's been computed already
force = False

job_dict = {}
local_output = Path(local_output)
local_output.mkdir(exist_ok=True)

nb_output = f"{scratch_root}/nbs"
s3.mkdir(nb_output, exist_ok=True)

job_file = local_output / "jobs.json"
if job_file.exists() and not force:
    with job_file.open() as f:
        job_dict = json.load(f)

In [12]:
%%time
for tag_name in tqdm(tag_list, desc="Processing tags"):
    try:
        te = pd.read_csv(s3.open(f"s3://{tags_repo}{tag_name}/tagging_events.csv"))
        release_date = np.datetime64(
            datetime.strptime(te["time"][0], "%Y-%m-%dT%H:%M:%SZ")
        )
        recapture_date = np.datetime64(
            datetime.strptime(te["time"][1], "%Y-%m-%dT%H:%M:%SZ")
        )
        observation_length = (recapture_date - release_date) / np.timedelta64(1, "D")

        if observation_length > 2:
            param["tag_name"] = tag_name
            output_path = f"{nb_output}/{tag_name}.ipynb"
            job_id = job_dict.get(output_path)
            if not force:
                if job_id:
                    # already submitted
                    print(f"Already submitted {tag_name} as {job_id}")
                    continue
                if s3.exists(output_path):
                    print(f"Already have {output_path}")
                    continue
            # kubernetes-safe tag name
            safe_tag_name = re.sub(r"[^a-z0-9-]", "", tag_name.lower())
            job_id = kbatch_papermill(
                input_notebook,
                output_path,
                code_dir=pangeo_fish_dir,
                s3_code_dir=s3_code_dir,
                job_name=f"papermill-{safe_tag_name}",
                parameters=param,
                profile_name="big160",
            )
            print(f"Submitted {tag_name} as {job_id}")
            job_dict[output_path] = job_id
    except Exception as e:
        print(f"Error for {tag_name}: {e.__class__.__name__}: {e}")
        raise

print(json.dumps(job_dict, indent=1))

with job_file.open("w") as f:
    json.dump(job_dict, f)

Processing tags:   0%|          | 0/15 [00:00<?, ?it/s]

Submitted AD_A11170 as papermill-ada11170-8jdkp
Submitted AD_A11177 as papermill-ada11177-b269k
Submitted AD_A11382 as papermill-ada11382-wqkgq
Submitted AD_A11384 as papermill-ada11384-f7xfw
Submitted AD_A11389 as papermill-ada11389-gq9l9
Submitted AD_A11774 as papermill-ada11774-hv8j8
Submitted AD_A11786 as papermill-ada11786-lqqpg
Submitted AD_A11788 as papermill-ada11788-48fmd
Submitted AD_A11791 as papermill-ada11791-q8lbv
Submitted AD_A11797 as papermill-ada11797-28ckm
Submitted AD_A11799 as papermill-ada11799-kxmrw
Submitted AD_A11800 as papermill-ada11800-zddlf
Submitted AD_A11801 as papermill-ada11801-6t54t
Submitted SV_A11963 as papermill-sva11963-kqs5q
{
 "s3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11170.ipynb": "papermill-ada11170-8jdkp",
 "s3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11177.ipynb": "papermill-ada11177-b269k",
 "s3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11382.ipynb": "papermill-ada11382-wqkgq",
 "s3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A

In [15]:
from kbatch_papermill import wait_for_jobs

wait_for_jobs(*job_dict.values())

jobs:   0%|          | 0/14 [00:00<?, ?it/s]

Output()

KeyboardInterrupt: 

In [22]:
job_dict

{'s3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11170.ipynb': 'papermill-ada11170-ns8jb',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11177.ipynb': 'papermill-ada11177-pf95f',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11382.ipynb': 'papermill-ada11382-nlvzv',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11384.ipynb': 'papermill-ada11384-769s8',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11389.ipynb': 'papermill-ada11389-9vxpm',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11774.ipynb': 'papermill-ada11774-tkqld',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11786.ipynb': 'papermill-ada11786-68mwr',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11788.ipynb': 'papermill-ada11788-5lsbh',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11791.ipynb': 'papermill-ada11791-qmxkk',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11797.ipynb': 'papermill-ada11797-jqrr5',
 's3://gfts-ifremer/bargip/run/minrk/test/nbs/AD_A11799.ipynb': 'papermill-ada11

In [16]:
local_nbs = local_output / "nbs"
s3.get(f"{nb_output}/*", local_nbs, recursive=True)
!tree {local_nbs}

[01;34mpapermill_output/nbs[0m
├── [00mAD_A11170.ipynb[0m
├── [00mAD_A11177.ipynb[0m
├── [00mAD_A11382.ipynb[0m
├── [00mAD_A11384.ipynb[0m
├── [00mAD_A11389.ipynb[0m
├── [00mAD_A11774.ipynb[0m
├── [00mAD_A11786.ipynb[0m
├── [00mAD_A11788.ipynb[0m
├── [00mAD_A11791.ipynb[0m
├── [00mAD_A11797.ipynb[0m
├── [00mAD_A11799.ipynb[0m
├── [00mAD_A11800.ipynb[0m
├── [00mAD_A11801.ipynb[0m
└── [00mSV_A11963.ipynb[0m

0 directories, 14 files


In [17]:
import kbatch_papermill

kbatch_papermill.print_job_status()

In [31]:
print("".join(kbatch.job_logs(list(job_dict.values())[-1]).splitlines(True)[-32:]))


Ending Cell 75-----------------------------------------
Executing Cell 76--------------------------------------
Resources: Memory increased 84GiB -> 86GiB (4s)

Resources: Memory increased 86GiB -> 88GiB (4s)

Resources: Memory increased 88GiB -> 90GiB (5s)

Resources: Memory increased 90GiB -> 92GiB (5s)

Resources: Memory increased 92GiB -> 95GiB (11s)

<xarray.backends.zarr.ZarrStore at 0x7f136baf8040>
Resources: cpu=2.9, mem=95GiB, duration=14s

Ending Cell 76-----------------------------------------
Executing Cell 77--------------------------------------
Resources: Memory increased 99GiB -> 102GiB (2s)

TrajectoryCollection with 2 trajectories
Resources: cpu=12.0, mem=102GiB, duration=3s

Ending Cell 77-----------------------------------------
Executing Cell 78--------------------------------------
Resources: cpu=0.0, mem=93GiB, duration=0s

Ending Cell 78-----------------------------------------
Executing Cell 79--------------------------------------
Resources: cpu=0.0, mem=83Gi

In [30]:
# print the output for a failed job
failed_job = list(job_dict.values())[6]
logs = kbatch.job_logs(failed_job)
idx = logs.index("Exception encountered")
print("".join(logs[idx:].splitlines(True)[:32]))
# print(''.join(kbatch.job_logs(list(job_dict.values())[6]).splitlines(True)[-250:-90]))

Exception encountered at "In [23]":
---------------------------------------------------------------------------
ServerDisconnectedError                   Traceback (most recent call last)
Cell In[23], line 2
      1 # Verify the data
----> 2 diff["diff"].count(["lat","lon"]).plot()

File /srv/conda/envs/notebook/lib/python3.12/site-packages/xarray/plot/accessor.py:48, in DataArrayPlotAccessor.__call__(self, **kwargs)
     46 @functools.wraps(dataarray_plot.plot, assigned=("__doc__", "__annotations__"))
     47 def __call__(self, **kwargs) -> Any:
---> 48     return dataarray_plot.plot(self._da, **kwargs)

File /srv/conda/envs/notebook/lib/python3.12/site-packages/xarray/plot/dataarray_plot.py:270, in plot(darray, row, col, col_wrap, ax, hue, subplot_kws, **kwargs)
    219 def plot(
    220     darray: DataArray,
    221     *,
   (...)
    228     **kwargs: Any,
    229 ) -> Any:
    230     """
    231     Default plot of DataArray using :py:mod:`matplotlib:matplotlib.pyplot`.
    232