In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import backend as K
from dataset_utils import read_many_hdf5
from sklearn.model_selection import train_test_split

if tf.test.gpu_device_name(): 
  print(f"Default GPU Device: {tf.test.gpu_device_name()}")

In [None]:
def coeff_determination(y_true, y_pred):
    SS_res =  K.sum(K.square( y_true-y_pred ))
    SS_tot = K.sum(K.square( y_true - K.mean(y_true) ) )
    return ( 1 - SS_res/(SS_tot + K.epsilon()) )

In [None]:
class SEBlock(layers.Layer):
  def __init__(self, shape, ratio=16):
    super(SEBlock, self).__init__()
    self.dense1 = layers.Dense(shape//ratio, activation="relu")
    self.dense2 = layers.Dense(shape, activation="sigmoid")
  
  def call(self, input_tensor):
    x = layers.GlobalAveragePooling2D()(input_tensor)
    x = self.dense1(x)
    x = self.dense2(x)
    return layers.Multiply()([input_tensor, x])


class SE_ResNetBlock(layers.Layer):
  def __init__(self, filters):
    super(SE_ResNetBlock, self).__init__()
    self.conv2a = layers.Conv2D(filters, (1,1))
    self.bna = layers.BatchNormalization()

    self.se = SEBlock(filters)

    self.conv2b = layers.Conv2D(filters, (1,1))
    self.bnb = layers.BatchNormalization()

  def call(self, input_tensor):
    x1 = self.conv2a(input_tensor)
    x = self.bna(x1)

    x = self.conv2b(x)
    x = self.bnb(x)

    x = self.se(x)

    x = layers.Add()([x, x1])
    return tf.nn.relu(x)

class ViCTORIA(tf.keras.Model):
  def __init__(self, filters, nb_blocks=6):
    super(ViCTORIA, self).__init__(name="ViCTORIA")

    self.conv2a = layers.Conv2D(filters, (1, 1))
    self.bna = layers.BatchNormalization()

    self.blocks = []
    for _ in range(nb_blocks):
      self.blocks.append(SE_ResNetBlock(filters))

    self.conv2b = layers.Conv2D(filters, (1, 1))
    self.bnb = layers.BatchNormalization()
    
    self.dense = layers.Dense(1, activation="linear")
  
  def call(self, input_tensor):
    x = self.conv2a(input_tensor)
    x = self.bna(x)
    x = tf.nn.relu(x)

    for block in self.blocks:
      x = block(x)

    x = self.conv2b(x)
    x = self.bnb(x)
    x = tf.nn.relu(x)

    x = layers.Flatten()(x)

    return self.dense(x)

In [None]:
directory = "E:/IA/Deep_ViCTORIA/Datasets/SE_ResNet/"
positions, scores = read_many_hdf5(129090, directory, "_train")

nb_models = 4
positions_train, scores_train = [], []
positions_valid, scores_valid = [], []
for i in range(nb_models):
  positions_train_t, positions_valid_t, scores_train_t, scores_valid_t = train_test_split(
    positions, scores, test_size=0.2, random_state=42+i)
  positions_train.append(positions_train_t)
  positions_valid.append(positions_valid_t)
  scores_train.append(scores_train_t)
  scores_valid.append(scores_valid_t)
print(f"Training sets shape ≈ {positions_train[0].shape}, Validation sets shape ≈ {positions_valid[0].shape}")


In [None]:
models = []
offset_blocks = 2
for i in range(1, nb_models+1):
  model = ViCTORIA(filters=64, nb_blocks=i+offset_blocks)
  model.build((1, 8, 8, 15))
  model.compile(loss='mean_absolute_error', optimizer="adam", metrics=[coeff_determination])
  models.append(model)
models[0].summary()

In [None]:
nb_epochs = 20
histories = []
for i, model in enumerate(models):
  print(f"Model {i+1}/{len(models)}...", end=" ")
  history = model.fit(positions_train[i], scores_train[i], verbose=1, epochs=nb_epochs)
  histories.append(history)
  print("Done.")

In [None]:
def plot_validation_results(histories, validation_Xs, validation_ys):
  epochs = range(1, nb_epochs + 1)

  _, axs = plt.subplots(1, 2, figsize=(20, 5))
  _, ax_score  = plt.subplots(1, 1, figsize=(10, 5))

  for i, history in enumerate(histories):
    X_valid = validation_Xs[i]
    y_valid = validation_ys[i]

    loss = history.history["loss"]
    score = history.history["coeff_determination"]
    axs[0].plot(epochs, loss, label=f"{i + offset_blocks + 1} SE-ResNet blocks")

    axs[1].plot(epochs, score, label=f"{i + offset_blocks + 1} SE-ResNet blocks")

    preds = models[i].predict(X_valid)
    r2_score = coeff_determination(tf.convert_to_tensor(y_valid), preds).numpy()
    ax_score.scatter(y_valid, preds, label = f"{i + offset_blocks + 1} SE-ResNet blocks $R^2 = {r2_score:.2f}$")


  axs[0].set_xlabel("Epoch")
  axs[0].set_ylabel("(Mean Absolute Error)")
  axs[0].set_title('Training loss')
  axs[1].set_xlabel("Epoch")
  axs[1].set_ylabel("($R^2$)")
  axs[1].set_title('Training score')
  ax_score.set_xlabel("True values")
  ax_score.set_ylabel("Predictions")
  ax_score.set_title("Results on validation set")

  ma = np.max(y_valid)
  mi = np.min(y_valid)
  x = np.linspace(ma, mi, 100)
  ax_score.plot(x, x, label = "Predictions = True values")
  ax_score.legend()
  for ax in axs:
    ax.legend()

def get_best_model(models, validation_Xs, validation_ys):
  preds = []
  for i in range(len(models)):
    pred = models[i].predict(validation_Xs[i])
    r2_score = coeff_determination(tf.convert_to_tensor(validation_ys[i]), pred).numpy()
    preds.append(r2_score)
  idx = np.argmax(preds)
  return models[idx]

def plot_regression_results(model, X_test, y_test, path=None):
  preds = model.predict(X_test)
  r2_score = coeff_determination(tf.convert_to_tensor(y_test), preds).numpy()
  ma = np.max(y_test)
  mi = np.min(y_test)
  x = np.linspace(ma, mi, 100)
  plt.figure(figsize=(10,5))
  plt.plot(x, x, color='r', label = "Predictions = True values")
  plt.scatter(y_test, preds, label = f"$R^2 = {r2_score:.2f}$")
  plt.xlabel("True values")
  plt.ylabel("Predictions")
  plt.title("Results of ViCTORIA SE-ResNet")
  plt.legend()
  if path:
    plt.savefig(path)

In [None]:
plot_validation_results(histories, positions_valid, scores_valid)

In [None]:
positions_test, scores_test = read_many_hdf5(99296, directory, "_test")
model = get_best_model(models, positions_valid, scores_valid)
plot_regression_results(model, positions_test, scores_test, path="result.png")