In [None]:
# imports
import statistics
import re
import keras

import keras.backend as K
import numpy as np

from google.colab import files
from os import listdir
from os.path import isfile, join
from collections import Counter
from PIL import Image
from matplotlib.image import imread
from matplotlib import pyplot as plt
from random import shuffle
from math import ceil
from scipy import ndimage

from sklearn.model_selection import train_test_split
# from sklearn.model_selection import KFold

from keras.models import Model
from keras.models import load_model
from keras.models import Sequential
from keras.layers import Input, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, MaxPooling2D, Dropout
from keras.utils import plot_model

In [None]:
# variaveis

img_to_load = 1000 # imagens a serem carregadas por vez
crop_size = 200 # altura e largura da imagem a ser enviada para o modelo
train = 1 # porcentagem de dados a ser usado para treino

In [None]:
#função para pegar só o centro da imagem

def crop_center(img,cropx,cropy):
    y,x = img.shape
    startx = x//2-(cropx//2)
    starty = y//2-(cropy//2)    
    return img[starty:starty+cropy,startx:startx+cropx]

In [None]:
#carregar nome de todas as imagens

ER_path = '/gdrive/My Drive/IDAO/train/ER/'
ER_files = [f'/gdrive/My Drive/IDAO/train/ER/{f}' for f in listdir(ER_path) if isfile(join(ER_path, f))]

NR_path = '/gdrive/My Drive/IDAO/train/NR/'
NR_files = [f'/gdrive/My Drive/IDAO/train/NR/{f}' for f in listdir(NR_path) if isfile(join(NR_path, f))]

all_files = ER_files + NR_files
shuffle(all_files)

In [None]:
# criar um dicionário para armazenar todos os dados separadamente
# isso é necessário para garantir que 70% de cada uma das classes serão usadas para treino

images = {'NR_1_keV': [], 'NR_6_keV': [], 'NR_20_keV': [], 'ER_3_keV': [], 'ER_10_keV': [], 'ER_30_keV': []}

In [None]:
# carrega 70% dos dados no X_train e 30% no X_test

X_train = []
X_test = []

for particle_type in images.keys():
    X_train = X_train + images[particle_type][:int(train*len(images[particle_type]))]
    X_test = X_test + images[particle_type][int(train*len(images[particle_type])):]
shuffle(X_train)
shuffle(X_test)

In [None]:
# cria o Y_train e Y_test com base no X_train e X_test criado anteriormente

Y_train_r = [int(re.findall(r"\d+(?=_keV)", en)[0])/30.0 for en in X_train]
Y_test_r = [int(re.findall(r"\d+(?=_keV)", en)[0])/30.0 for en in X_test]
Y_train_c = [(1 if '_NR_' in x else 0) for x in X_train]
Y_test_c = [(1 if '_NR_' in x else 0) for x in X_test]

#for i in range(10):
#  print(f"{Y_train_c[i]} {Y_train_r[i]} {X_train[i]}")

In [None]:
def create_model_v0():
    inputs = Input(shape=(crop_size, crop_size, 1), name='input')

    x = Conv2D(32, (2, 2))(inputs)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(64, (3, 3))(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(64, (3, 3))(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Flatten()(x)

    x = Dense(128)(x)
    x = Activation('relu')(x)
    x = Dense(64)(x)
    x = Activation('relu')(x)
    x = Dense(32)(x)
    x = Activation('relu')(x)

    output1 = Dense(1)(x)
    output1 = Activation('sigmoid', name='classification')(output1)

    output2 = Dense(1)(x)
    output2 = Activation('linear', name='regression')(output2)

    model = Model(inputs=inputs, outputs=[output1, output2])
    opt = keras.optimizers.Adam(learning_rate=0.0005)
    model.compile(loss={'classification': 'binary_crossentropy', 
                        'regression': 'mean_absolute_error'},
                  optimizer=opt,
                  metrics={'classification': 'accuracy',
                           'regression': 'mean_squared_error'})
    return model

In [None]:
def create_model_v1():
    inputs = Input(shape=(crop_size, crop_size, 1), name='input')

    x = Conv2D(64, (2, 2))(inputs)
    x = Activation('relu')(x)

    x = Conv2D(64, (3, 3))(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(64, (3, 3))(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Flatten()(x)

    x = Dense(128)(x)
    x = Activation('relu')(x)
    x = Dense(64)(x)
    x = Activation('relu')(x)
    x = Dense(32)(x)
    x = Activation('relu')(x)

    output1 = Dense(1)(x)
    output1 = Activation('sigmoid', name='classification')(output1)

    output2 = Dense(1)(x)
    output2 = Activation('linear', name='regression')(output2)

    model = Model(inputs=inputs, outputs=[output1, output2])
    opt = keras.optimizers.Adam(learning_rate=0.0005)
    model.compile(loss={'classification': 'binary_crossentropy', 
                        'regression': 'mean_absolute_error'},
                  optimizer=opt,
                  metrics={'classification': 'accuracy',
                           'regression': 'mean_squared_error'})
    return model

In [None]:
def create_model_v2():
    inputs = Input(shape=(crop_size, crop_size, 1), name='input')

    x = Conv2D(32, (2, 2))(inputs)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(64, (3, 3))(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(128, (3, 3))(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(3, 3))(x)

    x = Flatten()(x)

    x = Dense(128)(x)
    x = Activation('relu')(x)
    x = Dense(128)(x)
    x = Activation('relu')(x)
    x = Dense(64)(x)
    x = Activation('relu')(x)
    x = Dense(32)(x)
    x = Activation('relu')(x)

    output1 = Dense(1)(x)
    output1 = Activation('sigmoid', name='classification')(output1)

    output2 = Dense(1)(x)
    output2 = Activation('linear', name='regression')(output2)

    model = Model(inputs=inputs, outputs=[output1, output2])
    opt = keras.optimizers.Adam(learning_rate=0.0005)
    model.compile(loss={'classification': 'binary_crossentropy', 
                        'regression': 'mean_absolute_error'},
                  optimizer=opt,
                  metrics={'classification': 'accuracy',
                           'regression': 'mean_squared_error'})
    return model

In [None]:
X_train_img = []
for i in range(ceil(len(X_train)/img_to_load)):
    begin = i*img_to_load
    limit = min(i*img_to_load + img_to_load, len(X_train))
    if begin == limit:
        break
    for file in X_train[begin:limit]:
        im = imread(file)
        # X_train_img.append(crop_center(im, crop_size, crop_size))
        np_img = np.asarray(crop_center(im, crop_size, crop_size)).reshape((1,150,150,1))
        if len(X_train_img):
          X_train_img = np.append(X_train_img, np_img, axis=0)
        else:
          X_train_img = np_img
    print(f'loaded {begin} to {limit}')

Y_train_img_c = np.stack(Y_train_c, axis=0).reshape((len(Y_train_c), 1))
Y_train_img_r = np.stack(Y_train_r, axis=0).reshape((len(Y_train_r), 1))

In [None]:
X_train_img = X_train_img.astype(np.float16)
Y_train_img_c = Y_train_img_c.astype(np.float16)
Y_train_img_r = Y_train_img_r.astype(np.float16)

In [None]:
X_train_img.shape

In [None]:
# preds 1=c_loss, 2=r_loss, 3=c_acc, 4=r_mse
all_losses = []
all_c_losses = []
all_r_losses = []
all_c_accs = []
all_r_mses = []

n_folds = 5
models = 3
for m_num in range(models):
  loss = []
  c_loss = []
  r_loss = []
  c_acc = []
  r_mse = []
  for i in range(n_folds):
      print(f"Model {m_num} Training on Fold: {i+1}")
      t_x, val_x, t_y_c, val_y_c, t_y_r, val_y_r = train_test_split(X_train_img, Y_train_img_c, Y_train_img_r, test_size=0.3, random_state = np.random.randint(1,1000, 1)[0])
      model = eval(f'create_model_v{m_num}()')
      model.fit(t_x, {'classification': t_y_c, 'regression': t_y_r}, epochs=10, batch_size=64)
      preds = model.evaluate(val_x, {'classification': val_y_c, 'regression': val_y_r}, batch_size=32, verbose=1, sample_weight=None)
      loss.append(preds[0])
      c_loss.append(preds[1])
      r_loss.append(preds[2])
      c_acc.append(preds[3])
      r_mse.append(preds[4])
  all_losses.append(loss)
  all_c_losses.append(c_loss)
  all_r_losses.append(r_loss)
  all_c_accs.append(c_acc)
  all_r_mses.append(r_mse)

In [None]:
# == Provide average scores ==
for m in range(models):
  print(f'Model {m}')
  # print('------------------------------------------------------------------------')
  print('Score per fold')
  for f in range(n_folds):
    # print('------------------------------------------------------------------------')
    print(f'> Fold {f+1} - loss: {round(all_losses[m][f], 4)} - c_loss: {round(all_c_losses[m][f], 4)} - r_loss: {round(all_r_losses[m][f], 4)} - c_acc: {round(all_c_accs[m][f], 4)} - r_mse: {round(all_r_mses[m][f], 4)}')
  # print('------------------------------------------------------------------------')
  print('Average scores for all folds:')
  print(f'> avg loss: {round(np.mean(all_losses[m]), 4)} (+- {round(np.std(all_losses[m]), 4)})')
  print(f'> avg c_loss: {round(np.mean(all_c_losses[m]), 4)} (+- {round(np.std(all_c_losses[m]), 4)})')
  print(f'> avg r_loss: {round(np.mean(all_r_losses[m]), 4)} (+- {round(np.std(all_r_losses[m]), 4)})')
  print(f'> avg c_acc: {round(np.mean(all_c_accs[m]), 4)} (+- {round(np.std(all_c_accs[m]), 4)})')
  print(f'> avg r_mse: {round(np.mean(all_r_mses[m]), 4)} (+- {round(np.std(all_r_mses[m]), 4)})')
  print('------------------------------------------------------------------------')

In [None]:
# carrega as imagens usadas para testar 
X_test_img = []
for file in X_test:
    im = imread(file)
    np_img = np.asarray(crop_center(im, crop_size, crop_size)).reshape((1,150,150,1))
    if len(X_test_img):
      X_test_img = np.append(X_test_img, np_img, axis=0)
    else:
      X_test_img = np_img

print(f'loaded test images')

Y_test_img_c = Y_test_c
Y_test_img_c = np.stack(Y_test_img_c, axis=0).reshape((len(Y_test_img_c), 1))

Y_test_img_r = Y_test_r
Y_test_img_r = np.stack(Y_test_img_r, axis=0).reshape((len(Y_test_img_r), 1))

In [None]:
!pip install optuna

In [None]:
import optuna
from keras.backend import clear_session

In [None]:
def objective(trial):
    # Clear clutter from previous Keras session graphs.
    # K.clear_session()
    clear_session()

    inputs = Input(shape=(crop_size, crop_size, 1), name='input')

    x = Conv2D(filters=trial.suggest_categorical("filters1", [32, 64]), kernel_size=trial.suggest_categorical("kernel_size1", [2, 3, 4]))(inputs)
    x = Activation(activation=trial.suggest_categorical("activation1", ["relu", "linear", "sigmoid"]))(x)
    x = MaxPooling2D(pool_size=trial.suggest_categorical("pool_size1", [2, 3]))(x)

    x = Conv2D(filters=trial.suggest_categorical("filters2", [32, 64]), kernel_size=trial.suggest_categorical("kernel_size2", [2, 3, 4]))(x)
    x = Activation(activation=trial.suggest_categorical("activation2", ["relu", "linear", "sigmoid"]))(x)
    x = MaxPooling2D(pool_size=trial.suggest_categorical("pool_size2", [2, 3]))(x)

    x = Conv2D(filters=trial.suggest_categorical("filters3", [32, 64]), kernel_size=trial.suggest_categorical("kernel_size3", [2, 3, 4]))(x)
    x = Activation(activation=trial.suggest_categorical("activation3", ["relu", "linear", "sigmoid"]))(x)
    x = MaxPooling2D(pool_size=trial.suggest_categorical("pool_size3", [2, 3]))(x)


    x = Flatten()(x)

    x = Dense(units=trial.suggest_categorical("units1", [128, 64]))(x)
    x = Activation(trial.suggest_categorical("activation4", ["relu", "linear", "sigmoid"]))(x)
    x = Dense(units=trial.suggest_categorical("units2", [128, 64, 32]))(x)
    x = Activation(trial.suggest_categorical("activation5", ["relu", "linear", "sigmoid"]))(x)
    x = Dense(units=trial.suggest_categorical("units3", [64, 32]))(x)
    x = Activation(trial.suggest_categorical("activation6", ["relu", "linear", "sigmoid"]))(x)

    output1 = Dense(1)(x)
    output1 = Activation('sigmoid', name='classification')(output1)

    output2 = Dense(1)(x)
    output2 = Activation('linear', name='regression')(output2)

    model = Model(inputs=inputs, outputs=[output1, output2])
    lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True)
    opt = keras.optimizers.Adam(learning_rate=lr)
    model.compile(loss={'classification': 'binary_crossentropy', 
                        'regression': 'mean_absolute_error'},
                  optimizer=opt,
                  metrics={'classification': 'accuracy',
                           'regression': 'mean_squared_error'})

    t_x, val_x, t_y_c, val_y_c, t_y_r, val_y_r = train_test_split(X_train_img, Y_train_img_c, Y_train_img_r, test_size=0.3, random_state = np.random.randint(1,1000, 1)[0])

    model.fit(t_x,{'classification': t_y_c, 'regression': t_y_r},
    # model.fit(X_train_img,{'classification': Y_train_img_c, 'regression': Y_train_img_r},
        #validation_data=(x_valid, y_valid),
        shuffle=True,
        batch_size=64,
        epochs=10,
        verbose=False,
    )

    # Evaluate the model accuracy on the validation set.
    score = model.evaluate(val_x, {'classification': val_y_c, 'regression': val_y_r}, batch_size=32, verbose=1)
    loss = score[0]
    # r_loss = score[2]
    # c_acc = score[3]
    return loss

In [None]:
#gc_after_trial=True
import gc
gc.collect()

In [None]:
study = optuna.create_study(direction="minimize")
#study.optimize(objective, n_trials=30, gc_after_trial=True)

study.optimize(objective, n_trials=300,  gc_after_trial=True, callbacks=[lambda study, trial: gc.collect()])
# study = optuna.create_study(directions=["minimize", "maximize"])
# study = optuna.multi_objective.create_study(["minimize", "maximize"])
#optuna.create_study(directions=["minimize", "maximize"])
#study.best_params

In [None]:
study.best_trial

In [None]:
study.best_trial.params

In [None]:
fig = optuna.visualization.plot_optimization_history(study)
fig.show()

In [None]:
fig = optuna.visualization.plot_param_importances(study)
fig.show()

In [None]:
while True: pass

KeyboardInterrupt: ignored