In [29]:
import scipy.io as sio
import yaml
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt

from torch import nn
from typing import Dict, List
import torchvision.transforms as T

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import Compose, ToTensor
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline, make_pipeline
from torchinfo import summary

from sklearn.metrics import accuracy_score
import seaborn as sns

import utils as u
import RPNetRFextractor as rp

from sklearn.decomposition import PCA
from sklearn.svm import SVC, LinearSVC


from sklearn.model_selection import train_test_split

configuration = yaml.safe_load(open('config.yaml'))
data_path= configuration["data"]
parameters = configuration["parameters"]


# Build data tools

In [2]:
# create data tools : data loader and data set

class HSI_Dataset(Dataset):

    def __init__(self, X, y, transform=None, scale=True):
        """ X is a FLATTENED HSI volume dataset, y is a 1D FLATTENED tensor with the ground truth"""
        super().__init__()
        if scale:
            x =torch.tensor(X, dtype=torch.float32)
            m = torch.mean(torch.tensor(X), axis=0)
            self.X = x/x.max()


        else:
            self.X = torch.tensor(X, dtype=torch.float32) # should be transformed to tensors + normalized
        self.y = torch.tensor(y, dtype=torch.float32) # should be transformed to tensor + normalized
        self.transform = transform

        assert X.shape[0] == len(y)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, i):
        if self.transform is None:
            return self.X[i], self.y[i]
        else:
            return self.transform(self.X[i]), self.y[i]

    
    # def __getitems__(self, index_list):
    #     # Implement batch loading of items here
    #     return [(self.X[idx], self.y[idx]) for idx in index_list]


In [3]:
data = u.load_data("KSC")
X, y = data

In [108]:
train_data, test_data, unsup_data = u.data_split(data[1])

X_train, y_train = X[train_data + unsup_data], y[train_data + unsup_data]
X_test, y_test = X[test_data], y[test_data]

print(X_train.shape, y_train.shape)

(313325, 176) (313325,)


In [109]:

# train_ds = HSI_Dataset(X_train, y_train, transform=in_transform)
# test_ds = HSI_Dataset(X_test, y_test, transform=in_transform)

train_ds = HSI_Dataset(X_train, y_train)
test_ds = HSI_Dataset(X_test, y_test)


train_dl = DataLoader(train_ds, batch_size=1000, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=1000, shuffle=False)

# Auto encoder based

In [110]:

# Auto-encoders
# https://www.jeremyjordan.me/autoencoders/


class Encoder(nn.Module):
    def __init__(self,n_in: int,
                  n_out: int,
                  n_hidden: List[int] ):
        
        super().__init__()
        self.n_in = n_in
        self.n_out = n_out
        self.n_hidden= n_hidden

        in_list= [n_in] + n_hidden
        out_list= n_hidden + [n_out]

        self.encoder= nn.Sequential(
            *[nn.Sequential(nn.Linear(x, y), nn.ReLU()) for x,y in zip(in_list, out_list)]
        )

    def forward(self, x: torch.Tensor):
        return self.encoder(x)
    
class Decoder(nn.Module):
    def __init__(self,n_in: int,
                  n_out: int,
                  n_hidden: List[int]):
        super().__init__()
        self.n_in = n_in
        self.n_out = n_out
        self.n_hidden= n_hidden

        in_list= [n_in] + n_hidden
        out_list= n_hidden + [n_out]

        self.decoder= nn.Sequential(
            *[nn.Sequential(nn.ReLU(), nn.Linear(x, y)) for x,y in zip(in_list, out_list)]
        )

    def forward(self, x: torch.Tensor):
        return self.decoder(x)
    
class AE(nn.Module):
    def __init__(self, n_in: int, 
                 bottleneck: int, 
                 n_hidden_encoder: List[int], 
                 n_hidden_decoder: List[int] = None):
        super().__init__()
        self.n_in = n_in
        self.bottleneck = bottleneck
        self.n_hidden_encoder=n_hidden_encoder
        if n_hidden_decoder is None:
            # basecase : symetrical
            self.n_hidden_decoder= n_hidden_encoder[::-1]
        else:
            self.n_hidden_decoder= n_hidden_decoder

        self.encoder = Encoder(n_in, bottleneck, n_hidden_encoder)
        self.decoder = Decoder(bottleneck, n_in, self.n_hidden_decoder)

    def forward(self, x: torch.Tensor):
        return(self.decoder(self.encoder(x)))
    
    def encode(self, x: torch.Tensor):
        return(self.encoder(x))
    
    def decode(self, y: torch.Tensor):
        return(self.decode(y))


# Training

In [116]:
model = AE(176, 10, n_hidden_encoder=[200, 75])
loss_fn = nn.MSELoss()

optimizer = torch.optim.Adam(params=model.parameters(), lr=0.005)

In [135]:
EPOCH =10

epoch_train_loss, epoch_test_loss = [], []

for i in range(EPOCH):
    print(f"Start epoch {i+1}")
    running_loss=0
            
    ### Training
    model.train()
    for x, _ in iter(train_dl):
        # print(x.shape)
        x2 = model(x)
        loss = loss_fn(x, x2)

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backwards
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        running_loss += loss
    
    running_loss = running_loss/len(train_dl)
    epoch_train_loss.append(running_loss)
    print(1000*running_loss.item())

    
        
    # ### Testing
    # model.eval()
    # with torch.inference_mode():
    #     # 1. Forward pass
    #     x2 = 
    #     test_pred = torch.round(torch.sigmoid(test_logits))
    #     # 2. Caculate loss/accuracy
    #     test_loss = loss_fn(test_logits,
    #                         y_test)
    #     test_acc = accuracy_fn(y_true=y_test,
    #                            y_pred=test_pred)


Start epoch 1
0.8828024729155004
Start epoch 2
0.8817700436338782
Start epoch 3
0.8802605443634093
Start epoch 4
0.8802353404462337
Start epoch 5
0.8753378642722964
Start epoch 6
0.8798339404165745
Start epoch 7
0.8780044154264033
Start epoch 8
0.8713873685337603
Start epoch 9
0.8755896706134081
Start epoch 10
0.8678772137500346


In [30]:
L, H, W = X.shape

ds = X.reshape(L*H, W).std(axis=1)

In [31]:
u, v = np.unique(ds[ds>1000], return_counts=True)

In [32]:
sns.histplot(X.mean(axis=0))

<Axes: ylabel='Count'>

Error in callback <function flush_figures at 0x16a087880> (for post_execute), with arguments args (),kwargs {}:


KeyboardInterrupt: 

In [19]:
v

array([    1,     7,     1,     8,     4,    92,   237,    41,   708,
        1356,  1650,   269,  4951, 15085])

In [152]:


sns.histplot(ds[ds>1000])

KeyboardInterrupt: 

In [5]:
nn.Sequential(*[nn.Linear(10, 15), nn.ReLU(), nn.Linear(15, 17)])

Sequential(
  (0): Linear(in_features=10, out_features=15, bias=True)
  (1): ReLU()
  (2): Linear(in_features=15, out_features=17, bias=True)
)

# Tesing the 1D CNN approach

# 2D CNN approach

# GANS