**Import all the libraries.**


In [0]:
!pip install tensorflow-gpu=='2.0.0-beta1'

In [0]:
import tensorflow as tf
import pathlib
import random
import IPython.display as display
from sklearn.model_selection import train_test_split
import os
import matplotlib.pyplot as plt
import time
import numpy as np
import sklearn.metrics
from datetime import datetime
import io
import itertools
from packaging import version
from six.moves import range
import glob
from keras.models import load_model


%reload_ext tensorboard


**Mount to Google Drive**

In [0]:
from google.colab import drive
drive.mount('/content/drive')

**Define all the functions.**

In [0]:
def downloadUnzip(fileName, sourceURL, dest_dir):
    ''' 
        This function downloads the files from the source and unzips thems and stores them to a specified destination.
        
        Args:
            fileName(str): Name of file from source
            sourceURL(str): url of the source to download from
            dest_dir(str): Path on local to store the downloaded data
            
        Returns: Path of saved files on local to access
    '''
    
    data_root = tf.keras.utils.get_file(fileName, sourceURL, untar=True, cache_subdir=dest_dir)
    data_root = pathlib.Path(data_root)
    return data_root

def get_all_paths(data_root):
    ''' 
        This function allows us to get all image paths and randomly shuffles them.
        
        Args:
            data_root(str): root directory where data has been downloaded.
            
        Returns: Path all images
    '''
    all_image_paths = list(data_root.glob('*/*'))
    all_image_paths = [str(path) for path in all_image_paths]
    random.shuffle(all_image_paths)
    return all_image_paths

def get_path_and_label(data_root):
    ''' 
        This function helps us get the path and label of each image.
        
        Args:
            data_root(str): root directory where data has been downloaded.
            
        Returns: Path and label
    '''
    all_image_paths = get_all_paths(data_root)
    X = [p.rsplit('/', 2)[1] + '/' + p.rsplit('/', 1)[1] for p in all_image_paths]
    y = [p.rsplit('/', 1)[0].rsplit('/', 1)[1] for p in all_image_paths]
    return X, y

def caption_image(image_path):
    ''' 
        This function helps us caption each image. 
        
        Args:
            image_path(str): path of image to be captioned
            
        Returns: Path and caption
    '''
    image_rel = pathlib.Path(image_path).relative_to(data_root)
    return image_rel, str(image_rel).split('/')[0]

def display_image_and_caption(data_root):
    ''' 
        This function helps display the image. 
        
        Args:
            data_root(str): root directory where data has been downloaded.
    '''
    for n in range(3):
        all_image_paths = get_all_paths(data_root)
        image_path = random.choice(all_image_paths)
        display.display(display.Image(image_path))
        imPath, label = caption_image(image_path)
        print(label)
        print()

def split_data(data_root, test_size, valid_size):
    ''' 
        This function helps us split data into train, test and validation with ratio of 60:20:20

        Args:
            data_root(str): root directory where data has been downloaded.
            
        Returns: A list of list of all the paths for each set of dataset.
    '''
    X, y = get_path_and_label(data_root)
    X_train_temp, X_test, y_train_temp, y_test = train_test_split(X, y, test_size=test_size, stratify=y, random_state=None)
    X_train, X_valid, y_train, y_valid = train_test_split(X_train_temp, y_train_temp, test_size=valid_size, stratify=y_train_temp, random_state=None)
    X_data = [X_train, X_test, X_valid]
    return X_data

def write_dataset_to_file(filenames, X_data, root_file):
    ''' 
        This function writes all the datasets (train, test, validation) to files.

        Args:
            data_root(str): root directory where data has been downloaded.
            filenames(list): list filenames ['trainData', 'testData', 'validataionData']
    ''' 
    for d, l in zip(X_data, filenames):
        with open(root_file+l+'.txt', 'w') as f:
            for item in d:
                f.write("%s\n" % item)

def class_dictionary(data_root):
    ''' 
        This function make the class labels a dictionary as follows:
        {'daisy': 0, 'dandelion': 1, 'roses': 2, 'sunflowers': 3, 'tulips': 4}
        
        Args:
            data_root(str): root directory where data has been downloaded.
    '''
    all_image_paths = get_all_paths(data_root)
    label_names = sorted(item.name for item in data_root.glob('*/') if item.is_dir())
    label_to_index = dict((name, index) for index,name in enumerate(label_names))
    all_image_labels = [label_to_index[pathlib.Path(path).parent.name]
                        for path in all_image_paths]
    return label_to_index

# The following functions can be used to convert a value to a type compatible
# with tf.Example.
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def image_example(image_string, label, filename):
    '''
        This function Create a dictionary with features that may be relevant.
        
        Args:
            image_string(bytes): image as bytes
            label(str): Label of image
            filename(str): filename of image
            
        Returns: Tfrecords 
    '''
    image_shape = tf.image.decode_jpeg(image_string).shape
    for lab, val in label_to_index.items():    # for name, age in dictionary.iteritems():  (for Python 2.x)
        if lab == label:
            label_no = val
    feature = {
        'height': _int64_feature(image_shape[0]),
        'width': _int64_feature(image_shape[1]),
        'depth': _int64_feature(image_shape[2]),
        'label_num': _int64_feature(label_no),
        'image_raw': _bytes_feature(image_string),
        'filename' : _bytes_feature(str.encode(filename)),
        'label': _bytes_feature(str.encode(label)),
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))

def write_tfrecords(data_dir, split_path, output_path):
    '''
        This function writes to tfrecords
        
        Args:
            data_dir(str): path of directory where images are
            split_path(str): path where split(train, test, validation) data path file is
            output_path(str): path of where to store the tfrecords
    '''
    with tf.io.TFRecordWriter(output_path) as writer:
        f = open(split_path, "r")
        for line in f:
            label = line.split('/')[0]
            image_string = open(data_dir+line.rstrip('\n'), 'rb').read()
            tf_example = image_example(image_string, label, line)
            writer.write(tf_example.SerializeToString())

def augment(image, random_preprocessing):
  '''
    This function adds different augmentations to our training images to add more variety to data.
    
    Args:
      image(tensor): The image we want to add the augmentation to.
      random_preprocessing(list): A list of dictionaries of different augmentations.
      
   Returns: Tensor of augmented image.
  '''
  selected_preprocessing_key = random.choice(random_preprocessing)
  selected_preprocessing = selected_preprocessing_key
  selected_preprocessing = [*selected_preprocessing]
  if selected_preprocessing[0] == 'random_flip':
    image = tf.image.random_flip_left_right(image)
  elif selected_preprocessing[0] == 'random_crop':
    vals = selected_preprocessing_key.get('random_crop')
    image = tf.image.random_crop(image, [vals[0], vals[1], 3])
  elif selected_preprocessing[0] == 'random_brightness':
    image = tf.image.random_brightness(image, 1000.)
  elif selected_preprocessing[0] == 'random_salt_pepper':
    vals = selected_preprocessing_key.get('random_salt_pepper')
    image = add_salt_pepper_noise(image, vals[0], vals[1])
  else:
    return image
    
  return image
  
def add_salt_pepper_noise(image, amount, salt_vs_pepper):
    '''
        This function adds salt and pepper noise to the images, one kind of augmentation.
        
        Args:
          image(tensor): The image we want to add the augmentation to.
          amount(float): How much percentage of the image we want to add the salt pepper to.
          salt_vs_pepper(float): Percentage of salt vs pepper we want to add.
          
        Returns: Tensor of image with salt pepper augmentation.
    '''
    image_numpy = image.numpy()
    row, col, ch = image_numpy.shape
    salt_vs_pepper = salt_vs_pepper
    amount = amount
    num_salt = np.ceil(amount * row * salt_vs_pepper)
    num_pepper = np.ceil(amount * row * (1.0 - salt_vs_pepper))
    #print(X_img.shape)

    salt_coords = []
    pepper_coords = []
       
    coords = [np.random.randint(0, i - 1, int(num_salt)) for i in image_numpy.shape]
    image_numpy[coords[0], coords[1], :] = 1

    # Add Pepper noise
    coords = [np.random.randint(0, i - 1, int(num_pepper)) for i in image_numpy.shape]
    image_numpy[coords[0], coords[1], :] = 0 
        
    return tf.convert_to_tensor(image_numpy)

def preprocess(image, h, w, is_training=False):
  '''
      This function is for preprocessing images.

      Args: 
          image(tensor): Image we would like to preprocess on.
          h(int): Height we would like to resize the image to.
          w(int): Height we would like to resize the image to.
          is_training(bool): If is_training is True, we add augmentation to images or else we just resize the images.

     Returns: Tensor of preprocessed image.

  '''

  if is_training:
    amount = 0.004
    salt_vs_pepper = 0.2
    random_preprocessing = [{'normal': ['']}, {'random_flip': ['']}, {'random_crop':[h, w]}, {'random_brigthness': ['']}, {'random_salt_pepper': [amount, salt_vs_pepper]}]
    image = augment(image, random_preprocessing)

  image = tf.image.resize(image, (h, w))
  image = image/255.

  return image
  
def create_image_feature_description():
    '''
        This function create a dictionary describing the features.
    
        Returns: Dictionary of image feature descriptions
    '''
    image_feature_description = {
        'height': tf.io.FixedLenFeature([], tf.int64),
        'width': tf.io.FixedLenFeature([], tf.int64),
        'depth': tf.io.FixedLenFeature([], tf.int64),
        'label_num': tf.io.FixedLenFeature([], tf.int64),
        'image_raw': tf.io.FixedLenFeature([], tf.string),
        'filename' : tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.string),
    }
    return image_feature_description

def parse_fn(example_proto):
  # Parse the input tf.Example proto using the dictionary above.
    image_feature_description = create_image_feature_description()
    parsed = tf.io.parse_single_example(example_proto, image_feature_description)
    image = tf.image.decode_image(parsed["image_raw"])
    image = tf.reshape(image, (parsed['height'], parsed['width'], 3))
    image = preprocess(image,160, 160, is_training=False)
    labs = tf.one_hot(parsed["label_num"], 5)
    return image, labs

def parse_fn_training(example_proto):
  # Parse the input tf.Example proto using the dictionary above.
    image_feature_description = create_image_feature_description()
    parsed = tf.io.parse_single_example(example_proto, image_feature_description)
    image = tf.image.decode_image(parsed["image_raw"])
    image = tf.reshape(image, (parsed['height'], parsed['width'], 3))
    #import pdb; pdb.set_trace()

    image = preprocess(image,160, 160, is_training=True)
    labs = tf.one_hot(parsed["label_num"], 5)
    return image, labs
  
def parse_fn_predict(example_proto):
  # Parse the input tf.Example proto using the dictionary above.
    image_feature_description = create_image_feature_description()
    parsed = tf.io.parse_single_example(example_proto, image_feature_description)
    image = tf.image.decode_image(parsed["image_raw"])
    image = tf.reshape(image, (parsed['height'], parsed['width'], 3))
    image = preprocess(image,160, 160, is_training=False)
    labs = tf.one_hot(parsed["label_num"], 5)
    filenames = parsed["filename"]
    return image, labs, filenames


def make_dataset(datasetPath, shuffle_buffer_size, batch_size, is_training=True, is_predict=False):
    '''
        This function makes dataset for each split data.
        
        Args:
            datasetPath(str): path of where tfrecords are saved
            shuffle_buffer_size(int): size of shuffle buffer
            batch_size(int): size of batch
        
        Returns: dataset
    '''
    dataset = tf.data.TFRecordDataset(datasetPath)
    if is_training:
      dataset = dataset.shuffle(buffer_size=shuffle_buffer_size)
      dataset = dataset.repeat()
      dataset = dataset.map(map_func=parse_fn_training)
    elif is_predict:
      dataset = dataset.map(map_func=parse_fn_predict)
    else:
      dataset = dataset.map(map_func=parse_fn)
    dataset = dataset.batch(batch_size=batch_size)
    return dataset
  
def create_callbacks(log_dir, checkpoint_path):
    '''
        This function creates model for logistic regression.
        
        Args:
            log_dir(str): path of log directory 
            checkpoint_path(str): path where to store checkpoint.
    '''
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
    checkpoint_dir = os.path.dirname(checkpoint_path)
    cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path, save_best_only=True, save_weights_only=True, verbose=1)
    early_stop_callback = tf.keras.callbacks.EarlyStopping(
        # Stop training when `val_loss` is no longer improving
        monitor='val_loss',
        # "no longer improving" being defined as "no better than 1e-2 less"
        min_delta=1e-1,
        # "no longer improving" being further defined as "for at least 2 epochs"
        patience=1,
        verbose=1)
    return tensorboard_callback, cp_callback, early_stop_callback
  
def plot_to_image(figure):
  """Converts the matplotlib plot specified by 'figure' to a PNG image and
  returns it. The supplied figure is closed and inaccessible after this call."""
  # Save the plot to a PNG in memory.
  buf = io.BytesIO()
  plt.savefig(buf, format='png')
  # Closing the figure prevents it from being displayed directly inside
  # the notebook.
  plt.close(figure)
  buf.seek(0)
  # Convert PNG buffer to TF image
  image = tf.image.decode_png(buf.getvalue(), channels=4)
  # Add the batch dimension
  image = tf.expand_dims(image, 0)
  return image

def plot_image_grid(actual_image, actual_label, predicted_label):
  """
  Returns a matplotlib figure containing the plotted confusion matrix.

  Args:
    cm (array, shape = [n, n]): a confusion matrix of integer classes
    class_names (array, shape = [n]): String names of the integer classes
  """
  figure = plt.figure(figsize=(8, 8))
  plt.imshow(actual_image, interpolation='nearest', cmap=plt.cm.binary)
  plt.title('Actual: ' + str(actual_label) + ' ' + 'Predicted: ' + str(predicted_label))
  plt.tight_layout()
  return figure

def plot_confusion_matrix(cm, class_names):
  """
  Returns a matplotlib figure containing the plotted confusion matrix.

  Args:
    cm (array, shape = [n, n]): a confusion matrix of integer classes
    class_names (array, shape = [n]): String names of the integer classes
  """
  figure = plt.figure(figsize=(8, 8))
  plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
  plt.title("Confusion matrix")
  plt.colorbar()
  tick_marks = np.arange(len(class_names))
  plt.xticks(tick_marks, class_names, rotation=45)
  plt.yticks(tick_marks, class_names)

  # Normalize the confusion matrix.
  cm = np.around(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis], decimals=2)

  # Use white text if squares are dark; otherwise black.
  threshold = cm.max() / 2.
  for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
    color = "white" if cm[i, j] > threshold else "black"
    plt.text(j, i, cm[i, j], horizontalalignment="center", color=color)

  plt.tight_layout()
  plt.ylabel('True label')
  plt.xlabel('Predicted label')
  return figure

def accuracy(confusion_matrix):
    '''
      This function calculates the accuracy for the confusion matrix.
      
      Args:
        confusion_matrix(list): confusion matrix
        
      Returns: The accuracy of the confusion matrix.
    '''
    diagonal_sum = confusion_matrix.trace()
    sum_of_all_elements = confusion_matrix.sum()
    return diagonal_sum / sum_of_all_elements 
  
class ConfusionMatrixLogger(tf.keras.callbacks.Callback):
  '''
      ConfusionMatrixLogger logs and builds the confusion matrix.
      
      Arguments:
          ds_valid: dataset onto which we want to make predictions on to evaluate and contruct the confusion matrix on.
          file_writer_cm: Path to create a file with summary of confusion matrix.
          class_names: class names present in our dataset
          val_steps_per_epoch: number of steps to take per epoch.
  '''
  def __init__(self, ds_valid, file_writer_cm, class_names, val_steps_per_epoch):
    self.ds_valid = ds_valid
    self.file_writer_cm = file_writer_cm
    self.class_names = class_names
    self.val_steps_per_epoch = val_steps_per_epoch

  def on_epoch_end(self, epoch, logs=None):
    test_labels = []
    test_images = []
    for test_image, test_label in self.ds_valid.take(self.val_steps_per_epoch):
      test_labels.append(np.argmax(test_label.numpy(), axis=1))
      test_images.append(test_image)

    flattened_test_labels = [y for x in test_labels for y in x]
    
    # Use the model to predict the values from the validation dataset.
    test_pred_raw = self.model.predict(self.ds_valid, steps=self.val_steps_per_epoch)
    test_pred = np.argmax(test_pred_raw, axis=1)

    # Calculate the confusion matrix.
    cm = sklearn.metrics.confusion_matrix(flattened_test_labels, test_pred)
    # Log the confusion matrix as an image summary.
    figure = plot_confusion_matrix(cm, class_names=class_names)
    cm_image = plot_to_image(figure)
    
    acc = accuracy(cm)
    headings = []
    headings = list(class_names)
    headings = tf.convert_to_tensor(headings)
    headings = tf.reshape(headings, (1,-1))
    accuracy_head = []
    acc= tf.as_string(acc)
    accuracy_head = [acc] + ['']*(len(class_names)-1)
    acc_tensor = tf.convert_to_tensor(accuracy_head)
    acc_tensor = tf.reshape(acc_tensor, (1,-1))
    col_tensor = list(class_names)
    col_tensor.insert(0, 'True/Predicted')
    col_tensor.append('Accuracy')
    col_tensor = tf.convert_to_tensor(col_tensor)
    col_tensor = tf.reshape(col_tensor, (-1,1))
    
    updated_cm = tf.concat([headings, tf.as_string(cm)], 0)
    updated_cm = tf.concat([updated_cm, acc_tensor], 0)
    updated_cm = tf.concat([col_tensor, updated_cm], 1)
    
    r = random.randint(0,7)
    actual_label = flattened_test_labels[r]
    predicted_label = test_pred[r]
    actual_image =  test_images[0][r]
    
    figure = plot_image_grid(actual_image, actual_label, predicted_label)
    
    # Log the confusion matrix as an image summary.
    with self.file_writer_cm.as_default():
      tf.summary.image("Confusion Matrix", cm_image, step=epoch)
      #' '.join(str(r) for v in cm for r in v)
      tf.summary.text('Confusion Matrix2', updated_cm, step=epoch)
      tf.summary.image('True and Predicted labels', plot_to_image(figure), max_outputs=10, step=epoch)
      
def evaluate_model(model ,model_path, dataset_eval, dataset_predict, steps, file_to_write):
  '''
      This function evaluates the model.
      
      Args:
        model(): The model that has been fit and compiled, which needs to be evaluated.
        file_path(str): path where the data is that we want to evluate our model on.
        steps(int): number of steps to take per epoch.
        
      Returns:
        Accuracy, precision and recall.
  '''
  if(model):
    model.load_weights(model_path)

  else:
    model = load_model(model_path)

  loss, accuracy, precision, recall = model.evaluate(dataset_eval, steps=steps)
  
  predicted_classes = [] 

  for image, label, filename in dataset_predict:
    labels = np.argmax(label.numpy(), axis=1)
    classes = model.predict_classes(image)
    probabilities = model.predict_proba(image)
    for clas, fname, prob, actual in zip(classes, filename, probabilities, labels):
      for name, lab in label_to_index.items():    
        if lab == clas:
          predicted_label = name
        if lab == actual:
          actual_label = name
      predicted_classes.append(tuple((fname, actual_label, predicted_label, max(prob))))
            
  with open(file_to_write + 'predicted_classes_evaluate.txt', 'w') as fp:
    fp.write('\n'.join('%s %s %s %s' % x for x in predicted_classes))
        
  return accuracy, precision, recall
  

def make_prediction(image_path, model, model_path, file_to_write_path, height, width):
  '''
      This function makes the prediciton on new images.

      Args:
          image_path(str): path of images we want to predict the class for.
          checkpoint_path(str): path of the stored weights from the fitted model.
          model(): the model that has been fit and made by the training data.
          height(int): height we want to resize the image to.
          width(int): width we want to resize the image to.

      Returns: A dictionary, classes of images.
  '''
  if(model):
    model.load_weights(model_path)

  else:
    model = load_model(model_path)

  files = [f for f in glob.glob(image_path + "**/*.jpg", recursive=True)]

  images = []
  filenames = []
  actual_labels = []

  for f in files:
    with tf.io.gfile.GFile(f, 'rb') as fopen:
        imge = fopen.read()
    imge = tf.image.decode_image(imge)
    w,h,d = imge.numpy().shape
    image = tf.reshape(imge, (w, h, d))
    image = preprocess(image,height, width, is_training=False)
    image = tf.expand_dims(image, 0)
    images.append(image)
    filenames.append(f.rsplit('/', 1)[1])

  images = np.vstack(images)
  classes = model.predict_classes(images)
  probabilities = model.predict_proba(images)
  predicted_classes = []
  for clas, fname, prob in zip(classes, filenames[3:], probabilities):
    for name, lab in label_to_index.items():    
      if lab == clas:
        predicted_classes.append(tuple((fname, name, max(prob))))

    with open(file_to_write_path + 'predicted_classes.txt', 'w') as fp:
      fp.write('\n'.join('%s %s %s' % x for x in predicted_classes))

    
  print(predicted_classes)


**Download and unzip the data calling 'downloadUnzip( )'**

In [0]:
fileName = 'flower_photos'
#dest_dir = '/content/drive/My\ Drive/project_phase4/dataset'
dest_dir = '/content/sample_data/ML/dataset'

sourceURL = 'https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz'
data_root = downloadUnzip(fileName, sourceURL, dest_dir)

**Display Image & Caption calling 'display_image_and_caption( )'**

In [0]:
display_image_and_caption(data_root)


**Split data into training, validation, testing by calling 'split_data()'**

In [0]:
test_size = 0.20
valid_size = 0.20
X_data = split_data(data_root, test_size, valid_size)

**Make a dictionary of labels**

In [0]:
label_to_index = class_dictionary(data_root)
print(label_to_index)


**Write the split data (above function) into three different files by calling 'write_dataset_to_file()'**

In [0]:
filenames = ['trainData', 'testData', 'validataionData']
root_file = '/content/sample_data/ML/dataset/'
write_dataset_to_file(filenames, X_data, root_file)

**Write data into tfrecords by calling 'write_tfrecords()'** 

In [0]:
data_dir = '/content/sample_data/ML/dataset/flower_photos/'
write_tfrecords(data_dir, '/content/sample_data/ML/dataset/trainData.txt', '/content/sample_data/ML/dataset/trainImages.tfrecords') 
write_tfrecords(data_dir, '/content/sample_data/ML/dataset/validataionData.txt', '/content/sample_data/ML/dataset/testImages.tfrecords') 
write_tfrecords(data_dir, '/content/sample_data/ML/dataset/testData.txt', '/content/sample_data/ML/dataset/validImages.tfrecords')

**Dataset for each training, testing and validation are made with a specified batch size and shuffle buffer size by calling 'make_dataset()' three times**

In [0]:
shuffle_buffer_size = 1000
batch_size = 8
ds_train = make_dataset('/content/sample_data/ML/dataset/trainImages.tfrecords', shuffle_buffer_size, batch_size, is_training=True, is_predict=False)
ds_valid = make_dataset('/content/sample_data/ML/dataset/validImages.tfrecords', shuffle_buffer_size, batch_size, is_training=False, is_predict=False)
ds_test = make_dataset('/content/sample_data/ML/dataset/testImages.tfrecords', shuffle_buffer_size, batch_size, is_training=False, is_predict=False)
ds_test_predict = make_dataset('/content/sample_data/ML/dataset/testImages.tfrecords', shuffle_buffer_size, batch_size, is_training=False, is_predict=True)

**Find number of steps to take per epoch**

In [0]:
num_train = len(X_data[0])
num_test = len(X_data[1])
num_val = len(X_data[2])
tr_steps_per_epoch = num_train//batch_size
val_steps_per_epoch = num_val//batch_size
te_steps_per_epoch = num_test//batch_size

In [0]:
h = 160
w = 160
amount = 4
salt_vs_pepper = 0.2

random_preprocessing = [{'normal': ['']}, {'random_flip': ['']}, {'random_crop':[h, w]}, {'random_brigthness': ['']}, {'random_salt_pepper': [amount, salt_vs_pepper]}]

for images, labels in ds_train.take(1):
  image1 = images[0]
  image2 = augment(image1, random_preprocessing)
  plt.imshow(image1.numpy(), interpolation='nearest')
  plt.show()
  plt.imshow(image2.numpy(), interpolation='nearest')
  

**Feature Extraction**

In [0]:
IMG_SIZE = 160
IMG_SHAPE = (IMG_SIZE, IMG_SIZE, 3)
base_model = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE, include_top=False, weights='imagenet')

In [0]:
feature_batch = base_model(images)
print(feature_batch.shape)

**Freeze the convolutional base**

In [0]:
base_model.trainable = False

In [0]:
#base_model.summary()


**Adding a classification head**

In [0]:
global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
feature_batch_average = global_average_layer(feature_batch)
print(feature_batch_average.shape)

In [0]:
prediction_layer = tf.keras.layers.Dense(5)
prediction_batch = prediction_layer(feature_batch_average)
print(prediction_batch.shape)

In [0]:
model = tf.keras.Sequential([base_model, global_average_layer, prediction_layer])

**Compile the model.**

In [0]:
base_learning_rate = 0.0001
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate), loss='categorical_crossentropy', metrics=['accuracy', tf.keras.metrics.Precision(),tf.keras.metrics.Recall()])

**Fit the model and make the confusion matrix for feature extraction.**

In [0]:
initial_epochs = 10
validation_steps = 20
log_dir="/content/sample_data/ML/models/feature_extraction/{}".format(time.strftime("%Y%m%d-%H%M%S"))
checkpoint_path = "/content/sample_data/ML/models/training/cp.ckpt/{}".format(time.strftime("%Y%m%d-%H%M%S"))
tensorboard_callback, cp_callback, early_stop_callback = create_callbacks(log_dir, checkpoint_path)
file_writer_cm = tf.summary.create_file_writer(log_dir + '/cm')
class_names = ['daisy', 'dandelion', 'roses', 'sunflowers', 'tulips']
history = model.fit(ds_train, epochs=initial_epochs, steps_per_epoch=tr_steps_per_epoch,validation_steps=val_steps_per_epoch, validation_data=ds_valid,callbacks=[tensorboard_callback, cp_callback, early_stop_callback, ConfusionMatrixLogger(ds_train, file_writer_cm, class_names, val_steps_per_epoch)])


**Evaluate model**

In [0]:
dataset_predict = ds_test_predict
dataset_eval = ds_test
steps = te_steps_per_epoch
file_to_write = '/content/sample_data/ML/'
model_path = checkpoint_path
accuracy, precision, recall = evaluate_model(model, model_path, dataset_eval, dataset_predict, steps, file_to_write)

print('Accuracy: ' + str(accuracy))
print('Precision: ' + str(precision))
print('Recall: ' + str(recall))

**Plot accuracy and loss for Feature Extraction.**








In [0]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
#plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
#plt.ylim([0,1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()


**Display TensorBoard**

In [0]:
#%tensorboard --logdir /content/sample_data/ML/models/feature_extraction/
%tensorboard --logdir {log_dir}


**Make Predictions**

In [0]:
image_path = '/content/sample_data/ML/dataset/flower_photos/daisy/'
file_to_write_path = '/content/sample_data/ML/'
height = 160
width = 160
model_path = checkpoint_path
make_prediction(image_path, model, model_path, file_to_write_path, height, width)


**Fine Tuning**

**Un-freeze the top layers of the model**

In [0]:
base_model.trainable = True


In [0]:
print("Number of layers in the base model:", len(base_model.layers))


In [0]:
fine_tune_at = 100
for layer in base_model.layers[:fine_tune_at]:
    layer.trainable = False

**Compile the model.**

In [0]:
model.compile(loss='categorical_crossentropy', optimizer = tf.keras.optimizers.Adam(learning_rate=base_learning_rate/10), metrics=['accuracy',tf.keras.metrics.Precision(),tf.keras.metrics.Recall()])

In [0]:
model.summary()


**Fit the model and construct confusion matrix for Fine Tuning.**

In [0]:
fine_tune_epochs = 10
total_epochs = initial_epochs + fine_tune_epochs

log_dir="/content/sample_data/ML/models/fine_tune/{}".format(time.strftime("%Y%m%d-%H%M%S"))
checkpoint_path = "/content/sample_data/ML/models/training/cp.ckpt/{}".format(time.strftime("%Y%m%d-%H%M%S"))
tensorboard_callback, cp_callback, early_stop_callback = create_callbacks(log_dir, checkpoint_path)
file_writer_cm = tf.summary.create_file_writer(log_dir + '/cm_ft')
fine_history = model.fit(ds_train, epochs=initial_epochs, steps_per_epoch=tr_steps_per_epoch,validation_steps=val_steps_per_epoch, validation_data=ds_valid,callbacks=[tensorboard_callback, cp_callback ,ConfusionMatrixLogger(ds_train, file_writer_cm, class_names, val_steps_per_epoch)])


**Plot accuracy and loss for fine tuning**

In [0]:
acc += fine_history.history['accuracy']
val_acc += fine_history.history['val_accuracy']

loss += fine_history.history['loss']
val_loss += fine_history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
#plt.ylim([0.8, 1])
plt.plot([initial_epochs-1,initial_epochs-1],
          plt.ylim(), label='Start Fine Tuning')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
#plt.ylim([0, 1.0])
plt.plot([initial_epochs-1,initial_epochs-1],
         plt.ylim(), label='Start Fine Tuning')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

**Display TensorBoard**

In [0]:
%tensorboard --logdir /content/sample_data/ML/models/fine_tune/