In [2]:
# Importing the libraries
import tensorflow as tf
import numpy as np
import os, sys
import scipy.io as sio
from math import ceil
import matplotlib.pyplot as plt

In [3]:
# Initializing constants
DATASET_NAME = 'IP'
K = 10
K_NUM = 1

# The ratio of the training dataset over the whole points.
TRAINING_RATIO = 0.25                        
pmask_slope = 5
sample_slope = 10
BS = 30

EPOCHS = 200
LR = 0.001
BATCH_SIZE = 16

In [4]:
# Utils
""" Data preprocessing functions """
def read_data(dataset_name):                        # Loads Dataset
    # path = os.getcwd()
    if dataset_name == 'IP':
        image = sio.loadmat("..\\source_code\\datasets\\IndianPines\\Indian_pines_corrected.mat")["indian_pines_corrected"]
        label = sio.loadmat("..\\source_code\\datasets\\IndianPines\\Indian_pines_gt.mat")["indian_pines_gt"]
    elif dataset_name == 'UP':
        image = sio.loadmat('..\\source_code\\datasets\\PaviaUniversity\\PaviaU.mat')['paviaU']
        label = sio.loadmat('..\\source_code\\datasets\\PaviaUniversity\\PaviaU_gt.mat')['paviaU_gt']
    image = np.float64(image)
    label = np.array(label).astype(float)
    return image, label

def normalize_dataset(data):                        # Dataset Min-Max Normalization
    max_val = np.amax(data, axis=(0,1,2))
    min_val = np.amin(data, axis=(0,1,2))
    data_norm = (data - min_val) / (max_val - min_val)
    return data_norm

# Separates the dataset into training and test sets
# ! Pretty much useless method
def separate_train_test(data, labels, p):   
    '''
    data: HSI image (x, y, z), x and y being pixel coordinates and z being the number of spectral bands
    labels: pixels labels matrix as provided in dataset
    p: proportion (ratio) of training set
    '''
    c = int(labels.max())
    x = np.array([], dtype=float).reshape(-1, data.shape[2])
    xb = []
    x_loc1 = []
    x_loc2 = []
    x_loc = []
    y = np.array([], dtype=float).reshape(-1, data.shape[2])
    yb = []
    y_loc1 = []
    y_loc2 = []
    y_loc = []
    for i in range(1, c+1):
        # label coordinates
        loc1, loc2 = np.where(labels == i)
        # label frequency
        num = len(loc1)                     
        order = np.random.permutation(range(num))
        loc1 = loc1[order]
        loc2 = loc2[order]
        # number of training samples to be selected for each label
        num1 = int(np.round(num*p))
        x = np.vstack([x, data[loc1[:num1], loc2[:num1], :]])
        y = np.vstack([y, data[loc1[num1:], loc2[num1:], :]])
        xb.extend([i]*num1)
        yb.extend([i]*(num-num1))
        x_loc1.extend(loc1[:num1])
        x_loc2.extend(loc2[:num1])
        y_loc1.extend(loc1[num1:])
        y_loc2.extend(loc2[num1:])
        x_loc = np.vstack([x_loc1, x_loc2])
        y_loc = np.vstack([y_loc1, y_loc2])
    return x, xb, x_loc, y, yb, y_loc

def one_hot(lable,class_number):            # One-hot converter
    one_hot_array = np.zeros([len(lable),class_number])
    for i in range(len(lable)):
        one_hot_array[i,int(lable[i]-1)] = 1
    return one_hot_array

def disorder(X,Y,loc):
    index_train = np.arange(X.shape[0])
    np.random.shuffle(index_train)
    X = X[index_train, :]
    Y = Y[index_train, :]
    loc = loc[:,index_train]
    return X,Y,loc

# when w=1, windowFeature performs a train/test split
def windowFeature(data, loc, w):
    size = np.shape(data)
    data_expand = np.zeros((int(size[0]+w-1),int(size[1]+w-1),size[2]))
    newdata = np.zeros((len(loc[0]), w, w,size[2]))
    # for each spectral band
    for j in range(size[2]):    
        data_expand[:,:,j] = np.lib.pad(data[:,:,j], ((int(w / 2), int(w / 2)), (int(w / 2),int(w / 2))), 'symmetric')
        newdata[:,:,:,j] = np.zeros((len(loc[0]), w, w))
        # for each training sample
        for i in range(len(loc[0])):
            loc1 = loc[0][i]
            loc2 = loc[1][i]
            f = data_expand[loc1:loc1 + w, loc2:loc2 + w,j]
            newdata[i, :, :,j] = f
    return newdata

In [94]:
"""
Functions of the probability mask as described in the paper.
Each weight is passed into a sigmoid to convert them to a probability representation.
"""
class ProbMask(tf.keras.layers.Layer):
    def __init__(self, slope, filter_size, **kwargs):
        self.slope = tf.Variable(slope, dtype=tf.float32)
        self.P = filter_size
        
        # ? Paper says V is intialised from a Normal Distribution ?
        w_init = tf.random_uniform_initializer(minval=0, maxval=1)
        # w_init = tf.random_normal_initializer(mean=0, stddev=1)
        self.w = tf.Variable(w_init(shape=(1, filter_size, 1)))
        print('w1', self.w)
        # Inverse Tranform Sampling (https://en.wikipedia.org/wiki/Inverse_transform_sampling)
        self.w = tf.Variable(- tf.math.log(1. / self.w - 1.) / self.slope)
        print('w2', self.w)
        super(ProbMask, self).__init__(**kwargs) 
        
    def build(self, input_shape):
        super(ProbMask, self).build(input_shape)
        
    def call(self, input_tensor):
        weights = self.w
        # return weights
        return tf.sigmoid(self.slope * weights)

    def compute_output_shape(self, input_shape):
        lst = list(input_shape)
        lst[-1] = 1
        return tuple(lst)
    
# (Normalisation layer) rescales the weights given a sparsity level in probability map.
class RescaleProbMask(tf.keras.layers.Layer):
    def __init__(self, sparsity, **kwargs):
        self.alpha = tf.constant(sparsity, dtype=tf.float32)
        super(RescaleProbMask, self).__init__(**kwargs)

    def build(self, input_shape):
        self.num = input_shape[2]
        super(RescaleProbMask, self).build(input_shape)

    def call(self, input_tensor):
        prob_map = self.force_sparsity(input_tensor, alpha = self.alpha)
        return prob_map

    # Paper: Sec3.1: Eq(5): making opt prob unconstrained
    def force_sparsity(self, pixel, alpha):
        p = tf.math.reduce_mean(pixel, axis=1)
        beta = (1 - alpha) / (1 - p)
        le = tf.cast(tf.greater_equal(p, alpha), tf.float32)
        return (le * pixel * alpha) / p + (1 - le) * (1 - beta * (1 - pixel))

class ThresholdRandomMask(tf.keras.layers.Layer):
    def __init__(self, slope = 12, **kwargs):
        self.slope = None
        if slope is not None:
            self.slope = tf.Variable(slope, dtype=tf.float32) 
        super(ThresholdRandomMask, self).__init__(**kwargs)

    def build(self, input_shape):
        filter_size = input_shape[1]
        t_init = tf.random_uniform_initializer(minval=0, maxval=1)
        threshold = tf.constant(t_init(shape=(1, filter_size, 1)))
        self.thresh = threshold
        super(ThresholdRandomMask, self).build(input_shape)

    def call(self, inputs):
        #thresh = tf.zeros_like(inputs)
        if self.slope is not None:
            return tf.sigmoid(self.slope * (inputs-self.thresh)) 
        else:  
            # INFERENCE 
            return inputs > self.thresh

    def compute_output_shape(self, input_shape):
        return input_shape[0]

# The layer that performs masking operation.
class UnderSample(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(UnderSample, self).__init__(**kwargs)

    def build(self, input_shape):
        super(UnderSample, self).build(input_shape)

    def call(self, inputs):
        input_pix = inputs[0]
        mask = inputs[1]
        image_masked = tf.multiply(input_pix, mask)
        return image_masked

# Learning rate scheduler 
def scheduler(epoch, lr):
    if epoch == 100:
        lr = lr / 10
    if epoch == 150:
        lr = lr / 10
    return lr

In [6]:
# Importing and modifying data
data_ori, labels_ori = read_data(DATASET_NAME)

train_x, train_y, train_loc, test_x, test_y, test_loc = separate_train_test(data_ori, labels_ori, TRAINING_RATIO)

# number of classes
num_classification = int(np.max(labels_ori))
num_classification

16

In [7]:
X_train = windowFeature(data_ori, train_loc, 1)
X_test = windowFeature(data_ori, test_loc, 1)
X_train.shape, X_test.shape 

((2562, 1, 1, 200), (7687, 1, 1, 200))

In [None]:
# Data normalisation (after splitting)
X_train_norm = normalize_dataset(X_train)
X_test_norm = normalize_dataset(X_test)

In [8]:
X_train = np.squeeze(X_train_norm, axis=2)
X_test = np.squeeze(X_test_norm, axis=2)

In [9]:
Y_train = one_hot(train_y,num_classification)
Y_train_int = np.argmax(Y_train, axis=1)
Y_test = one_hot(test_y,num_classification)
Y_train.shape, Y_test.shape

((2562, 16), (7687, 16))

In [10]:
X_train, Y_train, train_loc = disorder(X_train, Y_train, train_loc)
X_test, Y_test, test_loc = disorder(X_test, Y_test, test_loc)

X_train = np.transpose(X_train, axes=(0,2,1))
X_test = np.transpose(X_test, axes=(0,2,1))

In [11]:
X_train.shape, X_test.shape

(2562, 200, 1)

In [12]:
# Number of training samples (training pixels)
num_data = X_train.shape[0]  
# Number of bands
num_band = X_train.shape[1]   
# Number of classes          
num_class = Y_train.shape[1]

number_class_label = []; class_weights = []
for i in range(num_class):
    # per class frequency in training set
    number_class_label.append(len(np.where(Y_train_int == i)[0]))
    # weight given to class i
    class_weights.append(num_data/number_class_label[i])

class_weights = np.array(class_weights)
class_weights_norm = class_weights / np.sum(class_weights)
class_weights_norm = np.sqrt(class_weights_norm)
# class_weights_norm = np.square(class_weights_norm)
class_weights_dic = {}
for i in range(num_class):
    class_weights_dic[i] = 100 * class_weights_norm[i]

print('Shape of the training images: ', X_train[0].shape)
print('Shape of the test images: ', X_test[0].shape)
print('Number of the train samples', X_train.shape[0])
print('Number of the test samples', X_test.shape[0])
print('Number of the classes', num_classification)
print('Class weights dict', class_weights_dic)

Shape of the training images:  (200, 1)
Shape of the test images:  (200, 1)
Number of the train samples 2562
Number of the test samples 7687
Number of the classes 16
Class weights dict {0: 38.506837354843995, 1: 7.059831985699054, 2: 9.2490431617559, 3: 17.366106809231667, 4: 12.12650886134199, 5: 9.887643047649991, 6: 50.41728484340734, 7: 12.17693115309604, 8: 59.65453591617137, 9: 8.557074967743109, 10: 5.383246764527845, 11: 10.964721769947355, 12: 18.678559732059004, 13: 7.503863620202448, 14: 13.614222907828822, 15: 27.81407078758715}


In [95]:
# Code for model creation, compilation and training.
input_image = tf.keras.Input(shape=(num_band, 1), name = 'input_image')

prob_mask_tensor = ProbMask(slope=pmask_slope, filter_size=num_band, name='prob_mask')(input_image)
thresh_tensor = RescaleProbMask(sparsity = BS/num_band, name='rescaled_mask')(prob_mask_tensor)
tensor_mask = ThresholdRandomMask(slope = sample_slope, name='sampled_mask')(thresh_tensor) 
last_tensor = UnderSample(name='proxy_data')([input_image, tensor_mask])

w1 <tf.Variable 'Variable:0' shape=(1, 200, 1) dtype=float32, numpy=
array([[[8.5914373e-02],
        [5.1298463e-01],
        [3.5321200e-01],
        [5.1235318e-01],
        [1.3836777e-01],
        [7.0760131e-02],
        [2.8034663e-01],
        [4.5623672e-01],
        [6.7400098e-01],
        [3.0862486e-01],
        [5.8917201e-01],
        [8.9967906e-01],
        [9.7280550e-01],
        [1.0548127e-01],
        [4.1237819e-01],
        [4.1309285e-01],
        [6.7453384e-01],
        [4.5877862e-01],
        [1.1039436e-01],
        [4.3892705e-01],
        [1.6875255e-01],
        [9.7054994e-01],
        [9.7398973e-01],
        [3.8829815e-01],
        [4.2403424e-01],
        [9.1588962e-01],
        [4.4726133e-01],
        [2.5295496e-02],
        [2.6717722e-01],
        [9.4124722e-01],
        [5.9784305e-01],
        [4.2479229e-01],
        [3.7464917e-01],
        [5.2407384e-01],
        [4.0169990e-01],
        [4.7934210e-01],
        [2.8452659e-01],
      

AttributeError: Exception encountered when calling layer "rescaled_mask" (type RescaleProbMask).

in user code:

    File "C:\Users\zbook\AppData\Local\Temp\ipykernel_20840\383605807.py", line 45, in call  *
        out = prob_map.numpy()

    AttributeError: 'Tensor' object has no attribute 'numpy'


Call arguments received by layer "rescaled_mask" (type RescaleProbMask):
  • input_tensor=tf.Tensor(shape=(1, 200, 1), dtype=float32)