In [None]:
import os
import re
from io import BytesIO

import ipywidgets as widgets
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import statsmodels.api as sm
from IPython.display import clear_output, display
from scipy.stats import linregress
from sklearn.utils import resample

%matplotlib widget

## Limit-of-Detection
This notebook calculates area based limits-of-detection (LOD) for laser ablation inductively coupled mass spectrometry data.  
In the process it clips negative data to 0 for better estimation of normality.  
  
Two CSV files are required:  
1. A file containing pixels of exported ROIs (from the image_reconstruction notebook). ROIs represent the measured standards ranging from blank to varying concentrations.
2. A 'amounts file' containing the absolut amounts of the individual standard element per standard measurement.

**Amounts file explanation**  
Standard elements must be given in the canonical abbreviated nomenclature i.e. **Pt** (=Platinum), **Fe** (=iron), **Co** (=Cobalt) etc.  
The file should contain 2 columns. The first holds the abbreviated element title, the second is a comma-separated list of the amounts (fg) of the element per standard measurement.  
A '**general**' row can be added which will apply to any standard elements that could not be matched to a measured isotope.
  
Below is an example of the format:    
<table>
  <tr>
    <td>Pt</td>
    <td>0.0, 2.47, 4.87, 9.41, 23.21, 46.36, 92.69</td>
  </tr>
  <tr>
    <td>Fe</td>
    <td>0.0, 3.68, 6.49, 15.63, 27.38, 36.53, 45.36</td>
  </tr>
  <tr>
    <td>Co</td>
    <td>0.0, 3.68, 6.49, 15.63, 27.38, 36.53, 45.36</td>
  </tr>
  <tr>
    <td>general</td>
    <td>0.0, 3.55, 6.41, 15.58, 27.23, 36.33, 45.24</td>
  </tr>
</table>
  

Set your output directory in the input field of the next cell.

In [None]:
output_dir = widgets.Text(
    placeholder="Path...",
    description="Output directory:",
    style={"description_width": "initial"},
    layout=widgets.Layout(width="40%"),
)
display(output_dir)

In [None]:
standards_dict = {}

# widgets
output = widgets.Output()

standard_upload = widgets.FileUpload(
    accept='.csv',
    multiple=False,
    description='Upload standard CSV',
    layout=widgets.Layout(width='auto'),
)
display(standard_upload, output)

def on_upload(change):
    global standards_dict
    standards_dict.clear()

    # check
    if standard_upload.value:
        uploaded_file = standard_upload.value[0]
        standard_df = pd.read_csv(BytesIO(uploaded_file.content))
        standard_name = standard_upload.value[0].name[0:-4]  # remove .csv extension
        
        # split by ROI
        for roi in standard_df['ROI'].unique():
            sub = standard_df[standard_df['ROI'] == roi].drop(columns=['ROI']).reset_index(drop=True)
            key = f"{standard_name}_ROI_{roi}"
            standards_dict[key] = sub

        # Feedback
        with output:
            display(standard_df)
            print("standards_dict keys:")
            for k in standards_dict:
                print(f" {k}: {standards_dict[k].shape[0]}x{standards_dict[k].shape[1]}")

# Wire the callback
standard_upload.observe(on_upload, names='value')

## Element selection

**Note:**  
The list below shows all the measured **isotopes**. However, most standards are not isotopically pure, one is effectively selecting **elements** for LOD calculation.  
The same calibration will be applied to all isotopes of any selected element.  
Additionally, negative values will be clipped to 0 in the cell below the selection.

In [None]:
blk_select = widgets.RadioButtons(
    options=standards_dict.keys(),
    description="Blank ROI",
    layout={"width": "max-content"},
)

columns_select = widgets.SelectMultiple(
    options=list(standards_dict.values())[0],
    description="Isotopes",
    rows=20,
    layout={"width": "auto"},
)

display(widgets.HBox([columns_select, blk_select]))

In [None]:
reduced_standards = {}

cleaned_columns = [col.strip() for col in columns_select.value]

for key, df in standards_dict.items():
    reduced = df[cleaned_columns].copy()
    reduced = reduced.clip(lower=0)
    reduced_standards[f"{key}_reduced"] = reduced

## Plot a raw histogram for the selected isotope

In [None]:
# widgets
standard_select = widgets.Dropdown(
    options=list(reduced_standards.keys()),
    value=list(reduced_standards.keys())[0],
    description="Select Standard",
    layout={"width": "max-content"},
    style={"description_width": "initial"},
)

histogram_select = widgets.RadioButtons(
    options=[],
    description="Histogram channel",
    style={"description_width": "initial"},
)

plot_output = widgets.Output()


# callbacks
def initialise_plot(df):
    cols = list(df.columns)
    histogram_select.options = cols
    histogram_select.value = cols[0]
    draw_histogram(df, cols[0])


def draw_histogram(df, col):
    with plot_output:
        clear_output(wait=True)
        plt.figure()
        plt.hist(df[col].dropna(), bins=50)
        plt.title(f"{col} distribution")
        plt.xlabel(col)
        plt.ylabel("Count")
        plt.show()


def on_standard_change(change):
    df = reduced_standards[change["new"]]
    initialise_plot(df)


def on_channel_change(change):
    df = reduced_standards[standard_select.value]
    draw_histogram(df, change["new"])


# observers
standard_select.observe(on_standard_change, names="value")
histogram_select.observe(on_channel_change, names="value")

display(standard_select, histogram_select, plot_output)

initialise_plot(reduced_standards[standard_select.value])

## Outlier filtering

The following section performs outlier filtering either based on set percentiles or on Z-scores (standard scores).

Percentiles method: Outliers are determined by the percentile of the data. Everything below the 'Lower percentile' and above the 'Upper percentile' is considered an outlier.

Z-score method: A z-score represents how many standard deviations a data point is from the mean. This function uses z-scores indirectly by calculating the threshold as mean + n_std * standard deviation. Data points above this threshold are considered outliers. By setting the 'Number of std' you can specify how strict or lenient you want the outlier determination to be. A smaller value of n_std would be stricter, catching more outliers, whereas a larger value would be more lenient. A value of 3 would retain to 99.7% of the data.

**NOTE**: Z-scores are especially useful if your data is normally distributed. Percentiles are more data-driven and can be applied without assumptions about the data distribution.

In [None]:
def remove_outliers_percentiles(column, upper_quant, lower_quant):
    q_low = column.quantile(lower_quant)
    q_hi = column.quantile(upper_quant)
    non_outliers = column[(column >= q_low) & (column <= q_hi)].reset_index(drop=True)
    outliers = column[(column < q_low) | (column > q_hi)].reset_index(drop=True)
    print(f"Column: {column.name}, Lower Quantile: {q_low}, Upper Quantile: {q_hi}")
    print(f"Non-Outliers: {len(non_outliers)}, Outliers: {len(outliers)}")
    return non_outliers, outliers


def remove_outliers_zscore(column, n_std):
    mean = column.mean()
    sd = column.std()
    non_outliers = column[(column <= mean + (n_std * sd))].reset_index(drop=True)
    outliers = column[(column > mean + (n_std * sd))].reset_index(drop=True)
    print(f"Non-Outliers: {len(non_outliers)}, Outliers: {len(outliers)}")
    return non_outliers, outliers

In [None]:
blk_clean = blk_select.value + "_reduced"
blk_columns = reduced_standards[blk_clean]
    
    
isotope_select = widgets.SelectMultiple(
    options=list(blk_columns.columns),
    description="Isotopes:",
    rows=10,
    layout={"width": "auto"},
)
outlier_method = widgets.RadioButtons(
    options=["Percentiles", "Z-Score"],
    description="Outlier Method:",
    style={"description_width": "initial"},
)
n_std_select = widgets.FloatText(
    value=3.0,
    min=0.0,
    step=0.1,
    description="Number of standard deviations:",
    style={"description_width": "initial"},
)
upper_quant_select = widgets.FloatText(
    value=0.997,
    min=0.000,
    step=0.001,
    description="Upper percentile limit:",
    style={"description_width": "initial"},
)
lower_quant_select = widgets.FloatText(
    value=0.000,
    min=0.000,
    step=0.001,
    description="Lower percentile limit:",
    style={"description_width": "initial"},
)

row1 = widgets.HBox([isotope_select, outlier_method])
row2 = widgets.HBox([n_std_select, upper_quant_select, lower_quant_select])

layout = widgets.VBox([row1, row2])
display(layout)

In [None]:
# Prepare empty DataFrames
outliers_df = pd.DataFrame()
filtered_df = pd.DataFrame()

# Choose removal function
if outlier_method.value == "Percentiles":
    remover = remove_outliers_percentiles
    params = (float(upper_quant_select.value), float(lower_quant_select.value))
    method_name = "percentiles"
else:
    remover = remove_outliers_zscore
    params = (int(n_std_select.value),)
    method_name = "z-score"

# Process each column
for column in isotope_select.value:
    print(f"Processing '{column}' using {method_name}:")
    series = blk_columns[column]
    
    print("Original data summary:")
    print(series.describe(), "\n")
    
    filtered_col, outliers_col = remover(series, *params)
    filtered_df[column] = filtered_col
    outliers_df[column] = outliers_col
    
    print(f"Removed {len(outliers_col)} outlier(s).")
    print("Filtered data summary:")
    print(filtered_col.describe(), "\n")
    print("Outliers data summary:")
    print(outliers_col.describe(), "\n")

# Merge filtered results
merged_df = blk_columns.copy()
merged_df[filtered_df.columns] = filtered_df

# Store result
reduced_standards[blk_clean] = merged_df

### Display histrogram of filtered data
Requires outlier filtering.

In [None]:
plot_output = widgets.Output()

# Display
display(widgets.VBox([histogram_select, plot_output]))
initialise_plot(filtered_df)

### Amounts file upload
This cell requires a CSV file upload. The file should contain the ground truth of the amounts of the individual standard element per standard measurement.  
Standard elements must be given in the canonical abbreviated nomenclature i.e. **Pt** (=Platinum), **Fe** (=iron), **Co** (=Cobalt) etc.  
The file should contain 2 columns. The first holds the abbreviated element title, the second is a comma-separated list of the amount of the element per standard measurement.  
A '**general**' row can be added which will apply to any standard elements that could not be matched to a measured isotope.
  
Below is an example of the format:    
<table>
  <tr>
    <td>Pt</td>
    <td>0.0, 2.47, 4.87, 9.41, 23.21, 46.36, 92.69</td>
  </tr>
  <tr>
    <td>Fe</td>
    <td>0.0, 3.68, 6.49, 15.63, 27.38, 36.53, 45.36</td>
  </tr>
  <tr>
    <td>Co</td>
    <td>0.0, 3.68, 6.49, 15.63, 27.38, 36.53, 45.36</td>
  </tr>
  <tr>
    <td>general</td>
    <td>0.0, 3.55, 6.41, 15.58, 27.23, 36.33, 45.24</td>
  </tr>
</table>


In [None]:
output_widget = widgets.Output()
upload_button = widgets.FileUpload(
    accept=".csv",
    multiple=False,
    description="Upload amounts CSV",
    layout=widgets.Layout(width="auto"),
)


def on_csv_upload(change):
    with output_widget:
        clear_output(wait=True)
        if not upload_button.value:
            return
        # read CSV
        uploaded_file = upload_button.value[0]
        content = uploaded_file["content"]
        df = pd.read_csv(BytesIO(content), header=None)
        print("CSV head:")
        print(df.head(), "\n")

        # parse x_values
        global element_x_values
        element_x_values = {}
        element_x_values = {
            element: np.fromstring(vals, sep=",") for element, vals in df.values
        }
        if "general" not in element_x_values:
            print(
                "Warning: 'general' element not found. Unmatched elements will cause errors."
            )

        print("Parsed element_x_values successfully!")


upload_button.observe(on_csv_upload, names="value")
display(upload_button, output_widget)

###  LOD calculations
The following cell computes limits of detection (LOD) based on the underlying distrubution of sumed area (**pixels**) of the blank.  
Depending on whether the data is gaussian or compound poisson distributed a different formula will be applied for LOD calculation.
- Gaussian: 3*sigma
- Compound poisson: approximated by poisson 3.29*sigma + 2.71

The decission is made based on a heuristic threshold (**normality_threshold**).  
For a more in-depth explanation please refere to Föls et al. 2025 (in preperation).  
Bootstrapping parameters can be adjusted in the cell below.
- **"iterations"** should be between 1000 - 10000
- **"pixels"** should reflect your cell size in pixels


In [None]:
iterations = 1000
pixels = 200
normality_threshold = 10

In [None]:
os.makedirs(output_dir.value, exist_ok=True)
log_path = os.path.join(output_dir.value, "bootstrap_log.log")

with open(log_path, "w") as log:

    def get_x_vals(col, x_vals_dict, log):
        # Extract element symbol from strings and return corresponding x_values
        m = re.search(r"(\d{1,3})([A-Z][a-z]?)", col)
        if m:
            symbol = m.group(2)
            if symbol in x_vals_dict:
                return x_vals_dict[symbol]
            else:
                log.write(f"No element amounts entry for '{symbol}'; ")
        else:
            log.write(f"No isotope+element match in '{col}'; ")

        # fallback to general if present
        general = x_vals_dict.get("general")
        if general is not None:
            log.write("using 'general'.\n")
            return general

        # no match at all
        log.write("and no 'general' defined; skipping.\n")
        return None

    # create output subfolder
    folder = f"{iterations}_iter_{pixels}px_sums"
    fp = os.path.join(output_dir.value, folder)
    os.makedirs(fp, exist_ok=True)
    log.write(f"\n=== {folder} ===\n")

    # Determine blank
    try:
        blank_key = blk_select.value + "_reduced"
        if blank_key not in reduced_standards:
            raise KeyError
    except Exception:
        log.write("blk_select undefined or invalid; using first ROI as blank.\n")
        blank_key = next(iter(reduced_standards))

    for col in cleaned_columns:
        log.write(f"\nColumn '{col}' — {iterations}x{pixels}:\n")

        # X‐values transform
        x_vals = get_x_vals(col, element_x_values, log)
        if x_vals is None:
            log.write(f"  Skipping calculation for '{col}' (no amounts).\n")
            continue

        n_standards = len(reduced_standards)
        n_xvals = len(x_vals)
        if n_xvals != n_standards:
            log.write(
                f"  Warning for '{col}': "
                f"{n_xvals} amounts vs {n_standards} standards; "
                "truncating to the shorter length.\n"
            )

        transformed = [
            (x / len(df)) * pixels for x, df in zip(x_vals, reduced_standards.values())
        ]
        log.write(f"  Original amounts: {x_vals}\n")
        log.write(f"  Transformed: {transformed}\n")

        # bootstrap sums
        sums_dict = {}
        for name, df in reduced_standards.items():
            series = df[col].dropna()
            m = re.match(r"(.+)_ROI_(.+)_reduced$", name)
            if not m:
                raise ValueError(f"Key format unexpected: '{name}'")
            roi_label = m.group(2)

            if name == blank_key:
                label = f"{col}_blk_sums"
            else:
                label = f"{col}_ROI_{roi_label}_sums"

            boot_sums = [
                resample(series, replace=True, n_samples=pixels).sum()
                for _ in range(iterations)
            ]
            sums_dict[label] = boot_sums
            log.write(f"  {label}: {len(boot_sums)} samples\n")

        combined = pd.DataFrame(sums_dict).dropna()

        # statistics & normality
        means = combined.mean()
        stds = combined.std()
        blk_label = f"{col}_blk_sums"
        if blk_label not in means:
            raise KeyError(f"Blank column '{blk_label}' missing")

        normality = (
            "normal"
            if means[blk_label] >= normality_threshold
            else "not normal/poisson"
        )
        log.write(f"  Normality ({blk_label}): {normality}\n")

        # regression
        slope, intercept, r_val, _, _ = linregress(transformed, means)
        r2 = r_val**2
        log.write(
            f"  Regression: slope={slope:.3f}, "
            f"intercept={intercept:.3f}, R²={r2:.3f}\n"
        )

        # LOD calculation
        if normality == "normal":  # gaussian
            lod_cts = stds[blk_label] * 3
        else:  # poisson
            lod_cts = stds[blk_label] * 3.29 + 2.71
        lod_fg = lod_cts / slope
        log.write(f"  LOD cts: {lod_cts:.3f}, LOD fg: {lod_fg:.3f}\n")

        # save results
        result = pd.concat(
            [
                combined,
                pd.DataFrame({f"{col}_normality": [normality]}),
                means.rename(lambda c: f"{c}_mean").to_frame().T,
                stds.rename(lambda c: f"{c}_std").to_frame().T,
                pd.DataFrame(
                    {"Slope": [slope], "Intercept": [intercept], "R_squared": [r2]}
                ),
                pd.DataFrame({"LOD_cts": [lod_cts], "LOD_fg": [lod_fg]}),
            ],
            axis=1,
        )

        fname = f"{col}_{iterations}iter_{pixels}px_results.csv"
        fname = re.sub(r"[\\/]", "_", fname)
        result.to_csv(os.path.join(fp, fname), index=False)
        log.write(f"  Saved CSV: {fname}\n")

print(f"Bootstrapping complete;\n log and results at: {log_path} and {fp}")

### Histogram and QQ-plot visualisation

In [None]:
# Base directory for results
aggregated_base = output_dir.value

# Widgets
folder_select = widgets.Dropdown(
    options=[f for f in os.listdir(aggregated_base)
             if os.path.isdir(os.path.join(aggregated_base, f))],
    description="Results Folder:",
    style={"description_width": "initial"},
)
file_select = widgets.Dropdown(description="Files:")
column_select = widgets.Dropdown(description="Columns:")
plot_type_select = widgets.ToggleButtons(
    options=["Histogram", "QQ-Plot"],
    description="Plot Type:",
)
plot_output = widgets.Output()
save_name = widgets.Text(value="plot", description="Save as:")
save_button = widgets.Button(description="Save Plot", icon="save")

current_fig = None

def refresh_files(change):
    folder = folder_select.value
    path = os.path.join(aggregated_base, folder)
    files = [f for f in os.listdir(path) if f.endswith("_results.csv")]
    file_select.options = files
    file_select.value = files[0] if files else None

def refresh_columns(change):
    folder = folder_select.value
    fname = file_select.value
    if not fname:
        column_select.options = []
        return
    df = pd.read_csv(os.path.join(aggregated_base, folder, fname))
    cols = list(df.columns)
    column_select.options = cols
    column_select.value = cols[0] if cols else None

def update_plot(change):
    global current_fig
    with plot_output:
        clear_output(wait=True)
        folder = folder_select.value
        fname = file_select.value
        col = column_select.value
        if not (folder and fname and col):
            return
        df = pd.read_csv(os.path.join(aggregated_base, folder, fname))
        
        # plot histogram or QQ-Plot
        if plot_type_select.value == "Histogram":
            fig, ax = plt.subplots(figsize=(8,5), dpi=100)
            sns.histplot(df[col], kde=True, ax=ax)
            ax.set_title(f"Histogram of {col}")
            ax.set_xlabel(col)
            ax.set_ylabel("Frequency")
        else:
            fig, ax = plt.subplots(figsize=(8,5), dpi=100)
            clean = df[col].replace([np.inf, -np.inf], np.nan).dropna()
            sm.qqplot(clean, line="45", ax=ax, fit=True)
            ax.set_title(f"QQ-Plot of {col}")
            ax.set_xlabel("Theoretical Quantiles")
            ax.set_ylabel("Sample Quantiles")
        
        plt.tight_layout()
        plt.show()
        current_fig = fig

def save_plot(btn):
    if current_fig:
        fname = save_name.value.strip() or "plot"
        outp = os.path.join(aggregated_base, folder_select.value, f"{fname}.png")
        current_fig.savefig(outp, dpi=300)
        with plot_output:
            print(f"Saved plot to {outp}")

# Observers
folder_select.observe(refresh_files, names="value")
file_select.observe(refresh_columns, names="value")
for w in (file_select, column_select, plot_type_select):
    w.observe(update_plot, names="value")
save_button.on_click(save_plot)

# Display
controls = widgets.VBox([folder_select, file_select, column_select, plot_type_select])
saver = widgets.HBox([save_name, save_button])
display(controls, plot_output, saver)

# Initialize
if folder_select.options:
    folder_select.value = folder_select.options[0]
    refresh_files(None)
    refresh_columns(None)
    update_plot(None)