In [0]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow.keras
from tensorflow.keras.models import load_model
from tensorflow.keras import optimizers, models, layers, regularizers, Input
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.utils import plot_model
from tensorflow.keras import backend as k

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

try:
  imgs = np.load('/content/drive/My Drive/Colab Notebooks/AP_dataset_float1RGB.npz')
  x_train = imgs['x_train']
  print(np.shape(x_train))
  x_val = imgs['x_val']
  print(np.shape(x_val))
  x_test = imgs['x_test']
  print(np.shape(x_test))
  y_train = imgs['y_train']
  print(np.shape(y_train))
  y_val = imgs['y_val']
  print(np.shape(y_val))
  y_test = imgs['y_test']
  print(np.shape(y_test))

  '''Cdata = np.load('/content/drive/My Drive/Colab Notebooks/CSlabels.npz')
  c_train = Cdata['c_train']
  print(np.shape(c_train))
  c_val = Cdata['c_val']
  print(np.shape(c_val))
  c_test= Cdata['c_test']
  print(np.shape(c_test))'''
except:
  print('Drive is not mounted or file paths are wrong.')

# BBnet2 = load_model('/content/drive/My Drive/Colab Notebooks/Models/BBnet2.h5')

In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
BBnet2_Base = models.Model(BBnet2.inputs, BBnet2.layers[-3].output)
BBnet2_Den64 = BBnet2.layers[-2]
BBnet2_Den8 = BBnet2.layers[-1]

BBnet2_Base.trainable = False
BBnet2_Den64.trainable = False
BBnet2_Den8.trainable = False

In [0]:
class Train:
  def __init__(self, epochs, batch_size, model_name='GateNet',  model_number=None):
    self.version = model_number
    self.name = model_name
    self.epoch = epochs
    self.bs = batch_size
    self.callbacks = None
  
  def update_parameters(self, new_epochs, new_bach_size):
    self.epoch = new_epochs
    self.bs = batch_size

  def set_callbacks(self, use_cb=True, e_stop_patience=5, rlr_patience=1, rlr_cool=5, rlr_factor=0.005, rlr_min_delta=0.5):
    model_path = '/content/drive/My Drive/Colab Notebooks/Models/' + self.name + str(self.version) + '.h5'
    if use_cb:
      self.callbacks = [EarlyStopping(monitor='loss', patience=e_stop_patience),
                  ModelCheckpoint(filepath=model_path,
                                  monitor='val_loss', save_best_only=True),
                  ReduceLROnPlateau(monitor='loss', min_delta=rlr_min_delta, cooldown=rlr_cool, factor=rlr_factor, patience=rlr_patience, verbose=1)]
    else:
      self.callbacks = None

  def train(self, x_tra, y_tra, x_v, y_v, x_t, y_t, opt='rmsprop', los='mse', met=['mae'], plot=False):
    model_path = '/content/drive/My Drive/Colab Notebooks/Models/' + self.name + str(self.version) + '.h5'
    model_graph_path ='/content/drive/My Drive/Colab Notebooks/Models/' + self.name + str(self.version) + 'Graph.png'

    if self.name == 'CSnet':
      network = CSnet()
      network.compile(optimizer=opt, loss=los, metrics=met)
    elif self.name == 'BBnet':
      network = BBnet()
      network.compile(optimizer=opt, loss=los, metrics=met)
    if plot:
      plot_model(network, show_shapes=False, to_file=model_graph_path)
      
    training_data = network.fit(x_tra, y_tra, epochs=self.epoch, batch_size=self.bs, validation_data=(x_v, y_v), callbacks=self.callbacks, shuffle=True)
    performance = network.evaluate(x_t, y_t, batch_size=self.bs)

    t_loss = training_data.history['loss']
    v_loss = training_data.history['val_loss']
    print('Test MAE:', performance[1])
    return t_loss, v_loss, performance

In [0]:
 def ResBlock(X, filter_size, channels): # takes functional input X and performs a Res Block with filter_size and list of Channel sizes
    C1, C2, C3 = channels
    activation_shortcut = X

    Conv1 = layers.Conv2D(C1, 1)(X)
    Norm1 = layers.BatchNormalization(axis=-1)(Conv1)
    Act1 = layers.Activation('relu')(Norm1)

    Conv2 = layers.Conv2D(C2, kernel_size=filter_size, padding='same')(Act1)
    Norm2 = layers.BatchNormalization(axis=-1)(Conv2)
    Act2 = layers.Activation('relu')(Norm2)

    Conv3 = layers.Conv2D(C3, 1)(Act2)
    Norm3 = layers.BatchNormalization(axis=-1)(Conv3)
    Res = layers.Add()([Norm3, activation_shortcut])
    Act3 = layers.Activation('relu')(Res)
    return Act3

In [0]:
def ConvBlock(X, filter_size, channels, stride=2):
    C1, C2, C3 = channels
    activation_shortcut = X

    Conv1 = layers.Conv2D(C1, 1, strides=stride)(X)
    Norm1 = layers.BatchNormalization(axis=-1)(Conv1)
    Act1 = layers.Activation('relu')(Norm1)

    Conv2 = layers.Conv2D(C2, kernel_size=filter_size, padding='same')(Act1)
    Norm2 = layers.BatchNormalization(axis=-1)(Conv2)
    Act2 = layers.Activation('relu')(Norm2)

    Conv3 = layers.Conv2D(C3, 1)(Act2)
    Norm3 = layers.BatchNormalization(axis=-1)(Conv3)

    Short_cut = layers.Conv2D(C3, 1, strides=stride)(activation_shortcut)
    Short_cut = layers.BatchNormalization(axis=-1)(Short_cut)

    Res = layers.Add()([Norm3, Short_cut])
    Act3 = layers.Activation('relu')(Res)
    return Act3

In [0]:
def CSnet(): # model 1
    img_input = Input(shape=(216, 324, 3), name='GateImageTensor')

    X = layers.Conv2D(128, (7, 7), strides=2)(img_input)
    x = layers.BatchNormalization(axis=-1)(X)
    X = layers.Activation('relu')(X)
    X = layers.MaxPooling2D(3, strides=2)(X)

    X = ConvBlock(X, 5, [64, 64, 256], 1)
    X = ResBlock(X, 3, [64, 64, 256])
    X = ResBlock(X, 3, [64, 64, 256])

    X = ConvBlock(X, 5, [128, 128, 512], 2)
    X = ResBlock(X, 3, [128, 128, 512])
    X = ResBlock(X, 3, [128, 128, 512])
    X = ResBlock(X, 3, [128, 128, 512])
    X = layers.AveragePooling2D(2, padding='same')(X)

    X = layers.Flatten()(X)
    X = layers.Dense(64, activation='relu')(X)
    CS = layers.Dense(1, activation='sigmoid')(X)
    network = models.Model(img_input, CS, name='CSnet')
    return network

In [0]:
def InceptionBlock(X, channels):
  branch_a = layers.Conv2D(channels, 1, activation='relu', padding='same', strides=2)(X)

  branch_b = layers.Conv2D(channels, 1, activation='relu', padding='same')(X)
  branch_b = layers.Conv2D(channels, 3, activation='relu', padding='same', strides=2)(branch_b)

  branch_c = layers.AveragePooling2D(3, padding='same', strides=2)(X)
  branch_c = layers.Conv2D(channels, 3, activation='relu', padding='same')(branch_c)

  branch_d = layers.Conv2D(channels, 1, activation='relu', padding='same')(X)
  branch_d = layers.Conv2D(channels, 3, activation='relu', padding='same')(branch_d)
  branch_d = layers.Conv2D(channels, 3, activation='relu', padding='same', strides=2)(branch_d)

  branch_e = layers.Conv2D(channels, 1, activation='relu', padding='same')(X)
  branch_e = layers.Conv2D(channels, 5, activation='relu', padding='same')(branch_e)
  branch_e = layers.Conv2D(channels, 5, activaiton='relu', padding='same', strides=2)

  output = layers.concatenate([branch_a, branch_b, branch_c, branch_d, branch_e], axis=-1)

  return output

In [0]:
def BBnet(): # todo add channels
  gate_input = Input(shape=(216, 324, 3))

  X = layers.Conv2D(128, (7, 7), strides=2)(gate_input)
  x = layers.BatchNormalization(axis=-1)(X)
  X = layers.Activation('relu')(X)
  X = layers.MaxPooling2D(3, strides=2)(X)

  X = InceptionBlock(X)
  X = layers.BatchNormalization()(X)
  X = InceptionBlock(X)
  X = layers.BatchNormalization()(X)
  X = InceptionBlock(X)
  X = layers.BatchNormalization()(X)
  X = InceptionBlock(X)
  X = layers.BatchNormalization()(X)
  X = layers.AveragePooling2D(2, padding='same')(X)

  X = layers.Flatten()(X)
  X = layers.Dense(64, activation='relu')(X)
  BB = layers.Dense(8)(X)

  net = models.Model(gate_input, BB, name=BBnet)
  return net

In [0]:
def BBCSnet():
  X = BBnet2_Den64(BBnet2_Base.output)
  BB = BBnet2_Den8(X)

  Y = layers.Dense(64, activation='relu')(BBnet2_Base.output)
  Y = layers.BatchNormalization()(Y)
  Y = layers.Dense(64, activation='relu')(Y)
  Y = layers.BatchNormalization()(Y)
  Y = layers.Dense(64, activation='relu')(Y)
  CS = layers.Dense(1, activation='sigmoid', name='Confidence')(Y)

  net = models.Model(BBnet2_Base.input, [BB, CS], name='BBCSnet')
  return net

In [0]:
model_path = '/content/drive/My Drive/Colab Notebooks/Models/BBCSnet2.h5'
model_graph_path ='/content/drive/My Drive/Colab Notebooks/Models/BBCSnet2Graph.png'

callbacks_list = [EarlyStopping(monitor='loss', patience=15),
                  ModelCheckpoint(filepath=model_path,
                                  monitor='val_loss', save_best_only=True),
                  ReduceLROnPlateau(monitor='loss', min_delta=.001, cooldown=15, factor=0.3, patience=1, verbose=1)]

network = BBCSnet()
network.compile(optimizer='rmsprop', loss=['mse', 'mae'], loss_weights=[.0000001, 1000])
# plot_model(network, show_shapes=False, to_file=model_graph_path)

training_data = network.fit(x_train, [y_train, c_train], epochs=100, batch_size=32, validation_data=(x_val, [y_val, c_val]), callbacks=callbacks_list, shuffle=True)
performance = network.evaluate(x_test , [y_test, c_test], batch_size=32)

print(np.shape(performance))
print('Test MAE:', performance[1])

In [0]:
BBnet3 = Train(100, 32, model_name='BBnet', model_number=6)
BBnet3.set_callbacks(rlr_factor=0.002)
BBnet3.train(x_train, y_train, x_val, y_val, x_test, y_test, plot=True)