<a href="https://colab.research.google.com/github/OpenXRF/lead-screening/blob/main/Figure3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

OpenXRF_logo_red.svg

# **Towards low-cost lead screening with transmission XRF**
---

This notebook reproduces the results in **Figure 3** of our paper:


>  [C. Gaßner, J. Reisewitz, J. E. Forsyth, K. Shaker, "Towards low-cost lead screening with transmission XRF" arXiv:2511.09110 (2025)](https://doi.org/10.48550/arXiv.2511.09110)


More information can be found here:

*   Project website: [openxrf.org](https://openxrf.org/)
*   GitHub repository: [github.com/OpenXRF/lead-screening](https://github.com/OpenXRF/lead-screening/)


---


In [2]:
#@title Install packages { display-mode: "form" }
# @markdown Python packages for data processing and visualization

import os
import io
import zipfile
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from time import time
from scipy.optimize import curve_fit

print("All packages installed successfully!")

All packages installed successfully!


# **Figure 3:** Showing LOD threasholds


---

* Callculating SNR for 10, 50, 100, 200, 500, 1000 ppm Pb in soil
* 2h max exposure for all ppm values
* Data is provided in Github

-> Script computes the visualized data and displays the plot itself in the last cell...


In [5]:
#@title 1) Load Data
# @markdown Load simulated data from Github

start_time = time()

!rm -rf sim_data
!mkdir -p sim_data
!wget -O sim_data/Figure3-4.zip "https://github.com/OpenXRF/lead-screening/raw/main/data/Figure3-4.zip"
!unzip -d sim_data sim_data/Figure3-4.zip

#zip_url = "https://github.com/OpenXRF/lead-screening/raw/main/data/Figure3-4.zip"

target_dir = "sim_data"

#os.makedirs(target_dir, exist_ok=True)

#print("Downloading ZIP with simulated data...")
#resp = requests.get(zip_url)
#resp.raise_for_status()

#with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
#    zf.extractall(target_dir)

print("ZIP downloaded and extracted into:", target_dir)

def read_dataset(base_conc, base_dir=target_dir):
    """
    Reads the 7 data csv files for one ppm concentration
    Output: {'S0': df, ..., 'S6': df}
    """
    ds = {}
    base_dir = os.path.join(base_dir, "Figure 3-4")
    for source_number in range(7):
        filename = f"data_{base_conc}ppm_S{source_number}.csv"
        file_path = os.path.join(base_dir, filename)

        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")

        df = pd.read_csv(
            file_path,
            usecols=["EventID", "Particle", "Type", "Energy(MeV)", "x(mm)", "y(mm)"])
        ds[f"S{source_number}"] = df

    return ds



def add_energy_resolution_and_radius(ds_dict, FWHM_keV, detector_radius_mm, rng):
    """
    Adds 'E' (Resolution) and 'r_hit'.
    """
    for k, df in ds_dict.items():
        E_keV = df["Energy(MeV)"] * 1000.0
        sigma = FWHM_keV / 2.355
        E_keV_res = E_keV + rng.normal(0, sigma, size=len(df))
        df["E"] = E_keV_res

        r_hit = np.sqrt(df["x(mm)"]**2 + df["y(mm)"]**2)
        df["r_hit"] = r_hit

    return ds_dict


rng = np.random.default_rng(0)

print("Reading simulated data from extracted ZIP...")

c_vals = np.array([10, 50, 100, 200, 500, 1000])  # ppm

data10ppm   = read_dataset(10)
print('Done 10ppm')
data50ppm   = read_dataset(50)
print('Done 50ppm')
data100ppm  = read_dataset(100)
print('Done 100ppm')
data200ppm  = read_dataset(200)
print('Done 200ppm')
data500ppm  = read_dataset(500)
print('Done 500ppm')
data1000ppm = read_dataset(1000)
print('Done 1000ppm')

FWHM_keV = 0.15
print(f"Adding realistic energy resolution: {FWHM_keV:.2f} keV...")

detector_area   = 50.0  # mm^2
detector_radius = np.sqrt(detector_area / np.pi)  # mm

datasets = [
    data10ppm,
    data50ppm,
    data100ppm,
    data200ppm,
    data500ppm,
    data1000ppm,
]

for i in range(len(datasets)):
    datasets[i] = add_energy_resolution_and_radius(
        datasets[i],
        FWHM_keV=FWHM_keV,
        detector_radius_mm=detector_radius,
        rng=rng
    )

(data10ppm,
  data50ppm,
  data100ppm,
  data200ppm,
  data500ppm,
  data1000ppm) = datasets

print(f"Data loaded successfully (took {time() - start_time:.1f} s)")

--2025-11-16 16:55:46--  https://github.com/OpenXRF/lead-screening/raw/main/data/Figure3-4.zip
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://media.githubusercontent.com/media/OpenXRF/lead-screening/main/data/Figure3-4.zip [following]
--2025-11-16 16:55:46--  https://media.githubusercontent.com/media/OpenXRF/lead-screening/main/data/Figure3-4.zip
Resolving media.githubusercontent.com (media.githubusercontent.com)... 185.199.111.133, 185.199.110.133, 185.199.108.133, ...
Connecting to media.githubusercontent.com (media.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1871461965 (1.7G) [application/zip]
Saving to: ‘sim_data/Figure3-4.zip’


2025-11-16 16:56:08 (81.3 MB/s) - ‘sim_data/Figure3-4.zip’ saved [1871461965/1871461965]

Archive:  sim_data/Figure3-4.zip
   creating: sim_data/Fig

In [6]:
#@title 2) Analyse experimental parameters and visualize results { display-mode: "form" }

def snr_model(t, A):
    # SNR = A * sqrt(t)
    return A * np.sqrt(t)


def lod_model(t, A1, A2):
    # c_LOD(t) = (A1 + sqrt(A1^2 + A2*t)) / t
    return (A1 + np.sqrt(A1**2 + A2 * t)) / t

# ------------------------------------------------------------
# Peak-Parameter
# ------------------------------------------------------------
peak = {
    "alpha": {"E": 10.55},  # keV
    "beta": {"E": 12.65},
    "window": FWHM_keV
}

# ------------------------------------------------------------
# Time values
# ------------------------------------------------------------
fluence = 995 * np.pi * 2 * (1 - np.cos(np.pi / 4))  # ph/s per source
step_size_min = 5  # min
max_event = 180 * 60 * fluence
step_size = step_size_min * 60 * fluence
num_steps = int(np.floor(max_event / step_size))
time_values = np.arange(1, num_steps + 1) * step_size_min  # in min

# ------------------------------------------------------------
# Histogramm-Setup
# ------------------------------------------------------------
T = np.array([10, 30, 60])  # min
T_steps = (T / step_size_min).astype(int)

binwidth = 0.03  # keV
E = np.arange(0, 75 + binwidth, binwidth)


# Container for SNR-Data
SNR = [dict() for _ in range(len(c_vals))]
saved_histograms = {}

print("\n=== Calculating SNR over time ===")

for conc_idx, conc in enumerate(c_vals):
    print(f"Processing {conc} ppm...")

    signal_over_time = np.zeros(num_steps)
    background_over_time = np.zeros(num_steps)
    signal_over_time_S0 = np.zeros(num_steps)
    background_over_time_S0 = np.zeros(num_steps)

    SNR[conc_idx]["alpha_individual"] = np.zeros((7, num_steps))
    SNR[conc_idx]["beta_individual"] = np.zeros((7, num_steps))
    SNR[conc_idx]["total_individual"] = np.zeros((7, num_steps))

    for step in range(1, num_steps + 1):
        end_event = step * step_size
        current_time = step * step_size_min
        print(f" - {current_time} min...")

        total_alpha_secondary = 0
        total_alpha_primary = 0
        total_beta_secondary = 0
        total_beta_primary = 0

        is_save_point = step in T_steps
        if is_save_point:
            combined_primary_hist = np.zeros_like(E)
            combined_secondary_hist = np.zeros_like(E)

        # all 7 Sources
        for s in range(7):
            field = f"S{s}"
            data = datasets[conc_idx][field]

            mask = (
                (data["EventID"] < end_event) &
                (data["Particle"] == "gamma") &
                (data["r_hit"] <= detector_radius)
            )
            df_f = data[mask]

            primary = df_f[df_f["Type"] == "Primary"]
            secondary = df_f[df_f["Type"] == "Secondary"]

            primary_hist, _ = np.histogram(primary["E"], bins=np.append(E, E[-1] + binwidth))
            secondary_hist, _ = np.histogram(secondary["E"], bins=np.append(E, E[-1] + binwidth))

            if is_save_point:
                combined_primary_hist += primary_hist
                combined_secondary_hist += secondary_hist

            # Windowing
            alpha_window = (E >= peak["alpha"]["E"] - peak["window"]) & (E <= peak["alpha"]["E"] + peak["window"])
            beta_window = (E >= peak["beta"]["E"] - peak["window"]) & (E <= peak["beta"]["E"] + peak["window"])

            alpha_primary_s = primary_hist[alpha_window].sum()
            alpha_secondary_s = secondary_hist[alpha_window].sum()
            beta_primary_s = primary_hist[beta_window].sum()
            beta_secondary_s = secondary_hist[beta_window].sum()

            total_secondary_s = alpha_secondary_s + beta_secondary_s
            total_primary_s = alpha_primary_s + beta_primary_s

            # SNR pro Source
            if (alpha_secondary_s + alpha_primary_s) > 0:
                SNR[conc_idx]["alpha_individual"][s, step - 1] = (
                    alpha_secondary_s / np.sqrt(alpha_secondary_s + alpha_primary_s)
                )
            if (beta_secondary_s + beta_primary_s) > 0:
                SNR[conc_idx]["beta_individual"][s, step - 1] = (
                    beta_secondary_s / np.sqrt(beta_secondary_s + beta_primary_s)
                )
            if (total_secondary_s + total_primary_s) > 0:
                SNR[conc_idx]["total_individual"][s, step - 1] = (
                    total_secondary_s / np.sqrt(total_secondary_s + total_primary_s)
                )

            # S0 individual
            if s == 0:
                signal_over_time_S0[step - 1] += total_secondary_s
                background_over_time_S0[step - 1] += total_primary_s

            # Summ over all Sources
            total_alpha_primary += alpha_primary_s
            total_alpha_secondary += alpha_secondary_s
            total_beta_primary += beta_primary_s
            total_beta_secondary += beta_secondary_s

        # Save Histo
        if is_save_point:
            T_idx = np.where(T_steps == step)[0][0] + 1
            field_name = f"c{conc}_T{T_idx}"
            saved_histograms[field_name] = {
                "primary": combined_primary_hist,
                "secondary": combined_secondary_hist,
                "E": E,
                "time_min": current_time,
            }

        total_secondary = total_alpha_secondary + total_beta_secondary
        total_primary = total_alpha_primary + total_beta_primary
        signal_over_time[step - 1] = total_secondary
        background_over_time[step - 1] = total_primary

        # combined SNR
        if (total_alpha_secondary + total_alpha_primary) > 0:
            alpha_combined = total_alpha_secondary / np.sqrt(total_alpha_secondary + total_alpha_primary)
        else:
            alpha_combined = 0

        if (total_beta_secondary + total_beta_primary) > 0:
            beta_combined = total_beta_secondary / np.sqrt(total_beta_secondary + total_beta_primary)
        else:
            beta_combined = 0

        if (total_secondary + total_primary) > 0:
            total_combined = total_secondary / np.sqrt(total_secondary + total_primary)
        else:
            total_combined = 0

        if step == 1:
            SNR[conc_idx]["alpha_combined"] = np.zeros(num_steps)
            SNR[conc_idx]["beta_combined"] = np.zeros(num_steps)
            SNR[conc_idx]["total_combined"] = np.zeros(num_steps)
        SNR[conc_idx]["alpha_combined"][step - 1] = alpha_combined
        SNR[conc_idx]["beta_combined"][step - 1] = beta_combined
        SNR[conc_idx]["total_combined"][step - 1] = total_combined

    SNR[conc_idx]["signal_over_time"] = signal_over_time
    SNR[conc_idx]["background_over_time"] = background_over_time
    SNR[conc_idx]["signal_over_time_S0"] = signal_over_time_S0
    SNR[conc_idx]["background_over_time_S0"] = background_over_time_S0


=== Calculating SNR over time ===
Processing 10 ppm...
 - 5 min...
 - 10 min...
 - 15 min...
 - 20 min...
 - 25 min...
 - 30 min...
 - 35 min...
 - 40 min...
 - 45 min...
 - 50 min...
 - 55 min...
 - 60 min...
 - 65 min...
 - 70 min...
 - 75 min...
 - 80 min...
 - 85 min...
 - 90 min...
 - 95 min...
 - 100 min...
 - 105 min...
 - 110 min...
 - 115 min...
 - 120 min...
 - 125 min...
 - 130 min...
 - 135 min...
 - 140 min...
 - 145 min...
 - 150 min...
 - 155 min...
 - 160 min...
 - 165 min...
 - 170 min...
 - 175 min...
 - 180 min...
Processing 50 ppm...
 - 5 min...
 - 10 min...
 - 15 min...
 - 20 min...
 - 25 min...
 - 30 min...
 - 35 min...
 - 40 min...
 - 45 min...
 - 50 min...
 - 55 min...
 - 60 min...
 - 65 min...
 - 70 min...
 - 75 min...
 - 80 min...
 - 85 min...
 - 90 min...
 - 95 min...
 - 100 min...
 - 105 min...
 - 110 min...
 - 115 min...
 - 120 min...
 - 125 min...
 - 130 min...
 - 135 min...
 - 140 min...
 - 145 min...
 - 150 min...
 - 155 min...
 - 160 min...
 - 165 min.

In [36]:
#@title 3) Plot LOD and SRN curve { display-mode: "form" }

colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
if len(colors) < len(c_vals):
    colors = colors * (len(c_vals) // len(colors) + 1)

fig_pl = make_subplots(
    rows=1,
    cols=2,
    subplot_titles=(
        "SNR as a function of time with 7× Am-241",
        "Limit of detection"
    ),
    horizontal_spacing=0.15
)

xLim = (0, 120)
yLim = (0, 35)

for idx, conc in enumerate(c_vals):
    t_data = time_values
    snr_data = SNR[idx]["total_combined"]

    fig_pl.add_trace(
        go.Scatter(
            x=t_data,
            y=snr_data,
            mode="markers",
            marker=dict(size=6, color=colors[idx]),
            name=f"{conc} ppm (data)",
            showlegend=True
        ),
        row=1, col=1
    )

    popt, _ = curve_fit(snr_model, t_data, snr_data, p0=[1.0], maxfev=10000)
    A_fitted = popt[0]

    t_fit = np.linspace(0, time_values.max(), 1000)
    snr_fit = snr_model(t_fit, A_fitted)

    fig_pl.add_trace(
        go.Scatter(
            x=t_fit,
            y=snr_fit,
            mode="lines",
            line=dict(color=colors[idx]),
            name=f"{conc} ppm fit: {A_fitted:.2f}·√t",
            showlegend=True
        ),
        row=1, col=1
    )

for tt in T:
    fig_pl.add_vline(
        x=tt,
        line=dict(color="black", dash="dash"),
        row=1, col=1
    )

fig_pl.update_xaxes(
    title_text="Time (min)",
    range=list(xLim),
    row=1, col=1
)
fig_pl.update_yaxes(
    title_text="SNR",
    range=list(yLim),
    row=1, col=1
)

xLim_lod = (0, 120)
yLim_lod = (0, 1200)

# ---------- 7× Am-241 ----------
A_values = np.zeros(len(c_vals))
t_LOD = np.zeros(len(c_vals))
sigma_t = np.zeros(len(c_vals))

def lin(x, A):
    return A * x

for idx, conc in enumerate(c_vals):
    t_data = time_values
    snr_data = SNR[idx]["total_combined"]
    signal_data = SNR[idx]["signal_over_time"]
    background_data = SNR[idx]["background_over_time"]

    # Fit SNR
    popt_snr, _ = curve_fit(snr_model, t_data, snr_data, p0=[1.0], maxfev=10000)
    A_values[idx] = popt_snr[0]
    t_LOD[idx] = 9.0 / (A_values[idx] ** 2)

    s_fit, _ = curve_fit(lin, t_data, signal_data, p0=[1.0])
    b_fit, _ = curve_fit(lin, t_data, background_data, p0=[1.0])

    s = s_fit[0]
    b = b_fit[0]
    t = t_LOD[idx]
    sigma_t[idx] = 9 * np.sqrt(((s + 2*b)**2 / s**5) * (1/t) + (b / s**4) * (1/t))

fig_pl.add_trace(
    go.Scatter(
        x=t_LOD,
        y=c_vals,
        mode="markers",
        marker=dict(color="red", size=8),
        error_x=dict(
            type="data",
            array=sigma_t,
            visible=True
        ),
        name="7× Am-241"
    ),
    row=1, col=2
)


popt_lod, _ = curve_fit(lod_model, t_LOD, c_vals, p0=[10, 10])
t_fit_lod = np.linspace(t_LOD.min() * 0.5, t_LOD.max() * 2, 200)
c_fit_lod = lod_model(t_fit_lod, *popt_lod)

fig_pl.add_trace(
    go.Scatter(
        x=t_fit_lod,
        y=c_fit_lod,
        mode="lines",
        line=dict(color="red"),
        name=f"Fit 7×: A1={popt_lod[0]:.0f}, A2={popt_lod[1]:.0f}"
    ),
    row=1, col=2
)

# ---------- 1× Am-241 (S0 only) ----------
A_values_S0 = np.zeros(len(c_vals))
t_LOD_S0 = np.zeros(len(c_vals))
sigma_t_S0 = np.zeros(len(c_vals))

for idx, conc in enumerate(c_vals):
    t_data = time_values
    snr_data_S0 = SNR[idx]["total_individual"][0, :]
    signal_data_S0 = SNR[idx]["signal_over_time_S0"]
    background_data_S0 = SNR[idx]["background_over_time_S0"]

    popt_s0, _ = curve_fit(snr_model, t_data, snr_data_S0, p0=[0.5], maxfev=10000)
    A_values_S0[idx] = popt_s0[0]
    t_LOD_S0[idx] = 9.0 / (A_values_S0[idx] ** 2)

    s_fit_S0, _ = curve_fit(lin, t_data, signal_data_S0, p0=[1.0])
    b_fit_S0, _ = curve_fit(lin, t_data, background_data_S0, p0=[1.0])

    s0 = s_fit_S0[0]
    b0 = b_fit_S0[0]
    t0 = t_LOD_S0[idx]
    sigma_t_S0[idx] = 9 * np.sqrt(((s0 + 2*b0)**2 / s0**5) * (1/t0) + (b0 / s0**4) * (1/t0))

fig_pl.add_trace(
    go.Scatter(
        x=t_LOD_S0,
        y=c_vals,
        mode="markers",
        marker=dict(color="blue", size=8),
        error_x=dict(
            type="data",
            array=sigma_t_S0,
            visible=True
        ),
        name="1× Am-241"
    ),
    row=1, col=2
)

popt_lod_S0, _ = curve_fit(lod_model, t_LOD_S0, c_vals, p0=[10, 10])
c_fit_lod_S0 = lod_model(t_fit_lod, *popt_lod_S0)

fig_pl.add_trace(
    go.Scatter(
        x=t_fit_lod,
        y=c_fit_lod_S0,
        mode="lines",
        line=dict(color="blue"),
        name=f"Fit 1×: A1={popt_lod_S0[0]:.0f}, A2={popt_lod_S0[1]:.0f}"
    ),
    row=1, col=2
)

fig_pl.update_xaxes(
    title_text="Time (min)",
    range=list(xLim_lod),
    row=1, col=2
)
fig_pl.update_yaxes(
    title_text="Concentration (ppm)",
    range=list(yLim_lod),
    tickvals=list(np.arange(0, 1201, 100)),
    row=1, col=2
)

fig_pl.update_layout(
    width=900,
    height=450,
    title="SNR and LOD using 1 or 7 Am-241 sources",
    title_x=0.5,
    font=dict(size=10),
    legend=dict(
        x=1.02,
        y=1,
        xanchor="left",
        yanchor="top",
        font=dict(size=10)
    ),
    margin=dict(t=80, l=60, r=160, b=60)
)

fig_pl.show()


In [45]:
#@title 3) Plot Histograms { display-mode: "form" }


plot_concs = [500, 200, 100]
xLim_h = (10, 13)
yLim_h = (0, 50)

subplot_titles = []
for conc in plot_concs:
    for T_idx in [1, 2, 3]:
        subplot_titles.append(f"{T[T_idx-1]} min - {conc} ppm")

fig_h = make_subplots(
    rows=3,
    cols=3,
    subplot_titles=subplot_titles,
    horizontal_spacing=0.08,
    vertical_spacing=0.08
)

red_line   = "rgb(201, 24, 31)"
red_fill   = "rgba(201, 24, 31, 0.5)"
blue_line  = "rgb(0, 114, 189)"
blue_fill  = "rgba(0, 114, 189, 0.7)"

for row_idx, conc in enumerate(plot_concs, start=1):
    conc_idx = np.where(c_vals == conc)[0][0]  # Index in c_vals

    for col_idx, T_idx in enumerate([1, 2, 3], start=1):
        key = f"c{conc}_T{T_idx}"
        hdata = saved_histograms[key]
        E_loc = hdata["E"]
        prim = hdata["primary"]
        sec = hdata["secondary"]
        total = prim + sec

        fig_h.add_trace(
            go.Scatter(
                x=E_loc,
                y=total,
                mode="lines",
                line=dict(color=red_line),
                fill="tozeroy",
                fillcolor=red_fill,
                name="Total (primary + secondary)" if (row_idx == 1 and col_idx == 1) else None,
                showlegend=(row_idx == 1 and col_idx == 1),
            ),
            row=row_idx, col=col_idx
        )

        fig_h.add_trace(
            go.Scatter(
                x=E_loc,
                y=prim,
                mode="lines",
                line=dict(color=blue_line),
                fill="tozeroy",
                fillcolor=blue_fill,
                name="Primary" if (row_idx == 1 and col_idx == 1) else None,
                showlegend=(row_idx == 1 and col_idx == 1),
            ),
            row=row_idx, col=col_idx
        )

        snr_val = SNR[conc_idx]["total_combined"][T_steps[T_idx - 1] - 1]

        x_alpha_minus = peak["alpha"]["E"] - peak["window"]
        x_alpha_plus  = peak["alpha"]["E"] + peak["window"]
        x_beta_minus  = peak["beta"]["E"] - peak["window"]
        x_beta_plus   = peak["beta"]["E"] + peak["window"]

        fig_h.add_trace(
            go.Scatter(
                x=[x_alpha_minus, x_alpha_minus],
                y=[0, yLim_h[1]],
                mode="lines",
                line=dict(color="black", dash="dash"),
                showlegend=False
            ),
            row=row_idx, col=col_idx
        )

        for x_line in [x_alpha_plus, x_beta_minus, x_beta_plus]:
            fig_h.add_trace(
                go.Scatter(
                    x=[x_line, x_line],
                    y=[0, yLim_h[1]],
                    mode="lines",
                    line=dict(color="black", dash="dash"),
                    showlegend=False,
                ),
                row=row_idx, col=col_idx
            )

        fig_h.update_xaxes(
            range=list(xLim_h),
            title_text="Energy (keV)" if row_idx == 3 else "",
            row=row_idx,
            col=col_idx
        )
        fig_h.update_yaxes(
            range=list(yLim_h),
            title_text="Detected counts" if col_idx == 1 else "",
            row=row_idx,
            col=col_idx
        )

fig_h.update_layout(
    height=800,
    width=800,
    title="Histograms for different exposure times and concentrations",
    title_x=0.5,
    font=dict(size=10),
    legend=dict(
        x=1.02,
        y=1,
        xanchor="left",
        yanchor="top",
        font=dict(size=8),
    ),
    margin=dict(t=80, l=80, r=160, b=80),
)

fig_h.show()