In [None]:
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
import ultraplot as uplt
from scipy import stats
from scipy import constants as c
from scipy.interpolate import make_splrep
import warnings

This notebook is to perform experiments with synthetic data for linear mixture model fitting.

# Model Definition

In [None]:
## Define the linear mixing spectral model
## Nonlinear optimization is performed with respect to a simple L^2 loss

class SpectralMixtureModel():
    # Goal of the model is to infer
    # 1) n_fire of T_i (temperatures)
    # 2) n_fire of p_i (fire area fractions)
    # 3) n_bkg of p_j (fire area fractions)

    # There is a need to remove the reflected component of the land surface
    # Either from direct sunlight or indirect illumination by the sky

    def __init__(
        self,
        n_fire,
        n_bkg,
        bkg_spectra_lis, # List of spectra of len n_bkg
        # All spectra should be spectral radiances following lambd
        SI=True, # Toggle bkg_spectra units from SI to AVIRIS standard
    ):
        # Design this class to be able to hold arbitrary number of endmembers
        # But for actual purposes, have only one to two end members
        self.n_fire = n_fire
        self.n_bkg = n_bkg
        
        if len(bkg_spectra_lis) != n_bkg:
            raise ValueError("n_bkg must be the same as the length of bkg_spectra_lis!")

        if SI:
            self.bkg_spectra_lis = bkg_spectra_lis
            bkg_spectra_lis_mod = bkg_spectra_lis
        else:
            print('Converting bkg_spectra to SI...')
            bkg_spectra_lis_mod = [(wv, spectra*1e7) for (wv, spectra) in bkg_spectra_lis]
            self.bkg_spectra_lis = bkg_spectra_lis_mod

        # Create spline functions for the spectra
        bkg_spectra_splines = []
        for lambd, spectra in bkg_spectra_lis_mod:
            spline = make_splrep(
                lambd,
                spectra,
                k=1,
                s=0
            )
            bkg_spectra_splines.append(spline)

        self.bkg_spectra_splines = bkg_spectra_splines
    
    def get_fire_spectra(self, lambd, T_tup):
        result_list = list()
        for T in T_tup:
            spectra = _planck(T, lambd)
            result_list.append(spectra)
        return result_list
    
    def get_bkg_spectra(self, lambd):
        result_list = list()
        for spline in self.bkg_spectra_splines:
            result_list.append(spline(lambd))
        return result_list

    def total_radiance(self, lambd, T_tup, T_fracs, bkg_fracs):
        # noise is given as an absolute radiance value for 1 std
        fire_spectra = self.get_fire_spectra(lambd, T_tup)
        bkg_spectra = self.get_bkg_spectra(lambd)

        result = np.zeros_like(lambd)
        for frac, spectra in zip(T_fracs, fire_spectra):
            result += frac * spectra

        for frac, spectra in zip(bkg_fracs, bkg_spectra):
            result += frac * spectra
        
        # Unit conversion
        # Output is in SI (W per m per m^2 per sr)
        # Want to convert to uW per nm per cm^2 per sr for consistency with AVIRIS
        # Multiply by 1e6 / (1e9 * 1e4) -> divide by 1e7
        return result*1e-7

def _planck(T, lambd):
    # Convert lambd from nanometres to metres
    lambd_ = lambd * 1e-9
    top = 2 * c.h * c.c**2
    bottom = lambd_**5 * (np.exp((c.h * c.c)/(lambd_ * c.k * T)) - 1)
    return top/bottom

In [None]:
# Synthesize a test spectrum 

lambd = np.linspace(400, 2500, 200)
# Create a thermal background at300K
bkg = _planck(300, lambd)

test_spectra = SpectralMixtureModel(
    n_fire=2,
    n_bkg=1,
    bkg_spectra_lis=[(lambd, bkg)]
)

In [None]:
T_tup = (600, 1200) # Smoldering areas + active burning
T_fracs = (0.2, 0.1) # Fire fraction
bkg_fracs = (0.7,) # background fraction

# Synthesize the spectra
analytic = test_spectra.total_radiance(lambd, T_tup, T_fracs, bkg_fracs)
sim_obs = analytic + 0.05 * analytic * np.random.randn(len(lambd))

In [None]:
fig, ax = uplt.subplots()
ax.plot(lambd, bkg * 1e-7)
ax.plot(lambd, 0.2*_planck(600, lambd)*1e-7)
ax.plot(lambd, 0.1*_planck(1200, lambd)*1e-7)
ax.plot(lambd, analytic)
#ax.plot(lambd, sim_obs)

In [None]:
# Model inversion
# Define the loss function

def return_loss(model, lambd, target):
    n_fire = model.n_fire
    n_bkg = model.n_bkg

    # The parameter vector is an ndarray of shape (2*n_fire + n_bkg-1,)
    # Arguments are organized in the following order
    # [T_i, p_i_fire, p*_j_bkg]
    # Note that in order to satisfy the p_i_fire + p_j_bkg = 1 constraint,
    # The last p_j_bkg parameter is omitted and calculated from 1 - all

    def loss(params):
        # Unpact the parameters and normalize them appropriately
        T_tup = tuple(params[:n_fire]*1000) # 1000 K scale
        T_fracs = tuple(params[n_fire:2*n_fire])
        # Enforcing land fraction constraint
        if len(params) >= 2*n_fire:
            bkg_fracs = tuple(params[2*n_fire:]) + (1 - np.sum(params[n_fire:]),)
        else:
            bkg_fracs = (1 - np.sum(params[n_fire:]),)

        prediction = model.total_radiance(lambd, T_tup, T_fracs, bkg_fracs)
        diff = target - prediction
        return np.sum(diff**2) # L2 norm

    return loss

In [None]:
loss_func = return_loss(test_spectra, lambd, analytic)

In [None]:
from scipy.optimize import minimize

In [None]:
result = minimize(
    fun = loss_func,
    x0 = np.array([500, 1000, 0.3, 0.3]),
    bounds = [(0,None)]*2 + [(0, 1)]*2, # Bounds from land fractions
    method = 'L-BFGS-B'
)

In [None]:
result

In [None]:
# Retrieve the parameters from the result
def retrieve_params(result, model):
    n_fire = model.n_fire
    n_bkg = model.n_bkg
    params = result.x
    T_tup = params[:n_fire] * 1000
    T_frac = params[n_fire:2*n_fire]
    if len(params) >= 2*n_fire:
        bkg_fracs = tuple(params[2*n_fire:]) + (1 - np.sum(params[n_fire:]),)
    else:
        bkg_fracs = (1 - np.sum(params[n_fire:]),)
    print("The fire temperatures in K are: ", T_tup)
    print("The fire fractions are: ", T_frac)
    print("The background fractions are: ", bkg_fracs)
    return T_tup, T_frac, bkg_fracs


In [None]:
T_tup_r, T_frac_r, bkg_frac_r = retrieve_params(result, test_spectra)

In [None]:
# parameter retrieval with the noisy data
loss_func = return_loss(test_spectra, lambd, sim_obs)
result = minimize(
    fun = loss_func,
    x0 = np.array([500, 1000, 0.3, 0.3]),
    bounds = [(0,None)]*2 + [(0, 1)]*2, # Bounds from land fractions
    method = 'L-BFGS-B'
)
retrieve_params(result, test_spectra)

# Tests with empirical background radiance

In [None]:
from mixture_model import estimate_params

We extract empirical background radiances from the AVIRIS dataset

In [None]:
pal_rad_path = 'datasets\palisades_fire\AV320250111t210400_005_L1B_RDN_3f4aef90_RDN.nc'
pal_mask_path = 'datasets\palisades_fire\AV320250111t210400_005_L1B_RDN_3f4aef90_BANDMASK.nc'
pal_ds = xr.open_datatree(pal_rad_path)
pal_ds

In [None]:
samples_coords = np.arange(1234)
lines_coords = np.arange(1280)
# Assign dummy coordinates
pal_radiance = pal_ds.radiance.radiance.assign_coords({'samples':samples_coords, 'lines':lines_coords})
pal_radiance

In [None]:
# Generate an RGB image
def normalize(band):
    band_min = band.min()
    band_max = band.max()
    return (band - band_min) / (band_max - band_min)

red_ = pal_radiance.sel(wavelength=700, method='nearest')
green_ = pal_radiance.sel(wavelength=550, method='nearest')
blue_ = pal_radiance.sel(wavelength=400, method='nearest')

red = normalize(red_)
green = normalize(green_) 
blue = normalize(blue_)
rgb_image = np.dstack((red.values, green.values, blue.values))
fig, ax = uplt.subplots(refwidth=5)
ax.imshow(rgb_image)
ax.plot(800, 800, marker='o', c='r', s=5)
ax.plot(1090, 550, marker='o', c='b', s=5)

In [None]:
# read the output from the 6s simulation for the wavelength masks
sixs_ds = xr.open_dataset('datasets/sixs_output.nc')
sixs_ds

In [None]:
wv_mask = sixs_ds.mask

In [None]:
# Sample two non-burning pixels
bkg_1_da = pal_radiance.sel(samples=800, lines=800) # Non-snowy pixel
bkg_2_da = pal_radiance.sel(samples=1090, lines=550) # Snowy pixel

In [None]:
# Plot the test spectra
fig, ax = uplt.subplots(refwidth=5, refaspect=(2,1))
ax.plot(bkg_1_da.where(~wv_mask), label='bkg_1')
ax.plot(bkg_2_da.where(~wv_mask), label='bkg_2')
ax.legend()

In [None]:
masked_lambds = bkg_1_da.where(~wv_mask).dropna(dim='wavelength').wavelength
bkg_1 = bkg_1_da.where(~wv_mask).dropna(dim='wavelength').values
bkg_2 = bkg_2_da.where(~wv_mask).dropna(dim='wavelength').values

In [None]:
test_model = SpectralMixtureModel(
    n_fire=2,
    n_bkg=2,
    bkg_spectra_lis=[(masked_lambds, bkg_1), (masked_lambds, bkg_2)],
    SI=False,
)

In [None]:
# Simulate pixel with small burning area
T_i = (400, 800) # Burn scar and 
fire_fracs = (0.1, 0.01)
bkg_fracs = (0.15, 0.7)
sim_radiance = test_model.total_radiance(masked_lambds, T_i, fire_fracs, bkg_fracs)
noisy_radiance = sim_radiance + 0.05 * sim_radiance * np.random.randn(len(masked_lambds))

In [None]:
# Plot the simulated radiance
fig, ax = uplt.subplots()
ax.plot(masked_lambds, sim_radiance)
ax.plot(masked_lambds, noisy_radiance)
ax.plot(masked_lambds, 0.15 * bkg_1)
ax.plot(masked_lambds, 0.7 * bkg_2)