In [1]:
import tensorflow as tf
import h5py
import scipy
import time
import datetime
import pandas as pd

from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam, Adamax





In [2]:
# Configurações para o treinamento
train_data_dir = './data'
batch_size = 40 # imagens por lote
num_epochs = 75 # épocas de treinamento
image_size = (200, 200) # tamanho imagens
num_classes = 5 # qtd de classes
classes = ['Raiva', 'Alegria', 'Neutro', 'Triste', 'Surpresa']

In [3]:
def print_in_color(txt_msg,fore_tupple,back_tupple,):
    #prints the text_msg in the foreground color specified by fore_tupple with the background specified by back_tupple 
    #text_msg is the text, fore_tupple is foregroud color tupple (r,g,b), back_tupple is background tupple (r,g,b)
    rf,gf,bf=fore_tupple
    rb,gb,bb=back_tupple
    msg='{0}' + txt_msg
    mat='\33[38;2;' + str(rf) +';' + str(gf) + ';' + str(bf) + ';48;2;' + str(rb) + ';' +str(gb) + ';' + str(bb) +'m' 
    print(msg .format(mat), flush=True)
    print('\33[0m', flush=True) # returns default print color to back to black
    return

In [5]:
class LRA(tf.keras.callbacks.Callback):
    def __init__(self,model, base_model, patience,stop_patience, threshold, factor, dwell, batches, initial_epoch,epochs, ask_epoch, csv_path=None):
        super(LRA, self).__init__()
        self.model=model
        self.base_model=base_model
        self.patience=patience # especifica quantas épocas sem melhoria antes que a taxa de aprendizado seja ajustada
        self.stop_patience=stop_patience # especifica quantas vezes ajustar a taxa de aprendizado sem melhoria para parar o treinamento
        self.threshold=threshold # especifica o limiar de acurácia de treinamento quando a taxa de aprendizado será ajustada com base na perda de validação
        self.factor=factor # fator pelo qual reduzir a taxa de aprendizado
        self.dwell=dwell
        self.batches=batches # número de lotes de treinamento para executar por época
        self.initial_epoch=initial_epoch
        self.epochs=epochs
        self.ask_epoch=ask_epoch
        self.ask_epoch_initial=ask_epoch # salvar este valor para restaurar se reiniciar o treinamento
        self.csv_path=csv_path
        # variáveis de callback
        self.count=0 # quantas vezes a taxa de aprendizado foi reduzida sem melhoria
        self.stop_count=0
        self.best_epoch=1 # época com a menor perda
        self.initial_lr=float(tf.keras.backend.get_value(model.optimizer.lr)) # obter a taxa de aprendizado inicial e salvá-la
        self.highest_tracc=0.0 # definir a maior acurácia de treinamento inicialmente como 0
        self.lowest_vloss=np.inf # definir a menor perda de validação inicialmente como infinito
        self.best_weights=self.model.get_weights() # definir os melhores pesos como os pesos iniciais do modelo
        self.initial_weights=self.model.get_weights() # salvar os pesos iniciais caso precisem ser restaurados
        self.data_dict={}
        for key in ['epoch','tr loss','tr acc','vloss','vacc','current lr','next lr','monitor','% improv','duration']:
            self.data_dict[key]=[]
    def on_train_begin(self, logs=None):
        if self.base_model != None:
            status=base_model.trainable
            if status:
                msg=' inicializando callback iniciando treinamento com base_model treinável'
            else:
                msg='inicializando callback iniciando treinamento com base_model não treinável'
        else:
            msg='inicializando callback e iniciando treinamento'
        print_in_color (msg, (244, 252, 3), (55,65,80))
        msg='{0:^8s}{1:^10s}{2:^9s}{3:^9s}{4:^9s}{5:^9s}{6:^9s}{7:^10s}{8:10s}{9:^8s}'.format('Epoch', 'Loss', 'Accuracy', 'V_loss','V_acc', 'LR', 'Next LR', 'Monitor','% Improv', 'Duration')
        print_in_color(msg, (244,252,3), (55,65,80))
        self.start_time= time.time()

    def on_train_end(self, logs=None):
        stop_time=time.time()
        tr_duration= stop_time- self.start_time            
        hours = tr_duration // 3600
        minutes = (tr_duration - (hours * 3600)) // 60
        seconds = tr_duration - ((hours * 3600) + (minutes * 60))
        if self.csv_path !=None:
            df=pd.DataFrame.from_dict(self.data_dict)
            now = datetime.now() 
            year = str(now.year)
            month=str(now.month)
            day=str(now.day)
            hour=str(now.hour)
            minute=str(now.minute)
            sec=str(now.second)
            label = month + '-'+ day + '-' + year + '-' + hour + '-' + minute + '-' + sec +'.csv'
            csv_path=self.csv_path + '-'+ label
            df.to_csv(csv_path, index=False) 
            print('dados de treinamento salvos como ', csv_path)

        self.model.set_weights(self.best_weights) # definir os pesos do modelo para os melhores pesos
        msg=f'Treinamento concluído - modelo definido com pesos da época {self.best_epoch} '
        print_in_color(msg, (0,255,0), (55,65,80))
        msg = f'o tempo decorrido de treinamento foi {str(hours)} horas, {minutes:4.1f} minutos, {seconds:4.2f} segundos'
        print_in_color(msg, (0,255,0), (55,65,80))   
        
    def on_train_batch_end(self, batch, logs=None):
        acc=logs.get('accuracy')* 100  # obter a acurácia de treinamento 
        loss=logs.get('loss')
        msg='{0:20s}processando lote {1:4s} de {2:5s} acurácia= {3:8.3f}  perda: {4:8.5f}'.format(' ', str(batch), str(self.batches), acc, loss)
        print(msg, '\r', end='') # imprime na mesma linha para mostrar a contagem de lotes em execução        
        
    def on_epoch_begin(self,epoch, logs=None):
        self.now= time.time()
        
    def on_epoch_end(self, epoch, logs=None):  # método executado no final de cada época
        later=time.time()
        duration=later-self.now 
        lr=float(tf.keras.backend.get_value(self.model.optimizer.lr)) # obter a taxa de aprendizado atual
        current_lr=lr
        v_loss=logs.get('val_loss')  # obter a perda de validação para esta época
        acc=logs.get('accuracy')  # obter a acurácia de treinamento 
        v_acc=logs.get('val_accuracy')
        loss=logs.get('loss')        
        if acc < self.threshold: # se a acurácia de treinamento estiver abaixo do limiar, ajustar a taxa de aprendizado com base na acurácia de treinamento
            monitor='accuracy'
            if epoch ==0:
                pimprov=0.0
            else:
                pimprov= (acc-self.highest_tracc )*100/self.highest_tracc
            if acc>self.highest_tracc: # a acurácia de treinamento melhorou na época                
                self.highest_tracc=acc # definir nova maior acurácia de treinamento
                self.best_weights=self.model.get_weights() # a acurácia de treinamento melhorou, então salvar os pesos
                self.count=0 # definir contagem para 0 já que a acurácia de treinamento melhorou
                self.stop_count=0 # definir contador de paradas para 0
                if v_loss<self.lowest_vloss:
                    self.lowest_vloss=v_loss
                color= (0,255,0)
                self.best_epoch=epoch + 1  # definir o valor da melhor época para esta época              
            else: 
                # a acurácia de treinamento não melhorou, verificar se isso aconteceu por um número de épocas igual à paciência
                # se sim, ajustar a taxa de aprendizado
                if self.count>=self.patience -1: # a taxa de aprendizado deve ser ajustada
                    color=(245, 170, 66)
                    lr= lr* self.factor # ajustar a taxa de aprendizado pelo fator
                    tf.keras.backend.set_value(self.model.optimizer.lr, lr) # definir a taxa de aprendizado no otimizador
                    self.count=0 # redefinir a contagem para 0
                    self.stop_count=self.stop_count + 1 # contar o número de ajustes consecutivos da taxa de aprendizado
                    self.count=0 # redefinir contador
                    if self.dwell:
                        self.model.set_weights(self.best_weights) # retornar a um ponto melhor no espaço N                        
                    else:
                        if v_loss<self.lowest_vloss:
                            self.lowest_vloss=v_loss                                    
                else:
                    self.count=self.count +1 # incrementar contador de paciência                    
        else: # a acurácia de treinamento está acima do limiar, então ajustar a taxa de aprendizado com base na perda de validação
            monitor='val_loss'
            if epoch ==0:
                pimprov=0.0
            else:
                pimprov= (self.lowest_vloss- v_loss )*100/self.lowest_vloss
            if v_loss< self.lowest_vloss: # verificar se a perda de validação melhorou 
                self.lowest_vloss=v_loss # substituir a menor perda de validação pela nova perda de validação                
                self.best_weights=self.model.get_weights() # a perda de validação melhorou, então salvar os pesos
                self.count=0 # redefinir contagem já que a perda de validação melhorou  
                self.stop_count=0  
                color=(0,255,0)                
                self.best_epoch=epoch + 1 # definir o valor da melhor época para esta época
            else: # a perda de validação não melhorou
                if self.count>=self.patience-1: # é necessário ajustar a taxa de aprendizado
                    color=(245, 170, 66)
                    lr=lr * self.factor # ajustar a taxa de aprendizado                    
                    self.stop_count=self.stop_count + 1 # incrementar contador de paradas porque a taxa de aprendizado foi ajustada 
                    self.count=0 # redefinir contador
                    tf.keras.backend.set_value(self.model.optimizer.lr, lr) # definir a taxa de aprendizado no otimizador
                    if self.dwell:
                        self.model.set_weights(self.best_weights) # retornar a um ponto melhor no espaço N
                else: 
                    self.count =self.count +1 # incrementar o contador de paciência                    
                if acc>self.highest_tracc:
                    self.highest_tracc= acc
        msg=f'{str(epoch+1):^3s}/{str(self.epochs):4s} {loss:^9.3f}{acc*100:^9.3f}{v_loss:^9.5f}{v_acc*100:^9.3f}{current_lr:^9.5f}{lr:^9.5f}{monitor:^11s}{pimprov:^10.2f}{duration:^8.2f}'
        print_in_color (msg,color, (55,65,80))
        key_list=['epoch','tr loss','tr acc','vloss','vacc','current lr','next lr','monitor','% improv','duration']
        val_list =[epoch + 1, loss, acc, v_loss, v_acc, current_lr, lr, monitor, pimprov, duration]
        for key, value in zip(key_list, val_list):
            self.data_dict[key].append(value)
        
        if self.stop_count> self.stop_patience - 1: # verificar se a taxa de aprendizado foi ajustada um número de vezes igual a stop_count sem melhoria
            msg=f' o treinamento foi interrompido na época {epoch + 1} após {self.stop_patience} ajustes da taxa de aprendizado sem melhoria'
            print_in_color(msg, (0,255,255), (55,65,80))
            self.model.stop_training = True # parar o treinamento
        else: 
            if self.ask_epoch !=None:                
                if epoch + 1 >= self.ask_epoch:
                    if base_model.trainable:
                        msg='digite H para interromper o treinamento ou um número inteiro para o número de épocas a serem executadas e perguntar novamente'
                    else:
                        msg='digite H para interromper o treinamento, F para ajustar o modelo ou um número inteiro para o número de épocas a serem executadas e perguntar novamente'
                    print_in_color(msg, (0,255,255), (55,65,80))
                    ans=input('')                     
                    if ans=='H' or ans=='h':
                        msg=f'você digitou H - o treinamento foi interrompido na época {epoch + 1} devido à entrada do usuário'
                        print_in_color(msg, (0,255,255), (55,65,80))
                        self.model.stop_training = True # parar o treinamento
                    elif ans == 'F' or ans=='f':
                        if base_model.trainable:
                            msg='você digitou F, mas base_model já está definido como treinável'
                        else:
                            msg='você digitou F - definindo base_model como treinável para ajuste fino do modelo'
                            self.base_model.trainable=True
                        print_in_color(msg, (0, 255,255), (55,65,80))
                        msg='Digite um número inteiro para o número de épocas a serem executadas e depois ser perguntado novamente'
                        print_in_color(msg, (0,2555,255), (55,65,80))
                        ans=input()
                        ans=int(ans)
                        self.ask_epoch +=ans
                        msg=f' você digitou {ans} o treinamento continuará até a época {self.ask_epoch} '  
                        print_in_color(msg, (0, 255,255), (55,65,80))    
                        msg='{0:^8s}{1:^10s}{2:^9s}{3:^9s}{4:^9s}{5:^9s}{6:^9s}{7:^10s}{8:^8s}'.format('Epoch', 'Loss', 'Accuracy',
                                                                                            'V_loss','V_acc', 'LR', 'Next LR', 'Monitor','% Improv', 'Duration')
                        print_in_color(msg, (244,252,3), (55,65,80))                         
                        self.count=0
                        self.stop_count=0                         
                    else:
                        ans=int(ans)
                        self.ask_epoch +=ans
                        msg=f' você digitou {ans} o treinamento continuará até a época {self.ask_epoch} '                        
                        print_in_color(msg, (0, 255,255), (55,65,80))
                        msg='{0:^8s}{1:^10s}{2:^9s}{3:^9s}{4:^9s}{5:^9s}{6:^9s}{7:^10s}{8:10s}{9:^8s}'.format('Epoch', 'Loss', 'Accuracy',
                                                                                            'V_loss','V_acc', 'LR', 'Next LR', 'Monitor','% Improv', 'Duration')
                        print_in_color(msg, (244,252,3), (55,65,80))

In [None]:
def saver(save_path, model, model_name, subject, accuracy,img_size, scalar,offset ,generator):    
    # Salvando o modelo
    save_id=str (model_name +  '-' + subject +'-'+ str(acc)[:str(acc).rfind('.')+3] + '.h5')
    model_save_loc=os.path.join(save_path, save_id)
    model.save(model_save_loc)
    print_in_color ('model was saved as ' + model_save_loc, (0,255,0),(55,65,80)) 
    # now create the class_df and convert to csv file    
    class_dict=generator.class_indices 
    height=[]
    width=[]
    scale=[]
    off=[]
    for i in range(len(class_dict)):
        height.append(img_size[0])
        width.append(img_size[1])
        scale.append(scalar) 
        off.append(offset)
    Index_series=pd.Series(list(class_dict.values()), name='class_index')
    Class_series=pd.Series(list(class_dict.keys()), name='class') 
    Height_series=pd.Series(height, name='height')
    Width_series=pd.Series(width, name='width')
    Scale_series=pd.Series(scale, name='scale by')
    Off_series=pd.Series(off, name='Offset')
    class_df=pd.concat([Index_series, Class_series, Height_series, Width_series, Scale_series, Off_series],axis=1)    
    csv_name='class_dict.csv'
    csv_save_loc=os.path.join(save_path, csv_name)
    class_df.to_csv(csv_save_loc, index=False) 
    print_in_color ('class csv file was saved as ' + csv_save_loc, (0,255,0),(55,65,80)) 
    return model_save_loc, csv_save_loc

In [None]:
def predictor(sdir, csv_path,  model_path, averaged=True, verbose=True):    
    # read in the csv file
    class_df=pd.read_csv(csv_path)    
    class_count=len(classes)    
    img_size= image_size 
    scale=1  
    # determine value to scale image pixels by
    try: 
        s=int(scale)
        s2=1
        s1=0
    except:
        split=scale.split('-')
        s1=float(split[1])
        s2=float(split[0].split('*')[1])
    path_list=[]
    paths=os.listdir(sdir)    
    for f in paths:
        path_list.append(os.path.join(sdir,f))
    if verbose:
        print (' Model is being loaded- this will take about 10 seconds')
    model=load_model(model_path)
    image_count=len(path_list) 
    image_list=[]
    file_list=[]
    good_image_count=0
    for i in range (image_count):        
        try:
            img=cv2.imread(path_list[i])
            img=cv2.resize(img, img_size)
            img=cv2.cvtColor(img, cv2.COLOR_BGR2RGB)            
            good_image_count +=1
            img=img*s2 - s1             
            image_list.append(img)
            file_name=os.path.split(path_list[i])[1]
            file_list.append(file_name)
        except:
            if verbose:
                print ( path_list[i], ' is an invalid image file')
    if good_image_count==1: # if only a single image need to expand dimensions
        averaged=True
    image_array=np.array(image_list)    
    # make predictions on images, sum the probabilities of each class then find class index with
    # highest probability
    preds=model.predict(image_array)    
    if averaged:
        psum=[]
        for i in range (class_count): # create all 0 values list
            psum.append(0)    
        for p in preds: # iterate over all predictions
            for i in range (class_count):
                psum[i]=psum[i] + p[i]  # sum the probabilities   
        index=np.argmax(psum) # find the class index with the highest probability sum        
        klass=class_df['class'].iloc[index] # get the class name that corresponds to the index
        prob=psum[index]/len(preds) * 100  # get the probability average         
        # to show the correct image run predict again and select first image that has same index
        for img in image_array:  #iterate through the images    
            test_img=np.expand_dims(img, axis=0) # since it is a single image expand dimensions 
            test_index=np.argmax(model.predict(test_img)) # for this image find the class index with highest probability
            if test_index== index: # see if this image has the same index as was selected previously
                if verbose: # show image and print result if verbose=1
                    plt.axis('off')
                    plt.imshow(img) # show the image
                    print (f'predicted class is {klass} with a probability of {prob:6.4f} % ')
                break # found an image that represents the predicted class      
        return klass, prob, img, None
    else: # create individual predictions for each image
        pred_class=[]
        prob_list=[]
        for i, p in enumerate(preds):
            index=np.argmax(p) # find the class index with the highest probability sum
            klass=class_df['class'].iloc[index] # get the class name that corresponds to the index
            image_file= file_list[i]
            pred_class.append(klass)
            prob_list.append(p[index])            
        Fseries=pd.Series(file_list, name='image file')
        Lseries=pd.Series(pred_class, name= 'class')
        Pseries=pd.Series(prob_list, name='probability')
        df=pd.concat([Fseries, Lseries, Pseries], axis=1)
        if verbose:
            length= len(df)
            print (df.head(length))
        return None, None, None, df

In [31]:
train_datagen = ImageDataGenerator(
    #rescale = 1.0/255,
    validation_split = 0.20, # 25% para validação
    #shear_range = 0.2, # inclinação
    #zoom_range = 0.2, # zoom
    horizontal_flip = True # espelhamento horizontal
)

In [32]:
train_generator = train_datagen.flow_from_directory(
    train_data_dir,
    target_size = image_size,
    batch_size = batch_size,
    class_mode = 'categorical',
    subset = 'training'
)

Found 11362 images belonging to 5 classes.


In [33]:
validation_generator = train_datagen.flow_from_directory(
    train_data_dir,
    target_size = image_size,
    batch_size = batch_size,
    class_mode = 'categorical',
    subset = 'validation'
)

Found 2838 images belonging to 5 classes.


In [34]:
model = Sequential() # instanciando o modelo
model.add(Conv2D(32, (3,3), input_shape=(image_size[0], image_size[1], 3), activation='relu')) # camada de convolução
model.add(MaxPooling2D(pool_size=(2,2))) # camada de pooling
model.add(Conv2D(64, (3,3), activation='relu')) # camada de convolução
model.add(MaxPooling2D(pool_size=(2,2))) # camada de pooling
model.add(Flatten()) # camada de achatamento ou vetorização
model.add(Dense(64, activation='relu')) # neurônios
model.add(Dropout(0.6)) # regularização 50% chance de desligar neurônios
model.add(Dense(5, activation='softmax')) # camada de saída (classificação binária)

In [35]:
# compilar o modelo
model.compile(
    Adamax(0.001),
    loss = 'categorical_crossentropy', # mais próximo de 0 melhor, 0.002 por exemplo
    metrics = ['accuracy']
)

In [36]:
# treinamento
model.fit(
    train_generator,
    epochs = num_epochs,
    validation_data = validation_generator
)

print('Treinamento concluído!')

# salvar o modelo
model.save('./models/ceMod4.h5')

Epoch 1/75
Epoch 2/75
Epoch 3/75
Epoch 4/75
Epoch 5/75
Epoch 6/75
Epoch 7/75
Epoch 8/75
Epoch 9/75
Epoch 10/75
Epoch 11/75
Epoch 12/75
Epoch 13/75
Epoch 14/75
Epoch 15/75
Epoch 16/75
Epoch 17/75
Epoch 18/75
Epoch 19/75
Epoch 20/75
Epoch 21/75
Epoch 22/75
Epoch 23/75
Epoch 24/75
Epoch 25/75
Epoch 26/75
Epoch 27/75
Epoch 28/75
Epoch 29/75
Epoch 30/75
Epoch 31/75
Epoch 32/75
Epoch 33/75
Epoch 34/75
Epoch 35/75
Epoch 36/75
Epoch 37/75
Epoch 38/75
Epoch 39/75
Epoch 40/75
Epoch 41/75
Epoch 42/75
Epoch 43/75
Epoch 44/75
Epoch 45/75
Epoch 46/75
Epoch 47/75
Epoch 48/75
Epoch 49/75
Epoch 50/75
Epoch 51/75
Epoch 52/75
Epoch 53/75
Epoch 54/75
Epoch 55/75
Epoch 56/75
Epoch 57/75
Epoch 58/75
Epoch 59/75
Epoch 60/75
Epoch 61/75
Epoch 62/75
Epoch 63/75
Epoch 64/75
Epoch 65/75
Epoch 66/75
Epoch 67/75
Epoch 68/75
Epoch 69/75
Epoch 70/75
Epoch 71/75
Epoch 72/75
Epoch 73/75
Epoch 74/75
Epoch 75/75
Treinamento concluído!
