<a href="https://colab.research.google.com/github/RejectHumanity/LIBS_library/blob/main/TFL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

import os
import pandas as pd
import numpy as np

%cd drive/My Drive/Colab Notebooks/Pavel/LIBS_library

Mounted at /content/drive
/content/drive/.shortcut-targets-by-id/1wxEeCnOnXIpjt5GzQ7lGh3mWPgTlIrUW/LIBS_library


# Data

In [None]:
from src.dataset_manager import DatasetManager

dm = DatasetManager('newFL', cache=False)

dim = (250, 153)
val_dim = (250, 41)
#s1_val = pd.read_hdf('./Outside_files/TFL_big_map.hdf', key='catalina_bigmap_small').astype('float16')
#s1, s2 = (s.astype('float16') for s in dm.load_datasets(['s1', 's2']))
#s1, s2 = (s.astype('float16') for s in dm.load_datasets(['s1_val', 's2_val']))
s1, s2 = dm.load_datasets(['s1_done', 's2_done'])
s1_val, s2_val = dm.load_datasets(['s1_val_done', 's2_val_done'])

debug1, debug2 = s1[:5], s2[:5]

### Preprocessing

In [None]:
if False:
  from sklearn.pipeline import Pipeline
  from src.preprocessing import Normalizer
  from src.preprocessing import match_wavelengths

  s1, s2 = match_wavelengths(s1, s2)

  pipe = Pipeline([('norm', Normalizer())])

  s1 = pipe.fit_transform(s1)
  s2 = pipe.transform(s2)

### Data check

In [None]:
from src.visualization import spectrometer_step_comparison 
from src.visualization import plot_spectra
from src.utils import spectra_pd_to_np
from src.visualization import plot_map
from src.map_utils import IndexType

display = False

In [None]:
if display:
  spectrometer_step_comparison([s1_val.columns, s2_val.columns], ['S1', 'S2']).show()

In [None]:
if display:
  plot_map(pd.DataFrame(s1), dim, index_type=IndexType.HORIZONTAL).show()
  plot_map(pd.DataFrame(s2), dim, index_type=IndexType.HORIZONTAL).show()

In [None]:
from src.preprocessing import match_wavelengths
if display:
  representatives = [200]

  plot_spectra(np.vstack([s1[representatives], s2[representatives]]), calibration = s1_val.columns, labels=['s1', 's2']).show()

In [None]:
if display:
  representatives = [250, 300, 500, 1000]
  plot_spectra(spectra_pd_to_np(representatives, pd.DataFrame(s2)), calibration = s1_val.columns).show()

In [None]:
if display:
  plot_spectra(np.vstack([s1.mean(axis=0), s2.mean(axis=0)]), calibration = s1_val.columns, labels=['s1', 's2']).show()

# Models - first iteration

In [None]:
from src.models.autoencoder import AutoEncoder, load_ae_model
from src.models.transfer_project import SimpleMLPRegressor
from src.models.base import load_model

# base class was moved
if False:
  retrain = False
  #layers=[1000, 400]
  if retrain:
    autoencoder = AutoEncoder(code_size = 8, layers=layers).fit(s1, s1, verbose=1)
  else:
    autoencoder = load_ae_model('./models/TFL_AE')

  if retrain:
    mlp = SimpleMLPRegressor(layers=layers).fit(s2, autoencoder.encoder_.predict(s1), verbose=1)
  else:
    mlp = load_model('./models/TFL-MLP')

### Model check

In [None]:
display = False

In [None]:
if display:
  representatives = [50, 250, 300]

  plot_spectra(
      np.vstack(
          (
              s1_test.iloc[representatives].to_numpy(),
              autoencoder.predict(s1_test.iloc[representatives]),
              autoencoder.decoder_.predict(mlp.predict(s2_test.iloc[representatives]))
          )
      ),
      calibration=s1_test.columns,
      labels=[label for label in ['original', 'autoencoder', 'mlp + decoder'] for _ in range(len(representatives))],
  ).show()

In [None]:
if display:
  plot_spectra(
      np.vstack(
          (
              s1_test.mean(),
              np.mean(autoencoder.predict(s1_test), axis=0),
              np.mean(autoencoder.decoder_.predict(mlp.predict(s2_test)), axis=0)
          )
      ),
       calibration=s1_test.columns,
       labels = ['original', 'autoencoder', 'mlp + decoder'],
  ).show()

# Models - second iteration

## Models

In [None]:
from src.models.base import NNRegressor

from tensorflow.keras.models import clone_model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras import Model
from tensorflow.keras.optimizers import Adam

from typing import Iterable, Optional


In [None]:
class PartialFitMixin:
  def partial_fit(self, X, y, n_fits=2, *args, **kwargs):
      step = len(X) // n_fits
      for i in range(n_fits - 1):
        self.fit(X[i * step : (i + 1) * step], y[i * step : (i + 1) * step], *args, **kwargs)
        tf.keras.backend.clear_session()
        gc.collect()
      self.fit(X[(n_fits - 1) * step:], y[(n_fits - 1) * step:], *args, **kwargs)
      return self

## Add to src

In [None]:
from keras.callbacks import Callback
import tensorflow as tf
import gc

class ClearMemory(Callback):
  def on_epoch_end(self, epoch, logs=None):
    tf.keras.backend.clear_session()
    gc.collect()

In [None]:
def baselined_scorer_from_loss(loss, baseline_loss):
  def score(y_true, y_pred):
      return 1 - loss(y_true, y_pred) / baseline_loss
  return score

In [None]:
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils import check_array, check_X_y
from sklearn.utils.validation import check_is_fitted

class Average_Dummy(BaseEstimator, RegressorMixin):

  def __init__(self, add_val:bool=False, *args, **kwargs):
      self.add_val = add_val
      super().__init__(*args, **kwargs)
    

  def fit(self, X, y, X_val=None, *args, **kwargs):
    X = check_array(X)
    y = check_array(y)
    if self.add_val:
      self.prediction_ = np.mean(np.vstack([X, X_val]), axis=0)
    else:
      self.prediction_ = np.mean(X, axis=0)
    return self
    

  def predict(self, X, *args, **kwargs):
    check_array(X)
    return np.tile(baseline.prediction_, (s1.shape[0], 1))


## Autoencoder

In [None]:
class Autoencoder(NNRegressor, PartialFitMixin):

  def __init__(self, layers:Iterable[int]=(200, 8, 200), regularizer=None, metrics=[], **kwargs):
    self.layers = layers
    self.regularizer = regularizer
    self.metrics = metrics
    super().__init__(**kwargs)


  def build(self):
    bottleneck = self.layers.index(min(self.layers))

    encoder_in = Input(shape=(self.n_features_in_,), name='encoder_in')
    x = Dense(self.layers[0], kernel_regularizer=self.regularizer)(encoder_in)
    x = tf.keras.layers.Activation('leaky_relu')(x)
    for nodes in self.layers[1:bottleneck]:
      x = Dense(nodes, kernel_regularizer=self.regularizer)(x)
      x = tf.keras.layers.Activation('leaky_relu')(x)
    x = Dense(self.layers[bottleneck], kernel_regularizer=self.regularizer, name='encoder_out')(x)
    x = tf.keras.layers.Activation('leaky_relu')(x)
    self.encoder_ = Model(encoder_in, x, name='encoder')

    decoder_in = Input(self.layers[bottleneck], name='decoder_in')
    x = Dense(self.layers[bottleneck + 1], kernel_regularizer=self.regularizer)(decoder_in)
    x = tf.keras.layers.Activation('leaky_relu')(x)
    for nodes in self.layers[bottleneck + 2:]:
      x = Dense(nodes, kernel_regularizer=self.regularizer)(x)
      x = tf.keras.layers.Activation('leaky_relu')(x)
    decoder_out = Dense(self.n_features_out_, activation='relu', kernel_regularizer=self.regularizer, name='decoder_out')(x)
    self.decoder_ =  Model(decoder_in, decoder_out, name="decoder") 

    autoencoder_in = Input(shape=(self.n_features_in_,), name='encoder_in')
    self.model_ = Model(autoencoder_in, self.decoder_(self.encoder_(autoencoder_in)), name="autoencoder")

    self.encoder_.compile(loss='mse', optimizer=Adam(), metrics=self.metrics, run_eagerly=True)
    self.decoder_.compile(loss='mse', optimizer=Adam(), metrics=self.metrics, run_eagerly=True)
    self.model_.compile(loss='mse', optimizer=Adam(), metrics=self.metrics, run_eagerly=True)

In [None]:
from tensorflow.keras.losses import MeanSquaredError

baseline = Average_Dummy().fit(s1, s1)
scorer = baselined_scorer_from_loss(MeanSquaredError(), MeanSquaredError()(s1, baseline.predict(s1).astype('float16')).numpy().astype('float32'))

if False:
  from tensorflow.keras import regularizers

  np.random.shuffle(s1)
  np.random.shuffle(s1_val)

  gc.collect()

  autoencoder = Autoencoder((128, 8, 128), metrics=[scorer], regularizer=regularizers.l2(1e-6))
  autoencoder.partial_fit(s1, s1, n_fits=3, n_iter=10, batch_size=16, callbacks=[ClearMemory()], X_val=s1_val, y_val=s1_val, verbose=1)
  autoencoder.save('./model/TFL_Autoencoder_reg_8_second_attempt')
else:
  from src.models.base import load_model
  autoencoder = load_model('./model/TFL_Autoencoder_reg_8', custom_objects = {"score": scorer})

## Inserter

In [None]:
class Inserter(NNRegressor, PartialFitMixin):

  def __init__(self, layers:Iterable[int]=(1000, 400), decoder=None, regularizer=None, encoder=None, metrics=[], **kwargs):
    """
    encoder used for initialization. layers get overwrridden by encoder.
    """
    self.layers = layers
    self.decoder = decoder
    self.regularizer = regularizer
    self.encoder = encoder
    self.metrics = metrics
    super().__init__(**kwargs)


  def build(self):
    if self.encoder is not None:
      layers = (layer.output_shape[1] for layer in self.encoder.layers if layer.trainable_weights)
    else:
      layers = self.layers
    input = Input(shape=(self.n_features_in_,), name='encoder_input')
    x = Dense(layers[0], kernel_regularizer=self.regularizer)(input)
    x = tf.keras.layers.Activation('leaky_relu')(x)
    for n in layers[1:]:
      x = Dense(n, kernel_regularizer=self.regularizer)(x)
      x = tf.keras.layers.Activation('leaky_relu')(x)
    x = Dense(self.n_features_out_, activation='relu', kernel_regularizer=self.regularizer, name='output')(x)

    if self.decoder is not None:
      # add decoder to the output
      self.encoder_ = Model(input, x, name="encoder")
      self.decoder_ = clone_model(self.decoder)
      # fix decoder (it should be trained by the Extractor model)
      self.decoder_.trainable = False
      encoded = self.encoder_(input)
      decoded = self.decoder_(encoded)
      input = Input(shape=(self.n_features_in_,), name='input')
      self.model_ = Model(input, decoded, name="Inserter")
      self.encoder_.compile(loss=self.loss, optimizer=Adam())
      self.decoder_.compile(loss=self.loss, optimizer=Adam())
    else:
      self.model_ = Model(input, x, name='Inserter')

    self.model_.compile(loss='mse', optimizer=Adam(), metrics=self.metrics)
    if self.encoder is not None:
      self.model_.set_weights(self.encoder.get_weights())

In [None]:
def make_weird_scorer(scorer):
  def score(y_true, y_pred):
    pass
  return score

In [None]:
from tensorflow.keras.losses import cosine_similarity
if False:
  s1 = auto.encoder_.predict(s1)
  s1_val = auto.encoder_.predict(s1_val)

  p = np.random.permutation(len(s1))
  s1, s2 = s1[p], s2[p]

  p = np.random.permutation(len(s1_val))
  s1_val, s2_val = s1_val[p], s2_val[p]

  gc.collect()
  
  insert = Inserter(layers=[256, 128], metrics=[cosine_similarity])
  insert.partial_fit(s2, s1, n_fits=3, n_iter=10, batch_size=16, callbacks=[ClearMemory()], X_val=s2_val, y_val=s1_val, verbose=1)
  insert.save('./model/TFL_Inserter')
else:
  insert = load_model('./model/TFL_Inserter')

#Visualization

## Sample Spectra

In [None]:
from enum import Enum, auto
from typing import TypeVar, Tuple, Iterable, Optional, Callable
from itertools import starmap

import numpy as np

import plotly.graph_objects as go

from src.preprocessing import LabelCropp

T = TypeVar('T')

In [None]:
class IndexType(Enum):
  """
  Class describing types of index to two dimensional space mappings.
  """
  HORIZONTAL_SNAKE = auto()
  VERTICAL_SNAKE   = auto()
  HORIZONTAL       = auto()
  VERTICAL         = auto()

def reshape(values:np.array, dimensions: Tuple[int, int], index_type: IndexType) -> np.array:
  """
  Modifies values!
  """
  if index_type in [IndexType.VERTICAL_SNAKE, IndexType.VERTICAL]:
    values.resize(dimensions[::-1], refcheck=False)
    values = np.transpose(values)
  else:
    values.resize(dimensions, refcheck=False)

  if index_type == IndexType.HORIZONTAL_SNAKE:
    values[1::2, :] = values[1::2, ::-1]
  elif index_type == IndexType.VERTICAL_SNAKE:
    values[:, 1::2] = values[::-1, 1::2]

  return values

In [None]:
def plot_map(values: np.array,                                                 
             dim: Tuple[int, int],                                      
             index_type: IndexType=IndexType.HORIZONTAL,
             *args,
             **kwargs,                                                      
             ):
  values = reshape(values, dim, index_type)

  fig = go.Figure(data=go.Heatmap(
        z=values,
        *args,
        **kwargs))

  return fig

In [None]:
def rowwise_cosine(y_true, y_pred):
  """
  https://stackoverflow.com/questions/49218285/cosine-similarity-between-matching-rows-in-numpy-ndarrays
  """
  return np.einsum('ij,ij->i', y_true, y_pred) / (
              np.linalg.norm(y_true, axis=1) * np.linalg.norm(y_pred, axis=1)
    )
  
def rowwise_mse(y_true, y_pred):
  return np.square(np.subtract(y_true, y_pred)).mean(1)

def rowwise_rmse(y_true, y_pred):
  return np.sqrt(np.square(np.subtract(y_true, y_pred)).mean(1))

In [None]:
def error_map(y_true: Iterable[T],                                             
              y_pred: Iterable[T],
              dim: Tuple[int, int],                                            
              index_type: IndexType=IndexType.HORIZONTAL, 
              rowwise_error: Callable[[Iterable[T], Iterable[T]], Iterable[float]]=rowwise_cosine,                                                                            
              title: Optional[str]=None,                                                                                    
              add_stats: bool=False,
              *args,
              **kwargs                                         
              ):
  values = rowwise_error(y_true, y_pred)

  if add_stats:
    if not title:
      title = ''
    title += ' (avg: {}, min: {}, max: {})'.format(np.mean(values), np.min(values), np.max(values))

  return plot_map(values, dim, index_type, *args, **kwargs)

In [None]:
def spectra_intensity(spectra: np.array,                                     
                      start: Optional[T]=None,                                 
                      end: Optional[T]=None,                                   
                      calibration: Optional[Iterable[T]]=None,                 
                      ) -> Iterable[float]:
    if calibration is None:
      calibration = np.arange(spectra.shape[0])
    if start is None:
      start = calibration[0]
    if end is None:
      end = calibration[-1]

    return np.sum(LabelCropp(label_from=start, label_to=end, labels=calibration).fit_transform(spectra), axis=1)

In [None]:
def intensity_map(spectra: np.array,
                  dim: Tuple[int, int],                                            
                  index_type: IndexType=IndexType.HORIZONTAL,                                  
                  start: Optional[T]=None,                                 
                  end: Optional[T]=None,                                   
                  calibration: Optional[Iterable[T]]=None,
                  *args,
                  **kwargs
                  ):
  values = spectra_intensity(spectra, start, end, calibration)

  return plot_map(values, dim, index_type, *args, **kwargs)


In [None]:
intensity_map(s1, dim=dim[::-1], index_type=IndexType.HORIZONTAL)

In [None]:
error_map(s1, s2, rowwise_error=rowwise_rmse, dim=dim[::-1])

In [None]:
f = error_map(s1, s2, rowwise_error=rowwise_rmse, dim=dim[::-1])

In [None]:
f.update_layout(yaxis = dict(scaleanchor = 'x'))

## Error maps

In [None]:
plot_spectra(s1[:10])

In [None]:
from src.visualization import plot_map
#, wave_from=16601, wave_to=16609
#, wave_from=11111, wave_to=11132

#plot_map(pd.DataFrame(s1_val), (41, 248), index_type=IndexType.HORIZONTAL).show()

In [None]:
from src.visualization import error_map, reduce_plot
from sklearn.metrics import mean_squared_error

p1 = reduce_plot(error_map(autoencoder.predict(s1_val), s1_val, val_dim, error_function=mean_squared_error))

p2 = reduce_plot(error_map(autoencoder.decoder_.predict(insert.predict(s1_val)), s1_val, val_dim, error_function=mean_squared_error))