# PM2.5 estimation from sky images (PPPC)
This notebook script:
 - Extracts two “naturalness” features from sky images:
   - **HS**: spatial entropy of saturation
   - **ES**: wavelet-based (transform) entropy of saturation
 - Merges features with ground PM2.5 measurements
 - Fits reference distributions on “clean air” samples
 - Builds a combined quality score **Q**
 - Fits a logistic mapping to predict PM2.5 and evaluates performance

In [None]:
import os
import re
import glob
import math

import numpy as np
import pandas as pd
import imageio.v3 as iio
import matplotlib.pyplot as plt

import pywt
from scipy.stats import gumbel_r, norm
from scipy.optimize import curve_fit
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

## Helper functions
 Functions for:
 - Parsing datetime from filename
 - Converting RGB → saturation
 - Computing spatial entropy (HS)
 - Computing wavelet-based transform entropy (ES)

In [None]:
def parse_dt_from_name(name):
    m = re.search(r'(\d{8})-(\d{4})', name)
    if not m:
        return None
    ymd, hm = m.group(1), m.group(2)
    return f"{ymd[:4]}-{ymd[4:6]}-{ymd[6:8]} {hm[:2]}:{hm[2:4]}"

In [None]:
def rgb_to_saturation(rgb_uint8):
    rgb = rgb_uint8.astype(np.float32) / 255.0
    R, G, B = rgb[..., 0], rgb[..., 1], rgb[..., 2]

    U = np.maximum.reduce([R, G, B])
    V = np.minimum.reduce([R, G, B])

    S = np.zeros_like(U, dtype=np.float32)
    mask = U > 0
    S[mask] = (U[mask] - V[mask]) / U[mask]

    return S

In [None]:
def spatial_entropy(S, bins=256):
    hist, _ = np.histogram(S, bins=bins, range=(0, 1), density=True)
    p = hist / (hist.sum() + 1e-12)
    p = p[p > 0]
    return float(-np.sum(p * np.log(p)))

In [None]:
def shannon_entropy_coeffs(coeffs, bins=256):
    data = coeffs.ravel()
    std = np.std(data)
    rng = 6 * std if std > 0 else 1.0

    hist, _ = np.histogram(data, bins=bins, range=(-rng, rng), density=True)
    p = hist / (hist.sum() + 1e-12)
    p = p[p > 0]

    return float(-np.sum(p * np.log(p)))


def transform_entropy(S, K=5, psi=4.0):
    coeffs = pywt.wavedec2(S, wavelet='haar', level=K, mode='symmetric')
    ES_scales = []

    for k in range(1, K + 1):
        cH, cV, cD = coeffs[k]

        e_LH = shannon_entropy_coeffs(cH)
        e_HL = shannon_entropy_coeffs(cV)
        e_HH = shannon_entropy_coeffs(cD)

        e_scale = (e_LH + e_HL + psi * e_HH) / (1 + psi)
        ES_scales.append(e_scale)

    return float(np.mean(ES_scales))

## Load images and extract features (HS, ES)
 - Reads all `.jpg`, `.png`, `.jpeg` images from `input/passed`
 - Crops a fixed sky region
 - Computes saturation, HS, and ES per image
 - Builds a dataframe indexed by Datetime

In [None]:
image_folder = "Images"
image_paths = glob.glob(os.path.join(image_folder, "*.jpg")) + \
              glob.glob(os.path.join(image_folder, "*.png")) + \
              glob.glob(os.path.join(image_folder, "*.jpeg"))

records = []

for path in image_paths:
    try:
        fname = os.path.basename(path)
        dt = parse_dt_from_name(fname)
        if dt is None:
            continue

        img = iio.imread(path)
        img = img[30:430, 30:1950, :3]  # sky crop

        S = rgb_to_saturation(img)
        HS = spatial_entropy(S)
        ES = transform_entropy(S)

        records.append({"Datetime": dt, "HS": HS, "ES": ES})

    except Exception as e:
        print(f"Error processing {fname}: {e}")

df_img = pd.DataFrame(records)
df_img["Datetime"] = pd.to_datetime(df_img["Datetime"])

## Load PM2.5 measurements and merge

In [None]:
df_pm = pd.read_csv("Model Input/PM25_MI Marche_ARPA.csv")
df_pm["Datetime"] = pd.to_datetime(df_pm["Datetime"], format="%d/%m/%Y %H:%M")

merged = df_img.merge(df_pm, on="Datetime", how="inner")

## Clean-air subset for “naturalness” fitting
 Uses low-PM samples to estimate reference distributions (the “clean air” baseline).

In [None]:
clean = merged[merged["PM25"] < 12]
HS_clean = clean["HS"].values
ES_clean = clean["ES"].values

## Fit reference distributions (baseline statistics)
 The code fits:
 - One feature with a **Gumbel** distribution (extreme value)
 - The other feature with a **Gaussian** distribution

 Important:
 The mapping (HS→Gumbel vs ES→Gumbel) depends on your assumption.

In [None]:
# Spatial entropy → Extreme-value (Gumbel)
#u, d = gumbel_r.fit(HS_clean)
u, d = gumbel_r.fit(ES_clean)
# Transform entropy → Gaussian
#mu, sigma = norm.fit(ES_clean)
mu, sigma = norm.fit(HS_clean)
print("Gaussian parameters (ES): μ =", mu, ", σ =", sigma)
print("Gumbel parameters (HS): u =", u, ", d =", d)

## Visual sanity check: histogram + fitted PDFs
 Plots feature histograms for the clean subset and overlays the fitted probability density functions.

In [None]:
# Create x-ranges for smooth PDF curves
hs_range = np.linspace(HS_clean.min(), HS_clean.max(), 500)
es_range = np.linspace(ES_clean.min(), ES_clean.max(), 500)

# Evaluate fitted PDFs
#hs_pdf = gumbel_r.pdf(hs_range, loc=u, scale=d)
#es_pdf = norm.pdf(es_range, loc=mu, scale=sigma)

hs_pdf = norm.pdf(hs_range, loc=u, scale=d)
es_pdf = gumbel_r.pdf(es_range, loc=mu, scale=sigma)

# Plot HS histogram + Gumbel fit
plt.figure(figsize=(7, 4))
plt.hist(HS_clean, bins=30, density=True, alpha=0.6, edgecolor="k", label="HS histogram (clean)")
plt.plot(hs_range, hs_pdf, "r-", lw=2, label="Gumbel fit")
plt.xlabel("HS (Spatial Entropy)")
plt.ylabel("Probability Density")
plt.title("Naturalness Statistics of HS (Clean Air)")
plt.legend()
plt.tight_layout()
plt.show()

# Plot ES histogram + Gaussian fit
plt.figure(figsize=(7, 4))
plt.hist(ES_clean, bins=30, density=True, alpha=0.6, edgecolor="k", label="ES histogram (clean)")
plt.plot(es_range, es_pdf, "b-", lw=2, label="Gaussian fit")
plt.xlabel("ES (Transform Entropy)")
plt.ylabel("Probability Density")
plt.title("Naturalness Statistics of ES (Clean Air)")
plt.legend()
plt.tight_layout()
plt.show()

## Build combined quality score Q
 Computes two likelihood-like terms:
 - QV from the Gaussian pdf
 - QG from the Gumbel pdf

 Then combines them as a weighted geometric mean:
 \[
 Q = QV^w \cdot QG^{(1-w)}
 \]
 where `w=0.5` gives equal weighting.

In [None]:
w = 0.5  # equal weighting

merged["QV"] = norm.pdf(merged["HS"], loc=u, scale=d)
merged["QG"] = gumbel_r.pdf(merged["ES"], loc=mu, scale=sigma)

merged["Q"] = (merged["QV"] ** w) * (merged["QG"] ** (1 - w))

## Logistic calibration: Q → PM2.5
 Fits a logistic curve that maps the quality score **Q** to PM2.5.
 This is the regression step producing `PM25_pred`.

In [None]:
def logistic(Q, a1, a2, a3):
    return a1 / (1 + np.exp(a2 - Q / a3))


mask = np.isfinite(merged["Q"]) & np.isfinite(merged["PM25"])
Q_vals = merged.loc[mask, "Q"].values
PM_vals = merged.loc[mask, "PM25"].values

popt, _ = curve_fit(
    logistic,
    Q_vals,
    PM_vals,
    p0=[PM_vals.max(), 1.0, np.median(Q_vals)]
)

merged.loc[mask, "PM25_pred"] = logistic(Q_vals, *popt)

## Evaluation metrics
 Reports:
 - MAE
 - MSE
 - R²

## Diagnostic plot: measured vs predicted
 A 1:1 line is shown for reference.

In [None]:
mae = mean_absolute_error(PM_vals, merged.loc[mask, "PM25_pred"])
mse = mean_squared_error(PM_vals, merged.loc[mask, "PM25_pred"])
r2  = r2_score(PM_vals, merged.loc[mask, "PM25_pred"])

print("MAE:", mae)
print("MSE:", mse)
print("R²:", r2)

plt.figure(figsize=(6, 5))
plt.scatter(PM_vals, merged.loc[mask, "PM25_pred"], alpha=0.6)
plt.plot([0, PM_vals.max()], [0, PM_vals.max()], "k--")
plt.xlabel("Measured PM2.5")
plt.ylabel("Predicted PM2.5")
plt.title("PPPC: Measured vs Predicted PM2.5")
plt.tight_layout()
plt.show()