<a href="https://colab.research.google.com/github/GiuliaPais/SMML_final_project/blob/main/Appendix_A_SMML_final_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Appendix A - Neural networks for image classification**
### *Final assignment for the course "Statistical Methods for Machine Learning", Università degli Studi di Milano Statale.*

   *Academic year 2022/2023*

   *Author: Giulia Pais*



---



# Setup

In [1]:
# Install most recent version of TF
!pip install --upgrade -q tensorflow

[K     |████████████████████████████████| 588.3 MB 21 kB/s 
[K     |████████████████████████████████| 439 kB 80.5 MB/s 
[K     |████████████████████████████████| 1.7 MB 51.6 MB/s 
[K     |████████████████████████████████| 6.0 MB 58.5 MB/s 
[?25h

In [2]:
!pip install -q validators

  Building wheel for validators (setup.py) ... [?25l[?25hdone


In [3]:
import tensorflow as tf
print(tf.__version__)
import pandas as pd
import plotly.express as px
import numpy as np
import validators
import subprocess
import os
import ipywidgets as widgets

from tensorflow import keras
from keras.models import Sequential
from keras.layers import Conv2D, Dense, Dropout, Rescaling, Flatten, BatchNormalization, MaxPooling2D
from pathlib import Path
from google.colab import drive
from IPython.display import display

2.11.0


In [4]:
try:
  strategy = tf.distribute.MirroredStrategy()
except Exception:
  strategy = tf.distribute.get_strategy()



## Data fetching

Modify the following options to change data location and retrieval.


*   **Data origin**: it's the source of data. If set to `drive`, mounts Google drive directory in `/content/drive` (requires login and access to Google drive); if set to `url` it downloads a zipped version of the dataset from the given url and unzips it in the destination folder; if set to `folder` simply provide a path on the local file system to the uncompressed dataset.
*   **Subfolder or path to zipped dataset (`drive`)**: a subfolder or path to file inside the mounted drive folder
*   **Unzip data**: boolean option for unzipping data or not
*   **Destination path**: path to a folder where the dataset should be unzipped

In [5]:
#@title Data source options

UNIMI_URL = "https://unimibox.unimi.it/index.php/s/eNGYGSYmqynNMqF/download"

output = widgets.Output()
DATA_ORIGIN = widgets.Dropdown(
    options=["drive", "url", "folder"],
    value="drive",
    description="Data origin",
    layout=widgets.Layout(width="100%")
)
DRIVE_SUBPATH = widgets.Text(
    value="MyDrive/CatsDogs.zip",
    description="Subfolder or path to zipped dataset",
    layout=widgets.Layout(display='none', width="100%")
)
DOWNLOAD_URL = widgets.Text(
    value=UNIMI_URL,
    description="Download URL",
    layout=widgets.Layout(display='none', width="100%")
)
UNZIP_DATA = widgets.Checkbox(
    value=True,
    description="Unzip data",
    indent=False,
    layout=widgets.Layout(width="100%")
)
UNZIP_PATH = widgets.Text(
    value="/content/dataset",
    description="Where should files be unzipped?",
    layout=widgets.Layout(display='none', width="100%")
)
DATA_FOLDER_PATH = widgets.Text(
    value="/content/dataset/CatsDogs",
    description="Path pointing to the folder containing the image category folders",
    layout=widgets.Layout(display='none', width="100%")
)

def on_display_origin(dropdown):
  if dropdown.value == "drive":
    # Using google drive
    ## Shows the optional subfolder/file path inside mounted drive folder
    DRIVE_SUBPATH.layout.display = "block"
    UNZIP_DATA.disabled = False
    ## Hidden components
    DOWNLOAD_URL.layout.display = "none"
    DATA_FOLDER_PATH.layout.display = "none"
  elif dropdown.value == "url":
    # Downloading files
    ## Shows download url
    DOWNLOAD_URL.layout.display = "block"
    ## Downloads always unzip
    UNZIP_DATA.value = True
    UNZIP_DATA.disabled = True
    ## Hidden components
    DRIVE_SUBPATH.layout.display = "none"
    DATA_FOLDER_PATH.layout.display = "none"
  elif dropdown.value == "folder":
    # Providing a local folder
    DATA_FOLDER_PATH.layout.display = "block"
    ## No need unzipping
    UNZIP_DATA.value = False
    UNZIP_DATA.disabled = True
    ## Hidden components
    DRIVE_SUBPATH.layout.display = "none"
    DOWNLOAD_URL.layout.display = "none"

def on_change_origin(change):
  if change['new'] == "drive":
    # Using google drive
    ## Shows the optional subfolder/file path inside mounted drive folder
    DRIVE_SUBPATH.layout.display = "block"
    UNZIP_DATA.disabled = False
    ## Hidden components
    DOWNLOAD_URL.layout.display = "none"
    DATA_FOLDER_PATH.layout.display = "none"
  elif change['new'] == "url":
    # Downloading files
    ## Shows download url
    DOWNLOAD_URL.layout.display = "block"
    ## Downloads always unzip
    UNZIP_DATA.value = True
    UNZIP_DATA.disabled = True
    ## Hidden components
    DRIVE_SUBPATH.layout.display = "none"
    DATA_FOLDER_PATH.layout.display = "none"
  elif change['new'] == "folder":
    # Providing a local folder
    DATA_FOLDER_PATH.layout.display = "block"
    ## No need unzipping
    UNZIP_DATA.value = False
    UNZIP_DATA.disabled = True
    ## Hidden components
    DRIVE_SUBPATH.layout.display = "none"
    DOWNLOAD_URL.layout.display = "none"

def on_display_unzip(check):
  if check.value:
    UNZIP_PATH.layout.display = "block"
  else:
    UNZIP_PATH.layout.display = "none"

def on_change_unzip(change):
  if change['new'] is True:
    UNZIP_PATH.layout.display = "block"
  else:
    UNZIP_PATH.layout.display = "none"

DATA_ORIGIN.observe(on_change_origin, names='value')
DATA_ORIGIN.on_displayed(on_display_origin)
UNZIP_DATA.on_displayed(on_display_unzip)
UNZIP_DATA.observe(on_change_unzip, names="value")


data_source_box = widgets.Box([DATA_ORIGIN, DRIVE_SUBPATH, DOWNLOAD_URL,
                               DATA_FOLDER_PATH,
                               UNZIP_DATA, UNZIP_PATH], 
                              layout=widgets.Layout(display="flex nowrap",
                                                    flex_flow="column",
                                                    align_items="stretch",
                                                    width="50%",
                                                    border="solid 2px",
                                                    padding="10px"))

display(data_source_box, output)

Box(children=(Dropdown(description='Data origin', layout=Layout(width='100%'), options=('drive', 'url', 'folde…

Output()

In [6]:
class DataFetcher():
  def __init__(self, force_download=False):
    self.force_download = force_download
    self._origin = DATA_ORIGIN.value
    self._download_url = None
    self._unzip = UNZIP_DATA.value
    self._unzip_path = None
    self._drive_subfold = None
    self._folder_provided = None
    self._final_path = None

  @property
  def data_path(self):
    return self._final_path
  
  def _navigate_subpath(self, path):
    if not isinstance(path, Path):
      path = Path(path)
    subpaths = path.rglob("*.jpg")
    common = os.path.commonprefix(list(subpaths))
    return Path(common)


  def _download_from_url(self):
    if not (self._download_url and validators.url(self._download_url)):
      raise ValueError(f"Invalid url supplied: {self._download_url}")
    if Path("dataset.zip").exists() and not self.force_download:
      print("Dataset already downloaded, skipping")
      return
    print("Downloading...")
    try:
      subprocess.run(
          ["wget", "-O", "dataset.zip", self._download_url],
          stdin=subprocess.PIPE, stdout=subprocess.PIPE, 
          stderr=subprocess.STDOUT, text=True, check=True
      )
      print("Download completed")
    except subprocess.CalledProcessError as ce:
      print("Download failed - details:")
      print(ce.stdout)  

  def _unzip_dataset(self, archive_path):
    if self._unzip:
      self._unzip_path = Path(UNZIP_PATH.value)
      print("Unzipping in destination folder...")
      try:
        subprocess.run(
            ["unzip", "-qu", archive_path, "-d", self._unzip_path],
            stdin=subprocess.PIPE, stdout=subprocess.PIPE, 
            stderr=subprocess.STDOUT, text=True, check=True
        )
        print(f"Dataset succesfully unzipped at {self._unzip_path}")
      except subprocess.CalledProcessError as ce:
        print("Unzipping failed - details:")
        print(ce.stdout)
    

  def fetch(self):
    if self._origin == "url":
      self._download_url = DOWNLOAD_URL.value
      self._download_from_url()
      self._unzip_dataset("dataset.zip")
      self._final_path = self._navigate_subpath(self._unzip_path)
      return
    if self._origin == "drive":
      drive.mount('/content/drive')
      self._drive_subfold = DRIVE_SUBPATH.value
      drive_complete_path = Path('/content/drive').joinpath(self._drive_subfold)
      if drive_complete_path.is_file():
        self._unzip_dataset(drive_complete_path)
        self._final_path = self._navigate_subpath(self._unzip_path)
      else:
        self._final_path =  self._navigate_subpath(drive_complete_path)
    else:
      self._folder_provided = Path(DATA_FOLDER_PATH.value)
      if not self._folder_provided.is_dir():
        raise ValueError(f"{self._folder_provided} is not a folder")
      self._final_path = self._folder_provided

In [7]:
data_fetcher = DataFetcher()
data_fetcher.fetch()
data_path = data_fetcher.data_path
folder = str(data_path)

Mounted at /content/drive
Unzipping in destination folder...
Dataset succesfully unzipped at /content/dataset


## General options and global vars

⚠ By default the option to re-train the models is disabled: data is fetched from tabular files obtained during the training phase. To re-train the models set the relative option to TRUE (training may require from half an hour to 1 hour depending on the runtime).

In [8]:
#@title Options

GITHUB_URL = "https://github.com/GiuliaPais/SMML_final_project.git"
subprocess.run(
          ["git", "clone", GITHUB_URL],
          stdin=subprocess.PIPE, stdout=subprocess.PIPE, 
          stderr=subprocess.STDOUT, text=True
      )
AUTOTUNE = tf.data.AUTOTUNE
MODEL_HISTORIES_FOLD = "/content/SMML_final_project/baseline_search_hist"
METRICS = ["accuracy"]

batch_size = widgets.IntText(
    value=32,
    description="Batch size"
)
img_size =  widgets.IntText(
    value=128,
    description="Image size (both height and width)"
)
color_mode = widgets.Dropdown(
    options=["rgb", "rgba", "grayscale"],
    value="rgb",
    description="Color mode"
)
seed = widgets.IntText(
    value=3562,
    description="Seed"
)
train_models = widgets.Checkbox(
    value=False,
    description="Re-train models?"
)

options_box = widgets.Box([batch_size, img_size, color_mode, seed, train_models], 
                          layout=widgets.Layout(display="flex nowrap",
                                                flex_flow="column",
                                                align_items="stretch",
                                                width="50%",
                                                border="solid 2px",
                                                padding="10px"))

display(options_box, output)


Box(children=(IntText(value=32, description='Batch size'), IntText(value=128, description='Image size (both he…

Output()

In [9]:
#@title

BATCH_SIZE = batch_size.value
IMG_HEIGHT = img_size.value
IMG_WIDTH = img_size.value
COLOR_MODE = color_mode.value
if COLOR_MODE == "rgb":
  CHANNELS = 3
elif COLOR_MODE == "rgba":
  CHANNELS = 4
else:
  CHANNELS = 1
SEED = seed.value

TRAIN_MODELS = train_models.value



# Utilities

In [10]:
def get_balanced_datasets(data_folder, train_split=0.8, test_split=0.2):
  '''
  Returns stratified datasets according to the percentages specified in input. This ensures all datasets
  have an (almost) equal proportion of examples from both classes.
  By default, the function reserves 20% of the whole dataset for testing.

  Parameters:
  -----------
  data_folder: str
      the path to the folder that contains the sub-folders with images
  train_split: float
      percentage of the whole dataset to reserve for training
  test_split: float
      percentage of the whole dataset to reserve for testing
  '''
  # Ensure percentages sum up to 1
  if (train_split + test_split) != 1.0:
    raise ValueError("dataset splits must sum up to 1.0")

  # Derive the number of images to include in each set
  image_count_all = len(list(data_folder.glob('*/*.jpg')))
  image_count_cats = len(list(data_folder.glob('Cats/*.jpg')))
  image_count_dogs = len(list(data_folder.glob('Dogs/*.jpg')))

  train_size_cats, train_size_dogs = (int(image_count_cats*train_split), int(image_count_dogs*train_split))
  test_size_cats, test_size_dogs = (int(image_count_cats*test_split), int(image_count_dogs*test_split))

  total_cats = train_size_cats + test_size_cats
  total_dogs = train_size_dogs + test_size_dogs
  
  residual_cats = image_count_cats - total_cats
  test_size_cats += residual_cats // 2
  train_size_cats += (residual_cats - (residual_cats // 2))

  residual_dogs = image_count_dogs - total_dogs
  test_size_dogs += residual_dogs // 2
  train_size_dogs += (residual_dogs - (residual_dogs // 2))

  # Get images and create a tf dataset
  dataset = tf.keras.utils.image_dataset_from_directory(
    data_folder,
    batch_size=None,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    label_mode='binary',
    seed=SEED,
    color_mode=COLOR_MODE
  )
  cats_ds = dataset.filter(lambda x, y: tf.reduce_all(tf.math.equal(y, 0))).ignore_errors()
  dogs_ds = dataset.filter(lambda x, y: tf.reduce_all(tf.math.equal(y, 1))).ignore_errors()

  # Create separate datasets for each
  train_cats_ds, train_dogs_ds = (cats_ds.take(train_size_cats), dogs_ds.take(train_size_dogs))
  test_cats_ds, test_dogs_ds = (cats_ds.skip(train_size_cats), dogs_ds.skip(train_size_dogs))

  # Obtain the 2 sets
  train_ds = train_cats_ds.concatenate(train_dogs_ds).shuffle(train_size_cats + train_size_dogs).batch(BATCH_SIZE).cache().prefetch(buffer_size=AUTOTUNE)
  test_ds = test_cats_ds.concatenate(test_dogs_ds).shuffle(test_size_cats + test_size_dogs).batch(BATCH_SIZE).cache().prefetch(buffer_size=AUTOTUNE)

  return (train_ds, test_ds)

In [11]:
def get_csv_logger_callback(model_version):
   hist_filename = f"{MODEL_HISTORIES_FOLD}/{model_version}_history.csv"
   csv_logger = tf.keras.callbacks.CSVLogger(hist_filename)
   return csv_logger

In [12]:
def get_history_plots(history, model_version, log_y=True):
  if not isinstance(history, pd.DataFrame):
    hist_df = pd.DataFrame.from_dict(history)
  else:
    hist_df = history
  hist_df = hist_df.melt(id_vars=["epoch"])
  hist_df = hist_df.assign(type = ["validation" if v else "training" for v in hist_df["variable"].str.contains("val")])
  hist_df["variable"] = hist_df["variable"].str.replace("val_", "")
  if log_y:
    hist_df["value_log"] = np.log(hist_df.value)

  fig = px.line(hist_df, x="epoch", y="value_log" if log_y else "value", color="type", 
                facet_col="variable", markers=True, 
                title=f"Baseline version {model_version}",
                template="plotly_white")
  fig.update_layout(
    yaxis_title="value (log10)" if log_y else "value"
  )
  return (fig, hist_df)

  

# Finding a baseline

In [13]:
train_set, test_set = get_balanced_datasets(data_path)

Found 25000 files belonging to 2 classes.


Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089


In [14]:
def baseline_model_v0():
  img_shape = (IMG_HEIGHT, IMG_WIDTH, CHANNELS)
  model = Sequential(name="baseline_v0")
  model.add(Rescaling(1./255, input_shape=img_shape))
  
  model.add(Conv2D(filters=64, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Conv2D(filters=128, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Conv2D(filters=256, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(keras.layers.MaxPool2D(padding="same"))

  model.add(Flatten())

  model.add(Dense(units=512, activation="relu"))
  model.add(Dense(units=512, activation="relu"))
  model.add(Dense(units=512, activation="relu"))
  model.add(Dense(units=1, activation="sigmoid"))

  model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.01),
                loss=tf.keras.losses.BinaryCrossentropy(),
                metrics=METRICS)

  return model

In [15]:
if TRAIN_MODELS:
  with strategy.scope():
    model = baseline_model_v0()
    model.summary()

In [16]:
if TRAIN_MODELS:
  v0_csv_cb = get_csv_logger_callback("baseline_v0")
  history_v0 = model.fit(train_set, validation_data=test_set, epochs=30, callbacks=[v0_csv_cb])
else:
  history_v0 = pd.read_csv(f"{MODEL_HISTORIES_FOLD}/baseline_v0_history.csv")

In [17]:
history_v0

Unnamed: 0,epoch,accuracy,loss,val_accuracy,val_loss
0,0,0.51,2.39,0.53,0.68
1,1,0.55,0.68,0.62,0.65
2,2,0.55,0.68,0.57,0.69
3,3,0.6,0.65,0.62,0.65
4,4,0.59,0.66,0.5,0.69
5,5,0.53,0.68,0.5,0.69
6,6,0.57,0.67,0.62,0.64
7,7,0.52,0.69,0.5,0.69
8,8,0.5,0.69,0.5,0.69
9,9,0.5,0.69,0.5,0.69


In [18]:
baseline_v0_plot, hist_v0_df = get_history_plots(history_v0, 0, log_y=True)
baseline_v0_plot.show()

In [19]:
def baseline_model_v1():
  img_shape = (IMG_HEIGHT, IMG_WIDTH, CHANNELS)
  model = Sequential(name="baseline_v1")
  model.add(Rescaling(1./255, input_shape=img_shape))
  
  model.add(Conv2D(filters=64, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Conv2D(filters=128, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Conv2D(filters=256, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Conv2D(filters=512, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))

  model.add(Flatten())

  model.add(Dense(units=1024, activation="relu"))
  model.add(BatchNormalization())
  model.add(Dense(units=1024, activation="relu"))
  model.add(BatchNormalization())
  model.add(Dense(units=1024, activation="relu"))
  model.add(BatchNormalization())
  model.add(Dense(units=1, activation="sigmoid"))

  model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.01),
                loss=tf.keras.losses.BinaryCrossentropy(),
                metrics=METRICS)

  return model

In [20]:
if TRAIN_MODELS:
  with strategy.scope():
    model = baseline_model_v1()
    model.summary()

In [21]:
if TRAIN_MODELS:
  v1_csv_cb = get_csv_logger_callback("baseline_v1")
  history_v1 = model.fit(train_set, validation_data=test_set, epochs=30, callbacks=[v1_csv_cb])
else:
  history_v1 = pd.read_csv(f"{MODEL_HISTORIES_FOLD}/baseline_v1_history.csv")

In [25]:
baseline_v1_plot, hist_v1_df = get_history_plots(history_v1, 1, log_y=True)
baseline_v1_plot.show()

In [26]:
def baseline_model_v2():
  img_shape = (IMG_HEIGHT, IMG_WIDTH, CHANNELS)
  model = Sequential(name="baseline_v2")
  model.add(Rescaling(1./255, input_shape=img_shape))
  
  # ----- 
  model.add(Conv2D(filters=64, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Dropout(0.3))
  # ----- 
  model.add(Conv2D(filters=64, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Dropout(0.3))
  # ----- 
  model.add(Conv2D(filters=128, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Dropout(0.3))
  # ----- 
  model.add(Conv2D(filters=128, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Dropout(0.3))
  # ----- 
  model.add(Conv2D(filters=256, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Dropout(0.3))
  # ----- 
  model.add(Conv2D(filters=512, kernel_size=(3, 3), padding="same", activation="relu"))
  model.add(BatchNormalization())
  model.add(keras.layers.MaxPool2D(padding="same"))
  model.add(Dropout(0.3))

  model.add(Flatten())

  model.add(Dense(units=1024, activation="relu"))
  model.add(BatchNormalization())
  model.add(Dense(units=1024, activation="relu"))
  model.add(BatchNormalization())

  model.add(Dense(units=1, activation="sigmoid"))

  model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.01),
                loss=tf.keras.losses.BinaryCrossentropy(),
                metrics=METRICS)

  return model

In [27]:
if TRAIN_MODELS:
  with strategy.scope():
    model = baseline_model_v2()
    model.summary()

In [28]:
if TRAIN_MODELS:
  v2_csv_cb = get_csv_logger_callback("baseline_v2")
  history_v2 = model.fit(train_set, validation_data=test_set, epochs=30, callbacks=[v2_csv_cb])
else:
  history_v2 = pd.read_csv(f"{MODEL_HISTORIES_FOLD}/baseline_v2_history.csv")

In [29]:
baseline_v2_plot, hist_v2_df = get_history_plots(history_v2, 2, log_y=True)
baseline_v2_plot.show()