<a href="https://colab.research.google.com/github/LuisFernandoRosa/CNN-Training-Structure/blob/main/Couro_newmodel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

###**importações**

In [None]:
!pip install --quiet neural_structured_learning

[?25l[K     |██▊                             | 10kB 23.8MB/s eta 0:00:01[K     |█████▍                          | 20kB 4.6MB/s eta 0:00:01[K     |████████▏                       | 30kB 5.1MB/s eta 0:00:01[K     |██████████▉                     | 40kB 5.5MB/s eta 0:00:01[K     |█████████████▋                  | 51kB 5.6MB/s eta 0:00:01[K     |████████████████▎               | 61kB 5.9MB/s eta 0:00:01[K     |███████████████████             | 71kB 6.5MB/s eta 0:00:01[K     |█████████████████████▊          | 81kB 6.9MB/s eta 0:00:01[K     |████████████████████████▌       | 92kB 7.5MB/s eta 0:00:01[K     |███████████████████████████▏    | 102kB 7.4MB/s eta 0:00:01[K     |██████████████████████████████  | 112kB 7.4MB/s eta 0:00:01[K     |████████████████████████████████| 122kB 7.4MB/s 
[?25h

In [None]:
import os
import cv2
import csv
import random
import datetime
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow.keras import Input
from tensorflow.random import set_seed
from tensorflow.keras.optimizers import Adagrad
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.applications import ResNet50, MobileNetV2
from tensorflow.keras.models import Model, save_model
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.layers import Dense, Dropout, AveragePooling2D, Flatten
from tensorflow.keras.metrics import Accuracy, Precision, FalsePositives, AUC, CategoricalAccuracy

from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

import albumentations as alb
import neural_structured_learning as nsl

####**preprocessing**
*   Definition of the functions for preprocessing methods.

In [None]:
def preprocessing_original_image(img,cla,batch,l_cla):
  img = cv2.resize(img,(224,224), interpolation = cv2.INTER_CUBIC)
  batch.append(img)
  l_cla.append(to_categorical(cla,Config.class_quant)) 

In [None]:
def preprocessing_augmentation(img,cla,batch,l_cla):
  rot = alb.augmentations.transforms.Rotate(always_apply= True)
  gns = alb.augmentations.transforms.GaussNoise(always_apply=True)
  bri = alb.augmentations.transforms.RandomBrightness(always_apply=True)

  batch.append(img)
  l_cla.append(to_categorical(cla,Config.class_quant)) 

  np.random.seed(42)
  aimg = rot.apply(img,np.random.randint(10,70))
  batch.append(aimg)
  l_cla.append(to_categorical(cla,Config.class_quant)) 
  
  np.random.seed(42)
  aimg = bri.apply(img,np.random.uniform(0.6,0.9))
  batch.append(aimg)
  l_cla.append(to_categorical(cla,Config.class_quant)) 

  np.random.seed(42)
  aimg = gns.apply(img)
  batch.append(aimg)
  l_cla.append(to_categorical(cla,Config.class_quant)) 

In [None]:
def preprocessing_4p4(img,cla,batch,l_cla):
  simg = img[0:int(img.shape[0]/2) , 0:int(img.shape[1]/2),:]   
  simg = cv2.resize(simg,Config.input_hw,cv2.INTER_CUBIC)
  batch.append(simg)
  l_cla.append(to_categorical(cla, Config.class_quant))

  simg = img[int(img.shape[0]/2) : img.shape[0] , 0:int(img.shape[1]/2),:]   
  simg = cv2.resize(simg,Config.input_hw,cv2.INTER_CUBIC)
  batch.append(simg)
  l_cla.append(to_categorical(cla, Config.class_quant))

  simg = img[0:int(img.shape[0]/2) , int(img.shape[1]/2) : img.shape[1],:]   
  simg = cv2.resize(simg,Config.input_hw,cv2.INTER_CUBIC)
  batch.append(simg)
  l_cla.append(to_categorical(cla, Config.class_quant))

  simg = img[int(img.shape[0]/2) : img.shape[0] , int(img.shape[1]/2) : img.shape[1],:]   
  simg = cv2.resize(simg,Config.input_hw,cv2.INTER_CUBIC)
  batch.append(simg)
  l_cla.append(to_categorical(cla, Config.class_quant))

####**pos-processing**
* Definition of the funtions that will handle with the outputs of the network.


In [None]:
def processing_predict_original(pob, id, Config):
  Config.predict[0].extend(pob)
  Config.predict[1].append(to_categorical(Config.class_dict[Config.data_frame.iloc[id]['targets']],Config.class_quant))

In [None]:
def processing_predict_4p4(pob, id, Config):
  Config.predict[0].extend(pob)
  for q in range(4):
    Config.predict[1].append(to_categorical(Config.class_dict[Config.data_frame.iloc[id]['targets']],Config.class_quant))
  
  Config.multi_method = True
  Config.predict_method = [[],[],[],[],[]]
  Config.method_name = ['add','mult','likelihood','votação','real']

  z = np.zeros(Config.class_quant)
  l = np.zeros(Config.class_quant)
  o = np.ones(Config.class_quant)
  for i in pob:
    z = np.add(z,i)
    l = np.add(z,np.log(i))
    o = np.multiply(o,i)    
  l = np.exp(l)

  Config.predict_method[0].append(z)
  Config.predict_method[1].append(o)
  Config.predict_method[2].append(l)
  
  m = 0
  pob = np.argmax(pob,axis=1) 
  pob = list(pob)
  for i in pob:
    c = pob.count(i)
    m = i if c > m else m
  Config.predict_method[3].append(to_categorical(m,Config.class_quant))
  Config.predict_method[4].append(to_categorical(Config.class_dict[Config.data_frame.iloc[id]['targets']],Config.class_quant))

####**Save**
* Function to save the confusion matrix and all type of output.

In [None]:
def matrix(y_pred,y_real):
  y_pred = np.argmax(y_pred,axis=1)
  print(y_pred)
  y_real = np.argmax(y_real,axis=1)
  print(y_real)

  cf = confusion_matrix(y_real, y_pred,labels = np.arange(0,Config.class_quant))
  fig, ax = plt.subplots(figsize = (10,10))

  Cm = ConfusionMatrixDisplay(cf, np.arange(0,Config.class_quant ))

  Cm.plot(include_values=True, cmap="Blues", ax= ax, xticks_rotation="vertical")
  # plt.show()

In [None]:
def save_mc():
  os.mkdir(Config.save_path  + '/matriz-confusao')

  if (Config.multi_method):
    for i in range(len(Config.method_name) - 1):
      matrix(Config.predict_method[i],Config.predict_method[len(Config.method_name) - 1])
      plt.savefig(Config.save_path + '/matriz-confusao/cm-' + Config.method_name[i] + '.png', format='png')

  matrix(Config.predict[0],Config.predict[1])
  plt.savefig(Config.save_path + '/matriz-confusao/cm-pred_real.png', format='png')

In [None]:
def save_csv():
  os.mkdir(Config.save_path  + '/valores')
  if (Config.multi_method):
    for i in range(len(Config.method_name) ):
      f = open(Config.save_path + '/valores/valores-'+ Config.method_name[i] + '.csv', 'a', newline ='\n')
      w = csv.writer(f , delimiter = ',')
      for r in range(len(Config.predict_method[i])):
        w.writerow(Config.predict_method[i][r])
      f.close()

  f = open(Config.save_path + '/valores/valores-predP.csv', 'a', newline ='\n')
  w = csv.writer(f , delimiter = ',')
  for r in range(len(Config.predict[0])):
    w.writerow(Config.predict[0][r])
  f.close()

  f = open(Config.save_path + '/valores/valores-realP.csv', 'a', newline ='\n')
  w = csv.writer(f , delimiter = ',')
  for r in range(len(Config.predict[1])):
    w.writerow(Config.predict[1][r])
  f.close()

##**Config**
* Configuration, and creating data frame.

In [None]:
class Config(object):
  def __init__(self):
    self.model_name = 'Mobile-1'
    self.save_path = '/content/drive/My Drive/models/' + self.model_name + '-' + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

    self.base_path = '/content/drive/My Drive/Base/Florestais/Especies Pericia Federal2'
    self.fold_num = 10
    self.epochs = 50
    self.batch_size = 36
    self.optmizer = Adagrad()
    self.adversarial = False
    self.validation_split = 0.2

    self.base_model = 0
    self.input_shape = (224,224,3)
    self.predict_function = processing_predict_4p4
    self.preprocessing_function = preprocessing_3p3
    self.loss = CategoricalCrossentropy()
    self.metrics = [CategoricalAccuracy(), Precision(), FalsePositives(), AUC()]
    self.multi_method = True

    self.seed = 5
    self.freeze = True
    self.img_quant = 0
    self.data_frame = None
    self.class_quant = 0
    
    self.class_dict = {}
    self.ext = ['png','jpg','jpeg']
    self.input_hw = (self.input_shape[1],self.input_shape[0])
    self.predict = [[],[]]
    self.predict_method = None
    self.method_name = None
    self.initializer = tf.keras.initializers.glorot_normal(seed = self.seed)


    os.mkdir(self.save_path)

  def create_data_frame(self, base_path):
    imagens = []
    classes = []

    os.chdir(base_path)
    class_dir = os.listdir('.')
    class_dir = [fn for fn in class_dir if len(fn.split('.')) < 2]
    class_dir = sorted(class_dir)
    self.class_quant = len(class_dir)

    for class_name, class_num in zip(class_dir, range(self.class_quant)):
      cla = [fn for fn in os.listdir(class_name) if fn.split('.')[-1] in self.ext]
      for img in cla:
        imagens.append(base_path + '/' + class_name + '/' + img)
        classes.append(class_name)
      self.class_dict[class_name] = class_num

    self.data_frame = pd.DataFrame({'inputs' : imagens, 'targets' : classes})
    self.img_quant = self.data_frame.shape[0]
    
    

##**Generator**
* Generator of images.

In [None]:
class Generator(object):
  def __init__(self, Config, indice_treinamento, indice_teste, validation_split, batch_size):
    self.train = None
    self.val = None
    self.control = 0
    self.control1 = 0

    np.random.shuffle(indice_treinamento)

    if validation_split != None:
      self.train, self.val = train_test_split(indice_treinamento, test_size = validation_split)
    else:
      self.train = indice_treinamento

  def train_generator(self,Config):
    batch = []
    l_cla  = []

    while True:
      img = cv2.imread(Config.data_frame.iloc[self.train[self.control]]['inputs'])
      img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
      c = Config.class_dict[Config.data_frame.iloc[self.train[self.control]]['targets']]

      Config.preprocessing_function(img,c,batch,l_cla)

      if len(batch) == Config.batch_size:
        batch = np.array(batch, np.float32) / 255.
        yield (np.array(batch, np.float32),np.array(l_cla, np.float32))
        batch = []
        l_cla  = []
      self.control += 1
      if self.control == self.train.shape[0]:
        self.control = 0

  def val_generator(self,Config):
    batch = []
    l_cla  = []

    while True:
      img = cv2.imread(Config.data_frame.iloc[self.val[self.control1]]['inputs'])
      img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
      c = Config.class_dict[Config.data_frame.iloc[self.val[self.control1]]['targets']]

      Config.preprocessing_function(img,c,batch,l_cla)

      if len(batch) == Config.batch_size:
        batch = np.array(batch, np.float32) / 255.
        yield (np.array(batch, np.float32),np.array(l_cla,np.float32))
        batch = []
        l_cla  = []
      self.control1 += 1
      if self.control1 == self.val.shape[0]:
        self.control1 = 0
  
  def adversarial_train_gernerator(self, Config):
    batch = []
    l_cla  = []

    while True:
      img = cv2.imread(Config.data_frame.iloc[self.train[self.control]]['inputs'])
      img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
      c = Config.class_dict[Config.data_frame.iloc[self.train[self.control]]['targets']]

      preprocessing_original_image(img,c,batch,l_cla)

      if len(batch) == Config.batch_size:
        batch = np.array(batch, np.float32) / 255.
        yield {"inputs" : np.array(batch, np.float32),"label" : np.array(l_cla,np.float32)}
        batch = []
        l_cla  = []
      self.control += 1
      if self.control == self.train.shape[0]:
        self.control = 0

  def adversarial_val_generator(self, Config):
    batch = []
    l_cla  = []

    while True:
      img = cv2.imread(Config.data_frame.iloc[self.val[self.control1]]['inputs'])
      img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
      c = Config.class_dict[Config.data_frame.iloc[self.val[self.control1]]['targets']]

      preprocessing_original_image(img,c,batch,l_cla)

      if len(batch) == Config.batch_size:
        batch = np.array(batch, np.float32) / 255.
        yield {"inputs" : np.array(batch, np.float32),"label" : np.array(l_cla,np.float32)}
        batch = []
        l_cla  = []
      self.control1 += 1
      if self.control1 == self.val.shape[0]:
        self.control1 = 0

  def test_generator_prepro(self,Config,indice):
    batch = []
    l_cla = []
    img = cv2.imread(Config.data_frame.iloc[indice]['inputs'])
    img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
    c = Config.class_dict[Config.data_frame.iloc[indice]['targets']]

    Config.preprocessing_function(img,c,batch,l_cla)

    batch = np.array(batch, np.float32) / 255.
    return np.array(batch, np.float32)
    batch = []
    l_cla  = []
  
  def test_generator_cru(self,Config,indice):
    batch = []
    l_cla = []
    img = cv2.imread(Config.data_frame.iloc[indice]['inputs'])
    img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
    c = Config.class_dict[Config.data_frame.iloc[indice]['targets']]

    preprocessing_original_image(img,c,batch,l_cla)

    batch = np.array(batch, np.float32) / 255.
    return np.array(batch, np.float32)
    batch = []
    l_cla  = []  


##**Model**
* Model creation.


In [None]:
def compiled_model(in_shape, num_classe, base_model, Ploss, opt, Pmetrics):
  if (base_model == 0) :
    b_model = MobileNetV2(include_top=False, weights='imagenet',input_tensor= Input(shape = in_shape, name = 'inputs'))
    # b_model = ResNet50(include_top=False, weights='imagenet',input_tensor= Input(shape = in_shape, name = 'inputs'))
  else :
    b_model = DenseNet121(include_top=False, weights='imagenet',input_tensor= Input(shape = in_shape, name = 'inputs'))
  
  if not (Config.freeze):
    print('Treinando pesos ResNet')
    for layer in b_model.layers:
      layer.trainable = False


  Rede = b_model.output
  Rede = AveragePooling2D()(Rede)
  Rede = Flatten()(Rede)
  Rede = Dropout(0.4)(Rede) #Dropout(0.5,seed = Config.seed)(Rede)
  Rede = Dense(Config.class_quant, activation='softmax',kernel_initializer= Config.initializer, bias_initializer= Config.initializer)(Rede)


  Rede = Model(inputs = b_model.input, outputs = Rede)

  if (Config.adversarial):
    adversarial_config = nsl.configs.make_adv_reg_config(multiplier=0.2, adv_step_size=0.05)
    Adv_Rede = nsl.keras.AdversarialRegularization(Rede, adv_config = adversarial_config)#, label_keys=('targets',))
    Adv_Rede.compile(loss = Ploss, optimizer = opt, metrics = Pmetrics,run_eagerly = None)
    return Adv_Rede
    
  else:
    Rede.compile(loss = Ploss, optimizer = opt, metrics = Pmetrics)#, run_eagerly = None
    return Rede

In [None]:
%load_ext tensorboard
# Clear any logs from previous runs
# !rm -rf ./logs/ 

In [None]:
Config = Config()

np.random.seed(Config.seed)
set_seed(Config.seed)
random.seed(Config.seed)
tf.compat.v1.set_random_seed(Config.seed)
tf.random.set_global_generator(tf.random.Generator.from_seed(5))


Config.create_data_frame(Config.base_path)

In [None]:
Folds = StratifiedKFold(n_splits = Config.fold_num, shuffle = True, random_state = Config.seed)

##**Training**
* Traning model.

In [None]:
zeros = np.zeros(Config.data_frame.shape[0])
print('config.dataframe shape: ')
print(Config.data_frame.shape[0])
n = 0

for indice_treinamento, indice_teste in Folds.split(zeros,zeros):

  tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) # tirar os avisos do tensorflow

  Modelo = compiled_model(in_shape = Config.input_shape, num_classe = Config.class_quant, base_model = Config.base_model, Ploss = Config.loss ,opt = Config.optmizer ,Pmetrics = Config.metrics)

  gen = Generator(Config, indice_treinamento, indice_teste, Config.validation_split, Config.batch_size)

  log_dir = Config.save_path + "/logs/fit" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

  tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq = 1)

  if Config.adversarial:
    Modelo.fit(
      x = gen.adversarial_train_gernerator(Config), 
      batch_size = Config.batch_size, 
      epochs = Config.epochs,
      steps_per_epoch = int(((Config.img_quant * (1 - Config.validation_split)) / Config.batch_size) + 1),
      validation_data = gen.adversarial_val_generator(Config),
      validation_steps = int(((Config.img_quant * Config.validation_split) / Config.batch_size) + 1),
      callbacks = [tensorboard_callback]
      )
  else: 
    Modelo.fit(
        x = gen.train_generator(Config), 
        batch_size = Config.batch_size, 
        epochs = Config.epochs,
        steps_per_epoch = int(((Config.img_quant * (1 - Config.validation_split)) / Config.batch_size) + 1),
        validation_data = gen.val_generator(Config),
        validation_steps = int(((Config.img_quant * Config.validation_split) / Config.batch_size) + 1),
        callbacks = [tensorboard_callback]
        )

  for id in indice_teste:
    Config.predict_function(Modelo.predict(gen.test_generator_cru(Config,id)),id,Config)
   
  print('OUTRO FOLD : ' + str(n) + " <<<<<<<<<<<<<<<<<<<<<<<<<<<<<" )
    
    
  save_model(Modelo, Config.save_path + '/pesos/' + '/pesos-{}.tf'.format(n), save_format = "tf")
  n +=  1

  tf.keras.backend.clear_session()
  del Modelo
  del gen


In [None]:
save_mc()
save_csv()

In [None]:
# %tensorboard --logdir '/content/drive/My Drive/...'