In [3]:
import os
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn import metrics
from sklearn import preprocessing
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import seaborn
import joblib
from utils import load_data

In [4]:
# Avoid tensorflow warnings and info messages about my poor bad CPU
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

**Deep learning part**

Note: See [here](https://www.tensorflow.org/api_docs/python/tf/keras/utils/image_dataset_from_directory) for dataset directory structure.

In [3]:
#Path and names
training_set_path = "training_set/"
validation_set_path = "validation_set/"

if not os.path.exists(training_set_path):
    os.makedirs(training_set_path)
    
if not os.path.exists(validation_set_path):
    os.makedirs(validation_set_path)

In [4]:
# Model & data parameters
model_name = 'retrain_mobilenet_v4'
retrain_convolution = False
num_classes = 50

image_size = 192 #in pixels
num_classes = 50
validation_size = 0.2
input_shape = (image_size, image_size, 3)

In [5]:
# Training parameters
optimizer = keras.optimizers.Adam(1e-4) #learning_rate=0.001 (default value)
epochs = 50
batch_size = 16

In [None]:
#Load dataset and normalize data to the range [-1, 1]
X, y = load_data((image_size, image_size))
X /= 127.5
X -= 1
# Split Training/Testing and validation test
X, X_validation, y, y_validation = train_test_split(X, y, test_size=validation_size)

In [6]:
# Call back
#early_callback = keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

In [7]:
# Tensorflow data augmentation
datagen = ImageDataGenerator(
    #featurewise_center=True,
    #featurewise_std_normalization=True,
    rotation_range=20,
    #width_shift_range=0.2,
    #height_shift_range=0.2,
    horizontal_flip=True,
    validation_split=0.2)

datagen.fit(X)

In [None]:
# MobileNet
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2

pretrain = MobileNetV2(weights="imagenet",  alpha=0.5, input_shape = input_shape, include_top = False)

In [9]:
# ResNet
from tensorflow.keras.applications.resnet import ResNet101

pretrain = ResNet101(weights="imagenet", input_shape = input_shape, include_top = False)

In [11]:
# DenseNet
from tensorflow.keras.applications.densenet import DenseNet201

pretrain = DenseNet201(weights="imagenet", input_shape = input_shape, include_top = False)

In [None]:
# Custom Network

M = keras.Sequential()

M.add(layers.Conv2D(32, (3, 3), input_shape = input_shape, activation = 'relu'))
M.add(layers.MaxPooling2D(pool_size = (2, 2)))
M.add(layers.Conv2D(32, (3, 3), activation = 'relu'))
M.add(layers.MaxPooling2D(pool_size = (2, 2)))
M.add(layers.Flatten())

M.add(layers.Dense(64, activation = 'relu'))
M.add(layers.Dense(num_classes, activation = 'softmax'))
model = M
model.summary()

In [None]:
# Init the selected model (if not a custom one, else do not run this part)
if not retrain_convolution:
    for layer in pretrain.layers:
        layer.trainable = False
    pretrain.layers[0].trainable = False
    
pretrain_out = pretrain.output

M = layers.MaxPooling2D()(pretrain_out)
M = layers.Flatten()(M)
M = layers.Dropout(0.5)(M)
M = layers.Dense(num_classes, activation="softmax")(M)

#Compile the model
model = keras.Model(inputs=pretrain.input, outputs=M)
model.summary()

In [None]:
# Training model without tensorflow data augmentation
model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy", "top_k_categorical_accuracy"])
history = model.fit(X, y, batch_size=batch_size, epochs=epochs, verbose=1, validation_split=0.2)

In [None]:
# Training model with tensorflow data augmentation
model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy", "top_k_categorical_accuracy"])
history = model.fit(
         datagen.flow(X, y, batch_size=batch_size, subset='training'),
         validation_data=datagen.flow(X, y,batch_size=16, subset='validation'),
         epochs=epochs, 
         verbose=1)

In [None]:
# Validation of the model
scores = model.evaluate(X_validation, y_validation, verbose=1)

In [None]:
# Predict validation set
prediction = np.argmax(model.predict(X_validation), axis=-1)
y_prediction = np.argmax(y_validation, axis=-1)

# Confusion matrix of this validation
cm = metrics.confusion_matrix(y_prediction, prediction)
plt.figure(figsize = (10,7))
seaborn.heatmap(cm, annot=True, linewidths=1)
plt.savefig("dl_confusion_matrix")

In [None]:
# Plot training & validation accuracy values
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title("Retrain " + model_name + " (alpha= 0.5) top 1 accuracy")
plt.ylabel("Accuracy")
plt.xlabel("Epoch")
plt.legend(["Train", "Validation"], loc="upper left")
plt.show()

In [None]:
# Plot training & validation top 5 values
plt.plot(history.history['top_k_categorical_accuracy'])
plt.plot(history.history['val_top_k_categorical_accuracy'])
plt.title("Retrain " + model_name + " (alpha= 0.5) top 5 accuracy")
plt.ylabel("Top 5 categorical accuracy")
plt.xlabel("Epoch")
plt.legend(["Train", "Validation"], loc="upper left")
plt.show()

In [None]:
# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title("Retrain " + model_name + " (alpha= 0.5) loss")
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.legend(["Train", "Validation"], loc="upper left")
plt.show()

In [17]:
# Save the model
model_path = './models/' + model_name + '.keras'
model.save(model_path)

In [19]:
# Save training and validation set for Random Forest
training_set_model_path = training_set_path + model_name
validation_set_model_path = validation_set_path + model_name

if not os.path.exists(training_set_model_path):
    os.makedirs(training_set_model_path)
    
if not os.path.exists(validation_set_model_path):
    os.makedirs(validation_set_model_path)
    
np.save(training_set_model_path + "/X_training.npy", X)
np.save(training_set_model_path + "/y_training.npy", y)
np.save(validation_set_model_path + "/X_validation.npy", X_validation)
np.save(validation_set_model_path + "/y_validation.npy", y_validation)

**Random Forest**
This part is for add a random forest after our convolutif part.

- 1. We extract important features informations with convolutif part of our trained model
- 2. We train our random forest on the same training set
- 3. Test accuracy on the same validation set

In [6]:
# Path and names
dl_model_name = "retrain_mobilenet_v4"
rf_model_path = "./random_forest.joblib"
training_set_path = "training_set/"
validation_set_path = "validation_set/"

training_set_model_path = training_set_path + dl_model_name
validation_set_model_path = validation_set_path + dl_model_name

# Layers to remove from our deep learning model to only get convolutif part
layers_to_remove = 3

In [8]:
# Random forest parameters
n_estimators = 320

**1. Extract important features**

In [9]:
# Load the deep learning model
model = keras.models.load_model('./models/' + dl_model_name + '.keras')

In [10]:
# Output of the CNN (without the dense part)
feature_extractor = keras.Model(model.input, model.layers[-layers_to_remove].output)

**2. Training part**

In [23]:
# Load the training set
X = np.load(training_set_model_path + "/X_training.npy")
y = np.load(training_set_model_path + "/y_training.npy")

# Transform y for Random Forest (need to be indice of the class as target value, not one hot encoded)
y_encoded = []
for y_ in y:
    y_encoded.append(np.argmax(y_))
    
# Try to save RAM space :)
y = []

In [24]:
# Init random forest
RF_model = RandomForestClassifier(n_estimators = n_estimators)

# Extract features
X_for_RF = feature_extractor.predict(X)

In [None]:
# Train the random forest
RF_model.fit(X_for_RF, y_encoded)

In [None]:
# Save the random forest model
joblib.dump(RF_model, rf_model_path)

**3. Validation part**

In [11]:
# Load the validation set
X_validation = np.load(validation_set_model_path+ "/X_validation.npy")
y_validation = np.load(validation_set_model_path + "/y_validation.npy")

# Transform y validaton for Random Forest (need to be indice of the class as target value, not one hot encoded)
y_validation_encoded = []
for y_ in y_validation:
     y_validation_encoded.append(np.argmax(y_))
        
# Try to save RAM space
y_validation = []

In [12]:
# Load the random forest model (if needed)
RF_model = joblib.load(rf_model_path)

In [13]:
# Test on validation test
X_validation_features = feature_extractor(X_validation)
predicted_values = RF_model.predict(X_validation_features)

In [None]:
# Accuracy
print("Top 1 accuracy: ", metrics.accuracy_score(y_validation_encoded, predicted_values))

# Confusion matrix
cm = metrics.confusion_matrix(y_validation_encoded, predicted_values)
seaborn.heatmap(cm, annot=True, linewidths=1)
plt.savefig("RF_DL_confusion_matrix")