In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf

# data

## loading

In [None]:
path = 'data/'
    
data = pd.read_csv(f'{path}data.csv', index_col=0)
wavelengths = pd.read_csv(f'{path}metadata_wvl.csv', index_col=0)
contents = pd.read_csv(f'{path}metadata_composition.csv', index_col=0)

wavelengths = np.squeeze(wavelengths.to_numpy())

In [None]:
from src.visualization import plot_spectra

plot_spectra([data.mean(axis=0).to_numpy()], calibration=wavelengths)

## preprocessing

In [None]:
# remove wavelengths without useful information

MASKS = [
  (200,245.5),
  (712.17,714),
  (796,808),
  (848.3,1000)
]
mask_cond = [
  not(
    MASKS[0][1] >= x >= MASKS[0][0] or
    MASKS[1][1] >= x >= MASKS[1][0] or
    MASKS[2][1] >= x >= MASKS[2][0] or
    MASKS[3][1] >= x >= MASKS[3][0]
  )
  for x in wavelengths
]

wavelengths = wavelengths[mask_cond]

data = data.loc[:,mask_cond]

In [None]:
def normalize_individually(spectra, wavelengths, ranges):
    if isinstance(spectra, np.ndarray):
        spectra = spectra.copy()
    else:
        spectra = spectra.to_numpy()

    for key in ranges:
        ndx = np.where(
            (
                wavelengths >= ranges[key][0]
            ) \
            & (
                wavelengths <= ranges[key][1]
            )
        )
        spectra[ndx] /= np.sum(spectra[ndx])
    return spectra


data = data.apply(
    func=lambda spectrum: pd.Series(
        normalize_individually(
            spectra=spectrum,
            wavelengths=wavelengths,
            ranges={
                '1':( 243,     342     ),
                '2':( 379,     465     ),
                '3':( 537,     620.079 ),
                '4':( 620.08,  712.17  ),
                '5':( 712.171, 852.78  ),
            },
        ),
    ),
    axis=1,
)

In [None]:
plot_spectra([data.mean(axis=0).to_numpy()], calibration=wavelengths)

## train test split

In [None]:
from random import shuffle

COMPOUND = 'SiO2'
TEST_FOLD = 3

train_names = [
    idx
    for idx
    in contents[
        (contents[f'{COMPOUND}_outliers'] == 'Keep')
        & (contents['distance_mm'] < 4000)
        & (contents[f'{COMPOUND}_Folds'] != TEST_FOLD)
        & (contents[f'{COMPOUND}_Folds'] != 0)
    ].index.tolist()
    if idx in data.index
]

test_names = [
    idx
    for idx
    in contents[
        (contents[f'{COMPOUND}_outliers'] == 'Keep')
        & (contents['distance_mm'] < 4000)
        & (contents[f'{COMPOUND}_Folds'] == TEST_FOLD)
    ].index.tolist()
    if idx in data.index
]

X_test = data.loc[test_names,:].to_numpy()
y_test = np.nan_to_num(contents.loc[test_names, f'{COMPOUND}'].to_numpy())

shuffle(train_names)
X_train = data.loc[train_names,:].to_numpy()
y_train = np.nan_to_num(contents.loc[train_names,f'{COMPOUND}'].to_numpy())

X_test, X_train = (np.reshape(m, (m.shape[0], m.shape[1], 1)) for m in (X_test, X_train))



# models

## cnn baseline

### architecture

In [None]:
def Compile_Branching(
  L = 1e-2,
  L_b = 1e-3,
  input_shape = (data.shape[1],) + (1,),
  print_summary:bool = False,
  lr = 1
):

  model_input = tf.keras.Input(
    shape=input_shape
  )
  # ----------------------------- Block1
  x = tf.keras.layers.Conv1D(
    filters=8,
    kernel_size=3,
    strides=1,
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(model_input)
  x = tf.keras.layers.Conv1D(
    filters=8,
    kernel_size=3,
    strides=1,
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(x)
  x = tf.keras.layers.Conv1D(
    filters=8,
    kernel_size=3,
    strides=1,
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(x)

  # ----------------------------- Block2
  # ----------------------------- Branch1
  branch1 = tf.keras.layers.Conv1D(
    filters=4,
    kernel_size=1,
    strides=1,
    padding="same",
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(x)

  # ----------------------------- Branch2
  branch2 = tf.keras.layers.Conv1D(
    filters=4,
    kernel_size=1,
    strides=1,
    padding="same",
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(x)
  branch2 = tf.keras.layers.Conv1D(
    filters=4,
    kernel_size=3,
    strides=1,
    padding="same",
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(branch2)

  # ----------------------------- Branch3
  branch3 = tf.keras.layers.Conv1D(
    filters=4,
    kernel_size=1,
    strides=1,
    padding="same",
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(x)
  branch3 = tf.keras.layers.Conv1D(
    filters=4,
    kernel_size=3,
    strides=1,
    padding="same",
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(branch3)
  branch3 = tf.keras.layers.Conv1D(
    filters=4,
    kernel_size=3,
    strides=1,
    padding="same",
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(branch3)

  # ----------------------------- Branch4
  branch4 = tf.keras.layers.MaxPool1D(
    pool_size=2,  
    strides=1,
    padding="same"
  )(x)
  branch4 = tf.keras.layers.Conv1D(
    filters=4,
    kernel_size=1,
    strides=1,
    padding="same",
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None)
  )(branch4)

  # ----------------------------- Concat
  concat = tf.keras.layers.add([branch1,branch2,branch3,branch4])
  # ----------------------------- Ouptut
  concat = tf.keras.layers.Flatten()(concat)

  model_output = tf.keras.layers.Dense(
    1,
    activation='relu',
    kernel_initializer=tf.keras.initializers.HeNormal(seed=None),
    kernel_regularizer=tf.keras.regularizers.l1_l2(l1=L,l2=L),
    bias_regularizer=tf.keras.regularizers.l1_l2(l1=L_b,l2=L_b)
  )(concat)

  model_output = tf.keras.layers.BatchNormalization()(model_output)

  model = tf.keras.Model(
    model_input,
    model_output,
    name='branching_cnn'
  )
  model.compile(
    optimizer=tf.optimizers.Adam(
      learning_rate=lr
    ),
    loss='mean_squared_error',
    metrics=[
      tf.metrics.RootMeanSquaredError()
    ]
  )

  if print_summary: model.summary()

  return(model)

### training

In [None]:
cnn_baseline = Compile_Branching()
cnn_baseline.fit(X_train, y_train, epochs=500, batch_size=32, verbose=2, validation_data=(X_test, y_test))

In [None]:
tf.keras.utils.plot_model(
    cnn_baseline,
    to_file="model.png",
    show_shapes=False,
    show_dtype=False,
    show_layer_names=True,
    rankdir="TB",
    expand_nested=False,
    dpi=96,
    layer_range=None,
    show_layer_activations=False,
)

## rnn

### architecture

### training