# HSI Library チュートリアル
### 目次
1. indian-pinesをダウンロード
2. ライブラリをダウンロード
3. ライブラリを利用したバンド削減
4. DeepLearningモデルの訓練

## 1. indian-pinesをダウンロード
まずチュートリアルに使用するindian-pinesのデータをダウンロードします。

In [None]:
# 保存先を指定
WORKDIR='./data'

# データのダウンロード
!wget https://www.ehu.eus/ccwintco/uploads/2/22/Indian_pines.mat -P $WORKDIR/indian_pines/ # data
!wget https://www.ehu.eus/ccwintco/uploads/c/c4/Indian_pines_gt.mat -P $WORKDIR/indian_pines/ # ground truth

.matファイルでダウンロードされたデータを、このライブラリで扱うために.npyファイルへ変換します

In [None]:
import glob
import os
import pathlib
import numpy as np
from scipy.io import loadmat

def mat2npy(data_path):
    print(data_path)
    local, ext = os.path.splitext(data_path)
    if ext != ".mat":
        print(f'{ext=}')
        return
    
    data = loadmat(data_path)
    print(data_path)
    for key in data:
        if isinstance(data[key], np.ndarray):
            print(data[key].shape)
            print(np.unique(data[key]))
            save_path = os.path.join(
                local,
            )
            np.save(save_path, data[key]) # npyに変換して保存

# データのロード
data_paths = [f'{WORKDIR}/indian_pines/Indian_pines.mat', f'{WORKDIR}/indian_pines/indian_pines_gt.mat']
for data in data_paths:
    mat2npy(data) 
            

## 2.ライブラリをダウンロード
続いてライブラリをインストールします。

In [None]:
# ryeを利用している場合
!rye add hsi_feature_extraction --git https://github.com/SuperHotDogCat/HSI-library.git 

In [None]:
# pipの場合
%pip install https://github.com/SuperHotDogCat/HSI-library.git 

## 3. ライブラリを利用したバンド削減
実際にライブラリを使用して、HSIのバンド数を削減するアルゴリズムを実行します。
ここでは、セレクションメソッドの一つである**VIF**を例に上げて説明します。

In [None]:
# データのあるディレクトリ
# WORKDIR='./data' 

import glob
import os
import pathlib
import numpy as np

# データのロード
data_path = f'{WORKDIR}/indian_pines/Indian_pines.npy'
label_path = f'{WORKDIR}/indian_pines/Indian_pines_gt.npy'

# ライブラリでは（batch, channels, height, widt)の形でデータが渡されることを前提としているので、それに合わせてトランスポーズする。
data = np.load(data_path)[np.newaxis,:32,:32,:].transpose([0,3,1,2])

print(f'アルゴリズム適用前のシェイプ: {data.shape}')

# ライブラリからVIFExtractorをインポート
from hsi_feature_extraction.selection.vif import VIFExtractor
processor = VIFExtractor()

# VIFの学習
processor.fit(data)

# 学習したVIFの適用
processed_data = processor(data)

print(f'削減後のシェイプ: {processed_data.shape}')


## 4. 削減ライブラリを利用したDeepLearningモデルの訓練
ライブラリを用いたDeeplerarningモデルの学習の例を示します。 
基本は普通の場合と変わりませんが、学習の前にバンド削減アルゴリズムをfitし、学習時にはそれを適用する必要があります。



In [None]:
from typing import Optional
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
import segmentation_models_pytorch as smp
import pytorch_lightning as pl

from hsi_feature_extraction.utils.utils import sampling_square_image
from hsi_feature_extraction.utils.data import (
    do_nothing,
    do_nothing_sampling,
    HSIDataWithSampling,
)

data_config = {
        "n_channels": 220,
        "n_classes": 17,
    }

# モデルとトレーニング、バリデーションの定義
class LightningModel(pl.LightningModule):
    def __init__(
        self,
        n_channels: int,
        n_classes: int,
        model_name: str = "unet",
    ):
        super().__init__()

        self.model = smp.Unet(
            in_channels=n_channels, classes=n_classes
        )
        self.n_classes = n_classes
        self.save_hyperparameters({"model_name": model_name})

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        """
        x: (batch_size, channels, height, width)
        label: (batch_size, height, width)
        """
        x, label = batch
        logits = self.model(x)
        loss = F.cross_entropy(logits, label)
        self.log("training_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        """
        x: (batch_size, channels, height, width)
        label: (batch_size, height, width)
        """
        x, label = batch
        logits = self.model(x)
        loss = F.cross_entropy(logits, label)
        self.log("validation_loss", loss)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(lr=1e-4, params=self.parameters())
        return optimizer

# dataクラスの作成 
class LightningHSIData(pl.LightningDataModule):
    def __init__(
        self,
        data_path: str,
        label_path: Optional[str] = None,
        sample_square_size: int = 64,
        sampler=do_nothing_sampling,
        transform=do_nothing,
        split_ratio=0.75,
    ):
        super().__init__()
        self.dataset = HSIDataWithSampling(
            data_path=data_path,
            label_path=label_path,
            sample_square_size=sample_square_size,
            sampler=sampler,
            transform=transform,
        )
        self.split_ratio = split_ratio

    def prepare_data(self):
        pass

    def prepare_data_per_node(self):
        pass

    def setup(self, stage: Optional[str] = None):
        dataset_size = len(self.dataset)
        indices = list(range(dataset_size))
        random.shuffle(indices)

        train_size = int(dataset_size * self.split_ratio)
        valid_size = dataset_size - train_size

        train_indices = indices[:train_size]
        valid_indices = indices[train_size : train_size + valid_size]

        self.train_dataset = Subset(self.dataset, train_indices)
        self.valid_dataset = Subset(self.dataset, valid_indices)
        del self.dataset

    def train_dataloader(self):
        return DataLoader(self.train_dataset)

    def val_dataloader(self):
        return DataLoader(self.valid_dataset)

# シェイプ変換
class ChannelTranspose(nn.Module):
    def forward(self, x):
        if len(x.shape) == 3:
            return x.transpose((2, 0, 1))
        return x.transpose((0, 3, 1, 2))

# ライブラリ適用
class Pipeline:
    def __init__(self, *args):
        self.transforms = args
    def __call__(self, x):
        for transform in self.transforms:
            x = transform(x)
        return x

# main
trainer = pl.Trainer(devices=1, max_epochs=3, enable_checkpointing=False)


processor = VIFExtractor()
data = np.load(data_path)[np.newaxis,:32,:32,:].transpose([0,3,1,2])
print(data.shape)
processor.fit(data) #バンド削減アルゴリズムの学習
del data

data_loader = LightningHSIData(
    data_path=data_path,
    label_path=label_path,
    sample_square_size=64,
    sampler=sampling_square_image,
    transform=Pipeline(
        ChannelTranspose(),
        processor #　バンド削減の適用
    ),
)
data_loader.setup()
model = LightningModel(processor.get_num_channels(), data_config["n_classes"])
trainer.fit(
    model,
    data_loader.train_dataloader(),
    data_loader.val_dataloader(),
)