# Difference batch run vs interactive run at frDF

### Introduction

This notebook highlight performances (CPU and memory comsuption) discrepancies of the `forcedPhotCoadd` task, as function of the way a task is run.
The processing are inspired from [this notebook](./rubin-resource-usage-analysis-pipetasks.ipynb) where more information about whole processing can be found.

### Source data

Source data for this analysis is obtained from the records about task execution collected by the pipelines themselves and recorded in the butler repository. Those records are stored in `parquet` format.

The processing was run with butler `/sps/lsst/dataproducts/hsc/hsc_pdr2/butler.yaml`, and its collections can be accessed with `butler query-collections /sps/lsst/dataproducts/hsc/hsc_pdr2/butler.yaml`.

The base comparaison data are located under `/sps/lsst/dataproducts/hsc/hsc_pdr2/u/lsstprod/gather-resource-usage/`. Inside `step_*/` folders (for each step of the pipeline), there are :
 + `*_resource_usage/` for each task, containing metric for the task execution,
 +  `ResourceUsageSummary/` for each step, containing runtime and memory distribution for each task,
We will only use the former.

The new data are located under `/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/` where the next folder represent `{machine/}?{version}/{option}` (e.g. `ccahm002/v27.0.0/stack` means that this run was made on `ccahm002`, with version `v27.0.0` of the stack and its using `stack` only without modification.

When code was changed, it was made locally and loaded into the stack [check this guide](https://pipelines.lsst.io/install/package-development.html). We modified [meas_base](https://github.com/lsst/meas_base) package.

# Import, variables initialization and utils function definition

In [38]:
import glob
import math
import os
import pathlib
import shutil
import sys
from typing import Tuple, List

In [39]:
import polars as pl
import polars.selectors as cs

# Set the maximum length to display for string columns
_ = pl.Config.set_fmt_str_lengths(50)

In [40]:
import pandas as pd
import numpy as np

In [41]:
import bokeh
import bokeh.plotting as bkh
import bokeh.models as bkhmodels
from bokeh.io import curdoc, output_notebook
curdoc().clear()
bkh.output_notebook()

In [42]:
import IPython.display
print_md = IPython.display.Markdown

In [43]:
def build_dataframe_from_list(paths: list[str]) -> pl.DataFrame:
    """Read all .parquet files given in 'paths' and build a single dataframe.

    Parameters
    ----------
    paths : `list[str]`
       .parqu input's filenames. All the files are read to build the dataframe.

    Returns
    -------
    build_dataframe_from_dir: polars.DataFrame
       a dataframe where each row contains the information of one task.
    """
    print(f"Loading data files ...")
    df = None
    for file in paths:
        this_df = pl.read_parquet(file)
        # Pre-pend with step ID, and task ID if needed
        if "task" in this_df.columns:
            this_df = this_df.select(
                pl.lit(file.parents[1].name).alias("step"), pl.all()
            )
        else:
            this_df = this_df.select(
                pl.lit('/'.join(str(file).split('/')[10:-3])).alias("kind"),
                pl.lit(file.parents[1].name.split("_")[0]).alias("step"),
                pl.lit(file.parents[0].name.split("_")[0]).alias("task"),
                pl.all(),
            )
        df = this_df if df is None else pl.concat([df, this_df], how="diagonal_relaxed")

    return df

def get_column(df:pl.DataFrame, column:str) -> List:
    """Return a list with the values of column `column` from data frame `df`
    """
    return df.get_column(column).to_list()

def get_columns(df:pl.DataFrame, columns:List[str]) -> List:
    """Return a list with the values of column `column` from data frame `df`
    """
    ans = []
    for column in columns:
        ans.append(df.get_column(column).to_list())
    return ans

def save_figure(fig, output_dir, filename, title):
    """Save figure in formats HTML and PNG
    """
    # Ensure output directories exists
    os.makedirs(output_dir, exist_ok=True)
    image_dir = os.path.join(output_dir, 'images')
    os.makedirs(image_dir, exist_ok=True)
    html_dir = os.path.join(output_dir, 'html')
    os.makedirs(html_dir, exist_ok=True)
    
    # Save PNG
    png_filename = os.path.join(image_dir, f'{filename}.png')
    _ = bokeh.io.export_png(fig, filename=png_filename)
    
    # Save HTML
    html_filename = os.path.join(html_dir, f'{filename}.html')
    bkh.output_file(filename=html_filename, title=title)
    bkh.save(fig)
    
    # Reset the output file so the html file is not overwritten by
    # future calls to save()
    bokeh.io.reset_output()
    bokeh.io.output_notebook()

def clean_output_dir(directory):
    """Remove all .html and .png files from `directory`
    """
    if not os.path.exists(directory):
        return
    
    to_remove = glob.glob(os.path.join(directory, 'images', '*.png')) + glob.glob(os.path.join(directory, 'html', '*.html'))
    for name in to_remove:
        os.remove(name)

In [44]:
# Create an output directory for plots created by this notebook
output_dir = os.path.join('.', 'comparaison_interactive-vs-batch-run')
os.makedirs(output_dir, exist_ok=True)

clean_output_dir(output_dir)

In [45]:
!find /sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/ -name 'forcedPhotCoadd*parq'
!find /sps/lsst/dataproducts/hsc/hsc_pdr2/u/lsstprod/gather-resource-usage/ -name 'forcedPhotCoadd*parq'

/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/w_2024_47/stack/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_w_2024_47_stack_gather-resource-usage.parq
/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/w_2024_47/meas_base_modified_noopt/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_w_2024_47_meas_base_modified_noopt_gather-resource-usage.parq
/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/w_2024_47/meas_base/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_w_2024_47_meas_base_gather-resource-usage.parq
/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/v27.0.0/stack/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_v27_0_0_stack_gather-resour

In [46]:
# Load the data files for each step of interest and aggregate it in a single dataframe
paths_ = [
"/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/w_2024_47/stack/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_w_2024_47_stack_gather-resource-usage.parq",
"/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/w_2024_47/meas_base_modified_noopt/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_w_2024_47_meas_base_modified_noopt_gather-resource-usage.parq", 
"/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/w_2024_47/meas_base/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_w_2024_47_meas_base_gather-resource-usage.parq",
"/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/v27.0.0/stack/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_v27_0_0_stack_gather-resource-usage.parq",
"/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/ccahm002/w_2024_47/stack/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_ccahm002_w_2024_47_stack_gather-resource-usage.parq",
"/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/ccahm002/w_2024_47/meas_base_modified_noopt/gather-resource-usage/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_abernard_runs_forcedPhotCoadd_ccahm002_w_2024_47_meas_base_modified_noopt_gather-resource-usage.parq",
"/sps/lsst/dataproducts/hsc/hsc_pdr2/u/lsstprod/gather-resource-usage/step3_20240904T145006Z/forcedPhotCoadd_resource_usage/forcedPhotCoadd_resource_usage_u_lsstprod_gather-resource-usage_step3_20240904T145006Z.parq",
]

kind = ["w_2024_47/meas_base_modified_noopt", "w_2024_47/meas_base", "w_2024_47/stack", "v27.0.0/stack", "ccahm002/w_2024_47/meas_base_modified_noopt", "ccahm002/w_2024_47/stack", "quentin_full_run_v27?"]

assert(len(kind) == len(paths_))

explanation_kind = """
 + w_2024_47/meas_base_modified_noopt: using w_2024_47 of the stack, with package meas_base modified with base compilation option, run in with sbatch.
 + w_2024_47/meas_base: using w_2024_47 of the stack, with package meas_base with base compilation option, run in with sbatch.
 + w_2024_47/stack: using w_2024_47 version of the stack,, run in with sbatch.
 + v27.0.0/stack: using v27.0.0 version of the stack (dated of approx. june 2024), run in with sbatch.
 + ccahm002/w_2024_47/meas_base_modified_noopt: using w_2024_47 of the stack, with package meas_base modified with base compilation option, run on ccahm002 interactive server.
 + ccahm002/w_2024_47/stack: using the w_2024_47 version of the stack, run on ccahm002 interactive server.
 + quentin_full_run_v27?: results obtain from HSC-PDR2 run made by Quentin early september 2024, using version V27.0.0. No info on the run's specifics ?
"""
print_md(explanation_kind)

# Read
df_test = build_dataframe_from_list(map(pathlib.Path, paths_))

df_test = df_test.with_columns([
    (pl.col("memory") / 1e9),    
])

# Replace empty string value in quentin full run
df_test = df_test.with_columns(
    pl.when(pl.col("kind") == "")
    .then(pl.lit("quentin_full_run_v27?"))  # Replace with your desired string
    .otherwise(pl.col("kind"))
    .alias("kind")  # Update the column
)

# Select only forcedPhotCoadd task
df_test = df_test.filter((pl.col('task') == 'forcedPhotCoadd'))
# Select only patch 60
df_test = df_test.filter((pl.col('patch') == 60))

#Let's do a dodges plot (https://docs.bokeh.org/en/latest/docs/examples/basic/bars/dodged.html)
from bokeh.transform import dodge
# Set default plot dimensions
plot_width = 800
plot_height = 800
hist_height = 200
hist_width = 200

Loading data files ...


In [47]:
from bokeh.palettes import Category10
palette = Category10[10]

#print(df_test.select(pl.col('band')).unique().get_column('band'))
#for e in kind:
#    print(get_column(df_test.filter(pl.col('kind') == e), 'memory'))
#    print(get_column(df_test.filter(pl.col('kind') == e), 'run_time'))

#print({ e : get_column(df_test.filter(pl.col('kind') == e), 'run_time') for e in kind })

for g in ['memory', 'run_time']:
    s = {'band' : get_column(df_test.select(pl.col('band')).unique(), 'band')}
    #s.update({e : get_column(df_test.filter(pl.col('kind') == e), 'run_time') for e in kind })
    s.update({e : get_column(df_test.filter(pl.col('kind') == e), g) for e in kind })
    
    source = bkhmodels.ColumnDataSource(data=s)

    p = bkh.figure(
        #x_axis_type='log',
        x_axis_label='Band',
        x_range= get_column(df_test.select(pl.col('band')).unique(), 'band'),
        #y_range=y_range,
        #y_axis_type='log',
        y_axis_label=g,
        width=plot_width,
        height=plot_height,
        background_fill_color="#f4f3f3",
        #toolbar_location=None
    )
    # Add title and subtitle
    p.add_layout(bkhmodels.Title(text="for differents version of the stack", text_font_style="italic", text_font_size="11pt"), 'above')
    p.add_layout(bkhmodels.Title(text=f"{g} for forcedPhotCoadd task, on patch 60,", text_font_size="18pt"), 'above')

    for i, k in enumerate(kind):
        rend = p.vbar(x=dodge('band', -0.25 + i*0.10, range=p.x_range), top=k, source=source,
               width=0.1, legend_label=k, color=palette[i])
        p.add_tools(bkhmodels.HoverTool(tooltips=[
            (g, '@{'+k+'}'),
        ], renderers=[rend], mode='mouse'))

    # Configure axes
    for x_axis in p.xaxis:
        x_axis.axis_label_text_font_size = "11pt"
        x_axis.axis_label_text_font_style = "bold"
        x_axis.major_label_text_font_size = "11pt"
    
    for y_axis in p.yaxis:
        y_axis.axis_label_text_font_size = "14pt"
        y_axis.major_label_text_font_size = "12pt"
        y_axis.axis_label_text_font_style = "bold"

    p.legend.location = 'bottom'
    
    bkh.show(p)
    save_figure(p, output_dir=output_dir, filename=f'{g}-forcedPhotCoadd-tract9461-patch60_vs_stack-version', title=f'forcedPhotCoadd {g} difference as function of stack version')

In [48]:
# Load the data files for each step of interest and aggregate it in a single dataframe
#data_dir = '../../data/pipetasks'
data_dir = '/sps/lsst/dataproducts/hsc/hsc_pdr2/u/lsstprod/gather-resource-usage'

# Processing identifier
processing_label = "Rubin Observatory French Data Facility – HSC PDR2 Reprocessing and Operations Rehearsal for DRP (v27)"

# Processing ID
processing_ID = None

#paths = sorted(pathlib.Path(data_dir).glob('step*'))
# We use recursive globbing to match the files '**/' or .rglob
paths_RessourceUsageSummary = sorted(pathlib.Path(data_dir).glob('**/*ResourceUsageSummary*/*.parq'))
paths_individualTask = sorted(list(set(pathlib.Path(data_dir).glob('**/*.parq')) - set(paths_RessourceUsageSummary)))

data_dirToCompare = "/sps/lsst/dataproducts/hsc/hsc_pdr2/u/abernard/runs/forcedPhotCoadd/w_2024_47/meas_base_modified_noopt/gather-resource-usage"
taskToCompare = "forcedPhotCoadd"

paths_RessourceUsageSummaryToCompare = sorted(pathlib.Path(data_dirToCompare).glob('**/*ResourceUsageSummary*/*.parq'))
paths_individualTaskToCompare = sorted(list(set(pathlib.Path(data_dirToCompare).glob('**/*.parq')) - set(paths_RessourceUsageSummaryToCompare)))
                                       
print(f"There is {len(paths_RessourceUsageSummary)} step and {len(paths_individualTask)} individual tasks")
                                                        
df_all, df_summary, df_compare = None, None, None
df_all = build_dataframe_from_list(paths_individualTask)
df_summary = build_dataframe_from_list(paths_RessourceUsageSummary)
df_compare = build_dataframe_from_list(paths_individualTaskToCompare)

# Edit compare info to fit into df_all
df_compare = df_compare.with_columns(
    step=pl.col("step").replace("gather-resource-usage", "step3"),
    #task=pl.col("task").str.replace(taskToCompare, taskToCompare + "_opti")
)

# A dataframe where we replace base with all optimized
df_all_opti = pl.concat([df_all.filter(pl.col("task") != taskToCompare), df_compare], how='diagonal_relaxed')

#df_all = pl.concat([df_all, df_compare], how="diagonal_relaxed")

df_compare = df_compare.with_columns([
    (pl.col("memory") / 1e9),    
])

# Add a column 'cpu_efficiency' and compute 'memory' column in gigabytes instead of kilobytes as reported
# by Python's resource.getrusage
df_all = df_all.with_columns([
    (pl.col("memory") / 1e9),
    (pl.col('run_time') / pl.col('run_time').sum()).alias('elapsed_time_pct'),
    (pl.col("memory")/1e9*pl.col("run_time")).alias("memory_hours"),

    
])
df_all_opti = df_all_opti.with_columns([
    (pl.col("memory") / 1e9),
    (pl.col('run_time') / pl.col('run_time').sum()).alias('elapsed_time_pct'),
    (pl.col("memory")/1e9*pl.col("run_time")).alias("memory_hours"),

    
])

There is 9 step and 41 individual tasks
Loading data files ...
Loading data files ...
Loading data files ...


In [49]:
df_orig = df_all.filter(pl.col("task") == taskToCompare) # Extract task to compare
df_orig = df_orig[[s.name for s in df_orig if not (s.null_count() == df_orig.height)]] # Remove null columns
df_orig = df_orig.drop(['step', 'task', 'skymap', 'elapsed_time_pct']) # 
df_compare = df_compare.drop(['step', 'task', 'skymap'])

In [50]:
dfn = df_orig.join(df_compare, on=['band', 'tract', 'patch'])

# Add some metrics columns
dfn = dfn.with_columns(
    (pl.col("memory_right")*pl.col("run_time_right")).alias("memory_hours_right"),
    (pl.col("memory")*pl.col("run_time")).alias("memory_hours"),
)

# Add difference columns
dfn = dfn.with_columns(
    (pl.col("memory_right") - pl.col("memory")).alias("memory_diff"),
    (pl.col("init_time_right") - pl.col("init_time")).alias("init_time_diff"),
    (pl.col("run_time_right") - pl.col("run_time")).alias("run_time_diff"),
)

# Add relative columns
dfn = dfn.with_columns(
    (pl.col("memory_diff") / pl.col("memory")).alias("memory_diff_pct"),
    (pl.col("init_time_diff") / pl.col("init_time")).alias("init_time_diff_pct"),
    (pl.col("run_time_diff") / pl.col("run_time")).alias("run_time_diff_pct"),
)


numeric_cols = [dfn[col].cast(pl.Float64) for col in dfn.columns if dfn[col].dtype in (pl.Int64, pl.Float64)]

stats = (
    pl.concat(
        [
            dfn.select(numeric_cols).max().with_columns(pl.lit("Max").alias("Statistic")),
            dfn.select(numeric_cols).min().with_columns(pl.lit("Min").alias("Statistic")),
            dfn.select(numeric_cols).mean().with_columns(pl.lit("Mean").alias("Statistic")),
            dfn.select(numeric_cols).sum().with_columns(pl.lit("Sum").alias("Statistic")),
        ]
    )
    #.select(["Statistic", numeric_cols])
)

display(stats)
#display(pl.concat([dfn.max(), dfn.min(), dfn.mean(), dfn.sum()]), how="diagonal_relaxed")


display(dfn.filter(pl.col("patch") == 60))

memory,init_time,run_time,tract,patch,memory_hours,memory_right,init_time_right,run_time_right,memory_hours_right,memory_diff,init_time_diff,run_time_diff,memory_diff_pct,init_time_diff_pct,run_time_diff_pct,Statistic
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,str
4.041474,0.083963,9063.883401,9461.0,80.0,36631.44954,4.263543,0.064922,4790.549582,18389.549024,2.771964,0.007754,-13.262361,1.933817,0.183673,-0.004045,"""Max"""
1.271026,0.042217,311.917842,9461.0,0.0,486.105861,1.313985,0.029293,131.222313,258.009045,-1.210192,-0.044201,-4568.075898,-0.299443,-0.573045,-0.607373,"""Min"""
3.166687,0.060031,4004.626891,9461.0,40.0,13560.039111,3.5799,0.039748,2168.205,7812.688978,0.413212,-0.020284,-1836.421891,0.176599,-0.323072,-0.47,"""Mean"""
1282.508431,24.312627,1621900.0,3831705.0,16200.0,5491800.0,1449.859412,16.097761,878123.025193,3164100.0,167.350981,-8.214866,-743750.865847,71.522668,-130.844312,-190.350014,"""Sum"""


kind,band,memory,init_time,run_time,tract,patch,memory_hours,kind_right,memory_right,init_time_right,run_time_right,memory_hours_right,memory_diff,init_time_diff,run_time_diff,memory_diff_pct,init_time_diff_pct,run_time_diff_pct
str,str,f64,f64,f64,i64,i64,f64,str,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""""","""g""",4.012585,0.064882,5448.748275,9461,60,21863.565377,"""w_2024_47/meas_base_modified_noopt""",3.424895,0.041135,3458.065332,11843.510581,-0.58769,-0.023748,-1990.682943,-0.146462,-0.366014,-0.365347
"""""","""i""",3.589685,0.063852,9061.449116,9461,60,32527.750215,"""w_2024_47/meas_base_modified_noopt""",3.629023,0.033322,4493.373217,16306.555795,0.039338,-0.03053,-4568.075898,0.010959,-0.478135,-0.504122
"""""","""r""",3.828605,0.04381,4602.185047,9461,60,17619.948349,"""w_2024_47/meas_base_modified_noopt""",3.182981,0.046193,3336.097422,10618.735109,-0.645624,0.002383,-1266.087625,-0.168632,0.054397,-0.275106
"""""","""y""",3.348222,0.043323,5960.281819,9461,60,19956.346426,"""w_2024_47/meas_base_modified_noopt""",4.263543,0.038767,4313.208511,18389.549024,0.915321,-0.004555,-1647.073308,0.273375,-0.105148,-0.276342
"""""","""z""",4.041474,0.064401,9063.883401,9461,60,36631.44954,"""w_2024_47/meas_base_modified_noopt""",2.831282,0.029914,4790.549582,13563.397645,-1.210192,-0.034487,-4273.333819,-0.299443,-0.535503,-0.471468


## Overview

In [51]:
summary = rf"""
 + sum (t_i): sum of runtime of every task
 + sum (m_i): sum of max memory used of every task
 + sum (t_i * m_i): sum of, runtime multiplied by memory, of everytask

Impact on whole pipeline:

              sum (t_i) |   sum (m_i)   | sum (t_i * m_i)

Base took: {df_all.get_column("run_time").sum()/3600: >10.1f} h | {df_all.get_column("memory").sum(): >10.1f} Gb | {df_all.get_column("memory_hours").sum()/3600: >10.1f} Gb.h\n
Opti took: {df_all_opti.get_column("run_time").sum()/3600:>10.1f} h | {df_all_opti.get_column("memory").sum(): >10.1f} Gb | {df_all_opti.get_column("memory_hours").sum()/3600: >10.1f} Gb.h
Diff     : {df_all.get_column("run_time").sum()/3600 - df_all_opti.get_column("run_time").sum()/3600:>10.1f} h | {df_all.get_column("memory").sum() - df_all_opti.get_column("memory").sum(): >10.1f} Gb | {df_all.get_column("memory_hours").sum()/3600 - df_all_opti.get_column("memory_hours").sum()/3600: >10.1f} Gb.h
Relative : {df_all_opti.get_column("run_time").sum()/df_all.get_column("run_time").sum()-1:>12.1%} | {df_all_opti.get_column("memory").sum()/df_all.get_column("memory").sum()-1: >13.1%} | {df_all_opti.get_column("memory_hours").sum()/df_all.get_column("memory_hours").sum()-1: >13.1%}

Impact on {taskToCompare} task:

              sum (t_i) |   sum (m_i)   | sum (t_i * m_i)

Base took: {dfn.get_column("run_time").sum()/3600:>10.1f} h | {dfn.get_column("memory").sum():>10.1f} Gb | {dfn.get_column("memory_hours").sum()/3600:>10.1f} Gb.h
Opti took: {dfn.get_column("run_time_right").sum()/3600:>10.1f} h | {dfn.get_column("memory_right").sum():>10.1f} Gb | {dfn.get_column("memory_hours_right").sum()/3600:>10.1f} Gb.h
Diff     : {dfn.get_column("run_time").sum()/3600 - dfn.get_column("run_time_right").sum()/3600:>10.1f} h | {dfn.get_column("memory").sum() - dfn.get_column("memory_right").sum():>10.1f} Gb | {dfn.get_column("memory_hours").sum()/3600 - dfn.get_column("memory_hours_right").sum()/3600:>10.1f} Gb.h
Relative : {dfn.get_column("run_time_right").sum()/dfn.get_column("run_time").sum()-1: >12.1%} | {dfn.get_column("memory_right").sum()/dfn.get_column("memory").sum()-1: >13.1%} | {dfn.get_column("memory_hours_right").sum()/dfn.get_column("memory_hours").sum()-1: >13.1%}
"""

print(summary)


 + sum (t_i): sum of runtime of every task
 + sum (m_i): sum of max memory used of every task
 + sum (t_i * m_i): sum of, runtime multiplied by memory, of everytask

Impact on whole pipeline:

              sum (t_i) |   sum (m_i)   | sum (t_i * m_i)

Base took:     1617.1 h |   157045.7 Gb |     4013.7 Gb.h\n
Opti took:     1410.5 h |   157213.1 Gb |     3367.1 Gb.h
Diff     :      206.6 h |     -167.4 Gb |      646.6 Gb.h
Relative :       -12.8% |          0.1% |        -16.1%

Impact on forcedPhotCoadd task:

              sum (t_i) |   sum (m_i)   | sum (t_i * m_i)

Base took:      450.5 h |     1282.5 Gb |     1525.5 Gb.h
Opti took:      243.9 h |     1449.9 Gb |      878.9 Gb.h
Diff     :      206.6 h |     -167.4 Gb |      646.6 Gb.h
Relative :       -45.9% |         13.0% |        -42.4%



In [52]:
from bokeh.models import Span, Label
from bokeh.palettes import Category10, Category20

def make_figure_correlation_memory_time_diff(
    run_time_diff: List[float], rss_max_diff: List[float],
    band: List[str], patch: List[int],
    label: str = None
) -> bkh.figure:
    """Return a scatter plot figure with max RSS and compute time differences between base and optimized version."""
    
    # Set default plot dimensions
    plot_width = 800
    plot_height = 800
    hist_height = 200
    hist_width = 200

    # Create a data source
    source = bkhmodels.ColumnDataSource(data={
        'run_time_diff': run_time_diff,
        'rss_max_diff': rss_max_diff,
        'band' : band,
        'patch' : patch,
        
    })

    # Use counter to sort tasks by quantity
    from collections import Counter
    counter = dict(Counter(band).most_common())
    # Generate a color palette using Viridis256
    unique_tasks = list(counter.keys())  # Ensure unique tasks
    palette = Category10[max(3, min(20, len(unique_tasks)))]  # Use at most 20 colors, and minimum 3

    # use whatever palette you want...
    color_map = bkhmodels.CategoricalColorMapper(factors=unique_tasks,
                                   palette=palette)

    
    # Define ranges for the scatter plot and histograms
    x_min, x_max = min(run_time_diff), max(run_time_diff)
    y_min, y_max = min(rss_max_diff), max(rss_max_diff)

    x_range_margin = max(0.1 * (x_max - x_min), abs(x_min * 0.1))
    y_range_margin = max(0.1 * (y_max - y_min), abs(y_min * 0.1))

    x_range = bkhmodels.Range1d(x_min - x_range_margin, x_max + x_range_margin)
    y_range = bkhmodels.Range1d(y_min - y_range_margin, y_max + y_range_margin)

    scatter = bkh.figure(
        #x_axis_type='log',
        x_axis_label='$$\Delta$$ run time  (s)',
        x_range=x_range,
        y_range=y_range,
        #y_axis_type='log',
        y_axis_label='$$\Delta$$ max memory (Gb)',
        width=plot_width,
        height=plot_height,
        background_fill_color="#f4f3f3",
        #toolbar_location=None
    )

    # Scatter
    scatter.scatter(
        marker='circle', x='run_time_diff', y='rss_max_diff', size=5,
        line_width=3,
        source=source,
        muted_alpha=0.05,
        color={'field': 'band', 'transform': color_map},
        )
        
    scatter.add_tools(bkhmodels.HoverTool(tooltips=[
        ('RSS max diff', '@rss_max_diff{0.2f} GB'),
        ('Elapsed time diff', '@run_time_diff{0.1f} s'),
        ('Band', '@band'), ('Patch', '@patch')
    ], mode='mouse'))
    
    # Histograms
    hist_x, edges_x = np.histogram(run_time_diff, bins=30)
    hist_y, edges_y = np.histogram(rss_max_diff, bins=30)

    hist_top = bkh.figure(
        x_range=x_range, width=plot_width, height=hist_height,
        tools='', #toolbar_location=None
    )
    hist_top.quad(
        top=hist_x, bottom=0, left=edges_x[:-1], right=edges_x[1:],
        fill_color="blue", line_color="white", alpha=0.7
    )

    hist_top.add_tools(bkhmodels.HoverTool(tooltips=[
        ('Count', '@top'),
        ('Range', '[@left{0.2f};@right{0.2f}]')
    ]))
    
    hist_right = bkh.figure(
        y_range=y_range, width=hist_width, height=plot_height,
        tools='', #toolbar_location=None
    )
    hist_right.quad(
        top=edges_y[1:], bottom=edges_y[:-1], left=0, right=hist_y,
        fill_color="blue", line_color="white", alpha=0.7,   
    )

    hist_right.add_tools(bkhmodels.HoverTool(tooltips=[
        ('Count', '@right'),
        ('Range', '[@bottom{0.2f};@top{0.2f}]')
    ]))
    
    hist_right.xaxis.axis_label="#Quantas"
    hist_top.yaxis.axis_label="#Quantas"

    # Configure axes
    for x_axis in [scatter.xaxis, hist_top.xaxis, hist_right.xaxis]:
        x_axis.axis_label_text_font_size = "11pt"
        x_axis.axis_label_text_font_style = "bold"
        x_axis.major_label_text_font_size = "11pt"

    for y_axis in [scatter.yaxis, hist_top.yaxis, hist_right.yaxis]:
        y_axis.axis_label_text_font_size = "14pt"
        y_axis.major_label_text_font_size = "12pt"
        y_axis.axis_label_text_font_style = "bold"

    layout = bokeh.layouts.gridplot([
        [hist_top, None],
        [scatter, hist_right]
    ], merge_tools=True)

     # Add title and subtitle
    hist_top.add_layout(bkhmodels.Title(text=processing_label, text_font_style="italic", text_font_size="11pt"), 'above')
    hist_top.add_layout(bkhmodels.Title(text="Delta of memory versus delta of elapsed time with optimization", text_font_size="18pt"), 'above')

     # Add quadrant dividers to scatter and histograms
    divider_line_width = 4
    divider_line_color = 'red'

    vline_scatter = Span(location=0, dimension='height', line_color=divider_line_color, line_dash='dashed', line_width=divider_line_width)
    hline_scatter = Span(location=0, dimension='width', line_color=divider_line_color, line_dash='dashed', line_width=divider_line_width)
    scatter.renderers.extend([vline_scatter, hline_scatter])

    vline_top = Span(location=0, dimension='height', line_color=divider_line_color, line_dash='dashed', line_width=divider_line_width)
    hist_top.renderers.append(vline_top)

    hline_right = Span(location=0, dimension='width', line_color=divider_line_color, line_dash='dashed', line_width=divider_line_width)
    hist_right.renderers.append(hline_right)


    # Add labels to the quadrants
    labels = [
        Label(x=0.5 * x_range.end, y=0.5 * y_range.end, text="Increased Memory & Runtime", text_align="center", angle=math.pi/2),
        Label(x=0.5 * x_range.start, y=0.7 * y_range.end, text="Increased Memory, Decreased Runtime", text_align="center"),
        Label(x=0.5 * x_range.start, y=0.9 * y_range.start, text="Decreased Memory & Runtime", text_align="center"),
        Label(x=0.85 * x_range.end, y=0.5 * y_range.start, text="Decreased Memory,\n Increased Runtime", text_align="center", angle=math.pi/2)
    ]
    for label in labels:
        scatter.add_layout(label)

    
    return layout

In [53]:
# Create and display the figure
task_correlation_memory_time_diff_fig = make_figure_correlation_memory_time_diff(
    run_time_diff=get_column(dfn, 'run_time_diff'),
    rss_max_diff=get_column(dfn, 'memory_diff'),
    band=get_column(dfn, 'band'),
    patch=get_column(dfn, 'patch'),
)

bkh.show(task_correlation_memory_time_diff_fig)
save_figure(task_correlation_memory_time_diff_fig, output_dir=output_dir, filename='memory-vs-runtime-diff'+taskToCompare, title='DP0.2 Memory vs Execution time diff')

In [54]:
# Build a new dataframe per step and for each pipetask within a step 
# compute its number of quanta, its elapsed time and its memory
steps = sorted(df_all.select(pl.col('step')).unique().get_column('step').to_list())

step_dfs = {}
for step in steps:
    df_step = df_all.filter(pl.col('step') == step).group_by('task').agg(
        [
            pl.col("task").count().alias("quanta"),
            (pl.col("run_time")/3_600).sum().alias("elapsed_time_hours"),
            pl.col('memory').max().alias('RSS_max'),
            (pl.col('memory').quantile(0.75)-pl.col('memory').quantile(0.25)).alias("RSS_iqr"),
            (pl.col('memory').std()).alias('RSS_std'),
        ]
    )
    step_dfs[step] = df_step

In [55]:
# Generate a table with pipetask details for each step and write
# the same information into a CSV files for export
table = f"""
| step | task | quanta | elapsed time (hours) | max RSS (GB) | inter-quartile RSS (GB) | std RSS (GB) |
| ---- | ---- | -----: | -------------------: | -----------: | ----------------------: | -----------: |
"""
csv_separator = ','
csv_output = csv_separator.join(('step', 'pipetask', 'quanta', 'elapsed_time_hours', 'memory_gb', 'RSS_IQR', 'RSS_std'))
for step in sorted(step_dfs.keys()):
    step_df = step_dfs[step].sort('task')
    for row_index in range(step_df.height):
        row = step_df.row(row_index)
        csv_output += f'\n{step}{csv_separator}' + csv_separator.join((str(v) for v in row))
        step_out = f'**{step}**' if row_index == 0 else ''
        task, quanta, elapsed, memory, mem_iqr, mem_std = row
        if mem_std is None: mem_std = -1
        table += f'| {step_out} | {task} | {quanta:,} | {elapsed:,.1f} | {memory:,.1f} | {mem_iqr:,.3f} | {mem_std:,.3f}\n'
        
# Write to CSV file
with open(os.path.join(output_dir, 'pipetasks.csv'), 'w') as f:
    f.write(csv_output)

print_md(table)


| step | task | quanta | elapsed time (hours) | max RSS (GB) | inter-quartile RSS (GB) | std RSS (GB) |
| ---- | ---- | -----: | -------------------: | -----------: | ----------------------: | -----------: |
| **step1** | calibrate | 13,529 | 167.8 | 2.3 | 0.052 | 0.076
|  | characterizeImage | 13,766 | 272.8 | 2.3 | 0.087 | 0.078
|  | isr | 14,208 | 133.4 | 2.0 | 0.006 | 0.043
|  | transformPreSourceTable | 13,529 | 0.6 | 2.3 | 0.052 | 0.076
|  | writePreSourceTable | 13,529 | 1.1 | 2.3 | 0.052 | 0.076
| **step2b** | gbdesAstrometricFit | 60 | 1.3 | 1.6 | 0.353 | 0.242
|  | isolatedStarAssociation | 16 | 0.1 | 0.7 | 0.050 | 0.031
| **step2c** | fgcmBuildFromIsolatedStars | 1 | 0.0 | 7.7 | 0.000 | -1.000
|  | fgcmFitCycle | 1 | 0.9 | 28.0 | 0.000 | -1.000
|  | fgcmOutputProducts | 1 | 0.0 | 7.6 | 0.000 | -1.000
| **step2d** | consolidateSourceTable | 126 | 0.2 | 1.0 | 0.223 | 0.134
|  | finalizeCharacterization | 128 | 70.8 | 1.4 | 0.112 | 0.068
|  | transformSourceTable | 12,978 | 0.5 | 0.4 | 0.010 | 0.006
|  | updateVisitSummary | 126 | 10.7 | 0.8 | 0.013 | 0.009
|  | writeRecalibratedSourceTable | 12,978 | 31.9 | 0.6 | 0.015 | 0.010
| **step2e** | makeCcdVisitTable | 1 | 0.0 | 2.4 | 0.000 | -1.000
|  | makeVisitTable | 1 | 0.0 | 2.4 | 0.000 | -1.000
| **step3** | assembleCoadd | 405 | 21.8 | 4.3 | 0.180 | 0.303
|  | consolidateObjectTable | 1 | 0.0 | 18.1 | 0.000 | -1.000
|  | deblend | 81 | 31.4 | 4.8 | 0.672 | 0.578
|  | detection | 405 | 16.4 | 1.4 | 0.024 | 0.017
|  | forcedPhotCoadd | 405 | 450.5 | 4.0 | 0.707 | 0.580
|  | healSparsePropertyMaps | 5 | 1.2 | 3.8 | 0.817 | 0.427
|  | makeWarp | 2,815 | 59.8 | 2.1 | 0.413 | 0.288
|  | measure | 405 | 252.9 | 3.7 | 0.536 | 0.518
|  | mergeDetections | 81 | 0.6 | 0.5 | 0.013 | 0.011
|  | mergeMeasurements | 81 | 0.4 | 1.8 | 0.350 | 0.264
|  | selectDeepCoaddVisits | 405 | 0.5 | 0.6 | 0.092 | 0.061
|  | transformObjectTable | 81 | 0.6 | 1.6 | 0.189 | 0.146
|  | writeObjectTable | 81 | 0.8 | 9.2 | 1.794 | 1.349
| **step4** | forcedPhotCcd | 4,602 | 87.1 | 1.4 | 0.162 | 0.120
| **step7** | consolidateHealSparsePropertyMaps | 5 | 0.0 | 0.5 | 0.002 | 0.002
| **step8** | analyzeMatchedVisitCore | 16 | 0.5 | 20.5 | 10.072 | 7.222
|  | analyzeObjectTableCore | 1 | 0.0 | 6.2 | 0.000 | -1.000
|  | analyzeObjectTableSurveyCore | 1 | 0.0 | 1.1 | 0.000 | -1.000
|  | catalogMatchTract | 1 | 0.0 | 1.9 | 0.000 | -1.000
|  | photometricCatalogMatch | 1 | 0.0 | 1.9 | 0.000 | -1.000
|  | photometricRefCatObjectTract | 1 | 0.0 | 0.8 | 0.000 | -1.000
|  | plotPropertyMapTract | 5 | 0.1 | 4.0 | 0.007 | 0.005
|  | refCatObjectTract | 1 | 0.0 | 0.8 | 0.000 | -1.000
|  | validateObjectTableCore | 1 | 0.0 | 1.0 | 0.000 | -1.000


In [56]:
# Create a dataframe with details about each pipetask
df_per_task = df_all.group_by("task", maintain_order=True).agg(
    [
        pl.col("task").count().alias("task_count"),
        (pl.col("run_time")/3_600).sum().alias("elapsed_time_hours"),
        pl.col('memory').min().alias('RSS_min'),
        pl.col('memory').max().alias('RSS_max'),
        pl.col('memory').mean().alias('RSS_mean'),
        pl.col('memory').std().alias('RSS_std'),
        pl.col('memory').quantile(0.05).alias('RSS_p05'),
        pl.col('memory').quantile(0.50).alias('RSS_p50'),
        pl.col('memory').quantile(0.95).alias('RSS_p95'),
        (pl.col('memory').quantile(0.75)-pl.col('memory').quantile(0.25)).alias("RSS_iqr"),
    ]
)

# hack
df_per_task_opti = df_all_opti.group_by("task", maintain_order=True).agg(
    [
        pl.col("task").count().alias("task_count"),
        (pl.col("run_time")/3_600).sum().alias("elapsed_time_hours"),
        pl.col('memory').min().alias('RSS_min'),
        pl.col('memory').max().alias('RSS_max'),
        pl.col('memory').mean().alias('RSS_mean'),
        pl.col('memory').std().alias('RSS_std'),
        pl.col('memory').quantile(0.05).alias('RSS_p05'),
        pl.col('memory').quantile(0.50).alias('RSS_p50'),
        pl.col('memory').quantile(0.95).alias('RSS_p95'),
        (pl.col('memory').quantile(0.75)-pl.col('memory').quantile(0.25)).alias("RSS_iqr"),
    ]
)

In [57]:
task_types = df_per_task.height
total_elapsed_hours = df_per_task.select('elapsed_time_hours').sum().row(0)[0]
#total_cpu_hours = df_per_task.select('cpu_time_hours').sum().row(0)[0]
total_cpu_hours = -1

overview = f"""
There were **{task_types:,} kinds of pipetasks** which consumed in aggregate **{total_elapsed_hours:,.0f} elapsed hours ({total_cpu_hours:,.0f} CPU hours**)
"""
print_md(overview)


There were **41 kinds of pipetasks** which consumed in aggregate **1,617 elapsed hours (-1 CPU hours**)


In [58]:
df_per_task.sort(['elapsed_time_hours', 'task_count'], descending=True)[:10]

task,task_count,elapsed_time_hours,RSS_min,RSS_max,RSS_mean,RSS_std,RSS_p05,RSS_p50,RSS_p95,RSS_iqr
str,u32,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""forcedPhotCoadd""",405,450.520525,1.271026,4.041474,3.166687,0.579597,1.880367,3.286925,3.896869,0.706621
"""characterizeImage""",13766,272.800257,1.732588,2.335285,1.897912,0.077734,1.853223,1.864249,1.970995,0.086766
"""measure""",405,252.883013,1.275191,3.693711,3.01413,0.51801,1.774531,3.169948,3.589526,0.535769
"""calibrate""",13529,167.770902,1.732588,2.335285,1.896656,0.075569,1.853219,1.864229,1.970242,0.052445
"""isr""",14208,133.382186,1.732588,1.975103,1.883975,0.043117,1.853121,1.863995,1.967686,0.005689
"""forcedPhotCcd""",4602,87.111164,0.641524,1.406783,1.070343,0.120346,0.883073,1.062605,1.262907,0.161698
"""finalizeCharacterization""",128,70.806853,1.161986,1.436373,1.317475,0.068107,1.209164,1.303474,1.4262272,0.112452
"""makeWarp""",2815,59.838098,0.789139,2.107859,1.669589,0.287761,1.116328,1.806442,1.95652,0.412787
"""writeRecalibratedSourceTable""",12978,31.88944,0.497066,0.561394,0.517276,0.010427,0.502383,0.516116,0.535921,0.014959
"""deblend""",81,31.401998,2.578502,4.837626,4.209757,0.578263,2.890662,4.414087,4.779254,0.672244


In [59]:
def make_figure_big_task_consumers(tasks: List[str], elapsed_time_pct: List[float], rss_min: List[float], rss_max: List[float], rss_mean: List[float],
                                   rss_pct_low: List[float], rss_pct_high: List[float], label: str = None, note: str = None) -> bkh.figure:
    """Return a figure representing the RSS distribution for each pipetask and its consumption of compute time.
    """
    # Build the data source: sort tasks by rss_max
    sorted_by_elapsed = sorted(zip(elapsed_time_pct, rss_min, rss_max, rss_mean, rss_pct_low, rss_pct_high, tasks), reverse=True)
    elapsed_time_pct, rss_min, rss_max, rss_mean, rss_pct_low, rss_pct_high, tasks = zip(*sorted_by_elapsed)
    source = bkhmodels.ColumnDataSource(data={
        'tasks': tasks,
        'elapsed_time_pct': elapsed_time_pct,
        'elapsed_time_cumulated': np.cumsum(elapsed_time_pct),
        'rss_min': rss_min,
        'rss_max': rss_max,
        'rss_mean': rss_mean,
        'rss_pct_low': rss_pct_low,
        'rss_pct_high': rss_pct_high,
    })
    
    # Build and configure the figure
    left_y_range = bokeh.models.Range1d(0.1, max(rss_max)*2)
    fig = bkh.figure(
        x_range = tasks,
        y_range = left_y_range,
        y_axis_type = 'log',
        y_axis_label = 'gigabyte',
        width = 1_600,
        height = 1_200,
        background_fill_color="#f4f3f3", 
    )

    # Title and subtitle
    fig.add_layout(bkhmodels.Title(text=processing_label, text_font_style="italic", text_font_size="11pt"), 'above')
    fig.add_layout(bkhmodels.Title(text="Memory used before and after optimization", text_font_size="18pt"), 'above')

    # Axis
    fig.xaxis.axis_label_text_font_size = "11pt"
    fig.xaxis.axis_label_text_font_style = "bold"
    fig.xaxis.major_label_text_font_size = "11pt"
    fig.xaxis.major_label_orientation = math.pi/2.5
    
    # Add secondary Y axis
    bottom_right_y_axis = min(elapsed_time_pct) / 2
    top_right_axis = max(elapsed_time_pct) * 1.2
    fig.extra_y_ranges = {"y_axis_right": bokeh.models.Range1d(bottom_right_y_axis, top_right_axis)}
    fig.add_layout(bokeh.models.LinearAxis(y_range_name='y_axis_right', axis_label='total elapsed time'), 'right')
    fig.yaxis[1].formatter = bokeh.models.NumeralTickFormatter(format='0%')

    # Format Y axis
    # fig.yaxis[1].ticker = bokeh.models.tickers.LogTicker()
    for y_axis in fig.yaxis:
        y_axis.axis_label_text_font_size = "14pt"
        y_axis.major_label_text_font_size = "12pt"
        y_axis.axis_label_text_font_style = "bold"
        
    # Annotation
    if label is not None:
        annotation_label = bkhmodels.Label(x=len(tasks)//2.2, y=max(rss_max)/1.2, x_units='data', y_units='data',
                     text=label, text_font_size='12pt',
                     text_color='dimgray', text_alpha=0.8,
                     background_fill_color='white', background_fill_alpha=1.0,
                     border_line_color='dimgray', border_line_alpha=0.5)
        fig.add_layout(annotation_label)

    
    # Add glyphs
    dashes_max = fig.scatter(marker='dash', x='tasks', y='rss_max', color='indianred', size=15, line_width=3, source=source, legend_label='max RSS')
    mean = fig.scatter(x='tasks', y='rss_mean', size=8, color='mediumseagreen', source=source, legend_label='mean RSS ')
    dashes_min = fig.scatter(marker='dash', x='tasks', y='rss_min', color='steelblue', size=15, line_width=3, source=source, legend_label='min RSS')
    bars = fig.vbar(x='tasks', top='elapsed_time_pct', bottom=bottom_right_y_axis, width=0.8, color='tan', alpha=0.3, source=source, legend_label='elapsed time', y_range_name='y_axis_right')
    
    whisker = bkhmodels.Whisker(base='tasks', upper='rss_pct_high', lower='rss_pct_low', source=source)
    fig.add_layout(whisker)
    
    # Hide toolbar
    fig.toolbar.autohide = True
    
    # Hide legend on click
    fig.legend.click_policy = 'mute'

    # Add tooltips
    fig.add_tools(bkhmodels.HoverTool(tooltips=[
        ('Task', '@tasks'),
        ('RSS max', '@rss_max{0.2f} GB'),
        ('RSS percentile high', '@rss_pct_high{0.2f} GB'),
        ('RSS mean', '@rss_mean{0.2f} GB'),
        ('RSS percentile low', '@rss_pct_low{0.2f} GB'),
        ('RSS min', '@rss_min{0.2f} GB'),
        ('elapsed time', '@elapsed_time_pct{0.0%}'),
        ('cumulated elapsed time', '@elapsed_time_cumulated{0.0%}'),
    ], renderers=[bars, mean, dashes_max, dashes_min], mode='mouse'))
    
    # Add note below the figure
    if note is not None:
        fig.add_layout(bkhmodels.Title(text=note, text_font_style='italic'), 'below')
    
    return fig

In [60]:
# Select the tasks which consume in aggregate a threshold of the elapsed time
task_time_consumers_df = df_per_task#.filter(pl.col('cpu_time_hours') > 0.0)
tasks = get_column(task_time_consumers_df, 'task')
elapsed_time = get_column(task_time_consumers_df, 'elapsed_time_hours')

# Sort by elapsed time in decreasing order
sorted_by_elapsed_time = sorted(zip(elapsed_time, tasks), reverse=True)
elapsed_time, tasks = zip(*sorted_by_elapsed_time)

# Compute the percentage of total time each kind of pipetask spent in execution
elapsed_percentage = [100.0 * v/sum(elapsed_time) for v in elapsed_time]
elapsed_cumulated = np.cumsum(elapsed_percentage)

cumulated_elapsed_threshold = 98 # percent
time_consumers_tasks = np.array(tasks)[elapsed_cumulated <= cumulated_elapsed_threshold].tolist()

# Build a dataframe with big consumer tasks and their memory usage
# with columns:
#    task, elapsed_time_hours, min_RSS, mean_RSS, max_RSS, p05_RSS, p95_RSS
tasks_to_plot = [taskToCompare, taskToCompare + '_opti']

#HACK
time_consumers_df = pl.concat([
    df_per_task.with_columns(
        # Add column 'elapsed_time_pct' with the percentage of total time consumed by each kind of pipetask
        (pl.col('elapsed_time_hours') / pl.col('elapsed_time_hours').sum()).alias('elapsed_time_pct')
    )
    .filter(
        # Select only the tasks consuming the most
        pl.col('task').is_in(tasks_to_plot)
    )
    .sort(by='elapsed_time_hours', descending=True)
,   df_per_task_opti.with_columns(
        # Add column 'elapsed_time_pct' with the percentage of total time consumed by each kind of pipetask
        (pl.col('elapsed_time_hours') / pl.col('elapsed_time_hours').sum()).alias('elapsed_time_pct'),
            task=pl.col("task").str.replace(taskToCompare, taskToCompare + "_opti")

    )
    .filter(
        # Select only the tasks consuming the most
        pl.col('task').is_in(tasks_to_plot)
    )
    .sort(by='elapsed_time_hours', descending=True)
]
                             )

display(time_consumers_df)

# Build the plot
task_rss_consumers_fig = make_figure_big_task_consumers(
    tasks = get_column(time_consumers_df, 'task'),
    elapsed_time_pct = get_column(time_consumers_df, 'elapsed_time_pct'),
    rss_max = get_column(time_consumers_df, 'RSS_max'),
    rss_min = get_column(time_consumers_df, 'RSS_min'),
    rss_mean = get_column(time_consumers_df, 'RSS_mean'),
    rss_pct_low = get_column(time_consumers_df, 'RSS_p05'),
    rss_pct_high = get_column(time_consumers_df, 'RSS_p95'),
    # label=f'\n Pipetasks which consume in aggregate {cumulated_elapsed_threshold}% of elapsed time.\n',
    note = f'NOTE: Whiskers show 5th to 95th RSS percentiles.',
    # note = f'NOTE: the pipetasks shown consume in aggregate {cumulated_elapsed_threshold}% of the total elapsed time of the DP0.2 campaign. Whiskers show 5th to 95th RSS percentiles.',
)
bkh.show(task_rss_consumers_fig)

# Export this figure
save_figure(task_rss_consumers_fig, output_dir=output_dir, filename='memory-stats', title='DP0.2 Memory by most compute-intensive pipetasks')

task,task_count,elapsed_time_hours,RSS_min,RSS_max,RSS_mean,RSS_std,RSS_p05,RSS_p50,RSS_p95,RSS_iqr,elapsed_time_pct
str,u32,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""forcedPhotCoadd""",405,450.520525,1.271026,4.041474,3.166687,0.579597,1.880367,3.286925,3.896869,0.706621,0.278605
"""forcedPhotCoadd_opti""",405,243.923063,1.313985,4.263543,3.5799,0.408522,2.897031,3.578556,4.127621,0.550023,0.172939


In [61]:
from bokeh.palettes import Category10, Category20
from bokeh.transform import factor_cmap
from scipy.spatial import ConvexHull

def make_figure_correlation_memory_time(
    tasks: List[str], run_time: List[float], elapsed_time_pct: List[float], rss_max: List[float],
    band: List[str], patch: List[int],
    label: str = None
) -> bkh.figure:
    """Return a scatter plot figure with max RSS and compute time. Color by task name."""
    
    # Sort the data
    #sorted_by_elapsed = sorted(zip(elapsed_time_pct, run_time, rss_max, tasks), reverse=True)
    #elapsed_time_pct, run_time, rss_max, tasks = zip(*sorted_by_elapsed)
    
    # Create a data source
    source = bkhmodels.ColumnDataSource(data={
        'tasks': tasks,
        'run_time': run_time,
        'elapsed_time_pct': elapsed_time_pct,
        'rss_max': rss_max,
        'band' : band,
        'patch' : patch,
    })
    
    # Use counter to sort tasks by quantity
    from collections import Counter
    task_counter = dict(Counter(tasks).most_common())
    # Generate a color palette using Viridis256
    unique_tasks = list(task_counter.keys())  # Ensure unique tasks
    palette = Category10[max(3, min(20, len(unique_tasks)))]  # Use at most 20 colors, and minimum 3
    #palette = [Viridis256[i * 256 // len(unique_tasks)] for i in range(len(unique_tasks))] # Use infinite colors, but less visually striking
    color_map = factor_cmap('tasks', palette=palette, factors=unique_tasks)

    # Build and configure the figure
    left_y_range = bokeh.models.Range1d(min(rss_max)-0.1, max(rss_max) * 1.2)
    x_range = bokeh.models.Range1d(0.1, 10000)

    fig = bkh.figure(
        #x_axis_type='log',
        x_axis_label='Elapsed time (s)',
        #x_range=x_range,
        y_range=left_y_range,
        #y_axis_type='log',
        y_axis_label='gigabyte',
        width=1_400,
        height=800,
        background_fill_color="#f4f3f3",
    )
    
    # Add title and subtitle
    fig.add_layout(bkhmodels.Title(text=processing_label, text_font_style="italic", text_font_size="11pt"), 'above')
    fig.add_layout(bkhmodels.Title(text="Max memory used versus elapsed time, grouped by task,", text_font_size="18pt"), 'above')

    # Configure axes
    for x_axis in fig.xaxis:
        x_axis.axis_label_text_font_size = "11pt"
        x_axis.axis_label_text_font_style = "bold"
        x_axis.major_label_text_font_size = "11pt"

    for y_axis in fig.yaxis:
        y_axis.axis_label_text_font_size = "14pt"
        y_axis.major_label_text_font_size = "12pt"
        y_axis.axis_label_text_font_style = "bold"

    # Add scatter points with dynamic color by task
    renderers_pt, renderers_ch = [], []
    for task, count in task_counter.items():
        # Extract data points for the current task
        task_x = [rt for t, rt, pctt, mem in zip(tasks, run_time, elapsed_time_pct, rss_max) if t == task]
        task_y = [mem for t, rt, pctt, mem in zip(tasks, run_time, elapsed_time_pct, rss_max) if t == task]
        task_pctt = [pctt for t, rt, pctt, mem in zip(tasks, run_time, elapsed_time_pct, rss_max) if t == task]
        task_band = [b for t, rt, pctt, mem, b, p in zip(tasks, run_time, elapsed_time_pct, rss_max, band, patch) if t == task]
        task_patch = [p for t, rt, pctt, mem, b, p in zip(tasks, run_time, elapsed_time_pct, rss_max, band, patch) if t == task]

        # Create a data source for the task points
        task_source = bkhmodels.ColumnDataSource(data={
            'run_time': task_x,
            'rss_max': task_y,
            'band' : task_band,
            'patch' : task_patch,
        })

        # Plot task points
        renderers_pt.append(
            fig.scatter(
                marker='circle', x='run_time', y='rss_max', size=5,
                line_width=3, color=palette[unique_tasks.index(task)],
                source=task_source, legend_label=f'{task} / {count} calls / {sum(task_pctt):.2%}',
                muted_alpha=0.05,
                
            )
        )

        # Compute convex hull for the task's points if there are enough points (min 3)
        if len(task_x) >= 3:
            points = np.array(list(zip(task_x, task_y)))
            hull = ConvexHull(points)

            # Get the vertices of the hull and close the polygon by appending the first point
            hull_vertices = points[hull.vertices]
            hull_vertices = np.append(hull_vertices, [hull_vertices[0]], axis=0)

            # Create a ColumnDataSource for the hull vertices
            hull_source = bkhmodels.ColumnDataSource(data={
                'task_name' : [task for _ in hull_vertices],
                'x': hull_vertices[:, 0],
                'y': hull_vertices[:, 1]
            })
            # Plot the convex hull as a patch
            r = fig.patch(
                x='x', y='y', source=hull_source,
                color=palette[unique_tasks.index(task)], 
                fill_alpha=0.0, line_alpha=0.5, line_width=2
                )
            renderers_ch.append(r)

    # Configure toolbar and legend
    fig.toolbar.autohide = True
    fig.legend.click_policy = 'hide'
    fig.legend.label_text_font_size = '8pt'

    # Add tooltips
    # TODO : tooltips for the convex hull (patch) does not work ?!
    #fig.add_tools(bkhmodels.HoverTool(
    #    tooltips=[('Task', '@{task_name}')],
    #    renderers=renderers_ch,
    #    mode='mouse')
    #)
    
    fig.add_tools(bkhmodels.HoverTool(tooltips=[
        ('RSS max', '@rss_max{0.2f} GB'),
        ('Elapsed time', '@run_time{0.1f} s'),
        ('Band', '@band'), ('Patch', '@patch')
    ], renderers=renderers_pt, mode='mouse'))
    
    # Legend location
    #fig.add_layout(fig.legend[0],'right') # outside the plot
    fig.legend[0].location = 'top_left' # inside the plot

    # Add legend title
    if label is not None: fig.legend[0].title = label
    
    return fig

In [62]:
# Select the tasks to include
tasks_to_plot = [taskToCompare + '_opti', taskToCompare]
df_all_non_zero = pl.concat([
    df_all.filter((pl.col('memory') >= 0.1) & (pl.col('task').is_in(tasks_to_plot))),
    df_all_opti.filter((pl.col('memory') >= 0.1) & (pl.col('task').is_in(tasks_to_plot))).with_columns(task=pl.col("task").str.replace(taskToCompare, taskToCompare + "_opti")),
]
)

# Create and display the figure
task_correlation_memory_time_fig = make_figure_correlation_memory_time(
    tasks=get_column(df_all_non_zero, 'task'),
    run_time=get_column(df_all_non_zero, 'run_time'),
    elapsed_time_pct=get_column(df_all_non_zero, 'elapsed_time_pct'),
    rss_max=get_column(df_all_non_zero, 'memory'),
    band=get_column(df_all_non_zero, 'band'),
    patch=get_column(df_all_non_zero, 'patch'),
    label=f'Comparaison between opti and stack\nname / nb calls / % total time',
)

bkh.show(task_correlation_memory_time_fig)
save_figure(task_correlation_memory_time_fig, output_dir=output_dir, filename='memory-vs-runtime-'+taskToCompare, title='DP0.2 Memory vs Execution time, top10')

In [63]:
df_all

kind,step,task,band,instrument,day_obs,detector,physical_filter,visit,memory,init_time,run_time,group,exposure,skymap,tract,patch,elapsed_time_pct,memory_hours
str,str,str,str,str,i64,i64,str,i64,f64,f64,f64,str,i64,str,i64,i64,f64,f64
"""""","""step1""","""calibrate""","""g""","""HSC""",20150813,0,"""HSC-G""",38440,1.860194,0.043232,17.485948,,,,,,0.000003,32.52726
"""""","""step1""","""calibrate""","""g""","""HSC""",20151014,0,"""HSC-G""",42158,1.864344,0.03966,42.24704,,,,,,0.000007,78.762997
"""""","""step1""","""calibrate""","""g""","""HSC""",20151014,0,"""HSC-G""",42170,1.867305,0.040183,24.076729,,,,,,0.000004,44.958595
"""""","""step1""","""calibrate""","""g""","""HSC""",20151014,0,"""HSC-G""",42172,1.864229,0.03583,36.990868,,,,,,0.000006,68.959444
"""""","""step1""","""calibrate""","""g""","""HSC""",20151014,0,"""HSC-G""",42174,1.851531,0.046396,23.545905,,,,,,0.000004,43.59598
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""""","""step8""","""plotPropertyMapTract""","""r""",,,,,,3.972592,0.000389,64.289125,,,"""hsc_rings_v1""",9461,,0.000011,255.394439
"""""","""step8""","""plotPropertyMapTract""","""y""",,,,,,3.975291,0.000495,94.022988,,,"""hsc_rings_v1""",9461,,0.000016,373.768727
"""""","""step8""","""plotPropertyMapTract""","""z""",,,,,,3.98524,0.000558,92.639373,,,"""hsc_rings_v1""",9461,,0.000016,369.19014
"""""","""step8""","""refCatObjectTract""",,,,,,,0.770441,0.000635,14.813182,,,"""hsc_rings_v1""",9461,,0.000003,11.412686


In [64]:
def make_figure_rss_histogram_per_task(distribution: List[float], task: str, annotation: str=None) -> bkh.figure:
    """Return a figure with a histogram of the RSS for a given task
    """
    # Compute histogram
    hist, edges = np.histogram(distribution, density=False, bins=bins)
    count = len(distribution)
    source = bokeh.models.ColumnDataSource(data=dict(base=[max(hist)],
                                                     upper=[distribution.quantile(0.95)], lower=[distribution.quantile(0.05)],
                                                     q1=[distribution.quantile(0.25)], q2=[distribution.quantile(0.50)], q3=[distribution.quantile(0.75)]
                                                    ))
    
    # Build and configure the figure
    width, height = 1_000, 600
    y_range = bokeh.models.Range1d(1, max(hist)*15)
    fig = bkh.figure(
        x_range = (edges[0], edges[-1]),
        x_axis_label = 'gigabyte',
        y_axis_label = 'frequency',
        y_range = y_range,
        y_axis_type = 'log',
        width = width,
        height = height,
        background_fill_color="#f4f3f3", 
    )

    # Title and subtitle
    fig.add_layout(bkhmodels.Title(text=processing_label, text_font_style="italic", text_font_size="11pt"), 'above')
    fig.add_layout(bkhmodels.Title(text=f"Distribution of memory usage by {task} pipetask", text_font_size="16pt"), 'above')

    # Format X axis
    fig.xaxis.axis_label_text_font_size = "12pt"
    fig.xaxis.axis_label_text_font_style = "bold"
    fig.xaxis.major_label_text_font_size = "12pt"
    fig.xaxis.formatter = bokeh.models.NumeralTickFormatter(format='0,.f')

    # Add and format secondary Y axis
    fig.extra_y_ranges = {"y_axis_right": y_range}
    fig.add_layout(bokeh.models.LogAxis(y_range_name="y_axis_right"), 'right')
    for y_axis in fig.yaxis:
        y_axis.ticker = bokeh.models.tickers.LogTicker()
        y_axis.axis_label_text_font_size = "14pt"
        y_axis.major_label_text_font_size = "12pt"
        y_axis.axis_label_text_font_style = "bold"


    
    # Add a histogram
    quad = fig.quad(top=hist, bottom=1, left=edges[:-1], right=edges[1:],
           fill_color="orange", line_color="black", alpha=0.3)
    ## Add descriptive statistics
    #whisker = bokeh.models.Whisker(base="base", upper="upper", lower="lower", dimension="width", source=source,
    #                               level="annotation", upper_units='screen', base_units='screen', lower_units='screen')
    #whisker.upper_head.size = whisker.lower_head.size = max(hist)
    #fig.add_layout(whisker)
    
    
    ## quantile boxes
    #fig.hbar("base", max(hist), "q2", "q3", source=source, color="red", line_color="black",)
    #fig.hbar("base", max(hist), "q1", "q2", source=source, color="red", line_color="black",)
    
    # Add annotation label
    if annotation is not None:
        # TODO: fix the issue with the location of the annotation in the plot
        label = bkhmodels.Label(x=int(width*0.75), y=int(height*0.60),
                                x_units='screen', y_units='screen',
                                x_offset=15, y_offset=-5,
                                text=f'{annotation}', text_font_size='10pt',
                                text_color='dimgray', text_alpha=0.8,
                                background_fill_color='white', background_fill_alpha=1.0,
                                border_line_color='dimgray', border_line_alpha=0.5)
        fig.add_layout(label)
        
    # Add tooltips
    fig.add_tools(bkhmodels.HoverTool(tooltips=[
        ('Interval', 'from @left{0.1f} to @right{0.1f} GB'),
        ('Frequency', '@top{0,} quanta'),
    ], renderers=[quad, ], mode='mouse'))
       
    # Hide toolbar
    fig.toolbar.autohide = True

    return fig

In [65]:
def describe_column(df, column) -> Tuple[float, float, float, float]:
    df_description = df.with_columns(
        [
            pl.col(column).min().alias('min'),
            pl.col(column).max().alias('max'),
            pl.col(column).mean().alias('mean'),
            pl.col(column).std().alias('std'),
        ]
    ).select(['min', 'max', 'mean', 'std'])
    return df_description.row(0)


memory_distribution_figures = []
bins = 40
for task in sorted([taskToCompare, taskToCompare + '_opti']):
    # Collect the histogram data for this task
    task_df = df_all.filter(
        (pl.col('task') == task) & pl.col('memory').is_not_null()
    ).select(pl.col('memory'))

    # hack till we rework how everything is handled
    if len(task_df) == 0:
        task = task[:-5] # remove _opti
        task_df = df_all_opti.filter(
        (pl.col('task') == task) & pl.col('memory').is_not_null()
    ).select(pl.col('memory'))
        task = task + "_opti"
            
    # Compute descriptive statistics
    min_value, max_value, mean_value, std_value = describe_column(task_df, 'memory')
    
    annotation = f' N: {len(task_df):,} \n min: {min_value:.2f} \n mean: {mean_value:.2f} \n std: {std_value:.2f} \n max: {max_value:.2f} '
    
    # Plot the histogram
    fig = make_figure_rss_histogram_per_task(task_df.get_column('memory'), task, annotation=annotation)
    bkh.show(fig)
    
    # Export this figure
    filename = f'memory-distribution-{task}'
    memory_distribution_figures.append(filename)
    save_figure(fig, output_dir=output_dir, filename=filename, title=f'DP0.2 Memory distribution by pipetask {task}')
    

In [30]:
def publish(results_dir, publication_top_dir):
    """Publish this notebook results under top_dir
    """
    # Create the publication directories and remove .png and .html files
    # that may exist in them
    publication_dir = os.path.join(publication_top_dir, 'pipetasks')    
    os.makedirs(publication_dir, exist_ok=True)
    
    dst_images_dir = os.path.join(publication_dir, 'images')
    os.makedirs(dst_images_dir, exist_ok=True)
    for f in glob.glob(os.path.join(dst_images_dir, '*.png')):
        os.remove(f)
    
    dst_html_dir = os.path.join(publication_dir, 'html')
    os.makedirs(dst_html_dir, exist_ok=True)
    for f in glob.glob(os.path.join(dst_html_dir, '*.html')):
        os.remove(f)
    
    # Publish PNG files
    src_image_dir = os.path.join(results_dir, 'images')
    for src_file in glob.glob(os.path.join(src_image_dir, '*.png')):
        dst_file = os.path.join(publication_dir, 'images', os.path.basename(src_file))
        shutil.copy(src_file, dst_file)
    
    # Publish HTML files
    src_html_dir = os.path.join(results_dir, 'html')
    for src_file in glob.glob(os.path.join(src_html_dir, '*.html')):
        dst_file = os.path.join(publication_dir, 'html', os.path.basename(src_file))
        shutil.copy(src_file, dst_file)
        
    # Publish main HTML file
    src_index_html = os.path.join(results_dir, f'optimization-{taskToCompare}.html')
    dst_index_html = os.path.join(publication_dir,  os.path.basename(src_index_html))
    shutil.copy(src_index_html, dst_index_html)

In [31]:
# Render the HTML template
import os
import jinja2

environment = jinja2.Environment(loader=jinja2.FileSystemLoader('./templates'))
template = environment.get_template('optimization-task-template.html')

pipetasks_html = os.path.join(output_dir, f'optimization-{taskToCompare}.html')
with open(pipetasks_html, mode="w", encoding="utf-8") as f:
    context = {
        'taskToCompare': taskToCompare,
        'summary': summary,
        'figures': memory_distribution_figures,
    }
    f.write(template.render(context))

In [32]:
publication_dir='/sps/lsst/users/fabio/web/rubin-dp0.2-at-frdf'
publication_dir='/sps/lsst/users/abernard/web/optimization-forcedPhotCoadd'

publish(results_dir=output_dir, publication_top_dir=publication_dir)

In [33]:
def categorize_tasks_per_memory(df: pl.DataFrame, categories: dict = {'small': 5, 'medium': 20, 'high': None}) -> dict:
    """Return a dictionnary with details about categories of tasks according to ``categories`` parameter.

    The returned dict contains for each category in ``categories`` the elapsed time spent on each category.
    """    
    result = {}
    key = f"0 GB ≤ max RSS ≤ {categories['small']} GB"
    elapsed = df.filter(
        (pl.col('RSS_max') >= 0) & (pl.col('RSS_max') <= categories['small'])
    ).select('elapsed_time_hours').sum().row(0)[0]
    result[key] = elapsed
    
    key = f"{categories['small']} GB < max RSS ≤ {categories['medium']} GB"
    elapsed = df.filter(
        (pl.col('RSS_max') > categories['small']) & (pl.col('RSS_max') <= categories['medium'])
    ).select('elapsed_time_hours').sum().row(0)[0]
    result[key] = elapsed

    key = f"max RSS > {categories['medium']} GB"
    elapsed = df.filter(
        pl.col('RSS_max') > categories['medium']
    ).select('elapsed_time_hours').sum().row(0)[0]
    result[key] = elapsed

    return result

In [34]:
def make_figure_task_category_by_memory(categories: List[str], elapsed_time: List[float]) -> bkh.figure:
    """Return a figure representing the elapsed time spent by each of the categories of tasks
    """
    # Build the data source
    total_elapsed = sum(elapsed_time)
    source = bkhmodels.ColumnDataSource(data={
        'category': categories, 
        'elapsed_time': [v/total_elapsed for v in elapsed_time], 
    })
    
    # Build and configure the figure
    fig = bkh.figure(
        x_range = bokeh.models.Range1d(0, 1.0),
        y_range = categories,
        width = 800,
        height = 600,
        background_fill_color="#f4f3f3", 
    )

    # Title and subtitle
    fig.add_layout(bkhmodels.Title(text=processing_label, text_font_style="italic", text_font_size="11pt"), 'above')
    fig.add_layout(bkhmodels.Title(text="Elapsed time spent per category of pipetask", text_font_size="18pt"), 'above')

    # Axis
    fig.xaxis.axis_label_text_font_style = "bold"
    fig.xaxis.major_label_text_font_size = "12pt"
    fig.xaxis.formatter = bokeh.models.NumeralTickFormatter(format='0%')

    # Format Y axis
    fig.yaxis.major_label_text_font_size = "12pt"
    fig.yaxis.axis_label_text_font_style = "bold"
     
    # Hide toolbar
    fig.toolbar.autohide = True
    
    # Add horizontal bars
    bars = fig.hbar(y='category', right='elapsed_time', left=0, height=0.8, source=source, fill_color='wheat', alpha=0.7, line_color='black')
    
    # Add tooltips
    fig.add_tools(bkhmodels.HoverTool(tooltips=[('Elapsed time', '@elapsed_time{%0}')], renderers=[bars], mode='mouse'))

    return fig

In [35]:
df_per_task = df_all.group_by("task", maintain_order=True).agg(
    [
        pl.col("task").count().alias("task_count"),
        pl.col("task").n_unique().alias("task_kinds"),
        #(pl.col("cpu_time")/3_600).sum().alias("cpu_time_hours"),
        (0*pl.col("run_time")).sum().alias("cpu_time"),
        (0*pl.col("run_time")/3_600).sum().alias("cpu_time_hours"),

        (pl.col("run_time")/3_600).sum().alias("elapsed_time_hours"),
        #(pl.col("cpu_time").sum()/pl.col("elapsed_time").sum()).alias("cpu_efficiency"),
        (0*pl.col("run_time").sum()/pl.col("run_time").sum()).alias("cpu_efficiency"),
        pl.col("memory").max().alias("RSS_max"),
    ]
)

In [36]:
# Generate a table with a summary of the characteristics of each task kind
summary = f"""
| Pipetask     | Number of tasks | Cumulated elapsed time (h) | Cumulated CPU time (h) | Overall CPU efficiency      | Max RSS (GB) |
| -----------: | --------------: | -------------------------: | ---------------------: | --------------------------: | -----------: |
"""

for task in sorted(df_per_task.get_column("task")):
    task_info = df_per_task.filter(pl.col("task") == task)
    task_count = task_info["task_count"][0]
    elapsed_time = task_info['elapsed_time_hours'][0]
    cpu_time = task_info["cpu_time_hours"][0]
    cpu_efficiency = task_info['cpu_efficiency'][0]
    max_rss = task_info['RSS_max'][0]
    summary += f'| `{task}` | {task_count:,} | {math.ceil(elapsed_time):,.0f} | {math.ceil(cpu_time):,.0f} | {cpu_efficiency:.2f} | {max_rss:.0f} |\n'    

# Summarize the dataframe
total_task_count = df_per_task.select("task_count").sum()[0,0]
total_elapsed_time =df_per_task.select("elapsed_time_hours").sum()[0,0]
total_cpu_time = df_per_task.select("cpu_time_hours").sum()[0,0]
total_cpu_efficiency = (df_per_task.select("cpu_time_hours").sum() / df_per_task.select("elapsed_time_hours").sum())[0,0]
total_max_rss = "n/a"

summary += f'| **Total** | **{total_task_count:,}** | **{math.ceil(total_elapsed_time):,.0f}** | **{math.ceil(total_cpu_time):,.0f}** | **{total_cpu_efficiency:.2f}** | **{total_max_rss}** |\n'
summary += f"Ignore the CPU time columns. These informations are currently not available in the rassource usage."
print_md(summary)


| Pipetask     | Number of tasks | Cumulated elapsed time (h) | Cumulated CPU time (h) | Overall CPU efficiency      | Max RSS (GB) |
| -----------: | --------------: | -------------------------: | ---------------------: | --------------------------: | -----------: |
| `analyzeMatchedVisitCore` | 16 | 1 | 0 | 0.00 | 20 |
| `analyzeObjectTableCore` | 1 | 1 | 0 | 0.00 | 6 |
| `analyzeObjectTableSurveyCore` | 1 | 1 | 0 | 0.00 | 1 |
| `assembleCoadd` | 405 | 22 | 0 | 0.00 | 4 |
| `calibrate` | 13,529 | 168 | 0 | 0.00 | 2 |
| `catalogMatchTract` | 1 | 1 | 0 | 0.00 | 2 |
| `characterizeImage` | 13,766 | 273 | 0 | 0.00 | 2 |
| `consolidateHealSparsePropertyMaps` | 5 | 1 | 0 | 0.00 | 1 |
| `consolidateObjectTable` | 1 | 1 | 0 | 0.00 | 18 |
| `consolidateSourceTable` | 126 | 1 | 0 | 0.00 | 1 |
| `deblend` | 81 | 32 | 0 | 0.00 | 5 |
| `detection` | 405 | 17 | 0 | 0.00 | 1 |
| `fgcmBuildFromIsolatedStars` | 1 | 1 | 0 | 0.00 | 8 |
| `fgcmFitCycle` | 1 | 1 | 0 | 0.00 | 28 |
| `fgcmOutputProducts` | 1 | 1 | 0 | 0.00 | 8 |
| `finalizeCharacterization` | 128 | 71 | 0 | 0.00 | 1 |
| `forcedPhotCcd` | 4,602 | 88 | 0 | 0.00 | 1 |
| `forcedPhotCoadd` | 405 | 451 | 0 | 0.00 | 4 |
| `gbdesAstrometricFit` | 60 | 2 | 0 | 0.00 | 2 |
| `healSparsePropertyMaps` | 5 | 2 | 0 | 0.00 | 4 |
| `isolatedStarAssociation` | 16 | 1 | 0 | 0.00 | 1 |
| `isr` | 14,208 | 134 | 0 | 0.00 | 2 |
| `makeCcdVisitTable` | 1 | 1 | 0 | 0.00 | 2 |
| `makeVisitTable` | 1 | 1 | 0 | 0.00 | 2 |
| `makeWarp` | 2,815 | 60 | 0 | 0.00 | 2 |
| `measure` | 405 | 253 | 0 | 0.00 | 4 |
| `mergeDetections` | 81 | 1 | 0 | 0.00 | 0 |
| `mergeMeasurements` | 81 | 1 | 0 | 0.00 | 2 |
| `photometricCatalogMatch` | 1 | 1 | 0 | 0.00 | 2 |
| `photometricRefCatObjectTract` | 1 | 1 | 0 | 0.00 | 1 |
| `plotPropertyMapTract` | 5 | 1 | 0 | 0.00 | 4 |
| `refCatObjectTract` | 1 | 1 | 0 | 0.00 | 1 |
| `selectDeepCoaddVisits` | 405 | 1 | 0 | 0.00 | 1 |
| `transformObjectTable` | 81 | 1 | 0 | 0.00 | 2 |
| `transformPreSourceTable` | 13,529 | 1 | 0 | 0.00 | 2 |
| `transformSourceTable` | 12,978 | 1 | 0 | 0.00 | 0 |
| `updateVisitSummary` | 126 | 11 | 0 | 0.00 | 1 |
| `validateObjectTableCore` | 1 | 1 | 0 | 0.00 | 1 |
| `writeObjectTable` | 81 | 1 | 0 | 0.00 | 9 |
| `writePreSourceTable` | 13,529 | 2 | 0 | 0.00 | 2 |
| `writeRecalibratedSourceTable` | 12,978 | 32 | 0 | 0.00 | 1 |
| **Total** | **104,864** | **1,618** | **0** | **0.00** | **n/a** |
Ignore the CPU time columns. These informations are currently not available in the rassource usage.

In [432]:
def categorize_tasks_per_memory(df: pd.DataFrame, categories: dict = {'small': 5, 'medium': 20, 'high': None}) -> pd.DataFrame:
    """Return a data frame with details about categories of tasks according to ``categries``.

    The returned data frame is indexed by category and contains the CPU time spent in each category (in seconds)
    as well as the percentage of time spent by each category of task.
    """
    # Determine the CPU time consumed by three classes of tasks: small-memory, medium-memory and high-memory
    total_cpu_time = df['cpu_time'].sum()

    # Compute the CPU time spent running each category of tasks tasks
    is_small_memory_task = df['RSS_max'] <= categories['small']
    is_medium_memory_task = (df['RSS_max'] > categories['small']) & (df['RSS_max'] <= categories['medium'])
    is_high_memory_task = ~is_small_memory_task & ~is_medium_memory_task
    cpu_time = [
        df[is_small_memory_task]['cpu_time'].sum(),
        df[is_medium_memory_task]['cpu_time'].sum(),
        df[is_high_memory_task]['cpu_time'].sum(),
    ]
    
    # Compute the percentage CPU time spent by each category of tasks
    cpu_time_percent = [v/total_cpu_time for v in cpu_time]
    
    # Build the resulting dataframe
    data = {
        'category': categories.keys(),
        'cpu_time': cpu_time,
        'cpu_time_percent': cpu_time_percent,
    }
    
    out_df = pd.DataFrame.from_records(data=data)
    out_df.set_index('category', inplace=True)
    return out_df

In [433]:
def generate_task_categories_summary(df: pd.DataFrame, categories: dict = {'small': 5, 'medium': 20, 'high': None}) -> str:
    """Return a Markdown-formatted table with a summary of the percentage of CPU time spent in each category of task.
    """
    # Summarize
    summary = f"""
| category of task    |    CPU time                                         | comments                                                                      |
| ------------------- | --------------------------------------------------: | ----------------------------------------------------------------------------- |
| **small-memory**    | {100. * df.loc['small', 'cpu_time_percent']:.2f}%   | 0 < max RSS ≤ {categories['small']:.0f} GB                                    |
| **medium-memory**   | {100. * df.loc['medium', 'cpu_time_percent']:.2f}%  | {categories['small']:.0f} < max RSS ≤ {categories['medium']:.0f} GB           |
| **high-memory**     | {100. * df.loc['high', 'cpu_time_percent']:.2f}%    | max RSS > {categories['medium']:.0f} GB                                       |
    """
    return summary

In [434]:
# Categorize tasks by their max RSS and generate a summary
task_categories = {
    'small': 5,
    'medium': 20,
    'high': None
}
categories_df = categorize_tasks_per_memory(df_all, categories=task_categories)
summary = generate_task_categories_summary(categories_df, categories=task_categories)
print_md(summary)

ColumnNotFoundError: "cpu_time" not found

In [419]:
def make_task_category_figure(df: pd.DataFrame) -> bkh.figure:
    """Return a figure representing the CPU time spent on each task category
    """
    # Build the data source
    categories = [f"{v}-memory tasks" for v in df.index.values]
    source = bkhmodels.ColumnDataSource(data={
        'category': categories,
        'percentage': df['cpu_time_percent'].values,
        'percentage_tooltips': [100.0 * v for v in df['cpu_time_percent'].values],
        'color': ['mediumseagreen', 'gold', 'crimson'],
    })
    
    # Build and configure the figure
    fig = bkh.figure(
        x_axis_label = 'CPU time',
        x_range = bokeh.models.Range1d(0, 1.0),
        y_range=categories,
        plot_width=600,
        plot_height=400,
        background_fill_color="#f4f3f3", 
    )

    # Title and subtitle
    fig.add_layout(bkhmodels.Title(text="Rubin processing at FrDF for Data Preview 0.2 (v23.0.1)", text_font_style="italic", text_font_size="12pt"), 'above')
    fig.add_layout(bkhmodels.Title(text="CPU time spent per task category", text_font_size="18pt"), 'above')

    # Axes
    fig.xaxis.axis_label_text_font_size = "14pt"
    fig.xaxis.axis_label_text_font_style = "bold"
    fig.xaxis.major_label_text_font_size = "12pt"
    fig.yaxis.axis_label_text_font_size = "14pt"
    fig.yaxis.major_label_text_font_size = "12pt"
    fig.yaxis.axis_label_text_font_style = "bold"
    
    # Add bars
    bars = fig.hbar(y='category', left=0, right='percentage', height=0.5, color='color', source=source)
    
    # Add tooltips
    fig.add_tools(bkhmodels.HoverTool(tooltips=[('task category', '@category'), ('CPU time', '@percentage_tooltips{0.2f} %')], renderers=[bars], mode='mouse'))
    
    # Add formatters
    fig.xaxis[0].formatter = bokeh.models.NumeralTickFormatter(format="0%")

    return fig

In [420]:
category_fig = make_task_category_figure(categories_df)
bkh.show(category_fig)

print_md(f"""
The figure above presents the fraction of CPU time spent for executing each category of tasks. Hover over the bars to get more details about each category. Tasks are categorized as small-, medium- and high-memory.
Small-memory tasks are those requiring up to {task_categories['small']} GB. Medium-memory tasks are those
requiring more than {task_categories['small']} GB and up {task_categories['medium']} GB. High-memory tasks are those requiring more than {task_categories['medium']} GB.
""")

NameError: name 'categories_df' is not defined

In [421]:
def summarize_cpu_time_per_task_category(df: pd.DataFrame, categories: dict = {'small': 5, 'medium': 20, 'high': None}) -> pd.DataFrame:
    """Return a data frame with details about each task type. The columns of the dataframe are the percentage of CPU time spent for each task category.
    The returned data frame is indexed by task type.
    """
    
    out_df = pd.DataFrame()
    for name, group in df.groupby('task'):
        # Compute the CPU spent per task category, for this group
        is_small_memory_task = group['RSS_max'] <= categories['small']
        is_medium_memory_task = (group['RSS_max'] > categories['small']) & (group['RSS_max'] <= categories['medium'])
        is_high_memory_task = ~is_small_memory_task & ~is_medium_memory_task
        
        group_cpu_time = group['cpu_time'].sum()
        row = { 'task': name }
        row['small'] = group[is_small_memory_task]['cpu_time'].sum() / group_cpu_time
        row['medium'] = group[is_medium_memory_task]['cpu_time'].sum() / group_cpu_time
        row['high'] = group[is_high_memory_task]['cpu_time'].sum() / group_cpu_time
        out_df = pd.concat([out_df, pd.DataFrame.from_records(data=[row,], columns=row.keys())])

    out_df.set_index('task', inplace=True)
    out_df.sort_values(by='task', ascending=True, inplace=True)
    return out_df

In [422]:
def make_figure_category_within_task_type(df: pd.DataFrame) -> bkh.figure:
    """Return a figure representing the CPU time spent on each task category, for each kind of task
    """
    # Sort by task name in reverse alphabetical order so that veertical reading makes sense
    df = df.sort_values(by=['small', 'medium', 'high'], ascending=True)

    # Build the data source
    source = bkhmodels.ColumnDataSource(data={
        'task': df.index.values,
        'small': df['small'].values,
        'medium': df['medium'].values,
        'high': df['high'].values,
    })
    
    # Build and configure the figure
    fig = bkh.figure(
        x_axis_label = 'CPU time',
        y_axis_label = 'task',
        x_range = bokeh.models.Range1d(0, 1.0),
        y_range = df.index.values,
        width = 1200,
        height = 1200,
        background_fill_color="#f4f3f3", 
    )

    # Title and subtitle
    fig.add_layout(bkhmodels.Title(text="Rubin processing at FrDF for Data Preview 0.2 (v23.0.1)", text_font_style="italic", text_font_size="12pt"), 'above')
    fig.add_layout(bkhmodels.Title(text="CPU time spent per task category", text_font_size="18pt"), 'above')

    # Axes
    fig.xaxis.axis_label_text_font_size = "14pt"
    fig.xaxis.axis_label_text_font_style = "bold"
    fig.xaxis.major_label_text_font_size = "12pt"
    fig.yaxis.axis_label_text_font_size = "14pt"
    fig.yaxis.major_label_text_font_size = "12pt"
    fig.yaxis.axis_label_text_font_style = "bold"
    fig.xaxis[0].formatter = bokeh.models.NumeralTickFormatter(format="0%")
    
    # Legend
    fig.add_layout(bokeh.models.Legend(), 'right')
    
    # Add bars
    bars = fig.hbar_stack(['small', 'medium', 'high'], y='task', color=['mediumseagreen', 'gold', 'crimson'], height=0.7, source=source, legend_label=['small-memory', 'medium-memory', 'high-memory'])
    
    # Add tooltips
    # fig.add_tools(bkhmodels.HoverTool(tooltips=[('task', '@task'), ('small memory', '@small{0.2f} %')], renderers=[bars], mode='mouse'))
    # fig.add_tools(bkhmodels.HoverTool(tooltips=[('task', '@task'),], renderers=[bars], mode='mouse'))
    return fig

In [423]:
taks_df = summarize_cpu_time_per_task_category(df_all, task_categories)
all_tasks_fig = make_figure_category_within_task_type(taks_df)
bkh.show(all_tasks_fig)
print_md(f"""
The figure above presents a more detailed view of the share of CPU time spent by each task category for every kind of pipeline task. Tasks are categorized as small-, medium- and high-memory.
Small-memory tasks are those requiring up to {task_categories['small']} GB. Medium-memory tasks are those
requiring more than {task_categories['small']} GB and up {task_categories['medium']} GB. High-memory tasks are those requiring more than {task_categories['medium']} GB.
""")

AttributeError: 'DataFrame' object has no attribute 'groupby'

In [424]:
def build_boxplot_dataframe(df: pd.DataFrame, column: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Build a dataframe with data to populate a figure composed of box plots for each
    kind of task based on the values of the column ``column``.
    
    Parameters
    ----------
    df : pd.DataFrame
       dataframe which contains one row for each executed task.
       
    column: str
       name of the column in ``df`` to be used as criterion for building the boxplot
       data.
       
    Returns
    -------
    first: pd.DataFrame
       a dataframe with columns 'task', 'q0', 'q1', 'q2', 'q3', 'q4', 'lower' and 'upper'.
       The columns 'qN' contain the value of the corresponding quartile. The columns
       'lower' and 'upper' contain the values for the lower and upper whiskers of the
       box plot.
       This dataframe is indexed by task name.
       
    second: pd.DataFrame
        outliers dataframe
       
    Notes
    -----
    See also: https://en.wikipedia.org/wiki/Box_plot
    """
    # Build a new dataframe where each row contains values computed for each kind of task.
    # Those values are used later for building a figure with a boxplot per task type.
    # The values are the first, second and third quartiles for 'RSS_max' column, the
    # inter-quartile range and the lower and upper boxplot limits.
    out_df = pd.DataFrame()
    
    for name, group in df.group_by('task'):
        # Compute the 25th, 50th and 75th percentile for this group
        q0, q1, q2, q3, q4 = np.percentile(group[column].dropna(), (0.0, 25.0, 50.0, 75.0, 100.0))
                
        # Compute inter-quartile range and values for lower and upper whiskers
        iqr = q3 - q1
        lower = max(q1 - 1.5*iqr, q0)
        upper = min(q3 + 1.5*iqr, q4)

        # Append a new row to the resulting dataframe for this kind of task
        row = {
            'task': name, 
            'q0': q0, 
            'q1': q1, 
            'q2': q2,
            'q3': q3,
            'q4': q4,
            'lower': lower,
            'upper': upper,
        }
        out_df = pd.concat([out_df, pd.DataFrame.from_records(data=[row,], columns=row.keys())])

    # Set the dataframe index to the task name
    out_df.set_index('task', inplace=True)
    
    # Select outliers
    def select_outliers(group):
        """Return outliers for the given DataFrame group. It expects a single group per task.

        Outliers are those data points below or higher than the task's lower and upper limits.
        """
        task = group.name
        lower = out_df.loc[task]['lower']
        upper = out_df.loc[task]['upper']
        is_outlier = (group[column] < lower) | (group[column] > upper)
        return group[is_outlier][column]

    outliers = df.group_by('task').apply(select_outliers).dropna()
    return out_df, outliers

In [425]:
def make_figure_boxplot_rss(df: pd.DataFrame, unit: str, outliers: pd.Series) -> bkh.figure:
    """Builds a figure with a boxplot per task
    """
    # Sort the dataframe by median value
    df = df.sort_values(by='q2', ascending=False)
    
    # Retrieve the names of the tasks
    tasks = df.index.values
    
    # Build an configure the figure
    fig = bkh.figure(
        x_axis_label = 'max RSS (gigabyte)',
        y_axis_label = 'pipeline task',
        background_fill_color = "#f4f3f3",
        y_range = tasks,
        width = 1200,
        height = 1400
    )

    # Title and subtitle
    fig.add_layout(bkhmodels.Title(text="Rubin processing at FrDF for Data Preview 0.2 (v23.0.1)", text_font_style="italic", text_font_size="12pt"), 'above')
    fig.add_layout(bkhmodels.Title(text="Memory Consumption by Pipeline Tasks", text_font_size="18pt"), 'above')

    # Axes
    fig.xaxis.axis_label_text_font_size = "14pt"
    fig.xaxis.axis_label_text_font_style = "bold"
    fig.xaxis.major_label_text_font_size = "12pt"
    fig.yaxis.axis_label_text_font_size = "14pt"
    fig.yaxis.major_label_text_font_size = "12pt"
    fig.yaxis.axis_label_text_font_style = "bold"
    
    # Data source
    data_source = bkhmodels.ColumnDataSource({
        'task': df.index.values,
        'q0': df['q0'].values,
        'q1': df['q1'].values,
        'q2': df['q2'].values,
        'q3': df['q3'].values,
        'q4': df['q4'].values,
        'lower': df['lower'].values,
        'upper': df['upper'].values,
    })

    # Stems
    line_color = 'black'
    fig.segment(x0='q3', y0='task', x1='upper', y1='task', source=data_source, line_color=line_color)
    fig.segment(x0='lower', y0='task', x1='q1', y1='task', source=data_source, line_color=line_color)

    # Boxes
    box_fill_color = 'tan'
    box_height = 0.8
    boxes = fig.hbar(y='task', height=box_height, right='q3', left='q1', source=data_source, fill_color=box_fill_color, line_color=box_fill_color)

    # Add tooltips for boxes
    fig.add_tools(
        bkhmodels.HoverTool(
            tooltips=[('task', '@task'), ('min', f'@q0{{0.2}} {unit}'), ('median', f'@q2{{0.}} {unit}'),  ('max', f'@q4{{0.}} {unit}')],
            renderers=[boxes],
            mode='mouse'
        )
    )

    # Whiskers
    whisker_height = box_height * 0.50
    fig.rect(x='lower', y='task', width=0.001, height=whisker_height, source=data_source, line_color=line_color, fill_color=line_color)
    fig.rect(x='upper', y='task', width=0.001, height=whisker_height, source=data_source, line_color=line_color, fill_color=line_color)

    # Median
    median_color = 'red'
    fig.rect(x='q2', y='task', width=0.001, height=box_height, source=data_source, line_color=median_color, fill_color=median_color)
    
    # Outliers
    if not outliers.empty:
        outliers_data = bkhmodels.ColumnDataSource({
            'x': list(outliers.values),
            'task': list(outliers.index.get_level_values(0)),
        })
        outliers_color = '#F38630' # 'darksalmon'
        circles = fig.circle(x='x', y='task', source=outliers_data, size=6, color=outliers_color, fill_alpha=0.6)
        fig.add_tools(bkhmodels.HoverTool(tooltips=[('task', '@task'), ('max RSS', f'$x{{0.1}} {unit}')], renderers=[circles], mode='mouse'))

    return fig

In [426]:
rss_boxplot_df, rss_outliers = build_boxplot_dataframe(df_all, column='RSS_max')
box_plot_rss_fig = make_figure_boxplot_rss(rss_boxplot_df, 'GB', rss_outliers)
bkh.show(box_plot_rss_fig)

ColumnNotFoundError: "RSS_max" not found

# Per task

In [316]:
# Create a dataframe with details about each pipetask
df_per_task = df_all.group_by("task", maintain_order=True).agg(
    [
        pl.col("task").count().alias("task_count"),
        (pl.col("run_time")/3_600).sum().alias("elapsed_time_hours"),
        pl.col('memory').min().alias('RSS_min'),
        pl.col('memory').max().alias('RSS_max'),
        pl.col('memory').mean().alias('RSS_mean'),
        pl.col('memory').std().alias('RSS_std'),
        pl.col('memory').quantile(0.05).alias('RSS_p05'),
        pl.col('memory').quantile(0.50).alias('RSS_p50'),
        pl.col('memory').quantile(0.95).alias('RSS_p95'),
        (pl.col('memory').quantile(0.75)-pl.col('memory').quantile(0.25)).alias("RSS_iqr"),
    ]
)

# hack
df_per_task_opti = df_all_opti.group_by("task", maintain_order=True).agg(
    [
        pl.col("task").count().alias("task_count"),
        (pl.col("run_time")/3_600).sum().alias("elapsed_time_hours"),
        pl.col('memory').min().alias('RSS_min'),
        pl.col('memory').max().alias('RSS_max'),
        pl.col('memory').mean().alias('RSS_mean'),
        pl.col('memory').std().alias('RSS_std'),
        pl.col('memory').quantile(0.05).alias('RSS_p05'),
        pl.col('memory').quantile(0.50).alias('RSS_p50'),
        pl.col('memory').quantile(0.95).alias('RSS_p95'),
        (pl.col('memory').quantile(0.75)-pl.col('memory').quantile(0.25)).alias("RSS_iqr"),
    ]
)

In [317]:
def make_figure_big_task_consumers(tasks: List[str], elapsed_time_pct: List[float], rss_min: List[float], rss_max: List[float], rss_mean: List[float],
                                   rss_pct_low: List[float], rss_pct_high: List[float], label: str = None, note: str = None) -> bkh.figure:
    """Return a figure representing the RSS distribution for each pipetask and its consumption of compute time.
    """
    # Build the data source: sort tasks by rss_max
    sorted_by_elapsed = sorted(zip(elapsed_time_pct, rss_min, rss_max, rss_mean, rss_pct_low, rss_pct_high, tasks), reverse=True)
    elapsed_time_pct, rss_min, rss_max, rss_mean, rss_pct_low, rss_pct_high, tasks = zip(*sorted_by_elapsed)
    source = bkhmodels.ColumnDataSource(data={
        'tasks': tasks,
        'elapsed_time_pct': elapsed_time_pct,
        'elapsed_time_cumulated': np.cumsum(elapsed_time_pct),
        'rss_min': rss_min,
        'rss_max': rss_max,
        'rss_mean': rss_mean,
        'rss_pct_low': rss_pct_low,
        'rss_pct_high': rss_pct_high,
    })
    
    # Build and configure the figure
    left_y_range = bokeh.models.Range1d(0.1, max(rss_max)*2)
    fig = bkh.figure(
        x_range = tasks,
        y_range = left_y_range,
        y_axis_type = 'log',
        y_axis_label = 'gigabyte',
        width = 1_600,
        height = 1_200,
        background_fill_color="#f4f3f3", 
    )

    # Title and subtitle
    fig.add_layout(bkhmodels.Title(text=processing_label, text_font_style="italic", text_font_size="11pt"), 'above')
    fig.add_layout(bkhmodels.Title(text="Memory used before and after optimization", text_font_size="18pt"), 'above')

    # Axis
    fig.xaxis.axis_label_text_font_size = "11pt"
    fig.xaxis.axis_label_text_font_style = "bold"
    fig.xaxis.major_label_text_font_size = "11pt"
    fig.xaxis.major_label_orientation = math.pi/2.5
    
    # Add secondary Y axis
    bottom_right_y_axis = min(elapsed_time_pct) / 2
    top_right_axis = max(elapsed_time_pct) * 1.2
    fig.extra_y_ranges = {"y_axis_right": bokeh.models.Range1d(bottom_right_y_axis, top_right_axis)}
    fig.add_layout(bokeh.models.LinearAxis(y_range_name='y_axis_right', axis_label='total elapsed time'), 'right')
    fig.yaxis[1].formatter = bokeh.models.NumeralTickFormatter(format='0%')

    # Format Y axis
    # fig.yaxis[1].ticker = bokeh.models.tickers.LogTicker()
    for y_axis in fig.yaxis:
        y_axis.axis_label_text_font_size = "14pt"
        y_axis.major_label_text_font_size = "12pt"
        y_axis.axis_label_text_font_style = "bold"
        
    # Annotation
    if label is not None:
        annotation_label = bkhmodels.Label(x=len(tasks)//2.2, y=max(rss_max)/1.2, x_units='data', y_units='data',
                     text=label, text_font_size='12pt',
                     text_color='dimgray', text_alpha=0.8,
                     background_fill_color='white', background_fill_alpha=1.0,
                     border_line_color='dimgray', border_line_alpha=0.5)
        fig.add_layout(annotation_label)

    
    # Add glyphs
    dashes_max = fig.scatter(marker='dash', x='tasks', y='rss_max', color='indianred', size=15, line_width=3, source=source, legend_label='max RSS')
    mean = fig.scatter(x='tasks', y='rss_mean', size=8, color='mediumseagreen', source=source, legend_label='mean RSS ')
    dashes_min = fig.scatter(marker='dash', x='tasks', y='rss_min', color='steelblue', size=15, line_width=3, source=source, legend_label='min RSS')
    bars = fig.vbar(x='tasks', top='elapsed_time_pct', bottom=bottom_right_y_axis, width=0.8, color='tan', alpha=0.3, source=source, legend_label='elapsed time', y_range_name='y_axis_right')
    
    whisker = bkhmodels.Whisker(base='tasks', upper='rss_pct_high', lower='rss_pct_low', source=source)
    fig.add_layout(whisker)
    
    # Hide toolbar
    fig.toolbar.autohide = True
    
    # Hide legend on click
    fig.legend.click_policy = 'mute'

    # Add tooltips
    fig.add_tools(bkhmodels.HoverTool(tooltips=[
        ('Task', '@tasks'),
        ('RSS max', '@rss_max{0.2f} GB'),
        ('RSS percentile high', '@rss_pct_high{0.2f} GB'),
        ('RSS mean', '@rss_mean{0.2f} GB'),
        ('RSS percentile low', '@rss_pct_low{0.2f} GB'),
        ('RSS min', '@rss_min{0.2f} GB'),
        ('elapsed time', '@elapsed_time_pct{0.0%}'),
        ('cumulated elapsed time', '@elapsed_time_cumulated{0.0%}'),
    ], renderers=[bars, mean, dashes_max, dashes_min], mode='mouse'))
    
    # Add note below the figure
    if note is not None:
        fig.add_layout(bkhmodels.Title(text=note, text_font_style='italic'), 'below')
    
    return fig

In [318]:
# Select the tasks which consume in aggregate a threshold of the elapsed time
task_time_consumers_df = df_per_task#.filter(pl.col('cpu_time_hours') > 0.0)
tasks = get_column(task_time_consumers_df, 'task')
elapsed_time = get_column(task_time_consumers_df, 'elapsed_time_hours')

# Sort by elapsed time in decreasing order
sorted_by_elapsed_time = sorted(zip(elapsed_time, tasks), reverse=True)
elapsed_time, tasks = zip(*sorted_by_elapsed_time)

# Compute the percentage of total time each kind of pipetask spent in execution
elapsed_percentage = [100.0 * v/sum(elapsed_time) for v in elapsed_time]
elapsed_cumulated = np.cumsum(elapsed_percentage)

cumulated_elapsed_threshold = 98 # percent
time_consumers_tasks = np.array(tasks)[elapsed_cumulated <= cumulated_elapsed_threshold].tolist()

# Build a dataframe with big consumer tasks and their memory usage
# with columns:
#    task, elapsed_time_hours, min_RSS, mean_RSS, max_RSS, p05_RSS, p95_RSS
tasks_to_plot = [taskToCompare, taskToCompare + '_opti']

#HACK
time_consumers_df = pl.concat([
    df_per_task.with_columns(
        # Add column 'elapsed_time_pct' with the percentage of total time consumed by each kind of pipetask
        (pl.col('elapsed_time_hours') / pl.col('elapsed_time_hours').sum()).alias('elapsed_time_pct')
    )
    .filter(
        # Select only the tasks consuming the most
        pl.col('task').is_in(tasks_to_plot)
    )
    .sort(by='elapsed_time_hours', descending=True)
,   df_per_task_opti.with_columns(
        # Add column 'elapsed_time_pct' with the percentage of total time consumed by each kind of pipetask
        (pl.col('elapsed_time_hours') / pl.col('elapsed_time_hours').sum()).alias('elapsed_time_pct'),
            task=pl.col("task").str.replace(taskToCompare, taskToCompare + "_opti")

    )
    .filter(
        # Select only the tasks consuming the most
        pl.col('task').is_in(tasks_to_plot)
    )
    .sort(by='elapsed_time_hours', descending=True)
]
                             )

display(time_consumers_df)

# Build the plot
task_rss_consumers_fig = make_figure_big_task_consumers(
    tasks = get_column(time_consumers_df, 'task'),
    elapsed_time_pct = get_column(time_consumers_df, 'elapsed_time_pct'),
    rss_max = get_column(time_consumers_df, 'RSS_max'),
    rss_min = get_column(time_consumers_df, 'RSS_min'),
    rss_mean = get_column(time_consumers_df, 'RSS_mean'),
    rss_pct_low = get_column(time_consumers_df, 'RSS_p05'),
    rss_pct_high = get_column(time_consumers_df, 'RSS_p95'),
    # label=f'\n Pipetasks which consume in aggregate {cumulated_elapsed_threshold}% of elapsed time.\n',
    note = f'NOTE: Whiskers show 5th to 95th RSS percentiles.',
    # note = f'NOTE: the pipetasks shown consume in aggregate {cumulated_elapsed_threshold}% of the total elapsed time of the DP0.2 campaign. Whiskers show 5th to 95th RSS percentiles.',
)
bkh.show(task_rss_consumers_fig)

# Export this figure
save_figure(task_rss_consumers_fig, output_dir=output_dir, filename='memory-stats', title='DP0.2 Memory by most compute-intensive pipetasks')

task,task_count,elapsed_time_hours,RSS_min,RSS_max,RSS_mean,RSS_std,RSS_p05,RSS_p50,RSS_p95,RSS_iqr,elapsed_time_pct
str,u32,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""forcedPhotCoadd""",405,450.520525,1.271026,4.041474,3.166687,0.579597,1.880367,3.286925,3.896869,0.706621,0.278605
"""forcedPhotCoadd_opti""",405,243.923063,1.313985,4.263543,3.5799,0.408522,2.897031,3.578556,4.127621,0.550023,0.172939


# Memory vs runtime scatter plot

In [319]:
from bokeh.palettes import Category10, Category20
from bokeh.transform import factor_cmap
from scipy.spatial import ConvexHull

def make_figure_correlation_memory_time(
    tasks: List[str], run_time: List[float], elapsed_time_pct: List[float], rss_max: List[float],
    band: List[str], patch: List[int],
    label: str = None
) -> bkh.figure:
    """Return a scatter plot figure with max RSS and compute time. Color by task name."""
    
    # Sort the data
    #sorted_by_elapsed = sorted(zip(elapsed_time_pct, run_time, rss_max, tasks), reverse=True)
    #elapsed_time_pct, run_time, rss_max, tasks = zip(*sorted_by_elapsed)
    
    # Create a data source
    source = bkhmodels.ColumnDataSource(data={
        'tasks': tasks,
        'run_time': run_time,
        'elapsed_time_pct': elapsed_time_pct,
        'rss_max': rss_max,
        'band' : band,
        'patch' : patch,
    })
    
    # Use counter to sort tasks by quantity
    from collections import Counter
    task_counter = dict(Counter(tasks).most_common())
    # Generate a color palette using Viridis256
    unique_tasks = list(task_counter.keys())  # Ensure unique tasks
    palette = Category10[max(3, min(20, len(unique_tasks)))]  # Use at most 20 colors, and minimum 3
    #palette = [Viridis256[i * 256 // len(unique_tasks)] for i in range(len(unique_tasks))] # Use infinite colors, but less visually striking
    color_map = factor_cmap('tasks', palette=palette, factors=unique_tasks)

    # Build and configure the figure
    left_y_range = bokeh.models.Range1d(min(rss_max)-0.1, max(rss_max) * 1.2)
    x_range = bokeh.models.Range1d(0.1, 10000)

    fig = bkh.figure(
        #x_axis_type='log',
        x_axis_label='Elapsed time (s)',
        #x_range=x_range,
        y_range=left_y_range,
        #y_axis_type='log',
        y_axis_label='gigabyte',
        width=1_400,
        height=800,
        background_fill_color="#f4f3f3",
    )
    
    # Add title and subtitle
    fig.add_layout(bkhmodels.Title(text=processing_label, text_font_style="italic", text_font_size="11pt"), 'above')
    fig.add_layout(bkhmodels.Title(text="Max memory used versus elapsed time, grouped by task,", text_font_size="18pt"), 'above')

    # Configure axes
    for x_axis in fig.xaxis:
        x_axis.axis_label_text_font_size = "11pt"
        x_axis.axis_label_text_font_style = "bold"
        x_axis.major_label_text_font_size = "11pt"

    for y_axis in fig.yaxis:
        y_axis.axis_label_text_font_size = "14pt"
        y_axis.major_label_text_font_size = "12pt"
        y_axis.axis_label_text_font_style = "bold"

    # Add scatter points with dynamic color by task
    renderers_pt, renderers_ch = [], []
    for task, count in task_counter.items():
        # Extract data points for the current task
        task_x = [rt for t, rt, pctt, mem in zip(tasks, run_time, elapsed_time_pct, rss_max) if t == task]
        task_y = [mem for t, rt, pctt, mem in zip(tasks, run_time, elapsed_time_pct, rss_max) if t == task]
        task_pctt = [pctt for t, rt, pctt, mem in zip(tasks, run_time, elapsed_time_pct, rss_max) if t == task]
        task_band = [b for t, rt, pctt, mem, b, p in zip(tasks, run_time, elapsed_time_pct, rss_max, band, patch) if t == task]
        task_patch = [p for t, rt, pctt, mem, b, p in zip(tasks, run_time, elapsed_time_pct, rss_max, band, patch) if t == task]

        # Create a data source for the task points
        task_source = bkhmodels.ColumnDataSource(data={
            'run_time': task_x,
            'rss_max': task_y,
            'band' : task_band,
            'patch' : task_patch,
        })

        # Plot task points
        renderers_pt.append(
            fig.scatter(
                marker='circle', x='run_time', y='rss_max', size=5,
                line_width=3, color=palette[unique_tasks.index(task)],
                source=task_source, legend_label=f'{task} / {count} calls / {sum(task_pctt):.2%}',
                muted_alpha=0.05,
                
            )
        )

        # Compute convex hull for the task's points if there are enough points (min 3)
        if len(task_x) >= 3:
            points = np.array(list(zip(task_x, task_y)))
            hull = ConvexHull(points)

            # Get the vertices of the hull and close the polygon by appending the first point
            hull_vertices = points[hull.vertices]
            hull_vertices = np.append(hull_vertices, [hull_vertices[0]], axis=0)

            # Create a ColumnDataSource for the hull vertices
            hull_source = bkhmodels.ColumnDataSource(data={
                'task_name' : [task for _ in hull_vertices],
                'x': hull_vertices[:, 0],
                'y': hull_vertices[:, 1]
            })
            # Plot the convex hull as a patch
            r = fig.patch(
                x='x', y='y', source=hull_source,
                color=palette[unique_tasks.index(task)], 
                fill_alpha=0.0, line_alpha=0.5, line_width=2
                )
            renderers_ch.append(r)

    # Configure toolbar and legend
    fig.toolbar.autohide = True
    fig.legend.click_policy = 'hide'
    fig.legend.label_text_font_size = '8pt'

    # Add tooltips
    # TODO : tooltips for the convex hull (patch) does not work ?!
    #fig.add_tools(bkhmodels.HoverTool(
    #    tooltips=[('Task', '@{task_name}')],
    #    renderers=renderers_ch,
    #    mode='mouse')
    #)
    
    fig.add_tools(bkhmodels.HoverTool(tooltips=[
        ('RSS max', '@rss_max{0.2f} GB'),
        ('Elapsed time', '@run_time{0.1f} s'),
        ('Band', '@band'), ('Patch', '@patch')
    ], renderers=renderers_pt, mode='mouse'))
    
    # Legend location
    #fig.add_layout(fig.legend[0],'right') # outside the plot
    fig.legend[0].location = 'top_left' # inside the plot

    # Add legend title
    if label is not None: fig.legend[0].title = label
    
    return fig

In [320]:
# Select the tasks to include
tasks_to_plot = [taskToCompare + '_opti', taskToCompare]
df_all_non_zero = pl.concat([
    df_all.filter((pl.col('memory') >= 0.1) & (pl.col('task').is_in(tasks_to_plot))),
    df_all_opti.filter((pl.col('memory') >= 0.1) & (pl.col('task').is_in(tasks_to_plot))).with_columns(task=pl.col("task").str.replace(taskToCompare, taskToCompare + "_opti")),
]
)

# Create and display the figure
task_correlation_memory_time_fig = make_figure_correlation_memory_time(
    tasks=get_column(df_all_non_zero, 'task'),
    run_time=get_column(df_all_non_zero, 'run_time'),
    elapsed_time_pct=get_column(df_all_non_zero, 'elapsed_time_pct'),
    rss_max=get_column(df_all_non_zero, 'memory'),
    band=get_column(df_all_non_zero, 'band'),
    patch=get_column(df_all_non_zero, 'patch'),
    label=f'Comparaison between opti and stack\nname / nb calls / % total time',
)

bkh.show(task_correlation_memory_time_fig)
save_figure(task_correlation_memory_time_fig, output_dir=output_dir, filename='memory-vs-runtime-'+taskToCompare, title='DP0.2 Memory vs Execution time, top10')

# Memory distribution

In [321]:
def make_figure_rss_histogram_per_task(distribution: List[float], task: str, annotation: str=None) -> bkh.figure:
    """Return a figure with a histogram of the RSS for a given task
    """
    # Compute histogram
    hist, edges = np.histogram(distribution, density=False, bins=bins)
    count = len(distribution)
    source = bokeh.models.ColumnDataSource(data=dict(base=[max(hist)],
                                                     upper=[distribution.quantile(0.95)], lower=[distribution.quantile(0.05)],
                                                     q1=[distribution.quantile(0.25)], q2=[distribution.quantile(0.50)], q3=[distribution.quantile(0.75)]
                                                    ))
    
    # Build and configure the figure
    width, height = 1_000, 600
    y_range = bokeh.models.Range1d(1, max(hist)*15)
    fig = bkh.figure(
        x_range = (edges[0], edges[-1]),
        x_axis_label = 'gigabyte',
        y_axis_label = 'frequency',
        y_range = y_range,
        y_axis_type = 'log',
        width = width,
        height = height,
        background_fill_color="#f4f3f3", 
    )

    # Title and subtitle
    fig.add_layout(bkhmodels.Title(text=processing_label, text_font_style="italic", text_font_size="11pt"), 'above')
    fig.add_layout(bkhmodels.Title(text=f"Distribution of memory usage by {task} pipetask", text_font_size="16pt"), 'above')

    # Format X axis
    fig.xaxis.axis_label_text_font_size = "12pt"
    fig.xaxis.axis_label_text_font_style = "bold"
    fig.xaxis.major_label_text_font_size = "12pt"
    fig.xaxis.formatter = bokeh.models.NumeralTickFormatter(format='0,.f')

    # Add and format secondary Y axis
    fig.extra_y_ranges = {"y_axis_right": y_range}
    fig.add_layout(bokeh.models.LogAxis(y_range_name="y_axis_right"), 'right')
    for y_axis in fig.yaxis:
        y_axis.ticker = bokeh.models.tickers.LogTicker()
        y_axis.axis_label_text_font_size = "14pt"
        y_axis.major_label_text_font_size = "12pt"
        y_axis.axis_label_text_font_style = "bold"


    
    # Add a histogram
    quad = fig.quad(top=hist, bottom=1, left=edges[:-1], right=edges[1:],
           fill_color="orange", line_color="black", alpha=0.3)
    ## Add descriptive statistics
    #whisker = bokeh.models.Whisker(base="base", upper="upper", lower="lower", dimension="width", source=source,
    #                               level="annotation", upper_units='screen', base_units='screen', lower_units='screen')
    #whisker.upper_head.size = whisker.lower_head.size = max(hist)
    #fig.add_layout(whisker)
    
    
    ## quantile boxes
    #fig.hbar("base", max(hist), "q2", "q3", source=source, color="red", line_color="black",)
    #fig.hbar("base", max(hist), "q1", "q2", source=source, color="red", line_color="black",)
    
    # Add annotation label
    if annotation is not None:
        # TODO: fix the issue with the location of the annotation in the plot
        label = bkhmodels.Label(x=int(width*0.75), y=int(height*0.60),
                                x_units='screen', y_units='screen',
                                x_offset=15, y_offset=-5,
                                text=f'{annotation}', text_font_size='10pt',
                                text_color='dimgray', text_alpha=0.8,
                                background_fill_color='white', background_fill_alpha=1.0,
                                border_line_color='dimgray', border_line_alpha=0.5)
        fig.add_layout(label)
        
    # Add tooltips
    fig.add_tools(bkhmodels.HoverTool(tooltips=[
        ('Interval', 'from @left{0.1f} to @right{0.1f} GB'),
        ('Frequency', '@top{0,} quanta'),
    ], renderers=[quad, ], mode='mouse'))
       
    # Hide toolbar
    fig.toolbar.autohide = True

    return fig

In [322]:
def describe_column(df, column) -> Tuple[float, float, float, float]:
    df_description = df.with_columns(
        [
            pl.col(column).min().alias('min'),
            pl.col(column).max().alias('max'),
            pl.col(column).mean().alias('mean'),
            pl.col(column).std().alias('std'),
        ]
    ).select(['min', 'max', 'mean', 'std'])
    return df_description.row(0)


memory_distribution_figures = []
bins = 40
for task in sorted([taskToCompare, taskToCompare + '_opti']):
    # Collect the histogram data for this task
    task_df = df_all.filter(
        (pl.col('task') == task) & pl.col('memory').is_not_null()
    ).select(pl.col('memory'))

    # hack till we rework how everything is handled
    if len(task_df) == 0:
        task = task[:-5] # remove _opti
        task_df = df_all_opti.filter(
        (pl.col('task') == task) & pl.col('memory').is_not_null()
    ).select(pl.col('memory'))
        task = task + "_opti"
            
    # Compute descriptive statistics
    min_value, max_value, mean_value, std_value = describe_column(task_df, 'memory')
    
    annotation = f' N: {len(task_df):,} \n min: {min_value:.2f} \n mean: {mean_value:.2f} \n std: {std_value:.2f} \n max: {max_value:.2f} '
    
    # Plot the histogram
    fig = make_figure_rss_histogram_per_task(task_df.get_column('memory'), task, annotation=annotation)
    bkh.show(fig)
    
    # Export this figure
    filename = f'memory-distribution-{task}'
    memory_distribution_figures.append(filename)
    save_figure(fig, output_dir=output_dir, filename=filename, title=f'DP0.2 Memory distribution by pipetask {task}')
    

# Publish the report

In [69]:
def publish(results_dir, publication_top_dir):
    """Publish this notebook results under top_dir
    """
    # Create the publication directories and remove .png and .html files
    # that may exist in them
    publication_dir = os.path.join(publication_top_dir, 'pipetasks')    
    os.makedirs(publication_dir, exist_ok=True)
    
    dst_images_dir = os.path.join(publication_dir, 'images')
    os.makedirs(dst_images_dir, exist_ok=True)
    for f in glob.glob(os.path.join(dst_images_dir, '*.png')):
        os.remove(f)
    
    dst_html_dir = os.path.join(publication_dir, 'html')
    os.makedirs(dst_html_dir, exist_ok=True)
    for f in glob.glob(os.path.join(dst_html_dir, '*.html')):
        os.remove(f)
    
    # Publish PNG files
    src_image_dir = os.path.join(results_dir, 'images')
    for src_file in glob.glob(os.path.join(src_image_dir, '*.png')):
        dst_file = os.path.join(publication_dir, 'images', os.path.basename(src_file))
        shutil.copy(src_file, dst_file)
    
    # Publish HTML files
    src_html_dir = os.path.join(results_dir, 'html')
    for src_file in glob.glob(os.path.join(src_html_dir, '*.html')):
        dst_file = os.path.join(publication_dir, 'html', os.path.basename(src_file))
        shutil.copy(src_file, dst_file)
        
    # Publish main HTML file
    src_index_html = os.path.join(results_dir, f'optimization-{taskToCompare}.html')
    dst_index_html = os.path.join(publication_dir,  os.path.basename(src_index_html))
    shutil.copy(src_index_html, dst_index_html)

# Render the HTML template
import os
import jinja2

environment = jinja2.Environment(loader=jinja2.FileSystemLoader('./templates'))
template = environment.get_template('comparing_interactive-vs-batch_run-template.html')

pipetasks_html = os.path.join(output_dir, f'optimization-{taskToCompare}.html')
with open(pipetasks_html, mode="w", encoding="utf-8") as f:
    context = {
        'taskToCompare': taskToCompare,
        'summary': summary,
        'explanation_kind': explanation_kind,
        'figures': memory_distribution_figures,
    }
    f.write(template.render(context))

publication_dir='/sps/lsst/users/abernard/web/comparaison_interactive-vs-batch-run'

publish(results_dir=output_dir, publication_top_dir=publication_dir)