Attempt at building the unmixing pipeline for easier testing.

In [10]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import loadmat
from abc import ABC, abstractmethod
from sklearn.decomposition import PCA
from math import factorial

In [136]:
HYDICE_PATH = 'C:/Users/Dhruv/OneDrive/Documents/Repositories/AM101/datasets/hydice/HYDICE-urban.mat'
PINES_PATH = 'C:/Users/Dhruv/OneDrive/Documents/Repositories/AM101/datasets/indian_pines/Indian_pines.mat'
PINES_GT_PATH = 'C:/Users/Dhruv/OneDrive/Documents/Repositories/AM101/datasets/indian_pines/Indian_pines_gt.mat'
SALINAS_PATH = 'C:/Users/Dhruv/OneDrive/Documents/Repositories/AM101/datasets/salinas/Salinas.mat'
SALINAS_GT_PATH = 'C:/Users/Dhruv/OneDrive/Documents/Repositories/AM101/datasets/salinas/Salinas_gt.mat'

In [12]:
class HydiceDataLoader:
    def __init__(self, filepath):
        self.filepath = filepath
        self.data_cube = None
        self.ground_truth = None

    def load(self):
        '''
        Returns: 
            - Data Cube: 80 x 100 x 175
            - Ground Truth Map: 80 x 100
        '''
        mat = loadmat(self.filepath)
        self.data_cube = mat['data']
        self.ground_truth = mat['map']
        return self.data_cube, self.ground_truth
    
    def normalize(self):
        if self.data_cube is not None:
            self.data_cube = self.data_cube / np.max(self.data_cube)

In [None]:
class DataLoader:
    def __init__(self, filepath, gt_path):
        self.filepath = filepath
        self.gt_path = gt_path
        self.data_cube = None
        self.ground_truth = None

    def load(self, name):
        '''
        Returns: 
            - Data Cube: H x W x B
            - Ground Truth Map: H x W
        '''
        mat = loadmat(self.filepath)
        self.data_cube = mat[name + '_corrected']
        mat_gt = loadmat(self.gt_path)
        self.ground_truth = mat_gt[name + '_gt']
        return self.data_cube, self.ground_truth
    
    def normalize(self):
        if self.data_cube is not None:
            self.data_cube = self.data_cube / np.max(self.data_cube)

In [113]:
class PinesDataLoader(DataLoader):
    def __init__(self):
        super().__init__(PINES_PATH, PINES_GT_PATH)
    def load(self):
        return super().load('indian_pines')

In [121]:
class SalinasDataLoader(DataLoader):
    def __init__(self):
        super().__init__(SALINAS_PATH, SALINAS_GT_PATH)
    def load(self):
        return super().load('salinas')

In [114]:
class BaseClass:
    def __init__(self):
        self.endmembers = None
    
    def visualize(self):
        for i in range(self.endmembers.shape[1]):
            plt.plot(self.endmembers[:, i], label=f'Endmember {i+1}')
        plt.xlabel('Band')
        plt.ylabel('Reflectance')
        plt.title('Extracted Endmembers')
        plt.legend()
        plt.show()
        
    def gt_endmembers(self, data_cube, ground_truth, num_endmembers):
        H, W, B = data_cube.shape
        data2D = data_cube.reshape(-1, B)
        gt_flat = ground_truth.flatten()

        class_means = np.zeros((B, num_endmembers))
        for i in range(num_endmembers):
            mask = gt_flat == i
            if np.any(mask):
                class_means[:, i] = np.mean(data2D[mask, :], axis=0)
        return class_means
    
    def compute_metrics(self, num_endmembers, class_means):
        SAM = np.zeros((num_endmembers, num_endmembers))
        RMSE = np.zeros((num_endmembers, num_endmembers))

        for i in range(num_endmembers):
            v1 = self.endmembers[:, i]
            for j in range(num_endmembers):
                v2 = class_means[:, j]
                norm1 = np.linalg.norm(v1)
                norm2 = np.linalg.norm(v2)
                if norm1 == 0 or norm2 == 0:
                    SAM[i, j] = np.nan
                else:
                    cosine = np.clip(np.dot(v1, v2) / (norm1 * norm2), -1.0, 1.0)
                    SAM[i, j] = np.arccos(cosine) * (180 / np.pi)

                RMSE[i, j] = np.sqrt(np.mean((v1 - v2) ** 2))

        return SAM, RMSE
    
    def visualize_closest_class_mean_SAM(self, class_means, SAM, wavelengths):
        best_sam = np.argmin(SAM, axis=0)
        plt.figure(figsize=(16, 12))
        for i in range(SAM.shape[0]):
            plt.subplot(4, 4, i+1)
            plt.plot(wavelengths, self.endmembers[:, i], 'r', label='N-FINDR')
            best_class_idx = best_sam[i]
            plt.plot(wavelengths, class_means[:, best_class_idx], 'b--', label='Class Mean')

            plt.title(f'Endmembers: {i+1}')
            plt.xlabel('Wavelength (nm)')
            plt.ylabel('Reflectance')
            plt.legend()
        plt.tight_layout()
        plt.show()

### Volume of a Simplex

If we have n vectors $v_1, v_2, ..., v_n$ in $R^n$, then the volume of the parellelopiped they span is:

$$V = |det([v_1, v_2, ..., v_n])|$$

But the simplex is not the same as this volume. A simplex is a convex hull of $n+1$ points. There is a base vector $v_0$ which is subtracted from all other vectors to give $n$ new vectors. $\frac{1}{n!}$ of the volume of the parallelopiped spanned by these new vectors is the volume of the simplex.

### Number of Endmembers - 1 Dimensional Simplex

Suppose there are $p$ endmembers to be extracted. Then the simplex will be in $p-1$ dimensions. This is why we run the PCA for $p-1$ principal components. 

In [115]:
class NFINDR(BaseClass):
    '''
    Details about the algorithm can be found here [https://in.mathworks.com/help/images/ref/nfindr.html]
    Reference: [1] Winter, Michael E. “N-FINDR: An Algorithm for Fast Autonomous Spectral End-Member Determination in Hyperspectral Data.” 
               Proc. SPIE Imaging Spectrometry V 3753, (October 1999): 
               266–75. https://doi.org/10.1117/12.366289.
    
    '''
    def __init__(self, data_cube, ground_truth, num_endmembers):
        self.max_iter = 5
        self.random_state = 42
        self.n_restarts = 1
        self.num_endmembers = num_endmembers
        self.data_cube = data_cube.reshape(-1, data_cube.shape[2]).T
        self.ground_truth = ground_truth

    def calculate_simplex_volume(self, A):
        mat = A[:, 1:] - A[:, [0]]
        return abs(np.linalg.det(mat)) / factorial(A.shape[0])

    def fit(self):
        rng = np.random.default_rng(self.random_state)
        bands, num_pixels = self.data_cube.shape
        pca = PCA(n_components=self.num_endmembers-1)
        data_cube_reduced = pca.fit_transform(self.data_cube.T).T

        best_volume = -np.inf
        best_indices = None

        for _ in range(self.n_restarts):
            idx = rng.choice(num_pixels, self.num_endmembers, replace=False)
            E = data_cube_reduced[:, idx]
            V = self.calculate_simplex_volume(E)

            for _ in range(self.max_iter):
                for i in range(self.num_endmembers):
                    best_idx_i = idx[i]
                    best_vol_i = V

                    Ei = E.copy()
                    for j in range(num_pixels):
                        if j in idx:
                            continue

                        Ei[:, i] = data_cube_reduced[:, j]
                        V_new = self.calculate_simplex_volume(Ei)
                        if V_new > best_vol_i:
                            best_vol_i = V_new
                            best_idx_i = j

                    if best_idx_i != idx[i]:
                        idx[i] = best_idx_i
                        E[:, i] = data_cube_reduced[:, best_idx_i]
                        V = best_vol_i

            if V > best_volume:
                best_volume = V
                best_indices = idx.copy()
        self.endmembers = self.data_cube[:, best_indices]
        return self.data_cube[:, best_indices]