In [1]:
import matplotlib
matplotlib.use('TkAgg')
import numpy as np
import matplotlib.pyplot as plt
from spectral import *
import spectral.io.envi as envi
from scipy.stats import median_abs_deviation as MAD
from scipy.signal import savgol_filter, argrelextrema
from matplotlib.widgets import *
from matplotlib.path import Path
from matplotlib.backend_bases import MouseButton
import pandas as pd
from PIL import Image
import rasterio
from rasterio.control import GroundControlPoint
from rasterio.transform import from_gcps
from rasterio.crs import CRS
from scipy.signal import savgol_filter , argrelextrema , find_peaks
from kneed import KneeLocator
from matplotlib.ticker import (MultipleLocator, AutoMinorLocator)
from matplotlib.widgets import PolygonSelector
from itertools import permutations 
import pandas as pd
from scipy.spatial import ConvexHull
from scipy.interpolate import interp1d
from numpy.random import randint , uniform , choice
import torch
import torch.nn.functional as F
from torch import nn
import torchvision
from torch.utils.data import DataLoader ,SubsetRandomSampler, random_split
from torch import optim
import itertools
from sklearn.metrics import silhouette_score
import random
from sklearn import metrics
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split, KFold, StratifiedShuffleSplit

In [7]:
def row_wise_integral_norm_data(wav , spectra):

    SPECTRA = np.zeros_like(spectra)

    for i in range(len(spectra[:,0])):
        SPECTRA[i] = spectra[i]/np.trapz(spectra[i] , wav)
        
    return SPECTRA , wav

def column_wise_norm(spectra):
    spectra_norm = np.zeros_like(spectra)
    for i in range(len(spectra[0])):
        spectra_norm[:,i] = (spectra[:,i]-np.mean(spectra[:,i]))/np.std(spectra[:,i])
        
    return spectra_norm

def find_nearest(array, value):
    array = np.asarray(array)
    idx = (np.abs(array - value)).argmin()
    return idx

def compute_removal_single(w , spectra , interp_type = 'linear'):
    points = np.c_[w,spectra]
    return continuum_removal(points , interp_type = interp_type)

def dimension_reduction( img , w1 , w2 , wavelength , cr = False ):

    X = img.shape[0]
    Y = img.shape[1]
    
    a = find_nearest(wavelength , w1)
    b = find_nearest(wavelength , w2)
    w = wavelength[a:b]
    lenw = len(w)
    
    A = np.count_nonzero(img[:,:,0] == 65535.)
    B = np.count_nonzero(img[:,:,0] != 65535.)
    
    spectra = np.zeros( (B , lenw) )
    indexes = np.zeros( (B , 2) )
    
    frame_indexes = np.zeros( (A , 2) )
 
    img = img[:,:,a:b]
    
    k , i , j = 0 , 0 , 0
    while k < B and i < X and j < Y:
        
        if img[i,j,0] != 65535. and img[i,j,:].all() != np.zeros(lenw).all():
            if cr == False:
                spectra[k,:] = img[i,j,:]
            else:
                spectra[k,:] = compute_removal_single(w , img[i,j,:])
            indexes[k,0] = i
            indexes[k,1] = j   
            k += 1
        if img[i,j,:].all() == np.zeros(lenw).all():
            print(i,j)
        i += 1
        if i == X:
            j += 1
            i = 0
            if j == Y:
                i = X
                j = Y
                k = B
                  
    return spectra , indexes , w

def dimension_reduction_spectral_parameters( img , norm = None , zeros = None ):

    X = img.shape[0]
    Y = img.shape[1]
    lenw = img.shape[2]
    print(X,Y,lenw)
    B = np.count_nonzero(img[:,:,0] != 65535.)
    print(B)
    
    spectra = np.zeros( (B , lenw) )
    indexes = np.zeros( (B , 2) )
    
    k , i , j = 0 , 0 , 0
    while k < B and i < X and j < Y:
        
        if img[i,j,0] != 65535.:
            spectra[k,:] = img[i,j,:]#[0][0]
            
            if zeros != None:
                for l in range(len(spectra[k,:])):
                    if spectra[k,l] < 0:
                        spectra[k,l] = 0
            
            indexes[k,0] = i
            indexes[k,1] = j   
            k += 1
        i += 1
        if i == X:
            j += 1
            i = 0
            if j == Y:
                i = X
                j = Y
                k = B

    if norm == 'none':
        spectra = np.nan_to_num(spectra)
        return spectra, indexes
        
    elif len(norm) > 0:
        for p in range(len(norm)):
    
            if p != 0:
                spectra = SPECTRA
            SPECTRA = np.zeros_like(spectra)
            
            if norm[p] == 'row':
                for i in range(len(spectra[:,0])):
                    SPECTRA[i] = spectra[i]/np.sum(abs(spectra[i]))
    
            elif norm[p] == 'column':
                for i in range(len(spectra[0])):
                    if zeros != None:
                        SPECTRA[:,i] = (spectra[:,i]-np.mean(spectra[:,i]))/np.std(spectra[:,i])
                    else:
                        m , s = np.mean(spectra[ np.argwhere(spectra[:,i] != 0) , i ]) , np.std(spectra[ np.argwhere(spectra[:,i] != 0) , i ])
                        SPECTRA[:,i] = (spectra[:,i]-m)/s
    
            elif norm[p] == 'minmax':
                for i in range(len(spectra[0])):
                    if zeros != None:
                        SPECTRA[:,i] = (spectra[:,i]-np.min(spectra[:,i]))/(np.max(spectra[:,i]) - np.min(spectra[:,i]))
    
            elif norm[p] == 'L1':
                for i in range(len(spectra[0])):
                    if zeros != None:
                        SPECTRA[:,i] = spectra[:,i]/np.linalg.norm(spectra[:,i])
                        
            SPECTRA = np.nan_to_num(SPECTRA)

        return SPECTRA , indexes

    else:
        print('Normalization must be either row, column, rowcolumn or none!')
        return

def continuum_removal(points , interp_type = 'linear'):#points, interp_type = 'linear'):
    interp_types = ['linear','nearest','nearest-up','zero','slinear','quadratic','cubic','previous','next']
    
    if interp_type not in interp_types:
        print('Type of interpolation must be one of:' + interp_types + '.')
        return
    
    x, y = points.T
    augmented = np.concatenate([points, [(x[0], np.min(y)-1), (x[-1], np.min(y)-1)]], axis=0)
    hull = ConvexHull(augmented , incremental = True)
    continuum_points = points[np.sort([v for v in hull.vertices if v < len(points)])]
    continuum_indexs = np.array(range(0,len(continuum_points) , 1) , dtype = int)
    continuum_function = interp1d(*continuum_points.T)
    n = continuum_function(x)
    
    yprime = y / n

    return yprime

def compute_removal(w , spectra , interp_type = 'linear'):
    SPECTRA = np.zeros(spectra.shape)
    for i in range(spectra.shape[0]):
        points = np.c_[w,spectra[i]]
        SPECTRA[i] = continuum_removal(points , interp_type = interp_type)#points)
    return SPECTRA

def compute_removal_single(w , spectra , interp_type = 'linear'):
    points = np.c_[w,spectra]
    return continuum_removal(points , interp_type = interp_type)

def auto_stretch_rgb(img_sr, product_names, n_bins=1000, plot=True):
    local_only_products = { 'R530' , 'R440' , 'R600' , 'R770' , 'R1080' , 'R1506' , 'R2529' , 'R3920' , 'SH600_2' , 'IRA' , 'ISLOPE1' , 'IRR2' }

    H, W, num_bands = img_sr.shape
    stretched_cube = np.zeros((H, W, num_bands), dtype=np.float32)

    for band_idx in range(num_bands):
        band = img_sr[:, :, band_idx]
        name = product_names[band_idx]
        valid = np.isfinite(band) & (band != 65535)
        vals = band[valid]

        if vals.size == 0:
            continue

        if name in local_only_products:
            vmin, vmax = np.percentile(vals, [0.1, 99.9])
        else:
            hist, bins = np.histogram(vals, bins=n_bins)
            mode = (bins[np.argmax(hist)] + bins[np.argmax(hist)+1]) / 2
            vmin = 0 if mode < 0 else mode
            vmax = np.percentile(vals, 99.9)

        stretched = np.zeros_like(band, dtype=np.float32)
        stretched[valid] = np.clip((band[valid] - vmin) / (vmax - vmin), 0, 1)
        stretched_cube[:, :, band_idx] = stretched[:,:,0]

        if plot:
            fig, ax = plt.subplots(1, 2, figsize=(10, 3))
            ax[0].hist(vals, bins=n_bins, color='gray', log=True)
            ax[0].axvline(vmin, color='blue', linestyle='--', label='vmin')
            ax[0].axvline(vmax, color='red', linestyle='--', label='vmax')
            ax[0].set_title(f"{name} Histogram")
            ax[0].legend()
            ax[0].set_xlim(np.percentile(vals, 0), np.percentile(vals, 100))

            ax[1].imshow(stretched, cmap='gray', vmin=0, vmax=1)
            ax[1].set_title(f"{name} Stretched")
            ax[1].axis('off')
            plt.tight_layout()
            plt.show()

    return stretched_cube

import numpy as np
import matplotlib.pyplot as plt

def auto_stretch_rgb(img_sr, product_names, n_bins=1000, plot=True, linearize=False, norm=None, zeros=False):
    local_only_products = {
        'R530', 'R440', 'R600', 'R770', 'R1080', 'R1506',
        'R2529', 'R3920', 'SH600_2', 'IRA', 'ISLOPE1', 'IRR2'
    }

    H, W, num_bands = img_sr.shape
    stretched_cube = np.zeros((H, W, num_bands), dtype=np.float32)

    for band_idx in range(num_bands):
        band = img_sr[:, :, band_idx]
        name = product_names[band_idx]
        valid = np.isfinite(band) & (band != 65535)
        vals = band[valid]

        if vals.size == 0:
            continue

        if name in local_only_products:
            vmin, vmax = np.percentile(vals, [0.1, 99.9])
        else:
            hist, bins = np.histogram(vals, bins=n_bins)
            mode = (bins[np.argmax(hist)] + bins[np.argmax(hist)+1]) / 2
            vmin = 0 if mode < 0 else mode
            vmax = np.percentile(vals, 99.9)

        stretched = np.zeros_like(band, dtype=np.float32)
        stretched[valid] = np.clip((band[valid] - vmin) / (vmax - vmin), 0, 1)
        stretched_cube[:, :, band_idx] = stretched#[:,:,0]

        if plot:
            fig, ax = plt.subplots(1, 2, figsize=(10, 3))
            ax[0].hist(vals, bins=n_bins, color='gray', log=True)
            ax[0].axvline(vmin, color='blue', linestyle='--', label='vmin')
            ax[0].axvline(vmax, color='red', linestyle='--', label='vmax')
            ax[0].set_title(f"{name} Histogram")
            ax[0].legend()
            ax[0].set_xlim(np.percentile(vals, 0.5), np.percentile(vals, 99.9))

            ax[1].imshow(stretched, cmap='gray', vmin=0, vmax=1)
            ax[1].set_title(f"{name} Stretched")
            ax[1].axis('off')
            plt.tight_layout()
            plt.show()

    if not linearize:
        return stretched_cube

    # LINEARIZATION
    valid_mask = img_sr[:, :, 0] != 65535
    indexes = np.argwhere(valid_mask)
    spectra = stretched_cube[valid_mask]  # shape: (N_valid, num_bands)

    if zeros:
        spectra[spectra < 0] = 0

    if norm is None or norm == 'none':
        return spectra, indexes

    # Apply normalization chain
    SPECTRA = spectra.copy()

    for n in norm:
        if n == 'row':
            row_sums = np.sum(np.abs(SPECTRA), axis=1, keepdims=True)
            row_sums[row_sums == 0] = 1
            SPECTRA = SPECTRA / row_sums

        elif n == 'column':
            means = np.mean(SPECTRA, axis=0)
            stds = np.std(SPECTRA, axis=0)
            stds[stds == 0] = 1
            SPECTRA = (SPECTRA - means) / stds

        elif n == 'minmax':
            mins = np.min(SPECTRA, axis=0)
            maxs = np.max(SPECTRA, axis=0)
            denom = maxs - mins
            denom[denom == 0] = 1
            SPECTRA = (SPECTRA - mins) / denom

        elif n == 'L1':
            norms = np.linalg.norm(SPECTRA, ord=1, axis=0)
            norms[norms == 0] = 1
            SPECTRA = SPECTRA / norms

        else:
            print(f"Unknown norm: {n}")

    SPECTRA = np.nan_to_num(SPECTRA)

    return SPECTRA, indexes

def open_raw(path_img_IF , path_hdr_IF , path_img_SR , path_hdr_SR):
    
    img = envi.open(path_hdr_IF , path_img_IF)

    img_sr = envi.open(path_hdr_SR , path_img_SR)

    wavelength = np.array(img.metadata['wavelength']).astype(float)

    sr_names = img_sr.metadata['band names']

    return img , img_sr , wavelength , sr_names

In [8]:
path3e12 = "Desktop/FRT00003e12/" # insert here your own path
name = '03e12'

# Example usage:
path_if_mtrdr = path3e12 + "frt000" + name + "_07_if167" + "j_mtr3.img"
head_if_mtrdr = path3e12 + "frt000" + name + "_07_if167" + "j_mtr3.hdr"

path_sr_mtrdr = path3e12 + "frt000" + name + "_07_sr167" + "j_mtr3.img"
head_sr_mtrdr = path3e12 + "frt000" + name + "_07_sr167" + "j_mtr3.hdr"

print(path_if_mtrdr)

img , img_sr , wavelength , sr_names = open_raw(path_if_mtrdr , head_if_mtrdr , path_sr_mtrdr , head_sr_mtrdr)  # Provide your image data

Nbands = len(wavelength)
print(sr_names)

img , img_sr = np.array(img[:,:,:]) , np.array(img_sr[:,:,:])

# deleting pure reflectance parameters

sr_names_2 = np.delete(np.array(sr_names) ,
                       [[0],[1],[8],[10],[11],[14],[52],[53],[54],[55],[56],[57],[58],[59]])
print(sr_names_2)

img_sr_2 = np.delete(img_sr[:,:,:] , [[0],[1],[8],[10],[11],[14],[52],[53],[54],[55],[56],[57],[58],[59]] , axis = 2)

print(img_sr.shape)

Desktop/FRT00003e12/frt00003e12_07_if167j_mtr3.img


FileNotFoundError: Unable to locate file "Desktop/FRT00003e12/frt00003e12_07_if167j_mtr3.hdr". If the file exists, use its full path or place its directory in the SPECTRAL_DATA environment variable.

In [93]:
def unison_shuffled_copies(a, b , SEED = 311996):
    assert len(a) == len(b)
    p = np.random.RandomState(seed=SEED).permutation(len(a))
    return a[p], b[p]

# general normalization
#spectra , indexes = dimension_reduction_spectral_parameters( img_sr , norm = ['L1'] , zeros = True )# , 500 , 2500 , wavelength)
# auto stretch
#spectra , indexes = auto_stretch_rgb(img_sr_2, sr_names_2, n_bins=1000, plot=False, linearize=True, norm=['column'], zeros=True)

# mean 0 and std 1 norm
spectra = column_wise_norm(spectra)

print( np.count_nonzero(spectra.all() == np.zeros(img.shape[2]).all()) )
print( 'Done' )

print(spectra.shape)

1
Done
(394890, 277)


In [97]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class GumbelSoftmax(nn.Module):
    def __init__(self, temperature=1.0, hard=False):
        super(GumbelSoftmax, self).__init__()
        self.temperature = temperature
        self.hard = hard

    def forward(self, x):
        return F.gumbel_softmax(x, tau=self.temperature, hard=self.hard)

class Net(nn.Module):
    def __init__(self, encoded_space_dim, in_channels, n_layers_encoder, n_layers_decoder, out1, out2, act, drops , last):
        super(Net, self).__init__()

        self.encoded_space_dim = encoded_space_dim
        self.in_channels = in_channels
        self.n_layers_encoder = n_layers_encoder
        self.n_layers_decoder = n_layers_decoder
        self.out1 = out1
        self.out2 = out2
        self.drops = drops
    
        self.model = []
        if self.n_layers_encoder == 1:
            self.model.append(nn.Linear(self.in_channels, self.out1[0]))
            self.model.append(nn.BatchNorm1d(self.out1[0]))
            self.model.append(act)
            self.model.append(nn.Dropout(self.drops[0]))
            self.model.append(nn.Linear(self.out1[0], encoded_space_dim))
            
        elif self.n_layers_encoder == 0:
            self.model.append(nn.Linear(self.in_channels, encoded_space_dim))
            self.model.append(act)
        else:
            for i in range(self.n_layers_encoder):
                if i == self.n_layers_encoder-1:
                    self.model.append(nn.Linear(self.out1[i], self.encoded_space_dim))
                elif i == 0:
                    self.model.append(nn.Linear(self.in_channels, self.out1[i]))
                    self.model.append(nn.BatchNorm1d(self.out1[i]))
                    self.model.append(nn.Dropout(self.drops[i]))
                    self.model.append(act)
                    self.model.append(nn.Linear(self.out1[i], self.out1[i+1]))
                    self.model.append(nn.Dropout(self.drops[i+1]))
                    self.model.append(act)
                else:   
                    self.model.append(nn.Linear(self.out1[i], self.out1[i+1]))
                    self.model.append(nn.BatchNorm1d(self.out1[i+1]))
                    self.model.append(nn.Dropout(self.drops[i+1]))
                    self.model.append(act)
                    
        # Add GumbelSoftmax to nn.Sequential
        self.encoder = nn.Sequential(*self.model, GumbelSoftmax(temperature=1.0, hard=True))
        
        self.model2 = []
        
        if self.n_layers_decoder == 0:
            self.model2.append(nn.Linear(self.encoded_space_dim, self.in_channels))
        elif self.n_layers_decoder == 1:
            self.model2.append(nn.Linear(self.encoded_space_dim, self.out2[0]))
            self.model2.append(act)
            self.model2.append(nn.Linear(self.out2[0], self.in_channels))
        else:
            self.model2.append(nn.Linear(self.encoded_space_dim, self.out2[0]))
            self.model2.append(act)
            for i in range(self.n_layers_decoder-1):
                self.model2.append(nn.Linear(self.out2[i], self.out2[i+1]))
                self.model2.append(act)
            self.model2.append(nn.Linear(self.out2[self.n_layers_decoder-1], self.in_channels))
            self.model2.append(nn.Tanh())
        self.decoder = nn.Sequential(*self.model2)
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

def find_nearest(array, value):
    array = np.asarray(array)
    idx = (np.abs(array - value)).argmin()
    return idx

def weight_init(model, init_method):
    for module in model.modules():
        if isinstance(module, nn.Linear):
            if init_method == 'kaiming_normal':
                nn.init.kaiming_normal_(module.weight)
            elif init_method == 'kaiming_uniform':
                nn.init.kaiming_uniform_(module.weight)
            elif init_method == 'xavier_normal':
                nn.init.xavier_normal_(module.weight)
            elif init_method == 'xavier_uniform':
                nn.init.xavier_uniform_(module.weight)
            elif init_method == 'uniform':
                nn.init.uniform_(module.weight)
            elif init_method == 'normal':
                nn.init.normal_(module.weight)
            elif init_method == 'ones':
                nn.init.ones_(module.weight)
            #elif init_method == 'zeros':
            #    nn.init.zeros_(module.weight)
            elif init_method == 'eye':
                nn.init.eye_(module.weight)
            elif init_method == 'orthogonal':
                nn.init.orthogonal_(module.weight)
            else:
                raise ValueError("Invalid initialization method!")
            nn.init.constant_(module.bias, 0)
        elif isinstance(module, nn.BatchNorm1d):
            nn.init.constant_(module.weight, 1)
            nn.init.constant_(module.bias, 0)

INITS = ['kaiming_normal' , 'kaiming_uniform' , 'xavier_normal' , 'xavier_uniform' , 'uniform' , 'normal' , 'ones' ,# 'zeros' ,
         'eye' , 'orthogonal']
def RandomSearch_autoencoder(dataset , in_channels, criterion , encoded_space_dim ,
                             n_encmax , n_decmax , MINenc , MAXenc, MINdec, MAXdec,
                             activations , initializations = INITS,
                             fix_enc = False , fix_dec = False ,
                             try_epochs = 10 , N_try = 10 , Seed = torch.seed() ,
                             printer = 'off' , bs = 100 , num_pieces = 5 , val_split = 0.2):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    print(device)
    
    # Initialize names and arrays of hyperparameters
    hyperparanames = ['LR','outC','outL','n_conv_layers','n_lin_layers','encoded_space_dim','weight_initialization']
    
    rate , W , L1 = [] , [] , []
    train_l = []
    val_l = []
    Lenc = np.zeros((N_try , n_encmax))
    Ldec = np.zeros((N_try , n_decmax))
    enc = []
    ACT = []
    DROPS = []
    inits = []

    total_len = len(dataset)
    val_len = int(total_len * val_split)
    train_len = total_len - val_len
    
    train_set, val_set = random_split(dataset, [train_len, val_len], generator=torch.Generator().manual_seed(Seed))
    TL = DataLoader(train_set, batch_size=bs, shuffle=True, pin_memory=True)
    VL = DataLoader(val_set, batch_size=bs, shuffle=False, pin_memory=True)
    
    for i in range(N_try):
        
        losses = []
        validations = []
        
        ENCSPDIM = choice(encoded_space_dim)
        
        print('Try' , str(i))
        
        if fix_dec == False:
            n_dec = randint( 0 , n_decmax+1 )
        else:
            n_dec = n_decmax
            
        if fix_enc == False:
            n_enc = randint( 0 , n_encmax+1 )
        else:
            n_enc = n_encmax

        LR  = choice(np.array([1,5]))*choice(np.array([0.00001 , 0.0001 , 0.001 , 0.01 , 0.1]))
        
        outLenc = np.sort(choice(np.arange(MINenc*5 , MAXenc*5+5 , 5 , dtype = int)  , n_enc))[::-1]
        outLdec = np.sort(choice(np.arange(MINdec*5 , MAXdec*5+5 , 5 , dtype = int)  , n_dec))
        
        dropouts = choice([0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9] , n_enc+1)

        l1_reg = choice([1,1e-1,1e-2,1e-3,1e-4,1e-5,1e-6,1e-7,1e-8,1e-9,1e-10])

        initials = choice(np.array(initializations))
        inits.append(initials)
        
        act = choice(activations)
        ACT.append(act)
        DROPS.append(dropouts)

        # Generating the model with the randomly generated hyperparameters
        
        model = Net(ENCSPDIM , in_channels , n_enc , n_dec , outLenc , outLdec , act , dropouts , True).to(device)
        #print(model)
        weight = choice([1,1e-1,1e-2,1e-3,1e-4,1e-5,1e-6,1e-7,1e-8,1e-9,1e-10])

        weight_init(model, init_method = initials)
        
        optimizer = choice([optim.Adam(model.parameters() , lr = LR , weight_decay = weight)])

        for epoch in range(try_epochs):
            model.train()
            LOSS = 0
            for x in TL:
                x = x.float().to(device)
                y = model(x)
                loss = criterion(y, x)
                l1_lambda = l1_reg
                l1 = sum(param.abs().sum() for param in model.parameters())
                loss += l1_lambda * l1
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                LOSS += loss.item()
            LOSS /= len(TL)
            losses.append(LOSS)

        # Validation loop
        model.eval()
        VAL_LOSS = 0
        with torch.no_grad():
            for x in VL:
                x = x.float().to(device)
                y = model(x)
                val_loss = criterion(y, x)
                VAL_LOSS += val_loss.item()
        VAL_LOSS /= len(VL)
        val_l.append(VAL_LOSS)

        # Updating the arrays with the randomly generated values
        rate.append(LR)
        W.append(weight)
        enc.append(ENCSPDIM)
        L1.append(l1_reg)

        for j in range(n_enc):
            Lenc[i,j] = outLenc[j]
        for j in range(n_dec):
            Ldec[i,j] = outLdec[j]
        
        # Updating the arrays of the losses + accuracy with the last losses
        train_l.append(np.array(losses)[-1])#.mean())
        print('Final loss value = ' , train_l[i])

    J = np.argmin(np.asarray(val_l))#train_l))
    
    # Printing out the best values relatively to the chosen method
    print( '\x1b[4;34;43m'+'Best results is '+'\x1b[0m' , J , '\x1b[4;34;43m'+'. With values: '+'\x1b[0m' ,
           "\n" ,  hyperparanames , "\n" , rate[J] , Lenc[J] , Ldec[J] , W[J] , enc[J] , ACT[J] , DROPS[J] , inits[J] , L1[J] )#, seeds[J] )
    
    return rate[J] , Lenc[J] , Ldec[J] , W[J] , enc[J] , ACT[J] , DROPS[J] , inits[J] , L1[J]

In [81]:
shape = spectra2.shape#len(wavelength[a:b])#60
print(shape)

(744, 781, 277)


In [98]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(DEVICE)
DIM = 3

from sklearn.model_selection import train_test_split

train_ds , test_ds = train_test_split(spectra , test_size=0.1, random_state=311996)

train_ds , valid_ds = train_test_split(train_ds , test_size=0.33, random_state=200960)

print(len(train_ds))

cuda:0
238118


In [None]:
a = find_nearest(wavelength , 1000)
b = find_nearest(wavelength , 2600)

shape = 277  # 46
torch.manual_seed(533733874069400)
params = RandomSearch_autoencoder( spectra , shape , nn.HuberLoss() ,
                                   encoded_space_dim = np.arange(5,11).tolist() ,
                                   n_encmax = 1 , n_decmax = 2 ,
                                   MINenc = 2 , MAXenc = 15 ,
                                   MINdec = 2 , MAXdec = 15 ,
                                   activations = [ nn.Mish() , nn.GELU() , nn.SiLU() , nn.ReLU() ] ,
                                   fix_enc = False , fix_dec = True , 
                                   try_epochs = 20 , N_try = 60, Seed = torch.seed() , 
                                   printer = 'off' ,
                                   bs = spectra.shape[0] ,
                                   num_pieces = spectra.shape[0] )

cuda
Try 0
Final loss value =  5.63027206226252e-05
Try 1
Final loss value =  0.020442016422748566
Try 2


In [106]:
from torch.utils.data import DataLoader
import torch.optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
wav = len(wavelength[a:b])#60

DIM = params[4]#DIM
torch.manual_seed(533733874069400)#params[6])#205653840223300)
model = Net( encoded_space_dim = DIM , 
            in_channels = shape ,
             n_layers_encoder = np.count_nonzero(params[1]) ,
             n_layers_decoder = np.count_nonzero(params[2]) ,
             out1 = params[1].astype(int) , out2 = params[2].astype(int) ,
             act = nn.Mish(x),#params[5] ,
             drops = params[6] , last = True ).to(device)

weight_init(model, init_method = params[-2])

print(model)
 
loader = torch.utils.data.DataLoader( dataset = spectra ,
                                      batch_size = spectra.shape[0],
                                      shuffle = True )

Net(
  (encoder): Sequential(
    (0): Linear(in_features=277, out_features=6, bias=True)
    (1): Mish(inplace=True)
    (2): GumbelSoftmax()
  )
  (decoder): Sequential(
    (0): Linear(in_features=6, out_features=35, bias=True)
    (1): Mish(inplace=True)
    (2): Linear(in_features=35, out_features=50, bias=True)
    (3): Mish(inplace=True)
    (4): Linear(in_features=50, out_features=55, bias=True)
    (5): Mish(inplace=True)
    (6): Linear(in_features=55, out_features=60, bias=True)
    (7): Mish(inplace=True)
    (8): Linear(in_features=60, out_features=60, bias=True)
    (9): Mish(inplace=True)
    (10): Linear(in_features=60, out_features=277, bias=True)
    (11): Tanh()
  )
)


In [107]:
train_losses, validation_losses = train_cnn( model, nn.HuberLoss(), 
                                             train_ds, valid_ds, 
                                             500 , 25,
                                             params[-1], len(train_ds), 
                                             DEVICE, params[0], printer = True )

Epoch 1/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 2/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 3/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 4/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 5/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 6/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 7/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 8/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 9/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 10/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 11/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 12/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 13/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 
Epoch 14/500 -> Train Loss: 0.0000 | Validation Loss: 0.0000 


KeyboardInterrupt: 

In [None]:
plt.figure()
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.plot(train_losses , 'b' , linewidth = 1 , label = 'train loss')
plt.plot(validation_losses , 'r' , linewidth = 1 , label = 'validation loss')
plt.legend()
plt.show()

# Extract Low-dimensional Representations
encoded_data = model.encoder(torch.tensor(spectra).float().to(device)).detach().cpu().numpy()#model.encoder(torch.tensor(spectra).float()).detach().numpy()

def plot_n_encoded(spectra , model , DIM , plot = True):
    # Extract Low-dimensional Representations
    encoded_data = model.encoder(torch.tensor(spectra).float()).detach().numpy()
    
    if plot:
        fig , ax = plt.subplots(DIM,DIM)
        for i in range(DIM):
            for j in range(DIM):
                if i != j and i < j:
                    ax[i,j].plot(encoded_data[:,i] , encoded_data[:,j] , 'k.')
                    ax[i,j].set_xlabel('Encoded Dimension '+str(i+1) , fontsize = 5)
                    ax[i,j].set_ylabel('Encoded Dimension '+str(j+1) , fontsize = 5)
                else:
                    ax[i,j].axis('off')
        plt.show()
    return encoded_data

In [65]:
print(rgbs.shape)
print(sr_names)
i = np.where((np.array(sr_names) == 'R2529'))[0][0]
j = np.where((np.array(sr_names) == 'R1506'))[0][0]
k = np.where((np.array(sr_names) == 'R1080'))[0][0]
print(rgbs[:,:,[i,j,k]].shape)

sr_names = np.array(sr_names)

(744, 781, 60)
['R770' 'RBR' 'BD530_2' 'SH600_2' 'SH770' 'BD640_2' 'BD860_2' 'BD920_2'
 'RPEAK1' 'BDI1000VIS' 'R440' 'IRR1' 'BDI1000IR' 'OLINDEX3' 'R1330'
 'BD1300' 'LCPINDEX2' 'HCPINDEX2' 'VAR' 'ISLOPE1' 'BD1400' 'BD1435'
 'BD1500_2' 'ICER1_2' 'BD1750_2' 'BD1900_2' 'BD1900R2' 'BDI2000'
 'BD2100_2' 'BD2165' 'BD2190' 'MIN2200' 'BD2210_2' 'D2200' 'BD2230'
 'BD2250' 'MIN2250' 'BD2265' 'BD2290' 'D2300' 'BD2355' 'SINDEX2' 'ICER2_2'
 'MIN2295_2480' 'MIN2345_2537' 'BD2500_2' 'BD3000' 'BD3100' 'BD3200'
 'BD3400_2' 'CINDEX2' 'BD2600' 'IRR2' 'IRR3' 'R530' 'R600' 'R1080' 'R1506'
 'R2529' 'R3920']
(744, 781, 3)


In [66]:
RGB_autoencoder = np.zeros( (img_sr.shape[0] , img_sr.shape[1] , DIM) )

print(DIM , encoded_data.shape)

for j in range(DIM):
    E = encoded_data[:,j]
    for i in range(len(indexes[:,0])):
        x , y = int(indexes[i,0]) , int(indexes[i,1])
        RGB_autoencoder[x,y,j] = E[i]

false = pyfresco.SpectraExtract(img, Nbands, wavelength, 400, 2600)
fal = false.upload_map('FAL' , folder = 'FRT00003e12/')

I = np.where((np.array(sr_names) == 'R2529'))[0][0]
J = np.where((np.array(sr_names) == 'R1506'))[0][0]
K = np.where((np.array(sr_names) == 'R1080'))[0][0]
print(I , J , K)
fig , ax = plt.subplots(1,DIM+1)
ax[0].imshow( rgbs[:,:,[I,J,K]])
ax[0].axis('off')
ax[0].set_title('False color')
for j in range(DIM):
    ax[j+1].imshow( RGB_autoencoder[:,:,j] , cmap = 'viridis' )
    ax[j+1].axis('off')
    ax[j+1].set_title('Neuron '+str(j+1))
plt.tight_layout()
plt.show()

cmap = matplotlib.colormaps.get_cmap('viridis')

bounds = np.linspace(0,DIM,DIM+1 , dtype = int)
norm = mpl.colors.BoundaryNorm(bounds, cmap.N)

pixelsN = np.zeros((img.shape[0] , img.shape[1]))
for i in range(DIM):
    mappa = RGB_autoencoder[:,:,i]
    ind = np.argwhere(RGB_autoencoder[:,:,i] != 0)
    pixelsN[ind[:,0] , ind[:,1]] = i

for i in range(img.shape[0]):
    for j in range(img.shape[1]):
        if img[i,j,0] == 65535:
            pixelsN[i,j] = -1

fig , ax = plt.subplots(1,2)
ax[0].imshow( rgbs[ : , : , [I,J,K] ] )
im = ax[1].imshow(pixelsN , cmap = cmap)
plt.colorbar(im , ticks=bounds , norm = norm , boundaries=bounds)
ax[0].axis('off')
ax[1].axis('off')
plt.show()

10 (394890, 10)
58 57 56


In [67]:
import math

n_plots = DIM + 1  # 1 for false color + DIM neurons
n_cols = math.ceil(math.sqrt(n_plots))
n_rows = math.ceil(n_plots / n_cols)

fig, axes = plt.subplots(n_rows, n_cols, figsize=(3*n_cols, 3*n_rows))
axes = axes.flatten()  # Flatten to use as a 1D list

# Plot false color image
axes[0].imshow(rgbs[:, :, [I, J, K]])
axes[0].axis('off')
axes[0].set_title('False color')

# Plot each neuron's activation map
for j in range(DIM):
    axes[j+1].imshow(RGB_autoencoder[:, :, j], cmap='viridis')
    axes[j+1].axis('off')
    axes[j+1].set_title('Neuron ' + str(j+1))

# Hide unused axes if any
for i in range(n_plots, len(axes)):
    axes[i].axis('off')

plt.show()


In [68]:
from matplotlib.ticker import StrMethodFormatter
w1 , w2 = 400,2600
A = find_nearest(wavelength , w1)
B = find_nearest(wavelength , w2)
wav = wavelength[A:B]

means = []
stds = []

k , l = 0 , 0
fig , ax = plt.subplots(2,DIM , figsize = [5,5] , sharey='row')
for i in range(DIM):
    
    i_n = np.argwhere(RGB_autoencoder[:,:,i] == 1)

    
    S = np.array(img[:,:,:])[i_n[:,0],i_n[:,1],A:B]
    
    if 65535 in S:
        S = np.delete(S , np.argwhere(S == np.ones(B-A , dtype = float)*65535.0) , axis = 0)
    
    s , se = np.mean(S , axis = 0) , np.std(S , axis = 0)
    
    means.append(s)
    stds.append(se)
    
for i in range(DIM):
    ax[0,i].plot(wav , means[i] , 'k')
    ax[0,i].plot(wav , means[i]+stds[i] ,'k--')
    ax[0,i].plot(wav , means[i]-stds[i] ,'k--')
    ax[0,i].fill_between(wav , means[i]-stds[i] , means[i]+stds[i] , color = 'black' , alpha = 0.5)
    ax[0,i].set_title('Neuron '+str(i+1))
    
    ax[1,i].imshow(RGB_autoencoder[:,:,i])
    ax[1,i].set_xticks([])
    ax[1,i].set_yticks([])
    ax[0,i].set_xlabel('Wavelength [nm]')
        
ax[0,0].set_ylabel('Reflectance')

#plt.tight_layout()
plt.show()