In [None]:
# Use data from Google Drive

from google.colab import drive
drive.mount('/content/drive')
base_data_directory = '/content/drive/MyDrive/lion_no_lion'

In [None]:
# Use data from local directory

import os
base_data_directory = os.path.realpath('../data')

In [None]:
# Initialize Tensorflow

# Imports tensorflow into notebook, which has the Xception model defined inside
# it
import tensorflow as tf
print("Tensorflow version " + tf.__version__)

# Try different backends in the following order: TPU, GPU, CPU and use the
# first one available
try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
  tf.config.experimental_connect_to_cluster(tpu)
  tf.tpu.experimental.initialize_tpu_system(tpu)
  distribution_strategy = tf.distribute.TPUStrategy(tpu)
  print(f'Running on a TPU w/{tpu.num_accelerators()["TPU"]} cores')
except ValueError:
  print("WARNING: Not connected to a TPU runtime; Will try GPU")
  if tf.config.list_physical_devices('GPU'):
    distribution_strategy = tf.distribute.MirroredStrategy()
    print(f'Running on {len(tf.config.list_physical_devices("GPU"))} GPUs')
  else:
    print('WARNING: Not connected to TPU or GPU runtime; Will use CPU context')
    distribution_strategy = tf.distribute.get_strategy()

In [None]:
# Settings for the different rows in the table

# Set the notebook number to run.
notebook_number = 4

# Load an existing model and its weights from disk (True) or create a fresh new
# model (False).
load_model_from_file = False

# Load previous training history from file (True).
load_history_from_file = False

# How many epochs to train for.
epochs = 300

# No changes below this line.
if notebook_number == 1:
    epochs = 2_100
    image_dimensions = (128, 128) # height, width
    with_augmentation = False
    batch_size = 16
    model_version = "light"
    alpha = 1e-5
    lion_directories = [
        # f'{base_data_directory}/lion_1',
        f'{base_data_directory}/lion',
    ]
    no_lion_directories = [
        # f'{base_data_directory}/no_lion_1',
        f'{base_data_directory}/no_lion',
    ]
elif notebook_number == 2:
    epochs = 1_200
    image_dimensions = (256, 256) # height, width
    with_augmentation = False
    batch_size = 32
    model_version = "light"
    lion_directories = [
        # f'{base_data_directory}/lion_1',
        f'{base_data_directory}/lion',
    ]
    no_lion_directories = [
        # f'{base_data_directory}/no_lion_1',
        f'{base_data_directory}/no_lion',
    ]
elif notebook_number == 3:
    epochs = 900
    image_dimensions = (256, 256) # height, width
    with_augmentation = True
    batch_size = 32
    model_version = "light"
    lion_directories = [
        f'{base_data_directory}/lion',
    ]
    no_lion_directories = [
        f'{base_data_directory}/no_lion',
    ]
elif notebook_number == 4:
    image_dimensions = (128, 128) # height, width
    with_augmentation = False
    batch_size = 16
    model_version = "pre-trained"
    lion_directories = [
        f'{base_data_directory}/lion_1',
    ]
    no_lion_directories = [
        f'{base_data_directory}/no_lion_1',
    ]
elif notebook_number == 5:
    image_dimensions = (128, 128) # height, width
    with_augmentation = False
    batch_size = 16
    model_version = "pre-trained"
    lion_directories = [
        f'{base_data_directory}/lion',
    ]
    no_lion_directories = [
        f'{base_data_directory}/no_lion',
    ]
elif notebook_number == 6:
    image_dimensions = (512, 512) # height, width
    with_augmentation = False
    batch_size = 16
    model_version = "pre-trained"
    lion_directories = [
        f'{base_data_directory}/lion',
        f'{base_data_directory}/cougar',
    ]
    no_lion_directories = [
        f'{base_data_directory}/no_lion',
        f'{base_data_directory}/nocougar',
    ]
else:
    raise Exception(f'Unknown notebook {notebook_number}')

model_file = f'{base_data_directory}/model_weights_{notebook_number}_{model_version}_{image_dimensions[0]}_{image_dimensions[1]}.keras'
history_file = f'{base_data_directory}/model_history_{notebook_number}_{model_version}_{image_dimensions[0]}_{image_dimensions[1]}.pickle'

In [None]:
# Copy images to working directory on runtime

import glob
import os
import shutil

# Find image names in Google Drive
lion_images = []
for lion in lion_directories:
    lion_images += glob.glob(os.path.join(lion, '*JPG'))
no_lion_images = []
for no_lion in no_lion_directories:
    no_lion_images += glob.glob(os.path.join(no_lion, '*JPG'))

print(f'Found {len(lion_images)} images tagged as `lion`')
print(f'Found {len(no_lion_images)} images tagges as `no-lion`')
print(f'In total {len(lion_images) + len(no_lion_images)} images')

shutil.rmtree('work', ignore_errors=True)
os.makedirs('work/lion')
os.makedirs('work/no_lion')

print(f'Copying images to working directory {os.path.realpath("work")}')
for image in lion_images:
    shutil.copy(image, 'work/lion')
for image in no_lion_images:
    shutil.copy(image, 'work/no_lion')
print('Copied all images')

In [None]:
# Create datasets

if model_version == 'pre-trained':
    color_mode = 'rgb'
else:
    color_mode = 'grayscale'
print(f'Using color_mode \'{color_mode}\'')

# Define augmentation layers which are used in some of the runs
augmentation_layers = [
    tf.keras.layers.RandomFlip('horizontal'),
    tf.keras.layers.RandomRotation(0.01),
    tf.keras.layers.RandomZoom(0.05),
    tf.keras.layers.RandomBrightness((-0.1, 0.1)),
    tf.keras.layers.RandomContrast(0.1),
    # tf.keras.layers.RandomCrop(200, 200),
    # tf.keras.layers.Rescaling(1./255),
]

def image_augmentation(image):
    # Use augmentation if `with_augmentation` is set to True
    if with_augmentation:
        for layer in augmentation_layers:
            image = layer(image)
    return image

# Create datasets(training, validation)
training_dataset, validation_dataset = tf.keras.preprocessing.image_dataset_from_directory(
    'work',
    batch_size=batch_size,
    validation_split=0.2,
    subset='both',
    # Seed is always the same in order to ensure that we can reproduce the same
    # training session
    seed=123,
    shuffle=True,
    image_size=image_dimensions,
    color_mode=color_mode,
)

training_dataset = training_dataset.map(
    lambda img, label: (image_augmentation(img), label),
    num_parallel_calls=tf.data.AUTOTUNE,
)

training_dataset = training_dataset.prefetch(tf.data.AUTOTUNE)
validation_dataset = validation_dataset.prefetch(tf.data.AUTOTUNE)

print(f'image dimensions {image_dimensions}')

In [None]:
# Plot a few images from the training dataset

import matplotlib.pyplot as plt
import numpy as np

images = training_dataset.take(1)

for image in images.as_numpy_iterator():
    print(f'shape {np.shape(image[0][0])}')
    plt.figure(figsize=(18, 18))
    for i in range(9):
        plt.subplot(3, 3, i + 1)
        plt.imshow(image[0][i].astype("uint8"), cmap='gray')
        plt.axis('off')
    plt.show()

In [None]:
# Define callbacks for training.

import pickle

class StoreHistory(tf.keras.callbacks.Callback):
    def __init__(self):
        self.history = {}
        self.number_epochs = 0
        history_file_exists = os.path.isfile(history_file)
        if history_file_exists and load_history_from_file:
            print(f'Loading history from file {history_file}')
            with open(history_file, 'rb') as f:
                self.history = pickle.load(f)
                keys = list(self.history.keys())
                self.number_epochs = len(self.history[keys[0]])
                print(f'Loaded history of {self.number_epochs} previous epochs')
                last_output = f'Epoch {self.number_epochs}: '
                for key in keys:
                    last_output += f'{key}: {self.history[key][-1]:.4f}'
                    if key != keys[-1]:
                        last_output += ' - '
                print(last_output)
        else:
            print(f'Creating new history file {history_file}')
        for key in ['duration', 'accuracy']:
            if key not in self.history:
                self.history[key] = []

    def on_train_begin(self, logs=None):
        keys = list(self.history.keys())
        if len(keys) == 0:
            self.number_epochs = 0
        else:
            self.number_epochs = len(self.history[keys[0]])
        print(f'Starting new training with {self.number_epochs} previous epochs')

    def on_epoch_end(self, epoch, logs=None):
        if not 'batch_size' in self.history:
            self.history['batch_size'] = []
        self.history['batch_size'].append(batch_size)
        for key in logs:
          if not key in self.history:
              self.history[key] = []
          self.history[key].append(logs[key])
        with open(history_file, 'wb') as f:
            pickle.dump(self.history, f)
            print(f'Epoch {epoch + self.number_epochs + 1} history pickled and saved to file')

checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=model_file,
    monitor='val_accuracy',
    save_best_only=True,
    save_weights_only=False,
    verbose=1,
)

reduce_learning_rate = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.75,  # New lr = lr * factor.
    patience=50,
    verbose=1,
    mode='min',
    min_lr=1e-8,  # Lower bound on the learning rate.
)

In [None]:
# Create / load training history

def get_best_epoch(history, key):
    max_value = 0
    max_epoch = 0
    if key not in history.history or len(history.history[key]) == 0:
        return 0, 0, 0, 0, 0
    for epoch in range(len(history.history[key])):
        value = history.history[key][epoch]
        if value >= max_value: # We want the last, best value
            max_value = value
            max_epoch = epoch
    return (history.history['accuracy'][max_epoch],
            history.history['val_accuracy'][max_epoch],
            history.history['loss'][max_epoch],
            history.history['val_loss'][max_epoch],
            max_epoch,
    )

full_history = StoreHistory()

best_accuracy, best_val_accuracy, best_loss, best_val_loss, best_epoch = get_best_epoch(full_history, 'accuracy')
print(f'Total time {sum(full_history.history["duration"])} for {len(full_history.history["accuracy"])} epochs')
print(f'Best epoch {best_epoch} - accuracy: {best_accuracy:.4f} - val_accuracy: {best_val_accuracy:.4f} - loss: {best_loss:.4f} - val_loss: {best_val_loss:.4f}')

In [None]:
# Define the two models.

def pre_trained_model():
    # Use the Xception model with imagenet weights as base model
    base_model = tf.keras.applications.Xception(
        weights='imagenet',
        include_top=False,
        input_shape=(*image_dimensions, 3),
    )

    print(f'Number of layers in the base model: {len(base_model.layers)}')
    print(f'shape of output layer: {base_model.layers[-1].output_shape}')

    # We do not want to change the weights in the Xception model (imagenet
    # weights are frozen)
    base_model.trainable = False

    # Average pooling takes the 2,048 outputs of the Xeption model and brings
    # it into one output. The sigmoid layer makes sure that one output is
    # between 0-1. We will train all parameters in these last two layers
    model = tf.keras.Sequential([
        base_model,
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dense(1, activation='sigmoid'),
    ])

    return model

# The light model does not run properly on a TPU runtime. The loss function
# results in `nan` after only one epoch. It does work on GPU runtimes though.
def light_model():
    inputs = tf.keras.Input(shape=(*image_dimensions, 1))

    # Entry block
    x = tf.keras.layers.Rescaling(1.0 / 255)(inputs)
    x = tf.keras.layers.Conv2D(128, 1, strides=2, padding="same")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    for size in [256, 512, 728]:
        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.SeparableConv2D(size, 1, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.SeparableConv2D(size, 1, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.MaxPooling2D(1, strides=2, padding="same")(x)

        # Project residual
        residual = tf.keras.layers.Conv2D(size, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = tf.keras.layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    x = tf.keras.layers.SeparableConv2D(1024, 1, padding="same")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    x = tf.keras.layers.GlobalAveragePooling2D()(x)

    x = tf.keras.layers.Dropout(0.1)(x)

    outputs = tf.keras.layers.Dense(1, activation=None)(x)
    model = tf.keras.Model(inputs, outputs)

    return model

In [None]:
# Build model
#
# Prepares model so we can run it

import os

with distribution_strategy.scope():
    model_file_exists = os.path.isfile(model_file)
    if load_model_from_file and model_file_exists:
        os.stat(model_file)
        print(f'Loading model from file {model_file}')
        model = tf.keras.models.load_model(model_file)
        print('Loaded model from file')
    else:
        print('Creating new model')
        if model_version == "pre-trained":
            print('Creating new Xception model')
            model = pre_trained_model()
        elif model_version == "light":
            print('Creating new light model')
            model = light_model()
        else:
            raise Exception(f'unknown model version {model_version}')

        if model_version == "pre-trained":
            model.build(input_shape=(None, *image_dimensions, 3))
        else:
            model.build(input_shape=(None, *image_dimensions, 1))

        print(f'Number of layers in the model: {len(model.layers)}')

In [None]:
# Compile model

with distribution_strategy.scope():
    if model_version == 'pre-trained':
        print('Compiling pre-trained model')
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
            loss='binary_crossentropy',
            metrics=['accuracy'],
        )
    elif model_version == 'light':
        print('Compiling light model')
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=alpha),
            loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
            metrics=[tf.keras.metrics.BinaryAccuracy(name="accuracy")],
        )
    else:
        raise Exception(f'Unknown model version {model_version}')
    model.summary()

# Some notes

## Accuracy

The accuracy ($A$) measures how often a machine learning model correctly predicts the outcome. You can calculate the accuracy by dividing the number of correct predictions by the total number of predictions. In terms of the confusion matrix

|                     | Lion (expected)     | no Lion (expected)  |
| ------------------- | ------------------- | ------------------- |
| Lion (predicted)    | true positive (TP)  | false positive (FP) |
| no Lion (predicted) | false negative (FN) | true negative (TN)  |

$$
A = \frac{ \mathrm{TP} + \mathrm{TN} }{ \mathrm{TP} + \mathrm{TN} + \mathrm{FP} + \mathrm{FN} }
$$

## Loss

The binary cross entropy (loss) is defined as

$$
- \frac{1}{N} \sum \left[ y_{i} \log p_{i} + (1 - y_{i}) \log (1 - p_{i}) \right]
$$

where the sum goes over all images in the dataset, $ y_{i} $ is the expected label of the image (0 for lion, 1 for no lion), and $ p_{i} $ is the predicted label. Note that $ p_{i} $ is the probability that the image is "no lion" and $ (1 - p_{i}) $ is the probability that the image shows a lion.

In [None]:
# Train

from datetime import datetime

start_time = datetime.now()
print(start_time)
history = model.fit(
    training_dataset,
    epochs=epochs,
    validation_data=validation_dataset,
    callbacks=[
        checkpoint,
        reduce_learning_rate,
        full_history,
    ]
)
end_time = datetime.now()
print(end_time)

duration = (end_time - start_time).total_seconds()
print(f'This run took {duration} seconds')

if 'duration' not in full_history.history:
    full_history.history['duration'] = []
full_history.history['duration'].append(duration)

print(f'total time {sum(full_history.history["duration"])} for {len(full_history.history["accuracy"])} epochs')

In [None]:
# Print some stats of training so far

print(f'Total time {sum(full_history.history["duration"])} for {len(full_history.history["accuracy"])} epochs')

best_accuracy, best_val_accuracy, best_loss, best_val_loss, best_epoch = get_best_epoch(full_history, 'accuracy')
print(f'Best accuracy - epoch {best_epoch} - accuracy: {best_accuracy:.4f} - val_accuracy: {best_val_accuracy:.4f} - loss: {best_loss:.4f} - val_loss: {best_val_loss:.4f}')

best_accuracy, best_val_accuracy, best_loss, best_val_loss, best_epoch = get_best_epoch(full_history, 'val_accuracy')
print(f'Best val_accuracy - epoch {best_epoch} - accuracy: {best_accuracy:.4f} - val_accuracy: {best_val_accuracy:.4f} - loss: {best_loss:.4f} - val_loss: {best_val_loss:.4f}')

In [None]:
# Plot training progress

import matplotlib.pyplot as plt

plt.figure(figsize=(18, 10))
plt.subplot(1, 2, 1)
plt.plot(full_history.history['accuracy'], label='Training Accuracy')
plt.plot(full_history.history['val_accuracy'], label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()), 1])
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(full_history.history['loss'], label='Training Loss')
plt.plot(full_history.history['val_loss'], label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0, 1.0])
plt.title('Training and Validation Loss')
plt.show()

In [None]:
# Load best model from disk.

with distribution_strategy.scope():
    model = tf.keras.models.load_model(model_file)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

images, labels = next(iter(validation_dataset))

# Predict the labels for the images
predictions = model.predict(images)

# Plot the images and their predicted labels
plt.figure(figsize=(18, 18))
for i in range(9):
  plt.subplot(3, 3, i + 1)
  plt.imshow(images[i].numpy().astype("uint8"), cmap='gray')
  plt.title(f"Predicted: {predictions[i][0]:.2f} ({'lion' if predictions[i][0] < 0.5 else 'no lion'}), Actual: {labels[i]} ({'lion' if labels[i] == 0 else 'no lion'})")
  plt.axis('off')

plt.show()

In [None]:
import glob
import numpy as np
import matplotlib.pyplot as plt

# classification_directory = '/content/drive/MyDrive/lion_no_lion/stable/angle 1'
classification_directory = '/content/drive/MyDrive/lion_no_lion/stable/angle 2/Lion'
# classification_directory = '/content/drive/MyDrive/lion_no_lion/cougar'
# classification_directory = '/content/drive/MyDrive/lion_no_lion/lion'
# classification_directory = '/content/drive/MyDrive/lion_no_lion/nocougar'
classification_image_files = glob.glob(os.path.join(classification_directory,"*JPG"))
classifications = []
for image_file in classification_image_files:
    image_data = tf.keras.utils.load_img(image_file, target_size=image_dimensions)
    image_data = tf.keras.utils.img_to_array(image_data)
    prediction = model.predict(np.expand_dims(image_data, 0))
    classifications.append({})
    classifications[-1]['path'] = image_file
    classifications[-1]['image'] = image_data
    classifications[-1]['prediction'] = prediction[0][0]

    plt.figure(figsize=(8, 8))
    plt.imshow(np.array(classifications[-1]['image'].astype("uint8")), cmap='gray')
    plt.title(f'{(1-classifications[-1]["prediction"])*100:.2f}% lion - {classifications[-1]["prediction"]*100:.2f}% no_lion')
    plt.axis('off')
    plt.show()