In [None]:
# download dataset

import requests

URL = 'https://download.microsoft.com/download/3/E/1/3E1C3F21-ECDB-4869-8368-6DEBA77B919F/kagglecatsanddogs_5340.zip'
response = requests.get(URL)
dataset_file_name = 'dataset.zip'
open(dataset_file_name, "wb").write(response.content)

In [None]:
# unzip dataset

import zipfile

dest_folder = 'dataset'
with zipfile.ZipFile(dataset_file_name, 'r') as zip_ref:
    zip_ref.extractall(dest_folder)

In [None]:
# get images stats: paths, resolutions and corrupted files

import os
from PIL import Image

cats_folder = './dataset/PetImages/Cat/'
dogs_folder = './dataset/PetImages/Dog/'

def get_images_stats(dir):
    resolutions = {}
    imgs_file_names = os.listdir(dir)
    file_names = []
    corrupted_file_names = []
    for img in imgs_file_names:
        try:
            composed_path = os.path.join(dir, img)
            img_size = Image.open(composed_path).size
            if img_size not in resolutions: resolutions[img_size] = 0
            resolutions[img_size] += 1
            file_names.append(composed_path)
        except:
            corrupted_file_names.append(composed_path)
    return file_names, resolutions, corrupted_file_names

cats_imgs, cats_resolutions, corrupted_cat_imgs = get_images_stats(cats_folder)
dogs_imgs, dogs_resolutions, corrupted_dog_imgs = get_images_stats(dogs_folder)
total_resolutions = {**cats_resolutions, **dogs_resolutions}

In [None]:
# calculate average image resolution

average_resolution = {'width': 0, 'height': 0}

counter = 0
for res in total_resolutions:
  count = total_resolutions[res]
  counter += count
  average_resolution['width'] += res[0]*count
  average_resolution['height'] += res[1]*count

average_resolution['width'] //= counter
average_resolution['height'] //= counter

print('average_resolution', average_resolution)

In [None]:
# remove corrupted images

corrupted = corrupted_cat_imgs + corrupted_dog_imgs
for cor_img in corrupted:
  if cor_img[-3:] == 'jpg': os.remove(cor_img)

In [None]:
# create train and test sets folders

train_cats_folder = './dataset/train/cats/'
train_dogs_folder = './dataset/train/dogs/'
test_cats_folder = './dataset/test/cats/'
test_dogs_folder = './dataset/test/dogs/'

os.makedirs(train_cats_folder, exist_ok=True)
os.makedirs(train_dogs_folder, exist_ok=True)
os.makedirs(test_cats_folder, exist_ok=True)
os.makedirs(test_dogs_folder, exist_ok=True)

In [None]:
# train and test split: images are copied from the dataset folder to the destination folder (train or test)
# train percentage is set to 85 %

import shutil
import random
from PIL import Image
from tqdm import tqdm

def train_test_folders_split(imgs_paths, train_split, train_dir_path, test_dir_path):
    num_train_imgs = int(len(imgs_paths) * train_split)
    random.seed(42) # for reproducibility purposes
    random.shuffle(imgs_paths)
    fails = []
    for i, img_path in tqdm(enumerate(imgs_paths), total=len(imgs_paths), desc=f"{imgs_paths[0].split('/')[-2]} splitting"):
        try:
            if i < num_train_imgs: shutil.copy(img_path, f"{train_dir_path}{img_path.split('/')[-1]}")
            else: shutil.copy(img_path, f"{test_dir_path}{img_path.split('/')[-1]}")
        except: fails.append(img_path)
    return fails

train_percentage = 0.85
cats_fails = train_test_folders_split(cats_imgs, train_percentage, train_cats_folder, test_cats_folder)
dogs_fails = train_test_folders_split(dogs_imgs, train_percentage, train_dogs_folder, test_dogs_folder)
print()
print(len(cats_fails), cats_fails)
print(len(dogs_fails), dogs_fails)

In [None]:
# create the images data generators
# the train set generator also applies transformations to the images
# given the different resolutions, images are all resized to a common resolution of 128x128

from keras.preprocessing.image import ImageDataGenerator

reshape_target_size = (128, 128)
batch_size = 64

train_datagen = ImageDataGenerator(
    rotation_range=15,
    rescale=1./255,
    shear_range=0.1,
    zoom_range=0.2,
    horizontal_flip=True,
    width_shift_range=0.1,
    height_shift_range=0.1,
    validation_split=0.15
)

train_generator = train_datagen.flow_from_directory(
    'dataset/train/',
    target_size = reshape_target_size,
    batch_size=batch_size,
    class_mode='binary',
    subset='training'
)

validation_generator = train_datagen.flow_from_directory(
    'dataset/train/',
    target_size = reshape_target_size,
    batch_size=batch_size,
    class_mode='binary',
    subset='validation'
)

test_datagen = ImageDataGenerator(
    rescale=1./255
)

test_generator = test_datagen.flow_from_directory(
    'dataset/test/',
    target_size = reshape_target_size,
    batch_size=32,
    class_mode='binary',
)

In [None]:
# plot some images just to check them

import matplotlib.pyplot as plt

batch_images, batch_labels = validation_generator.next()
plt.figure(figsize=(10, 10))
for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(batch_images[i])
    plt.title(f"Label: {batch_labels[i]}")
    plt.axis("off")
plt.tight_layout()
plt.show()

In [None]:
# definition of the model:
# 3 convolutional layers followed by a fully connected layer
# dropout is applied so that the model can generalize better
# adam optimizer

from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout

input_shape = batch_images[0].shape

model = Sequential()

model.add(Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.summary()

In [None]:
# I define an early stopping to avoid overfitting:
# the validation loss is observed and if this does not
# improve in the last 10 epochs the training stops and
# returns the model instance that presented the minimum
# validation loss during training

from keras.callbacks import EarlyStopping

epochs = 100
early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, restore_best_weights=True)

history = model.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // batch_size,
    epochs=epochs,
    validation_data=validation_generator,
    validation_steps= validation_generator.samples // batch_size,
    callbacks=[early_stopping]
)

In [None]:
model.save('classifier.keras')

In [None]:
# plot training curves

plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Metrics')
plt.legend()
plt.show()

In [None]:
import pandas as pd

df = pd.DataFrame(history.history)
df.to_csv('history.csv', index=False)

In [None]:
# evaluation of the model on the test set reporting the various metrics

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import numpy as np

total_predictions = []
total_labels = []

for i, batch in enumerate(test_generator):
  images, labels = batch
  preds = model.predict(images, verbose=0)
  preds = np.where(preds<0.5,0,1)
  preds = list(map(lambda x: x[0], preds))
  total_predictions = total_predictions + preds
  total_labels = total_labels + list(labels)
  if i == len(test_generator)-1: break

print("Accuracy: ", accuracy_score(total_labels, total_predictions))
print("Precision:", precision_score(total_labels, total_predictions))
print("Recall:   ", recall_score(total_labels, total_predictions))
print("F1-score: ", f1_score(total_labels, total_predictions))

In [None]:
# confusion matrix

import seaborn as sns

plt.figure(figsize=(5, 4))
sns.heatmap(confusion_matrix(total_labels, total_predictions),
            annot=True,
            fmt='d',
            cmap='Blues',
            xticklabels=test_generator.class_indices,
            yticklabels=test_generator.class_indices)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()

In [None]:
# continue training by loading the previous model

# from keras.models import load_model

# model_two = load_model('classifier.keras')
# history_two = model_two.fit(
#     train_generator,
#     steps_per_epoch=train_generator.samples // batch_size,
#     epochs=epochs,
#     validation_data=validation_generator,
#     validation_steps= validation_generator.samples // batch_size,
#     callbacks=[early_stopping]
# )