In [None]:
# Imports
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D
from keras.layers import Activation, Dropout, Flatten, Dense
from keras.preprocessing.image import load_img, img_to_array

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, auc, roc_curve

import tensorflow as tf
from tensorflow.keras.utils import normalize
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image_dataset_from_directory

import os
import cv2
from PIL import Image
import numpy as np
import itertools 
import pandas as pd
import matplotlib.pyplot as plt
import shutil

device_name = tf.test.gpu_device_name()
print(tf.__version__)
if device_name != '/device:GPU:0':
  print('GPU device not found')
else:
  print('Found GPU at: {}'.format(device_name))

2.8.0
Found GPU at: /device:GPU:0


In [None]:
# Connect to google drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Dataset restore
!rm -r /content/drive/MyDrive/Egyetem/dataset
!unzip /content/drive/MyDrive/Egyetem/dataset7.0.zip -d /content/drive/MyDrive/Egyetem/

# dataset 
#   |
#   |- missing
#   |     |- img.jpg
#   |     |- ...
#   |- present
#   .      |- img.jpg
#   .      |- ...
#   .      .
#          .
#          .

In [None]:
for f in os.listdir("/content/drive/MyDrive/Egyetem/dataset"):
  print(f, len(os.listdir("/content/drive/MyDrive/Egyetem/dataset/"+f)), "db")  

In [None]:
# OPTIONAL!
def augment_images(path, num):
  i=0
  for img in os.listdir(path):
    if ".jpg" in img:
      i = i + 1
      if i > 15:
        break
      # Initialising the ImageDataGenerator class.
      # We will pass in the augmentation parameters in the constructor.
      datagen = ImageDataGenerator(            
            horizontal_flip = True,
            brightness_range = (0.5, 1.5))
          
      # Loading a sample image 
      img = load_img(os.path.join(path,img)) 
      # Converting the input sample image to an array
      x = img_to_array(img)
      # Reshaping the input image
      x = x.reshape((1, ) + x.shape) 
      
      # Generating and saving 5 augmented samples 
      # using the above defined parameters. 
      i = 0
      for batch in datagen.flow(x, batch_size = 1,
                                save_to_dir = path, 
                                save_prefix ='image', save_format ='jpg'):
          i += 1
          if i > num:
              break

augment_images("/content/drive/MyDrive/Egyetem/dataset/missing", 5)
print("Missing Done")
augment_images("/content/drive/MyDrive/Egyetem/dataset/present", 5)
print("Present Done")
augment_images("/content/drive/MyDrive/Egyetem/dataset/uncertain", 5)
print("Uncertain Done")

In [None]:
for f in os.listdir("/content/drive/MyDrive/Egyetem/dataset"):
  print(f, len(os.listdir("/content/drive/MyDrive/Egyetem/dataset/"+f)))  

In [None]:
# Defive valieables
img_size = 56
class_names = ["missing", "present", "uncertain"] # Class folder names!

In [None]:
# Create dataset

def create_dataset(image_directory):
  dataset = [] 
  label = []
  for folder in os.listdir(image_directory):
    images = os.listdir(image_directory + '/'+ folder)
    for i, image_name in enumerate(images):      
        if (image_name.split('.')[1] == 'jpg'): # Whatch out for file names and formats!
            image = cv2.imread(image_directory + '/' +folder+ '/' + image_name)
            image = Image.fromarray(image, 'RGB')
            image = image.resize((img_size, img_size)) 
            image = np.flip(image, axis=-1) # Fix colors  
            dataset.append(np.array(image).astype('float32') / 255.0)
            label.append(class_names.index(folder))            

  return np.array(dataset), np.array(label)


dataset, label = create_dataset('/content/drive/MyDrive/Egyetem/dataset/')
print("Dataset size is ", dataset.shape)
print("Label size is ", label.shape)

In [None]:
from collections import Counter

k = list(Counter(label).keys())
v = list(Counter(label).values())

for i in k:
  print(class_names[i]+":", v[i],"db") 

In [None]:
# Split dataset
X_train, X_test, y_train, y_test = train_test_split(dataset, label, test_size = 0.2)#, random_state = 0)
print("Train size is ", X_train.shape)
print("Test size is ", X_test.shape)

In [None]:
# Train images preview
def preview(X_train, y_train):
  plt.figure(figsize=(10,10))
  for i in range(25):
      plt.subplot(5,5,i+1)
      plt.xticks([])
      plt.yticks([])
      plt.grid(False)
      plt.imshow(X_train[i], cmap=plt.cm.binary)
      plt.xlabel(class_names[y_train[i]])
  plt.show()

preview(X_train, y_train)

In [None]:
# Define the model
def create_model():
  model = Sequential()

  model.add(Conv2D(32, (3, 3), padding='same', input_shape=(img_size, img_size, 3)))
  model.add(Activation('relu'))  
  model.add(Conv2D(32, (3, 3)))
  model.add(Activation('relu'))  
  model.add(MaxPooling2D(pool_size=(2, 2)))
  model.add(Dropout(0.25))

  model.add(Conv2D(64, (3, 3), padding='same'))
  model.add(Activation('relu'))
  model.add(Conv2D(64, (3, 3)))
  model.add(Activation('relu'))  
  model.add(MaxPooling2D(pool_size=(2, 2)))
  model.add(Dropout(0.25))

  model.add(Conv2D(64, (3, 3), padding='same'))
  model.add(Activation('relu'))
  model.add(Conv2D(64, (3, 3)))
  model.add(Activation('relu'))
  model.add(MaxPooling2D(pool_size=(2, 2)))
  model.add(Dropout(0.25))

  model.add(Flatten())
  model.add(Dense(512))
  model.add(Activation('relu'))
  model.add(Dropout(0.25))
  model.add(Dense(3))
  model.add(Activation('softmax'))

  #model.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy', metrics=['accuracy'])  
  model.compile(optimizer='SGD', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    
  return model




model = create_model()

print(model.summary())
print("Layer count:", len(model.layers))

In [None]:
# Callbacks
checkpoint = tf.keras.callbacks.ModelCheckpoint("/content/drive/MyDrive/Egyetem/checkpoints", save_best_only=True)
stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5) # Ha 5 epoch alatt nem csökken a veszteség, leáll
print_batch_callback = tf.keras.callbacks.LambdaCallback(
    on_epoch_end=lambda a,b: print("\n", str((int(a)+1))+". Epoch done.", f"Loss: {b['loss']}, Accuracy: {b['accuracy']}, Val_loss: {b['val_loss']}, Val_accuracy: {b['val_accuracy']}", "\n")
)

In [None]:
# Train model
def train_model(model):
  return model.fit(
                      X_train, 
                      y_train, 
                      batch_size = 16,
                      verbose = 1, 
                      epochs = 100, # Stop early miatt úgyis kisebb!
                      validation_data=(X_test, y_test),
                      validation_split=0.2,    
                      callbacks = [checkpoint, print_batch_callback, stop_early],#
                      shuffle = True # False
                    )
  
history = train_model(model)

In [None]:
# Show train history
def plot_train_history(result):
    plt.figure(1)      
    plt.plot(result.history['accuracy'])              
    plt.plot(result.history['val_accuracy'])          
    plt.title('Model accuracy')  
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Valid'], loc='upper left')      
    plt.show()    
    
    plt.plot(result.history['loss'])      
    plt.plot(result.history['val_loss'])      
    plt.title('Model loss')  
    plt.ylabel('Loss') 
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Valid'], loc='upper left')  
    plt.show()   
    
plot_train_history(history)

In [None]:
# Evaluate model
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=2)
print('\nTest accuracy:', test_acc)

In [None]:
# Save model
model.save('/content/drive/MyDrive/Egyetem/model_new.h5')

In [None]:
# Load model
model = tf.keras.models.load_model('/content/drive/MyDrive/Egyetem/model_new.h5')

In [None]:
# Show confusion matrix
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')    

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
            horizontalalignment="center",
            color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')



pred = model.predict(x=X_test, steps=len(X_test), verbose=0)
predictions = []
for i in pred:    
  predictions.append(np.argmax(i))

cm = confusion_matrix(y_true=y_test, y_pred=predictions)
plot_confusion_matrix(cm=cm, classes=class_names, title='Confusion Matrix')

In [None]:
def plot_image(i, predictions_array, true_label, img):
  true_label, img = true_label[i], img[i]
  plt.grid(False)
  plt.xticks([])
  plt.yticks([])

  plt.imshow(img, cmap=plt.cm.binary)

  

  predicted_label = np.argmax(predictions_array)
  if true_label == predicted_label:
    color = 'blue'    
  else:
    color = 'red' 
    print(predictions_array, np.argmax(predictions_array), predictions_array[np.argmax(predictions_array)]) 

  

  plt.xlabel("{} {}% ({})".format(class_names[predicted_label],                                
                                max(predictions_array)*100  ,
                                class_names[true_label]),
                                color=color)

In [None]:
# Visualize predictions
predictions = model.predict(X_test)
num_rows = 5
num_cols = 4
num_images = num_rows*num_cols
plt.figure(figsize=(2*2*num_cols, 2*num_rows))
for i in range(num_images):
  plt.subplot(num_rows, num_cols, i+1)
  plot_image(i, predictions[i], y_test, X_test)  
plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt 
from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import roc_curve, auc, roc_auc_score

target= class_names

# set plot figure size
fig, c_ax = plt.subplots(1,1, figsize = (12, 8))

# function for scoring roc auc score for multi-class
def multiclass_roc_auc_score(y_test, y_pred, average="macro"):
    pp = []
    for p in y_pred:
      pp.append(np.argmax(p))
    
    y_pred = pp
    
    lb = LabelBinarizer()
    lb.fit(y_test)
    y_test = lb.transform(y_test)
    y_pred = lb.transform(y_pred)

    for (idx, c_label) in enumerate(target):
        fpr, tpr, thresholds = roc_curve(y_test[:,idx].astype(int), y_pred[:,idx])
        c_ax.plot(fpr, tpr, label = '%s (AUC:%0.2f)'  % (c_label, auc(fpr, tpr)))
    c_ax.plot(fpr, fpr, 'b-', label = 'Random Guessing')
    return roc_auc_score(y_test, y_pred, average=average)


score = multiclass_roc_auc_score(y_test, predictions)

c_ax.legend()
c_ax.set_xlabel('False Positive Rate')
c_ax.title.set_text('ROC AUC score: '+str(score))
c_ax.set_ylabel('True Positive Rate')
plt.show()

In [None]:
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.models import Model
from numpy import expand_dims

ixs = [2, 5, 9]
outputs = [model.layers[i].output for i in ixs]
model = Model(inputs=model.inputs, outputs=outputs)
# load the image with the required shape
img = load_img('/content/drive/MyDrive/Egyetem/dataset/present/20220311113736806578.jpg', target_size=(img_size, img_size))
# convert the image to an array
img = img_to_array(img)
# expand dimensions so that it represents a single 'sample'
img = expand_dims(img, axis=0)
# prepare the image (e.g. scale pixel values for the vgg)
img = preprocess_input(img)
# get feature map for first hidden layer
feature_maps = model.predict(img)

square = 4
for fmap in feature_maps:
	# plot all 64 maps in an 8x8 squares
	ix = 1
	for _ in range(square):
		for _ in range(square):
			# specify subplot and turn of axis
			ax = plt.subplot(square, square, ix)
			ax.set_xticks([])
			ax.set_yticks([])
			# plot filter channel in grayscale
			plt.imshow(fmap[0, :, :, ix-1], cmap='gray')
			ix += 1
	# show the figure
	plt.show()

In [None]:
#Convert rmsprop optimalized model to tflite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT] #Uses default optimization strategy to reduce the model size
tflite_model = converter.convert()
open("/content/drive/MyDrive/Egyetem/model_new.tflite", "wb").write(tflite_model)
tflite_size = os.path.getsize("/content/drive/MyDrive/Egyetem/model_new.tflite")/1048576  #Convert to MB
print("Tflite Model size: ", tflite_size, "MB")

In [None]:
#Convert SGD optimalized model to tflite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float32]
converter.inference_input_type = tf.float32
converter.inference_output_type = tf.float32

tflite_model = converter.convert()

open("/content/drive/MyDrive/Egyetem/model_new.tflite", "wb").write(tflite_model)
tflite_size = os.path.getsize("/content/drive/MyDrive/Egyetem/model_new.tflite")/1048576  #Convert to MB
print("Tflite Model size: ", tflite_size, "MB")



## **Tensorflow Lite model usage**



In [None]:
import numpy as np
import tensorflow as tf

from keras.preprocessing.image import load_img, img_to_array
from time import time
from matplotlib import pyplot as plt
import os
import random
from tensorflow.keras.utils import normalize

In [None]:
# Image loader
def load_image(path):
  image = load_img(path, target_size=(img_size, img_size))
  image = (img_to_array(image)).astype('float32') / 255.0
  image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))      
  return image

In [None]:
# Returns random image from  folder
def rand_image():
  i = random.choice(os.listdir("/content/drive/MyDrive/Egyetem/real"))
  image = load_image("/content/drive/MyDrive/Egyetem/real/"+i)    
  return image

In [None]:
tflite_model_path = "/content/drive/MyDrive/Egyetem/model_new.tflite"

# Load the TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
interpreter.allocate_tensors()

# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

In [None]:
# Load image
input_data = rand_image() # Random igame from folder
#input_data = load_image("/content/drive/MyDrive/Egyetem/t1.jpg") # One exact image

plt.figure()
plt.imshow(input_data[0])  
plt.grid(False)
plt.show()

interpreter.set_tensor(input_details[0]['index'], input_data)

time_before=time()
interpreter.invoke()
time_after=time()
total_tflite_time = time_after - time_before

output_data_tflite = interpreter.get_tensor(output_details[0]['index'])

print("Sum", np.sum(output_data_tflite[0]))
print("Prediction:", output_data_tflite[0],
      max(output_data_tflite[0])*100,
      class_names[np.argmax(output_data_tflite[0])])

In [None]:
# Evaulate lite model
good = 0
bad = 0
counter = 0
for img in X_test:  
  img = img.reshape((1, img.shape[0], img.shape[1], img.shape[2]))    
  interpreter.set_tensor(input_details[0]['index'], img)  
  interpreter.invoke()    
  output_data_tflite = interpreter.get_tensor(output_details[0]['index'])
  result = np.argmax(output_data_tflite[0])

  #print("Result:", result)
  if result == y_test[counter]:    
    good += 1
  else:
    bad += 1
    # Show false predicted images
    plt.figure()
    plt.imshow(img[0])  
    plt.grid(False)
    plt.show()

  counter += 1

print("Result accuracy:", good/(good+bad))