

### INITIAL



In [1]:
%%capture

! git clone https://github.com/artsousa/HyperSI
! pip install -r HyperSI/requirements.txt

In [2]:
import sys
import pprint
import importlib
import numpy as np
import seaborn as sns
from google.colab import drive

repo_path = '/content/HyperSI'
sys.path.insert(0, repo_path)

from source import (
    utils,
    pipeline,
    hsiroutine,
    sample
)


importlib.reload(utils)
importlib.reload(sample)
importlib.reload(pipeline)
importlib.reload(hsiroutine)

drive.mount('/content/drive')

Mounted at /content/drive


### Paths

In [None]:
# import spectral as sp
# sp.settings.envi_support_nonlowecase_params = True

# teste = sp.open_image('/content/drive/Shareddrives/HSI_Fungos/Carrapatos/data/Fungo_Manisopliae_IP119_1_221221-163308/capture/Fungo_Manisopliae_IP119_1_221221-163308.hdr')

#### Preprocess

In [None]:
import os
import spectral as sp

samples_names = {}

data_ = "/content/drive/MyDrive/DATA/FUNGOS/data/"
key = "/content/drive/MyDrive/DATA/FUNGOS/data/Candida_albicans_48h_210716-114834/"
sample_name = key.split('/')[-2]

print(sample_name)
image = sp.open_image(os.path.join(data_, sample_name, 'capture' , sample_name + '.hdr'))

sample = image.load()

sample.shape

Candida_albicans_48h_210716-114834


(1423, 320, 256)

In [None]:
# idx = np.array([1, 2, 2, 2, 1, 1, 1, 1])
# print(hsiroutine.HsiRoutine.realIdx(idx, 2))

# ind, rm = hsiroutine.HsiRoutine.sum_idx_array(hsiroutine.HsiRoutine.realIdx(idx, 2))
# hsiroutine.HsiRoutine.rev_idx_array(ind, rm)

# teste = np.arange(24).reshape(3, 4, -1)

# print(teste[:, :, 0])
# print(teste[:, :, 1])
# print(teste.shape, '\n\n', )

# teste = teste.transpose(2, 0, 1)
# print(teste, teste.shape, '\n\n', )

# teste = teste.transpose(1, 2, 0)
# print(teste[:, :, 0])
# print(teste[:, :, 1])
# print(teste.shape, '\n\n', )

# teste = teste.transpose(2, 0, 1)
# teste_matrix = teste.T.reshape((teste.shape[1] * teste.shape[2], teste.shape[0]), order='F')
# teste_matrix

#### Functions and classes

In [4]:
import os
import sys
import cv2
import glob
import time
import torch
import traceback
import numpy as np
import pickle as pkl


from typing import Dict, Any
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
from sklearn.preprocessing import MinMaxScaler

from torch.utils.data import Dataset, DataLoader

def cluster_image(
    hypercube: np.array, 
    pipeline: pipeline.HsiPipeline, 
    sample_name: str,
    **kwargs
):

    matrix = pipeline._signal_filter(
        cube=hypercube, 
        order=kwargs['golay_order'], 
        window=kwargs['golay_window'], 
        dv=kwargs['golay_deriv'])
    
    print('P--/ --', end='')

    rows, cols = hypercube.shape[1:]
    cube = pipeline.routine.matrix2hsi(matrix, rows, cols)

    idx = pipeline.routine.removeBg(
        matrix[:, 100:250], pcs=kwargs['pcs'], k_clusters=kwargs['k_clusters']
    ) + 1

    print('R--/')

    image = cube[150, :, :]

    out_1 = pipeline.routine.getCluster(image, idx, 1, (1, 0, 0))
    out_2 = pipeline.routine.getCluster(image, idx, 2, (0, 0, 1))

    figure(figsize=(6, 6), dpi=100)
    # plt.subplot(1, 2, 1)
    plt.imshow(out_1)
    plt.axis('off')
    plt.show()

    figure(figsize=(6, 6), dpi=100)
    # plt.subplot(1, 2, 2)
    plt.imshow(out_2)
    plt.axis('off')
    plt.show()

    cluster = input('which image represents the sample? 1 or 2')
    plt.close('all')

    ind, rm = pipeline.routine.sum_idx_array(
        pipeline.routine.realIdx(idx, int(cluster))
    )
    
    sample_cluster = pipeline.routine.rev_idx_array(ind, rm)
    out_1 = pipeline.routine.getCluster(image, sample_cluster, 0, (1, 1, 1))
    
    figure(figsize=(6, 6), dpi=100)
    plt.imshow(pipeline.routine.rgbscale(out_1))
    plt.axis('off')
    plt.savefig(f"{sample_name}_bgremoved.png")
    
    plt.close()
    
    return sample_cluster


class FungusDataset(Dataset):
    def __init__(self, folder: str, 
                 samples: Dict, 
                 task: int = 0,
                 load_all: bool = True,
                 dest_folder: str = '',
                 first_time: bool = True,
                 process_images: bool = False,
                 **kwargs):

        self.data = {}
        self.task = task
        self.folder = folder
        self.samples = samples
        self.dest_folder = dest_folder
        self.pipeline = pipeline.HsiPipeline(data_folder=folder, samples=samples)

        self.__get_wavelength()
        
        if load_all:
            if first_time:
                self.__get_samples()
            elif process_images:
                self.__load_pkl(
                    preprocess_=True, method='kmeans', **kwargs
                )

    def __len__(self):
        return len(list(self.samples.keys()))

    def __getitem__(self, sample_name):
        return utils.Utils.load_hsisample(
            path=self.folder, name=sample_name, folder='capture'
        )

    def __show_sample(self, sample_name: str, save: bool = False):
        
        try:
            print(f"Sample: {sample_name} --", end='')
            fungus = utils.Utils.load_hsisample(
                path=self.folder, name=sample_name, folder='capture'
            )
            
            print('L--/ --', end='')

            image = fungus.normalized[150, :, :]

            rows, cols = fungus.normalized.shape[1:]
            image = self.pipeline.routine.getCluster(
                image, fungus.sample_cluster, 0, (1, 1, 1)
            )

            print(f'P--/ --{rows * cols}--/ --{(rows, cols)}--')
            figure(figsize=(6, 6), dpi=100)
            plt.imshow(self.pipeline.routine.rgbscale(image))
            plt.axis('off')

            if save:
                plt.savefig(f"{sample_name}_vis.png")

            plt.show()

        except Exception as e:
            traceback.print_tb(e.__traceback__)
            print(f"Sample {sample_name} not loaded... \n {e}")


    def visualize_images(self,):

        for hsisample in self.pipeline.samples:
            sample_name = hsisample.split("/")[-2]

            self.__show_sample(sample_name, save=False)


    def __load_pkl(self, preprocess_: bool = True, 
                   method: str = 'kmeans', **kwargs):

        for hsisample in self.pipeline.samples:
            
            sample_name = hsisample.split("/")[-2]
            print(f"Sample: {sample_name} --", end='')

            try:
                fungus = utils.Utils.load_hsisample(
                    path=self.folder, name=sample_name, folder='capture'
                )

                print(f"L--/ --", end='')
                if preprocess_:
                    fungus.sample_cluster = cluster_image(
                        fungus.normalized, 
                        self.pipeline, 
                        self.dest_folder + sample_name, **kwargs
                    )

                    fungus.save()
                    del fungus

                # image = fungus.normalized[150, :, :]
                
                # print(f"\n\nSample: {sample_name} {type(fungus)} -- " + 
                #       f"{fungus.normalized.shape} -- {image.shape}")
                
                # figure(figsize=(6, 6), dpi=100)
                # plt.imshow(image, cmap='gray')
                # plt.axis('off')
                # plt.show()

            except Exception as e:
                traceback.print_tb(e.__traceback__)
                print(f"Sample {sample_name} not loaded... \n {e}")

        return

    def __get_samples(self,):

        print('Loading samples...')
        for hsisample in self.pipeline.samples:
            sample_name = hsisample.split("/")[-2]

            try:
                
                st_time = time.time()
                fungus = sample.Sample(
                    self.pipeline.folder, sample_name
                )
                darkref = sample.Sample(
                    self.pipeline.folder, sample_name, sample_prefix="DARKREF_"
                )
                whiteref = sample.Sample(
                    self.pipeline.folder, sample_name, sample_prefix="WHITEREF_"
                )

                fungus.normalized = self.pipeline.routine.raw2mat(
                    image=fungus, dark=darkref, white=whiteref, inplace=False
                )

                fungus.image = None
                fungus.save()

                print(f"Sample: {sample_name} -- {time.time() - st_time}s --")

            except Exception as e:
                print(f"Sample {sample_name} not loaded... \n {e}")
            
        print('Done...')

        return

    def __get_wavelength(self):
        self.wavelength = {}

        for hsisample in self.pipeline.samples:
            sample_name = hsisample.split("/")[-2]

            path = os.path.join(
                self.pipeline.folder, sample_name, "capture", sample_name + ".hdr"
            )

            self.wavelength[sample_name] = self.pipeline.routine.get_wavelength(
                folder=self.folder, sample=sample_name, spectral_range=(0, -1)
            )

        unique = True
        original = 0
        for key in list(self.wavelength.keys()):
            if type(original) is int:
                original = self.wavelength[key]

            if sum((original - self.wavelength[key])[0]):
                unique = False
                print('not all wavelength are the same')
                break

        self.wavelength = original

#### load .raw samples

In [5]:
data_path = "/content/drive/Shareddrives/HSI_Fungos/Fungos/data"
# data_path = '/content/drive/Shareddrives/HSI_Fungos/Carrapatos/data'

fungus_samples = glob.glob(data_path + "/*/")
env_sample = [name for i, name in enumerate(fungus_samples) if "Meio" in name]
fungus_samples.remove(env_sample[0])

In [6]:
# ------------------------------------------------------------------------------

# fungus_samples = {
#     f"{data_path}/Aspergillus_niger_14d_210716-114349/": [1],
#     f"{data_path}/Candida_albicans_48h_210716-114834/": [2],
#     f"{data_path}/Aspergillus_niger_30d_210716-114543/": [3],
#     f"{data_path}/Aspergillus_niger_48h_210716-113748/": [4],
#     f"{data_path}/Aspergillus_niger_30d_210716-114626/": [5],
#     f"{data_path}/Aspergillus_niger_96h_210716-114013/": [6],
#     f"{data_path}/Aspergillus_terreus_48h_210716-112704/": [7],
#     f"{data_path}/Aspergillus_terreus_14d_210716-113405/": [8],
#     f"{data_path}/Aspergillus_terreus_30d_210716-113623/": [9],
#     f"{data_path}/Aspergillus_terreus_96h_210716-112952/": [10],
#     f"{data_path}/Candida_albicans_14d_210716-115055/": [11],
#     f"{data_path}/Candida_albicans_30d_210716-115154/": [12],
#     f"{data_path}/Candida_albicans_96h_210716-114946/": [13],
#     f"{data_path}/Fusarim_chlamydosporums_14d_210716-111105/": [14],
#     f"{data_path}/Fusarim_chlamydosporums_48h_210716-110338/": [15],
#     f"{data_path}/Fusarim_chlamydosporums_30d_210716-111246/": [16],
#     f"{data_path}/Fusarim_chlamydosporums_96h_210716-110832/": [17],
#     f"{data_path}/Penicillium_spp_30d_210716-112221/": [18],
#     f"{data_path}/Penicillium_spp_14d_210716-112032/": [19],
#     f"{data_path}/Penicillium_spp_48h_210716-111545/": [20],
#     f"{data_path}/Penicillium_spp_96h_210716-111758/": [21],
# }

fungus_samples = {sample: [i + 1] for i, sample in enumerate(fungus_samples)}
sample_names = [name.split("/")[-2] for name in list(fungus_samples.keys())]
sample_species = list(
    set(name.split("_")[0] + "_" + name.split("_")[1] for name in sample_names)
)
sample_species = [(group, i + 1) for i, group in enumerate(sample_species)]

# ------------------------------------------------------------------------------
# Training config
# Maturation time: 48h, 96h, 14d, 30d

training_cfg = {'val': '48h',
                'test': '96h'}

for key in list(fungus_samples.keys()): ## group by samples
    for group in sample_species:
        if group[0] in key:
            fungus_samples[key].append(group[1])

            # Define training and val samples
            for cfg in list(training_cfg.keys()):
                if training_cfg[cfg] in key:
                    if cfg == 'val':
                        res_ = 1
                        break
                    elif cfg == 'test':
                        res_ = 2
                        break 
                else:
                    res_ = 0

            fungus_samples[key].append(res_)

            break

# ------------------------------------------------------------------------------

preprocess_args = {'pcs': 2,
                   'k_clusters': 2,
                   'golay_order': 2,
                   'golay_deriv': 1,
                   'golay_window': 25}

# for key in list(fungus_samples.keys()):
#     for group in [sample_species[0]]:
#         if group[1] == fungus_samples[key][1]:
#             print(key, fungus_samples[key])
#             break
        # print('\n\n')

In [7]:
dataset = FungusDataset(
    folder=data_path, samples=fungus_samples,
    load_all=False, 
    first_time=False,
    dest_folder='/content/figures/',
    process_images=False,
    **preprocess_args
)

In [8]:
# dataset.samples ## dict per task
# cube.normalized ## hypercube complete
# cube.sample_cluster ## index of sample pixels

In [9]:
from pprint import pprint
from typing import List
import matplotlib
import warnings

from sklearn.model_selection import train_test_split


def plot_matrix(
    dataset: Dataset,
    matrix: np.array,
    desc: str,
    desc_size: np.array,
    group: int,
    fig: matplotlib.figure.Figure,
    axes: matplotlib.axes.Axes,
):

    plotargs = {
        "label": desc,
        "linewidth": 2,
        # 'color': dataset.pipeline.utils.colors[str(group)]
    }
    axes.set_ylabel("Pseudo Absorbance")
    axes.set_xlabel("Wavelength (nm)")
    axes.set_xlabel(axes.get_xlabel(), dataset.pipeline.properties["fontproperties"])
    axes.set_ylabel(axes.get_ylabel(), dataset.pipeline.properties["fontproperties"])

    x_axis = dataset.wavelength
    ind = np.linspace(0, x_axis.shape[1] - 1, num=5, dtype=int)

    axes.set_xticks(ind)
    axes.set_xticklabels(x_axis[0, ind])

    mean_matrix = np.mean(matrix, axis=0)

    return axes.plot(np.arange(mean_matrix.shape[0]), mean_matrix, **plotargs)


def plot_samples(dataset: Dataset, task: int, mean_group: bool, **kwargs):
    groups = get_group(dataset, task)

    # ------ Handle plots
    # plots = []
    # fig, axes = plt.subplots(figsize=(12, 6), dpi=1000)
    # plt.rcParams.update({'font.size': 10, 'legend.frameon': False})

    # for key in list(groups.keys()):
    #     print(f"group_key: {key} group: {groups[key]} \n")

    #     group_data, descs = group_samples(group=groups[key], dataset=dataset)
    #     print(f"shape: {group_data.shape}, description: {descs}")

    #     for desc in descs:
    #         if not mean_group:
    #             plots.append(plot_matrix(
    #                 dataset, group_data[desc[1][0]:desc[1][1] + 1],
    #                 desc[0], desc[1], key, fig, axes
    #             ))
    #         else:
    #             plots.append(plot_matrix(
    #                 dataset, group_data,
    #                 desc[0], np.array([]), key, fig, axes
    #             ))

    #         axes.legend(loc='center left',
    #                     bbox_to_anchor=(1, 0.5),
    #                     prop={'size': 12,})

    #     break

    # ------ Handle plots

    # plot_matrix(dataset, group, samples)

    return


def group_samples_plot(group: Dict, dataset: Dataset):

    samples = []
    data_group = np.array([])
    for hsisample in group:

        sample_name = hsisample[1].split("/")[-2]

        sample_data = dataset[sample_name]
        ind, _ = dataset.pipeline.routine.sum_idx_array(
            dataset.pipeline.routine.realIdx(sample_data.sample_cluster, 1)
        )

        ind, _ = train_test_split(ind, test_size=0.7, shuffle=True)

        matrix = dataset.pipeline.routine.hsi2matrix(sample_data.normalized)
        matrix = dataset.pipeline.routine.normalize_mean(matrix[ind, :])

        if not data_group.shape[0]:
            data_group = np.array([]).reshape(0, matrix.shape[1])

        samples.append(
            (sample_name, data_group.shape[0] + np.array([0, matrix.shape[0] - 1]))
        )
        data_group = np.concatenate([data_group, matrix], axis=0)

    return data_group, samples


def get_group(dataset: Dataset, task: int):

    groups = {}
    samples = dataset.samples
    for hsisample in list(samples.keys()):
        sample_name = "_".join(hsisample.split("/")[-2].split("_")[0:3])

        if samples[hsisample][task] not in list(groups.keys()):
            groups[samples[hsisample][task]] = [(sample_name, hsisample)]
        else:
            groups[samples[hsisample][task]].append((sample_name, hsisample))

    return groups


import pandas as pd


def groups2dataframe(groups: Dict, dataset: Dataset):

    gp_df = pd.DataFrame()

    for key in dataset.samples:  # dict keys
        line = {}
        columns = ["sample"] + [str(i) for i in range(len(dataset.samples[key]))]

        for column in columns:
            if column == "sample":
                line[column] = key
            else:
                line[column] = int(dataset.samples[key][int(column)])

        gp_df = gp_df.append(line, ignore_index=True)

    return gp_df


def _get_sample(
    sample_name: str,
    dataset: Dataset,
    preprocess: bool = False,
    sub_sampling: float = 0.3,
):

    sample_data = dataset[sample_name]
    ind, _ = dataset.pipeline.routine.sum_idx_array(
        dataset.pipeline.routine.realIdx(sample_data.sample_cluster, 1)
    )

    if sub_sampling > 0.0:
        ind, _ = train_test_split(ind, test_size=1 - sub_sampling, shuffle=True)

    matrix = dataset.pipeline.routine.hsi2matrix(sample_data.normalized)

    if preprocess:
        pass
    else:
        matrix = dataset.pipeline.routine.normalize_mean(matrix[ind, :])

    return matrix


def get_group_samples(group: List, output: int, sf: float):

    group_matrix = None

    for hsisample in group:
        sample_name = hsisample.split("/")[-2]
        matrix = _get_sample(sample_name, dataset, preprocess=False, sub_sampling=sf)

        if not (type(group_matrix) == np.ndarray):
            group_matrix = matrix
        else:
            group_matrix = np.concatenate([group_matrix, matrix], axis=0)

    return group_matrix, np.ones(group_matrix.shape[0], dtype=int) * output


def split_train_val(dataset: Dataset, task: int, split: int = -1, **kwargs):

    train, val, test = [], [], []
    groups_dict = get_group(dataset, task)
    groups_dframe = groups2dataframe(groups_dict, dataset)

    for gk in list(groups_dict.keys()):
        task_df = groups_dframe[groups_dframe[str(task)] == float(gk)]

        sets = task_df[str(split)].unique()
        split_df = None
        if split > 0:
            split_df = {
                "train": list(task_df[task_df[str(split)] == 0.0]["sample"].values),
                "teste": list(task_df[task_df[str(split)] == 2.0]["sample"].values),
            }

            if 1.0 in sets:
                split_df["valid"] = list(task_df[task_df[str(split)] == 1.0]["sample"].values),
                X_valid, y_valid = get_group_samples(split_df["valid"][0], gk, sf=0.2)
                val.append((X_valid, y_valid))

            X_train, y_train = get_group_samples(split_df["train"], gk, sf=0.2)        
            X_teste, y_teste = get_group_samples(split_df["teste"], gk, sf=0.2)
        
        test.append((X_teste, y_teste))
        train.append((X_train, y_train))

    def _concatenate(
        groups: List[np.array],
    ):
        group_y = None
        group_matrix = None

        for group in groups:

            if not (type(group_matrix) == np.ndarray):
                group_y = group[1]
                group_matrix = group[0]
            else:
                group_y = np.concatenate([group_y, group[1]], axis=0)
                group_matrix = np.concatenate([group_matrix, group[0]], axis=0)

        return group_matrix, group_y

    return _concatenate(train), _concatenate(val) if len(val) > 0 else (), _concatenate(test)


# -----------------------------------------------------------------------------

train, vall, test = split_train_val(dataset=dataset, task=1, split=2, **preprocess_args)


In [None]:
from sklearn import (
    svm,
    discriminant_analysis,
)
import seaborn as sns
from sklearn.utils import shuffle
from sklearn.metrics import classification_report


seed = 42
models = [
    # discriminant_analysis.LinearDiscriminantAnalysis(
    #     covariance_estimator=None,
    #     n_components=None,
    #     priors=None, shrinkage=None,
    #     solver='svd',store_covariance=False,
    #     tol=0.0001)
    svm.SVC(decision_function_shape="ovo", kernel="rbf", verbose=True, max_iter=-1)
]

for model in models:
    x_test, y_test = shuffle(test[0], test[1], random_state=42)
    x_valid, y_valid = shuffle(vall[0], vall[1], random_state=42)
    x_train, y_train = shuffle(train[0], train[1], random_state=42)

    cls = model.fit(x_train, y_train)

    pred_valid = cls.predict(x_valid)
    pred_teste = cls.predict(x_test)

    print("Validation: ")
    print(classification_report(y_valid, pred_valid), "\n\n")

    print("Test Set: ")
    print(classification_report(y_test, pred_teste), "\n\n")


[LibSVM]

In [None]:
pred_teste = cls.predict(x_test)
display(classification_report(y_test, pred_teste))

In [None]:
# for key in fungus_samples:
#     sample_name = key.split('/')[-2]
    
#     pkl_file = os.path.join(key, 'capture', sample_name + '.pkl')
#     if os.path.exists(pkl_file):
#         os.remove(pkl_file)

In [None]:
fungus.image

	Data Source:   '/content/drive/MyDrive/DATA/FUNGOS/data/Aspergillus_niger_14d_210716-114349/capture/Aspergillus_niger_14d_210716-114349.raw'
	# Rows:           1424
	# Samples:         320
	# Bands:           256
	Interleave:        BIL
	Quantization:  16 bits
	Data format:    uint16