# <center> Rozpoznawanie monet - projekt
### <center> EfficientNetB3

<a id="import"></a>
## Import modułów i konfiguracja

In [None]:
# basic utils
import numpy as np
import pandas as pd
import os
import time
import shutil

# data representation
import matplotlib.pyplot as plt
import seaborn as sns

# neural network
from sklearn.metrics import confusion_matrix, classification_report
import tensorflow as tf
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Dense, Dropout, BatchNormalization
from keras.optimizers import Adamax
from keras.models import Model

# config
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
sns.set_style('darkgrid')

# logger only for getting rid of the warning
import logging
logging.getLogger("tensorflow").setLevel(logging.ERROR)
import warnings
pd.options.display.max_columns = None
pd.options.display.max_rows = 90 # visual aid
warnings.simplefilter("ignore")
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

<a id="makedf"></a>
## Import i obsługa danych
załadowanie zbioru danych, preprocessing i wczytanie klas

zbiór danych [WorldCoins (kaggle)](https://www.kaggle.com/datasets/wanderdust/coin-images)

In [None]:
sdir=r'\dataset\coins\data'
classes=[]
datasets=os.listdir(sdir)
for d in datasets:
    dpath=os.path.join(sdir,d)    
    if os.path.isdir(dpath):
        print ('processing the ', d, ' dataset')
        filepaths = []
        labels=[]        
        classlist=os.listdir(dpath)
        for klass in classlist:            
            classpath=os.path.join(dpath, klass)
            flist=os.listdir(classpath)
            for f in flist:
                fpath=os.path.join(classpath,f)
                fclass=fpath.split('__')[1].split('.')[0]                
                filepaths.append(fpath)
                labels.append(fclass)
        Fseries=pd.Series(filepaths, name='filepaths')
        Lseries=pd.Series(labels, name='labels')        
        if d == 'train':
            train_df=pd.concat([Fseries, Lseries], axis=1)
        elif d == 'test':
            test_df=pd.concat([Fseries, Lseries], axis=1)
        else:
            valid_df= pd.concat([Fseries, Lseries], axis=1)
print('train_df lenght: ', len(train_df), '  test_df length: ', len(test_df), '  valid_df length: ', len(valid_df))

# train_df class characterization
classes=list(train_df['labels'].unique())
class_count = len(classes)
print('The number of classes in the dataset is: ', class_count)
groups=train_df.groupby('labels')
print('{0:^30s} {1:^13s}'.format('CLASS', 'IMAGE COUNT'))
countlist=[]
classlist=[]
for label in train_df['labels'].unique():
    group=groups.get_group(label)
    countlist.append(len(group))
    classlist.append(label)
    print('{0:^30s} {1:^13s}'.format(label, str(len(group))))

# metrics for later dataset balancing

# min/max values for numbers of samples
max_value=np.max(countlist)
max_index=countlist.index(max_value)
max_class=classlist[max_index]
min_value=np.min(countlist)
min_index=countlist.index(min_value)
min_class=classlist[min_index]
print(max_class, ' has the most images= ',max_value, ' ', min_class, ' has the least images= ', min_value)


# average dimensions
ht=0
wt=0
# select 100 random samples of train_df
train_df_sample=train_df.sample(n=100, random_state=123,axis=0)
for i in range (len(train_df_sample)):
    fpath=train_df_sample['filepaths'].iloc[i]
    img=plt.imread(fpath)
    shape=img.shape
    ht += shape[0]
    wt += shape[1]
print('average height= ', ht//100, ' average width= ', wt//100, 'aspect ratio= ', ht/wt)

<a id="balance"></a>
## balansowanie datasetu
rozszerzanie datasetu przez utworzenie lustrzanych i odwróconych obrazów

In [None]:
def balance(df, n, working_dir, img_size):
    def augment(df,n, working_dir, img_size):
        aug_dir=os.path.join(working_dir, 'aug')
        os.mkdir(aug_dir)        
        for label in df['labels'].unique():    
            dir_path=os.path.join(aug_dir,label)    
            os.mkdir(dir_path)
        # create and store the augmented images  
        total=0
        gen=ImageDataGenerator(horizontal_flip=True,  rotation_range=20, width_shift_range=.2,
                                      height_shift_range=.2, zoom_range=.2)
        groups=df.groupby('labels') # group by class
        for label in df['labels'].unique():  # for every class               
            group=groups.get_group(label)  # a dataframe holding only rows with the specified label 
            sample_count=len(group)   # determine how many samples there are in this class  
            if sample_count< n: # if the class has less than target number of images
                aug_img_count=0
                delta=n - sample_count  # number of augmented images to create
                target_dir=os.path.join(aug_dir, label)  # define where to write the images
                msg='{0:40s} for class {1:^30s} creating {2:^5s} augmented images'.format(' ', label, str(delta))
                print(msg, '\r', end='') # prints over on the same line
                aug_gen=gen.flow_from_dataframe( group,  x_col='filepaths', y_col=None, target_size=img_size,
                                                class_mode=None, batch_size=1, shuffle=False, 
                                                save_to_dir=target_dir, save_prefix='aug-', color_mode='rgb',
                                                save_format='jpg')
                while aug_img_count<delta:
                    images=next(aug_gen)            
                    aug_img_count += len(images)
                total +=aug_img_count
        print('Total Augmented images created= ', total)
        # create aug_df and merge with train_df to create composite training set ndf
        aug_fpaths=[]
        aug_labels=[]
        classlist=os.listdir(aug_dir)
        for klass in classlist:
            classpath=os.path.join(aug_dir, klass)     
            flist=os.listdir(classpath)    
            for f in flist:        
                fpath=os.path.join(classpath,f)         
                aug_fpaths.append(fpath)
                aug_labels.append(klass)
        Fseries=pd.Series(aug_fpaths, name='filepaths')
        Lseries=pd.Series(aug_labels, name='labels')
        aug_df=pd.concat([Fseries, Lseries], axis=1)        
        df=pd.concat([df,aug_df], axis=0).reset_index(drop=True)
        return df 
    
    df=df.copy() 
    # make directories to store augmented images
    aug_dir=os.path.join(working_dir, 'aug')    
    if 'aug' in os.listdir(working_dir):
        print(' Augmented images already exist. To delete these and create new images enter D, else enter U to use these images', flush=True)
        ans=input(' ')
        if ans == 'D' or ans == 'd':            
            shutil.rmtree(aug_dir) # start with an clean empty directory  
            augment(df,n, working_dir, img_size)
            return df
        else:
            
            return df
    else:
        augment(df,n, working_dir, img_size)
        return df
        
   
n=120 # number of samples in each class
working_dir=r'./' # directory to store augmented images
img_size=(150,150) # size of augmented images
train_df=balance(train_df, n, working_dir, img_size)

<a id="generators"></a>
## Generacja zbiorów danych
ostateczne wytworzenie `train_gen`, `test_gen`, `final_test_gen` i `valid_gen`

**UWAGA:** dla zbioru testowego `batch_size * test_steps = sample_count` aby uniknąć uczenia się na próbce więcej niż raz

In [None]:
batch_size=30
trgen=ImageDataGenerator(horizontal_flip=True,rotation_range=20, width_shift_range=.2,
                                  height_shift_range=.2, zoom_range=.2 )
t_and_v_gen=ImageDataGenerator()

# training
msg='{0:70s} for train generator'.format(' ')
print(msg, '\r', end='') # prints over on the same line
train_gen=trgen.flow_from_dataframe(train_df, x_col='filepaths', y_col='labels', target_size=img_size,
                                   class_mode='categorical', color_mode='rgb', shuffle=True, batch_size=batch_size)

# validation
msg='{0:70s} for valid generator'.format(' ')
print(msg, '\r', end='') # prints over on the same line
valid_gen=t_and_v_gen.flow_from_dataframe(valid_df, x_col='filepaths', y_col='labels', target_size=img_size,
                                   class_mode='categorical', color_mode='rgb', shuffle=False, batch_size=batch_size)

# test
length=len(test_df)
test_batch_size=sorted([int(length/n) for n in range(1,length+1) if length % n ==0 and length/n<=80],reverse=True)[0]  
test_steps=int(length/test_batch_size)
msg='{0:70s} for test generator'.format(' ')
print(msg, '\r', end='') # prints over on the same line
test_gen=t_and_v_gen.flow_from_dataframe(test_df, x_col='filepaths', y_col='labels', target_size=img_size,
                                   class_mode='categorical', color_mode='rgb', shuffle=False, batch_size=test_batch_size)

# generate report
classes=list(train_gen.class_indices.keys())
class_indices=list(train_gen.class_indices.values())
class_count=len(classes)
labels=test_gen.labels
print ( 'test batch size: ' ,test_batch_size, '  test steps: ', test_steps, ' number of classes : ', class_count)

<a id="model"></a>
## Tworzenie modelu
tworzenie modelu na podstawie [EfficientNetB3](https://www.tensorflow.org/api_docs/python/tf/keras/applications/efficientnet/EfficientNetB3) z wstępnym przedtrenowaniem. Dodajemy pojedynczą warstwę gęstą 256 neurownów


In [6]:
img_shape=(img_size[0], img_size[1], 3)
model_name='EfficientNetB3'
base_model=tf.keras.applications.efficientnet.EfficientNetB3(include_top=False, weights="imagenet",input_shape=img_shape, pooling='max') 

base_model.trainable=True
x=base_model.output
x=BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001 )(x)
x=Dense(128 ,activation='relu')(x)
x=Dropout(rate=.4, seed=123)(x)

output=Dense(class_count, activation='softmax')(x)
model=Model(inputs=base_model.input, outputs=output)

lr=.001 # learning rate
model.compile(Adamax(learning_rate=lr), loss='categorical_crossentropy', metrics=['accuracy']) 

<a id="train"></a>
# Trening modelu
30 epok

In [None]:
history=model.fit(x=train_gen,  epochs=30, verbose=1,  validation_data=valid_gen,
               validation_steps=None,  shuffle=False,  initial_epoch=0)

## Plotowanie statystyk uczenia
Loss i Accuracy w relacji kolejnych epok

In [None]:
def tr_plot(tr_data, start_epoch):
    #Plot the training and validation data
    tacc=tr_data.history['accuracy']
    tloss=tr_data.history['loss']
    vacc=tr_data.history['val_accuracy']
    vloss=tr_data.history['val_loss']
    Epoch_count=len(tacc)+ start_epoch
    Epochs=[]
    for i in range (start_epoch ,Epoch_count):
        Epochs.append(i+1)   
    plt.style.use('ggplot')
    fontsize=20
    fig,axes=plt.subplots(nrows=1, ncols=2, figsize=(20,8))
    axes[0].plot (Epochs,tacc,'r',label= 'Accuracy - training', color="teal")
    axes[0].plot (Epochs,vacc,'g',label= 'Accuracy - validation', color="orange")
    axes[0].set_title('Accuracy metrics', weight='bold', fontsize=fontsize+5)
    axes[0].set_xlabel('Epoch', weight='bold', fontsize=fontsize)
    axes[0].set_ylabel('Accuracy', weight='bold', fontsize=fontsize)
    axes[0].legend()
    axes[1].plot(Epochs,tloss, 'r', label='Loss - training', color="teal")
    axes[1].plot(Epochs,vloss,'g',label='Loss - validation', color="orange")
    axes[1].set_title('Loss metrics', weight='bold', fontsize=fontsize+5)
    axes[1].set_xlabel('Epochs', weight='bold', fontsize=fontsize)
    axes[1].set_ylabel('Loss', weight='bold', fontsize=fontsize)
    axes[1].legend()  
    plt.show()
    
tr_plot(history,0)

<a id="result"></a>
## Wykonanie predykcji wytrenowanym modelem
test na batchach danych i funkcje do generowania metryk

In [None]:
def predictor(test_gen, test_steps):
    y_pred= []
    y_true=test_gen.labels
    classes=list(train_gen.class_indices.keys())
    class_count=len(classes)
    errors=0
    preds=model.predict(test_gen, steps=test_steps, verbose=1) # predict on the test set
    tests=len(preds)
    for i, p in enumerate(preds):
            pred_index=np.argmax(p)         
            true_index=test_gen.labels[i]  # labels are integer values
            if pred_index != true_index: # a misclassification has occurred                                           
                errors=errors + 1
            y_pred.append(pred_index)
    acc=( 1-errors/tests) * 100
    print(f'there were {errors} in {tests} tests for an accuracy of {acc:6.2f}')
    ypred=np.array(y_pred)
    ytrue=np.array(y_true)
    if class_count <=30:
        cm = confusion_matrix(ytrue, ypred )
        # plot the confusion matrix
        plt.figure(figsize=(16, 10))
        sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Blues', cbar=False)       
        plt.xticks(np.arange(class_count)+.5, classes, rotation=90)
        plt.yticks(np.arange(class_count)+.5, classes, rotation=0)
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.title("Confusion Matrix")
        plt.show()
    clr = classification_report(y_true, y_pred, target_names=classes, digits= 4) # create classification report
    print("Classification Report:\n----------------------\n", clr)
    return errors, tests
errors, tests=predictor(test_gen, test_steps)

<a id="save"></a>
## Zapis modelu do pliku

In [None]:
subject='coins' 
acc=str(( 1-errors/tests) * 100)
index=acc.rfind('.')
acc=acc[:index + 3]
save_id= subject + '_' + str(acc) + '.h5' 
model_save_loc=os.path.join(working_dir, save_id)
model.save(model_save_loc)
print ('model was saved as ' , model_save_loc ) 
   