# Implementation of 
## *ENDNET: SPARSE AUTOENCODER NETWORK FOR ENDMEMBER EXTRACTION AND HYPERSPECTRAL UNMIXING*
S. Ozkan, B. Kaya and G. B. Akar, "EndNet: Sparse AutoEncoder Network for Endmember Extraction and Hyperspectral Unmixing," in IEEE Transactions on Geoscience and Remote Sensing, vol. 57, no. 1, pp. 482-496, Jan. 2019, doi: 10.1109/TGRS.2018.2856929.

### Loss is given by 
$$\mathcal{L} = \frac{\lambda_0}{2}\|\mathbf{x}-\hat{\mathbf{x}}\|_2^2-\lambda_1 D_\text{KL}(1.0||C(\mathbf{x},\hat{\mathbf{x}}))+\lambda_2\|\mathbf{z}\|_1+\lambda_3\|\mathbf{W^{(e)}}\|_2+\lambda_4\|\mathbf{W}^{(d)}\|_2+\lambda_5\|\mathbf{\rho}\|_2$$
where
$$C(x^{(i)},x^{(j)})=1.0-\frac{SAD(x^{(i)},x^{(j)})}{\pi}$$

### Imports

In [None]:
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
from tensorflow.keras import initializers, constraints, layers, activations, regularizers
from tensorflow.python.ops import math_ops
from tensorflow.python.keras import backend as K
from tensorflow.python.framework import tensor_shape
from unmixing import HSI, plotEndmembers,vca
from unmixing import plotEndmembersAndGT, plotAbundancesSimple, load_HSI, PlotWhileTraining
from scipy import io as sio
import os
import numpy as np
from numpy.linalg import inv
import warnings
import matplotlib
import matplotlib.pyplot as plt
warnings.filterwarnings("ignore")
%matplotlib inline

### Use CPU

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

## Method SAD

In [None]:
def SAD(y_true, y_pred):
    A = -tf.keras.losses.cosine_similarity(y_true,y_pred)
    sad = tf.math.acos(A)
    return sad

## Method C
method that implements 
$$C(x^{(i)},x^{(j)})=1.0-\frac{SAD(x^{(i)},x^{(j)})}{\pi}$$

In [None]:
def C(x,y):
    val=1.0-SAD(x,y)/np.pi
    return val

In [None]:
class Cdot(object):
    def __init__(self,vec):
        self.vec=vec
        
    def dot(self, x):
        val=1.0-SAD(self.vec,x)/np.pi
        return val

## Fidelity terms
The fidelity terms of the loss
$$\mathcal{L} = \frac{\lambda_0}{2}\|\mathbf{x}-\hat{\mathbf{x}}\|_2^2-\lambda_1 D_\text{KL}(1.0||C(\mathbf{x},\hat{\mathbf{x}}))$$

In [None]:
class Endnet_loss(object):
    def __init__(self, batch_size,lambda0,lambda1):
        self.lambda0 = lambda0
        self.lambda1 = lambda1
        self.b = batch_size
    def loss(self, y_true, y_pred):
        MSE = tf.keras.losses.mse(y_true,y_pred) 
        c=C(y_true,y_pred)
        KL_Divergence = -tf.math.log(c)
        loss = self.lambda0/2.0*MSE+self.lambda1*KL_Divergence
        return loss

## Class SumToOne
Custom layer that enforces the ASC. Also performs regularizations l1
$$\lambda_2\|\mathbf{z}\|_1$$

In [None]:
class SumToOne(layers.Layer):
    def __init__(self, params, **kwargs):
        super(SumToOne, self).__init__(**kwargs)
        self.params = params
    
    def mask_all_but_top_k(self,X, k):
        n = X.shape[1]
        top_k_indices = tf.math.top_k(X, k).indices
        mask = tf.reduce_sum(tf.one_hot(top_k_indices, n), axis=1)
        return mask * X
    
    def l1_regularization(self,x):
        l1 = regularizers.l1(1.0)(x)
        return self.params['lambda2'] * l1
        
    def call(self, x):
        self.add_loss(self.l1_regularization(x))
        x = self.mask_all_but_top_k(x,2)
        x = tf.abs(x)/(tf.reduce_sum(x, axis=-1, keepdims=True)+K.epsilon())
        return x

## Class MaskedNoise
Adds noise to the layer masked


In [None]:
class MaskedNoise(layers.Layer):
    def __init__(self, params, **kwargs):
        super(MaskedNoise, self).__init__(**kwargs)
        self.std = params['noise']
    
        
    def call(self, x, training=None):
        if training:
            mask = tf.nn.dropout(tf.ones_like(x),0.4)
            noise = layers.GaussianNoise(self.std)(tf.zeros_like(x))
            return x+mask*noise
        else:
            return x
        
        



## Class Spectral_BN
Performs batch normalization with no scaling
$$BN(\bf{h})=\frac{\bf{h}-\bf{\mu}}{\sqrt{\bf{\sigma^2}+\epsilon}}+\bf{\rho}$$

In [None]:
class Spectral_BN(layers.Layer):
    def __init__(self, params, **kwargs):
        super(Spectral_BN, self).__init__(**kwargs)
        self.num_outputs = params['num_endmembers']
        self.params = params
    
    def l2_regularization(self,x):
        l2 = tf.reduce_sum(tf.square(x))
        return self.params['lambda5'] * l2
    
    def build(self, input_shape):
        assert len(input_shape) >= 2
        input_dim = input_shape[-1]
        self.p = self.add_weight(shape=(input_dim,),
                                 initializer="zeros",
                                 trainable=True)
    def call(self, x, training=None):
        if training is not None:
            mu = tf.reduce_mean(x,axis=0)
            sigma = tf.sqrt(tf.math.reduce_variance(x,axis=0)+K.epsilon())
            y = (x-mu)/sigma+self.p
            self.add_loss(self.l2_regularization(self.p))
            return y
        else:
            return x

In [None]:
class SparseReLU(tf.keras.layers.Layer):
    def __init__(self):
        super(SparseReLU, self).__init__()
    def build(self, input_shape):
        self.alpha = self.add_weight(shape=input_shape[1:],initializer=tf.keras.initializers.Zeros(),
        trainable=True, constraint=tf.keras.constraints.non_neg())
        super(SparseReLU, self).build(input_shape)
    def call(self, x):
        return tf.keras.backend.relu(x - self.alpha)

## Class Custom_layer_transform
Performs matrix vector multiplication using custom innner product

In [None]:
class Custom_layer_transform(object):
    def __init__(self,W:tf.Tensor):
        self.W = tf.transpose(W)
    def custom_matvec_prod(self, a:tf.Tensor):
        cdot = Cdot(a)
        return tf.map_fn(cdot.dot,self.W)
    
    def transform(self,Batch:tf.Tensor):
        return tf.map_fn(self.custom_matvec_prod, Batch)
        #return tf.transpose(tf.map_fn(self.custom_matvec_prod, Batch),(0,2,1))
    

## Class SAD_Layer
This is a dense layer that transforms its inputs using a custom matrix vector product that uses normalized SAD as the inner product.

In [None]:
class SAD_Layer(tf.keras.layers.Layer):
    def __init__(
        self,
        units,
        activation=None,
        use_bias=False,
        kernel_initializer="glorot_uniform",
        bias_initializer="zeros",
        kernel_regularizer=None,
        bias_regularizer=None,
        activity_regularizer=None,
        kernel_constraint=None,
        bias_constraint=None,
        **kwargs
    ):
        if "input_shape" not in kwargs and "input_dim" in kwargs:
            kwargs["input_shape"] = (kwargs.pop("input_dim"),)
        super().__init__(**kwargs)
        self.units = units
        self.activation = tf.keras.activations.get(activation)
        self.use_bias = use_bias
        self.kernel_initializer = tf.keras.initializers.get(kernel_initializer)
        self.bias_initializer = tf.keras.initializers.get(bias_initializer)
        self.kernel_regularizer = tf.keras.regularizers.get(kernel_regularizer)
        self.bias_regularizer = tf.keras.regularizers.get(bias_regularizer)
        self.activity_regularizer = tf.keras.regularizers.get(activity_regularizer)
        self.kernel_constraint = tf.keras.constraints.get(kernel_constraint)
        self.bias_constraint = tf.keras.constraints.get(bias_constraint)
        self.input_spec = tf.keras.layers.InputSpec(min_ndim=2)
        self.supports_masking = True

    def build(self, input_shape):
        assert len(input_shape) >= 2
        input_dim = input_shape[-1]

        self.kernel = self.add_weight(
            shape=(input_dim, self.units),
            initializer=self.kernel_initializer,
            name="kernel",
            regularizer=self.kernel_regularizer,
            constraint=self.kernel_constraint,
            )
        if self.use_bias:
                self.bias = self.add_weight(
                    shape=(self.units,),
                    initializer=self.bias_initializer,
                    name="bias",
                    regularizer=self.bias_regularizer,
                    constraint=self.bias_constraint,
                    )
        else:
            self.bias = None
        self.input_spec = tf.keras.layers.InputSpec(min_ndim=2, axes={-1: input_dim})
        self.built = True

    def compute_output_shape(self, input_shape):
        assert input_shape and len(input_shape) >= 2
        output_shape = list(input_shape)
        output_shape[-1] = self.units
        return tuple(output_shape)

    def call(self, inputs):
        print(self.kernel.get_shape())
        print(inputs.get_shape())
        custom_transform = Custom_layer_transform(self.kernel)
        output = custom_transform.transform(inputs)
        if self.use_bias:
            output = K.bias_add(output, self.bias, data_format="channels_last")
        if self.activation is not None:
            output = self.activation(output)
        return output

## Class Autoencoder
Wrapper class for the autoencoder model and associcated utility functions

In [None]:
class Autoencoder(object):
    def __init__(self, params,W=None):
        self.data = params["data"].array()
        self.params = params
        self.masked_noise = MaskedNoise(params)
        self.decoder = layers.Dense(
            units=self.params["n_bands"],
            kernel_regularizer=regularizers.l2(self.params['lambda4']),
            activation='linear',
            name="output",
            kernel_constraint=constraints.non_neg(),
            use_bias=False)
            
        self.hidden = SAD_Layer(
            units=self.params["num_endmembers"],
            activation='linear',
            kernel_regularizer=regularizers.l2(self.params['lambda3']),
            name='hidden',
            use_bias=False
        )
        self.spectral_bn = Spectral_BN(params)# BatchNormalization(scale=False)
        self.sparse_relu = SparseReLU()
        self.asc_layer = SumToOne(self.params, name='abundances')
        self.model = self.create_model()
        self.initalize_encoder_and_decoder(W)
        self.model.compile(optimizer=self.params["optimizer"], loss=self.params["loss"])
    
    def initalize_encoder_and_decoder(self,W):
        if W is None: return
        self.model.get_layer('output').set_weights([W.T])
        self.model.get_layer('hidden').set_weights([W])
        
        
    def create_model(self):
        input_features = layers.Input(shape=(self.params["n_bands"],))
        code = self.masked_noise(input_features)
        code = self.hidden(code)
        code = layers.BatchNormalization(scale=False)(code)
        #code = self.spectral_bn(code)
        code = layers.Dropout(self.params['p'])(code)
        code = tf.keras.activations.relu(code)
        abunds = self.asc_layer(code)
        output = self.decoder(abunds)

        return tf.keras.Model(inputs=input_features, outputs=output)
    
    def fix_decoder(self):
        for l in self.model.layers:
            l.trainable = True
        self.model.layers[-1].trainable = False
        self.decoder.trainable = False
        self.model.compile(optimizer=self.params["optimizer"], loss=self.params["loss"])

    def fix_encoder(self):
        for l in self.model.layers:
            l.trainable = True
        self.model.get_layer('hidden').trainable = False
        self.hidden.trainable = False
        self.model.compile(optimizer=self.params["optimizer"], loss=self.params["loss"])

        
    
    def fit(self,data,n):
        plot_callback = PlotWhileTraining(n,self.params['data'])
        return self.model.fit(
            x=data,
            y=data,
            batch_size=self.params["batch_size"],
            epochs=self.params["epochs"],
            callbacks=[plot_callback]
        )
    
    def train_alternating(self,data,epochs):
        for epoch in range(epochs):
            self.fix_decoder()
            self.model.fit(x=data, y=data,
                batch_size=self.params["batch_size"],
                epochs=2)
            self.fix_encoder()
            self.model.fit(x=data, y=data,
                batch_size=self.params["batch_size"],
                epochs=1)
            if epoch % 3 == 0:
                endmembers = self.get_endmembers()
                abundances = self.get_abundances()
                plotEndmembersAndGT(endmembers,self.params['data'].gt)
                plotAbundancesSimple(abundances,'abunds')
        

    def get_endmembers(self):
        #one_hot = tf.one_hot(tf.range(0,self.params['num_endmembers']),self.params['num_endmembers'])
        return self.model.layers[len(self.model.layers) - 1].get_weights()[0]

    def get_abundances(self):
        intermediate_layer_model = tf.keras.Model(
            inputs=self.model.input, outputs=self.model.get_layer("abundances").output
        )
        abundances = intermediate_layer_model.predict(self.data)
        abundances = np.reshape(abundances,[self.params['data'].cols,self.params['data'].rows,self.params['num_endmembers']])
        
        return abundances

## Set Hyperparameters

In [None]:
#Dictonary of aliases for datasets. The first string is the key and second is value (name of matfile without .mat suffix)
#Useful when looping over datasets
datasetnames = {
        "Urban": "Urban4",
        "Samson": "Samson",
}
dataset = "Urban"

hsi = load_HSI(
    "./Datasets/" + datasetnames[dataset] + ".mat"
)

# Hyperparameters
num_endmembers = 4
num_spectra = 4000
batch_size = 64
learning_rate = 0.001
epochs = 40
lambda0 = 0.01
lambda1 = 50.0
lambda2 = 0.0
lambda3 = 1e-5
lambda4 = 1e-5
lambda5 = 1e-3
p = 0.1
noise_std = 0.3
opt = tf.optimizers.Adam(learning_rate=learning_rate,beta_1=0.7)

# hsi.gt=None
data = hsi.array()
# Hyperparameter dictionary
params = {
    "lambda0":lambda0,
    "lambda1":lambda1,
    "lambda2":lambda2,
    "lambda3":lambda3,
    "lambda4":lambda4,
    "lambda5":lambda5,
    "p":p,
    "num_endmembers": num_endmembers,
    "batch_size": batch_size,
    "num_spectra": num_spectra,
    "data": hsi,
    "epochs": epochs,
    "n_bands": hsi.bands,
    "GT": hsi.gt,
    "lr": learning_rate,
    "optimizer": opt,
    "noise":noise_std,
    "loss":Endnet_loss(batch_size,lambda0,lambda1).loss
}

plot_every = 5 #Plot endmembers and abundance maps every x epochs. Set to 0 when running experiments. 

training_data = data[
    np.random.randint(0, data.shape[0], num_spectra), :
]


## Train Autoencoder

In [None]:
init = vca(data.T,params['num_endmembers'])[0]
autoencoder = Autoencoder(params,init)
autoencoder.fit(training_data,plot_every)
endmembers = autoencoder.get_endmembers()
abundances = autoencoder.get_abundances()
plotEndmembersAndGT(endmembers, hsi.gt)
plotAbundancesSimple(abundances,'abund.png')


## Run experiment 

In [None]:
num_runs = 25
results_folder = '/home/burkni/Hyperspectral/Review_Paper_Results'
method_name = 'Endnet'

for ds in ['Urban']:
    opt = tf.optimizers.Adam(learning_rate=learning_rate,beta_1=0.7)
    results_folder = './Results'
    hsi = load_HSI(
        "./Datasets/" + datasetnames[ds] + ".mat"
    )
  
    data = hsi.array()
    params = {
    "lambda0":lambda0,
    "lambda1":lambda1,
    "lambda2":lambda2,
    "lambda3":lambda3,
    "lambda4":lambda4,
    "lambda5":lambda5,
    "p":p,
    "num_endmembers": num_endmembers,
    "batch_size": batch_size,
    "num_spectra": num_spectra,
    "data": hsi,
    "epochs": epochs,
    "n_bands": hsi.bands,
    "GT": hsi.gt,
    "lr": learning_rate,
    "optimizer": opt,
    "noise":noise_std,
    "loss":Endnet_loss(batch_size,lambda0,lambda1).loss
}
    params['data']=hsi
    save_folder = results_folder+'/'+method_name+'/'+ds
    if not os.path.exists(save_folder):
        os.makedirs(save_folder)
    for run in range(1,num_runs+1):
        training_data = hsi.array()[np.random.randint(0, hsi.array().shape[0], num_spectra), :]
        save_name = datasetnames[ds]+'_run'+str(run)+'.mat'
        save_path = save_folder+'/'+save_name
        init = vca(data.T,params['num_endmembers'])[0]
        autoencoder = Autoencoder(params,init)
        autoencoder.fit(training_data,epochs)
        endmembers = autoencoder.get_endmembers()
        abundances = autoencoder.get_abundances()
        plotEndmembersAndGT(endmembers, hsi.gt)
        plotAbundancesSimple(abundances,'abund.png')
        sio.savemat(save_path,{'M':endmembers,'A':abundances})
    