# Implementation of
# *HYPERSPECTRAL IMAGE UNMIXING USING AUTOENCODER CASCADE*

Published in 

R. Guo, W. Wang and H. Qi, "Hyperspectral image unmixing using autoencoder cascade," 2015 7th Workshop on Hyperspectral Image and Signal Processing: Evolution in Remote Sensing (WHISPERS), 2015, pp. 1-4, doi: 10.1109/WHISPERS.2015.8075378.

In [None]:
import tensorflow as tf
from tensorflow.keras import initializers, constraints, layers, activations, regularizers
from tensorflow.python.ops import math_ops
from tensorflow.python.keras import backend as K
from tensorflow.python.framework import tensor_shape
from unmixing import HSI, plotEndmembers, PlotWhileTraining, vca
from unmixing import plotEndmembersAndGT, plotAbundancesSimple, load_HSI
from scipy import io as sio
import os
import numpy as np
import warnings
warnings.filterwarnings("ignore")

### Use CPU

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

## Class SumToOne

In [None]:
class SumToOne(tf.keras.layers.Layer):
    def __init__(self, params, **kwargs):
        super(SumToOne, self).__init__(**kwargs)
        self.params = params
    def call(self, x):
        x = K.abs(x) / (K.sum(K.abs(x), axis=-1, keepdims=True) + K.epsilon())
        return x

## Class mDA_initializer
Initializer that provides the weights for marginalized denoising autoencoder hidden layer

In [None]:
class mDA_initializer(tf.keras.initializers.Initializer):
    def __init__(self, data, p):
        self.W = self.mDA(data.T, p)

    def mDA(self, X, p):
        X = np.vstack((X, np.ones((1, X.shape[1]))))
        d = X.shape[0]
        q = np.vstack((np.ones((d - 1, 1)) * (1 - p), np.ones(1)))
        S = np.matmul(X, X.T)
        Q = S * np.matmul(q, q.T)
        row, col = np.diag_indices_from(Q)
        Q[row, col] = np.multiply(q.T, np.diag(S))
        P = np.multiply(S, np.repeat(q.T, d, 0))
        a = P[0:-1, :]
        b = Q + 1e-5 * np.eye(d)
        W = np.linalg.lstsq(b.T, a.T, rcond=None)[0].T
        return W.astype(np.float32)

    def __call__(self, shape, dtype=None):
        return tf.constant(value=self.W)

    def get_config(self):  # To support serialization
        return {"W": self.W}

## Class NonNeg
Kernel regularizer to keep weights nonegative.  


In [None]:
class NonNeg(regularizers.Regularizer):
    def __init__(self, strength):
        super(NonNeg,self).__init__()
        self.strength = strength

    def __call__(self, x):
        neg = tf.abs(tf.cast(x < 0, x.dtype) * x)
        reg = self.strength * tf.reduce_sum(neg)
        return reg

## Class DenseTied
This is a dense layer that is tied to another layer and its weights are the transpose of the tied to layer

In [None]:
class DenseTied(tf.keras.layers.Layer):
    def __init__(
        self,
        units,
        activation=None,
        use_bias=False,
        kernel_initializer="glorot_uniform",
        bias_initializer="zeros",
        kernel_regularizer=None,
        bias_regularizer=None,
        activity_regularizer=None,
        kernel_constraint=None,
        bias_constraint=None,
        tied_to=None,
        **kwargs
    ):
        self.tied_to = tied_to
        if "input_shape" not in kwargs and "input_dim" in kwargs:
            kwargs["input_shape"] = (kwargs.pop("input_dim"),)
        super().__init__(**kwargs)
        self.units = units
        self.activation = tf.keras.activations.get(activation)
        self.use_bias = use_bias
        self.kernel_initializer = tf.keras.initializers.get(kernel_initializer)
        self.bias_initializer = tf.keras.initializers.get(bias_initializer)
        self.kernel_regularizer = tf.keras.regularizers.get(kernel_regularizer)
        self.bias_regularizer = tf.keras.regularizers.get(bias_regularizer)
        self.activity_regularizer = tf.keras.regularizers.get(activity_regularizer)
        self.kernel_constraint = tf.keras.constraints.get(kernel_constraint)
        self.bias_constraint = tf.keras.constraints.get(bias_constraint)
        self.input_spec = tf.keras.layers.InputSpec(min_ndim=2)
        self.supports_masking = True

    def build(self, input_shape):
        assert len(input_shape) >= 2
        input_dim = input_shape[-1]

        if self.tied_to is not None:
            self.kernel = K.transpose(self.tied_to.kernel)
            self.non_trainable_weights.append(self.kernel)
        else:
            self.kernel = self.add_weight(
                shape=(input_dim, self.units),
                initializer=self.kernel_initializer,
                name="kernel",
                regularizer=self.kernel_regularizer,
                constraint=self.kernel_constraint,
            )
        if self.use_bias:
            self.bias = self.add_weight(
                shape=(self.units,),
                initializer=self.bias_initializer,
                name="bias",
                regularizer=self.bias_regularizer,
                constraint=self.bias_constraint,
            )
        else:
            self.bias = None
        self.input_spec = tf.keras.layers.InputSpec(min_ndim=2, axes={-1: input_dim})
        self.built = True

    def compute_output_shape(self, input_shape):
        assert input_shape and len(input_shape) >= 2
        output_shape = list(input_shape)
        output_shape[-1] = self.units
        return tuple(output_shape)

    def call(self, inputs):
        output = K.dot(inputs, self.kernel)
        if self.use_bias:
            output = K.bias_add(output, self.bias, data_format="channels_last")
        if self.activation is not None:
            output = self.activation(output)
        return output

## Class AugmentedLogistic(Layer)
Custom layer that implements the augmented logistic activation given by
$$\frac{1}{1+e^{a_ix_i-b_i}}$$

In [None]:
class AugmentedLogistic(tf.keras.layers.Layer):
    def __init__(self):
        super(AugmentedLogistic, self).__init__()
        
    def build(self, input_shape):
        assert len(input_shape) >= 2
        input_dim = input_shape[-1]
        self.a = self.add_weight(shape=(input_dim,),
                                 initializer="ones",
                                 trainable=True)
        self.b = self.add_weight(shape=(input_dim,),
                                 initializer="zeros",
                                 trainable=True)
    def call(self, x):
        y = self.a*x + self.b
        return tf.nn.sigmoid(y)

## Class Autoencoder
Wrapper class for the autoencoder model and associcated utility functions

In [None]:
class Autoencoder(object):
    def __init__(self, params):
        self.data = params["data"].array()
        self.params = params
        self.mDA_layer = layers.Dense(
            units=self.params["n_bands"] + 1,
            activation="linear",
            use_bias=False,
            kernel_initializer=mDA_initializer(self.data, self.params["p"]),
        )
        self.unmix_layer = layers.Dense(
            units=self.params["num_endmembers"],
            activation="linear",
            kernel_regularizer=NonNeg(10),
            name='unmix',
            use_bias=False
        )
        self.output_layer = DenseTied(
            units=self.params["n_bands"],
            kernel_constraint=None,
            activation="linear",
            tied_to=self.unmix_layer,
        )
        self.asc_layer = SumToOne(self.params, name = 'abundances')
        self.model = self.create_model()
        init = vca(data.T,params['num_endmembers'])[0]
        self.model.get_layer(name='unmix').set_weights([init])
        self.model.compile(optimizer=self.params["optimizer"], loss=self.params["loss"])

    def create_model(self):
        
        input_features = layers.Input(shape=(self.params["n_bands"],))
        code = self.mDA_layer(input_features)
        code = layers.Lambda(lambda x: x[:, :-1])(code)
        code = self.unmix_layer(code)
        code = AugmentedLogistic()(code)
        
        abunds = self.asc_layer(code)
        output = self.output_layer(abunds)

        return tf.keras.Model(inputs=input_features, outputs=output)

    def fit(self,data,n):
        plot_callback = PlotWhileTraining(n,self.params['data'])
        return self.model.fit(
            x=data,
            y=data,
            batch_size=self.params["batch_size"],
            epochs=self.params["epochs"],
            callbacks=[plot_callback]
        )
    
    def train_alternating(self,data,epochs):
        for epoch in range(epochs):
            self.fix_decoder()
            self.model.fit(x=data, y=data,
                batch_size=self.params["batch_size"],
                epochs=2)
            self.fix_encoder()
            self.model.fit(x=data, y=data,
                batch_size=self.params["batch_size"],
                epochs=1)


    def get_endmembers(self):
        return self.model.layers[len(self.model.layers) - 1].get_weights()[0]

    def get_abundances(self):
        intermediate_layer_model = tf.keras.Model(
            inputs=self.model.input, outputs=self.model.get_layer("abundances").output
        )
        abundances = intermediate_layer_model.predict(self.data)
        abundances = np.reshape(abundances,[self.params['data'].cols,self.params['data'].rows,self.params['num_endmembers']])
        
        return abundances

## Set Hyperparameters and load data

In [None]:
#Dictonary of aliases for datasets. The first string is the key and second is value (name of matfile without .mat suffix)
#Useful when looping over datasets
datasetnames = {
        "Urban": "Urban4",
}
dataset = "Urban"

hsi = load_HSI(
    "./Datasets/" + datasetnames[dataset] + ".mat"
)
data = hsi.array()


# Hyperparameters
num_endmembers = 4
num_spectra = 90000
batch_size = 64
learning_rate = 0.001
epochs = 10
loss = "mse"
opt = tf.optimizers.RMSprop(learning_rate=learning_rate,momentum=0.9)


# Hyperparameter dictionary
params = {
    "num_endmembers": num_endmembers,
    "batch_size": batch_size,
    "num_spectra": num_spectra,
    "data": hsi,
    "epochs": epochs,
    "n_bands": hsi.bands,
    "GT": hsi.gt,
    "lr": learning_rate,
    "p": 0.01,
    "optimizer": opt,
    "loss": loss,
}

plot_every = 0 #Plot endmembers and abundance maps every x epochs. Set to 0 when running experiments. 

training_data = data[np.random.randint(0, data.shape[0], num_spectra), :]


## Train Autoencoder

In [None]:
autoencoder = Autoencoder(params)
autoencoder.fit(training_data,plot_every)
endmembers = autoencoder.get_endmembers()
print(endmembers.shape)
plotEndmembersAndGT(endmembers, hsi.gt)

## Run experiment 

In [None]:
num_runs = 25
results_folder = './Results'
method_name = 'mDAE'

for dataset in ['Urban']:
    save_folder = results_folder+'/'+method_name+'/'+dataset
    if not os.path.exists(save_folder):
        os.makedirs(save_folder)

    hsi = load_HSI(
        "./Datasets/" + datasetnames[dataset] + ".mat"
    )
    data=hsi.array()
    for run in range(1,num_runs+1):
        training_data = data[np.random.randint(0, data.shape[0], num_spectra), :]
        save_name = dataset+'_run'+str(run)+'.mat'
        save_path = save_folder+'/'+save_name
        autoencoder = Autoencoder(params)
        autoencoder.fit(training_data,plot_every)
        endmembers = autoencoder.get_endmembers()
        abundances = autoencoder.get_abundances()
        plotEndmembersAndGT(endmembers, hsi.gt)
        plotAbundancesSimple(abundances,'abund.png')
        sio.savemat(save_path,{'M':endmembers,'A':abundances})
    