# $f_s$ from $\tau$

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.model_selection import train_test_split
import os
import datetime
import tensorflow.keras.backend as K

%matplotlib inline
plt.rcParams['figure.figsize'] = [16, 6]

########## CONFIGURATION ##########
TAU_PATH = './npy-tau-fs/tau.npy'
FS_PATH = './npy-tau-fs/fs.npy'

np.random.seed(10) #10
tf.random.set_seed(10) #10

NOISE_STDDEV = [0, 1e-2]
############################

TITLE = 'tau-fs'
MODEL_H5_PATH = f'./models/{TITLE}.h5'

def standarize(data, mean, std):
    return (data - mean) / std

def destandarize(data, mean, std):
    return (data * std) + mean

def mae_destandarize(y_true, y_pred):
    y_true_d = destandarize(y_true, y_mean, y_std)
    y_pred_d = destandarize(y_pred, y_mean, y_std)
    return K.mean(K.abs(y_pred_d - y_true_d))

def mae_percentage(y_true, y_pred):
    y_true_d = destandarize(y_true, y_mean, y_std)
    y_pred_d = destandarize(y_pred, y_mean, y_std)
    mae = K.mean(K.abs(y_pred_d - y_true_d))
    return (mae * 100) / y_mean

def predict(data):
    return destandarize(model.predict(standarize(data, x_mean, x_std)), y_mean, y_std)

def predict_single(data):
    data_map = standarize(data, x_mean, x_std)
    pred = model.predict(np.expand_dims(data_map, axis=0))[0]
    return destandarize(pred, y_mean, y_std)

def predict(data):
    assert data.ndim > 1
    data_s = standarize(data, x_mean, x_std)
    if data_s.ndim == 2:
        data_s = np.expand_dims(data_s, axis=0)
        pred = model.predict(data_s)[0]
    else:
        pred = model.predict(data_s)
    return destandarize(pred, y_mean, y_std)

def add_noise(data, rng, noise_stddev):
    assert np.size(noise_stddev) == 1 or np.size(noise_stddev) == 2
    if np.size(noise_stddev) == 1:
        data += rng.normal(0, noise_stddev, data.shape)
    else:
        s = rng.uniform(noise_stddev[0], noise_stddev[1], len(data))
        for i, d in enumerate(data):
            d += rng.normal(0, s[i], d.shape)
    return data

## Training

In [None]:
x = np.load(os.path.join(TAU_PATH))
y = np.load(os.path.join(FS_PATH))

x_mean = np.mean(x[:].mean())
x_std = np.mean(x[:].std())

y_mean = np.mean(y[:].mean())
y_std = np.mean(y[:].std())

##### NOISE
x = add_noise(x, np.random.RandomState(0), NOISE_STDDEV)
# tau cannot be higher than 1
x[x > 1] = 1
#####

x_train, x_valid, y_train, y_valid = train_test_split(x, y, test_size=0.2, shuffle=True)

x_train = standarize(x_train, x_mean, x_std)
x_valid = standarize(x_valid, x_mean, x_std)

y_train = standarize(y_train, y_mean, y_std)
y_valid = standarize(y_valid, y_mean, y_std)

#################### U-NET
y_train = np.expand_dims(y_train, axis=y_train.ndim)
y_valid = np.expand_dims(y_valid, axis=y_valid.ndim)

x_train = np.expand_dims(x_train, axis=x_train.ndim)
x_valid = np.expand_dims(x_valid, axis=x_valid.ndim)

## U-net

In [None]:
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, BatchNormalization, Activation, Dense, Dropout,GlobalAveragePooling2D
from tensorflow.keras.layers import Lambda, RepeatVector, Reshape
from tensorflow.keras.layers import Conv2D, Conv2DTranspose
from tensorflow.keras.layers import MaxPooling2D, GlobalMaxPool2D
from tensorflow.keras.layers import concatenate, add, subtract
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam

def conv_block(tensor, filters, kernel_size=3):
    c = Conv2D(
        filters=filters,
        kernel_size=kernel_size,
        activation = 'relu',
        kernel_initializer='he_normal',
        padding='same'
    )(tensor)
    c = BatchNormalization()(c)
    c = Conv2D(
        filters=filters,
        kernel_size=kernel_size,
        activation = 'relu',
        kernel_initializer='he_normal',
        padding='same'
    )(c)
    c = BatchNormalization()(c)
    return c

def deconv_block(tensor, filters, kernel_size=3):
    d = Conv2DTranspose(
        filters=filters,
        kernel_size=kernel_size,
        strides=(2,2),
        activation='relu',
        kernel_initializer='he_normal',
        padding='same'
    )(tensor)
    d = BatchNormalization()(d)
    return d
    
def model_unet(input_img, depth=4, first_filters=64, kernel_size=3, dropout=None, skip_connection=False):
    # Contracting Path
    c = []
    for i in range(depth):
        if i == 0:
            c.append(conv_block(input_img, filters=(2**i)*first_filters))
        else:
            c.append(MaxPooling2D(pool_size=2)(c[i-1]))
            c[i] = conv_block(c[i], filters=(2**i)*first_filters)
        if dropout is not None:
            c[i] = Dropout(dropout)(c[i])
    # Expanding Path
    e = c[-1]
    for i in reversed(range(depth-1)):
        e = deconv_block(e, filters=(2**i)*first_filters)
        e = concatenate([e, c[i]])
        e = conv_block(e, filters=(2**i)*first_filters)
        if dropout is not None:
            c[i] = Dropout(dropout)(c[i])
        if i == 0:
            e = Conv2D(filters=1, kernel_size=1, activation='linear') (e)
    if skip_connection:
        e = add([input_img, e])
    model = Model(inputs=[input_img], outputs=[e])
    return model

inputs = Input(shape=(x_train.shape[1],x_train.shape[2],1))
#model = model_unet(inputs, n_filters=8, dropout=0.2)
model = model_unet(inputs, depth=4, first_filters=8, skip_connection=False)

model.compile(
    #optimizer=tf.keras.optimizers.RMSprop(learning_rate=0.0001),
    optimizer= Adam(lr = 0.0005),
    loss='mse',
    metrics=['mae', mae_destandarize, mae_percentage]
)

es_cb = EarlyStopping(monitor='val_mae_percentage', mode='min', verbose=2, patience=10)
cp_cb = ModelCheckpoint(MODEL_H5_PATH, monitor='val_mae_percentage', verbose=1, save_best_only=True, mode='min')

history = model.fit(x=x_train,
        y=y_train,
        epochs=100,
        #batch_size=64,
        validation_data=(x_valid, y_valid),
        callbacks=[es_cb, cp_cb])

model = tf.keras.models.load_model(
    MODEL_H5_PATH,
    custom_objects={'mae_destandarize': mae_destandarize, 'mae_percentage': mae_percentage}
)

## Model summary

In [None]:
model.summary()

## Inference over the test set

In [None]:
x = np.load(TAU_PATH)
y = np.load(FS_PATH)

x_mean = np.mean(x[:].mean())
x_std = np.mean(x[:].std())

y_mean = np.mean(y[:].mean())
y_std = np.mean(y[:].std())

x_test = np.load(TAU_PATH.replace('.npy', '_test.npy'))
y_test = np.load(FS_PATH.replace('.npy', '_test.npy'))

##### NOISE
x_test = add_noise(x_test, np.random.RandomState(0), NOISE_STDDEV)
x_test[x_test > 1] = 1
#####

model = tf.keras.models.load_model(
    MODEL_H5_PATH,
    custom_objects={'mae_destandarize': mae_destandarize, 'mae_percentage': mae_percentage}
)

x_test = x_test.reshape(x_test.shape[0],x_test.shape[1],x_test.shape[2],1)

y_test_pred = predict(x_test)

x_test = x_test.reshape(x_test.shape[0],x_test.shape[1],x_test.shape[2])
y_test_pred = y_test_pred.reshape(y_test_pred.shape[0],y_test.shape[1],y_test.shape[2])

# to ppm
y_test = y_test * 1e6
y_test_pred = y_test_pred * 1e6

## Plot

In [None]:
# CONFIGURATION

N = 1

# ==========

import matplotlib.ticker as ticker
XTICKS = 2
YTICKS = 8
%matplotlib inline
plt.rcParams['figure.figsize'] = [14, 12]

fig, ax = plt.subplots(2, 4)

def aximshow(ax, data, title=None, cmap=None, vmin=None, vmax=None):
    im = ax.imshow(data, cmap=cmap, vmin=vmin, vmax=vmax) 
    ax.xaxis.set_major_locator(
        ticker.FixedLocator(np.linspace(0, data.shape[1]-1, XTICKS)))
    ax.set_xticklabels(np.around(np.linspace(0, 0.394, XTICKS), 1))
    ax.yaxis.set_major_locator(
        ticker.FixedLocator(np.linspace(0, data.shape[0]-1, YTICKS)))
    ax.set_yticklabels(np.around(np.linspace(7.2864, 1.0, YTICKS), 1))
    ax.set_xlabel(r'$r$ (cm)')
    ax.set_ylabel(r'$z$ (cm)')
    ax.set_title(title)
    return im

def remove_tick_labels(ax):
    ax.tick_params(labelleft=False, labelbottom=False)
    ax.set_xlabel('')
    ax.set_ylabel('')

aximshow(ax[0,0], x_test[N], title=r'$\tau_{\lambda=600~nm}$', cmap='jet', vmin=x_test[N].min(), vmax=x_test[N].max())
ax[0,1].set_axis_off()
ax[0,2].set_axis_off()
ax[0,3].set_axis_off()

# ==========

vmin = y_test_pred.min()
vmax = y_test_pred.max()

abs_err = np.abs(y_test[N] - y_test_pred[N])

im = aximshow(ax[1,0], y_test[N], title=r'$f_s$ Groundtruth', cmap='jet', vmin=vmin, vmax=vmax)
fig.colorbar(im, ax=ax[1,0], label=r'$f_s$ (ppm)')
im = aximshow(ax[1,1], y_test_pred[N], title=r'$f_s$ Prediction', cmap='jet', vmin=vmin, vmax=vmax)
fig.colorbar(im, ax=ax[1,1], label=r'$f_s$ (ppm)')
im = aximshow(ax[1,2], abs_err, title=r'$f_s$ Abs. Error', cmap='jet', vmin=vmin, vmax=vmax)
fig.colorbar(im, ax=ax[1,2], label=r'$f_s$ (ppm)')
im = aximshow(ax[1,3], abs_err, title=r'$f_s$ Abs. Error' + '\n(Different Scale)', cmap='jet')
fig.colorbar(im, ax=ax[1,3], label=r'$f_s$ (ppm)')
remove_tick_labels(ax[1,1])
remove_tick_labels(ax[1,2])
remove_tick_labels(ax[1,3])

print(f'Image {N}, Abs. error max:\t {abs_err.max()}')
print(f'Image {N}, Abs. error min:\t {abs_err.min()}')
print(f'Image {N}, Abs. error mean:\t {abs_err.mean()}')
print(f'Image {N}, Abs. error stddev:\t {abs_err.std()}')
print(f'Image {N}, Abs. error %:\t\t {abs_err.mean()*100/y_test[N].mean()}')