https://www.tensorflow.org/tutorials/images/transfer_learning

In [None]:
import os
from pathlib import Path
import zipfile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image
import tensorflow as tf
from tensorflow import keras

# Config

In [None]:
os.environ['KMP_DUPLICATE_LIB_OK']='True' # fixes error with libiomp5.dll
RANDOM_STATE = 42
UNZIP_IMAGES = False
RESIZE_IMAGES = False
MOVE_IMAGES = False

In [None]:
id_label = pd.read_csv('data\\train.csv')
id_label.head()

In [None]:
id_label.count()

In [None]:
id_label.groupby('has_cactus').count()

### Observations
- The data is skewed towards images that have a cactus
- i.e. by always guessing 1, the accuracy would be 75%
- Need to augment the `has_cactus == 0` data

## Separate data into train and validation set

In [None]:
VALIDATION_SET_SIZE = 3500 # 20%

neg_indices = id_label[id_label['has_cactus'] == 0].index
pos_indices = id_label[id_label['has_cactus'] == 1].index

neg_val = np.random.choice(neg_indices, VALIDATION_SET_SIZE // 2)
pos_val = np.random.choice(pos_indices, VALIDATION_SET_SIZE // 2)

In [None]:
id_label['no_cactus'] = pd.Series(np.ones((id_label.shape[0],)) - id_label['has_cactus'], dtype=np.int64)
id_label.head()


In [None]:
# confirm that `no_cactus` is the opposite of `has_cactus`
assert id_label['has_cactus'].sum() + id_label['no_cactus'].sum() == id_label.shape[0]
id_label.where(id_label['has_cactus'] == id_label['no_cactus']).count()

In [None]:
# splitting the validation classes 50:50 in case the model is biased towards predicting positive,
# it will perform poorly on the validation set
pos_val = id_label.sample(VALIDATION_SET_SIZE // 2, weights='has_cactus', random_state=RANDOM_STATE)
neg_val = id_label.sample(VALIDATION_SET_SIZE // 2, weights='no_cactus' , random_state=RANDOM_STATE)

In [None]:
print('pos count:', pos_val['has_cactus'].sum())
print('neg count:', neg_val['no_cactus'].sum())

In [None]:
train_df = id_label.drop(pos_val.index).drop(neg_val.index)
train_df.shape

In [None]:
val_df = pd.concat([pos_val, neg_val])
val_df.shape

In [None]:
train_df['has_cactus'].value_counts()

In [None]:
pos_count, neg_count = train_df['has_cactus'].value_counts().array

print(f'Pos:Neg ratio = {pos_count / neg_count:.2f}', )

- About 4x as many positives as negitives
- Can increase the weight of a negative by 4
- Could downsample but this may reduce the training data too much

In [None]:
def resize_images(src: Path, new_size: tuple[int, int]):
    for img_path in src.iterdir():
        with Image.open(img_path, 'r') as img:
            resized = img.resize(new_size)
        resized.save(img_path)

In [None]:
def move_images_to_folder(src: Path, dst:Path, df: pd.DataFrame) -> None:
    has_cactus = dst.joinpath('has_cactus')
    no_cactus = dst.joinpath('no_cactus')

    ensure_folders_exist([has_cactus, no_cactus])

    for i, row in df.iterrows():
        if i in df.index:
            img_path = src.joinpath(row['id'])
            move_to_class_folder(img_path, [has_cactus, no_cactus], row)


def ensure_folders_exist(paths: list[Path]) -> None:
    for path in paths:
        if not path.parent.exists():
            path.parent.mkdir()
        if not path.exists():
            path.mkdir()


def move_to_class_folder(img_path: Path, class_dirs: list[Path], row: pd.Series) -> None:
    if row['has_cactus'] == 1:
        img_path.rename(class_dirs[0].joinpath(row['id']))
    else:
        img_path.rename(class_dirs[1].joinpath(row['id']))

In [None]:
TRAIN_ZIP = Path('data\\train.zip')
TRAIN_DIR  = Path('data\\train')
VAL_DIR = Path('data\\validation')
IMG_SIZE = (96, 96)

if UNZIP_IMAGES:
    with zipfile.ZipFile(TRAIN_ZIP, 'r') as my_zip:
        my_zip.extractall(TRAIN_ZIP.parent)

if RESIZE_IMAGES:
    resize_images(TRAIN_DIR, IMG_SIZE) # images come in `train.zip` so are extracted to `train`

if MOVE_IMAGES:
    move_images_to_folder(TRAIN_DIR, TRAIN_DIR, train_df)
    move_images_to_folder(TRAIN_DIR, VAL_DIR, val_df)

In [None]:
def count_images(src: Path) -> int:
    count = 0
    for path in src.iterdir():
        if path.is_dir():
            count += count_images(path)
        elif path.is_file():
            count += 1
    return count

print(count_images(TRAIN_DIR))
print(count_images(VAL_DIR))

In [None]:
# Check the correct amount of images in each path
assert count_images(VAL_DIR.joinpath('has_cactus')) == val_df['has_cactus'].sum()
assert count_images(VAL_DIR.joinpath('no_cactus')) == val_df['no_cactus'].sum()
assert count_images(TRAIN_DIR.joinpath('has_cactus')) == train_df['has_cactus'].sum()
assert count_images(TRAIN_DIR.joinpath('no_cactus')) == train_df['no_cactus'].sum()

## Preprocess data

In [None]:
BATCH_SIZE = 32

train_dataset = keras.utils.image_dataset_from_directory(TRAIN_DIR,
                                                        shuffle=True,
                                                        batch_size=BATCH_SIZE,
                                                        image_size=IMG_SIZE)

val_dataset = keras.utils.image_dataset_from_directory(VAL_DIR,
                                                       shuffle=True,
                                                       batch_size=BATCH_SIZE,
                                                       image_size=IMG_SIZE)

In [None]:
class_names = train_dataset.class_names
print(class_names)

In [None]:
plt.figure(figsize=(10, 10))
for images, labels in train_dataset.take(1):
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(images[i].numpy().astype("uint8"))
    plt.title(class_names[labels[i]])
    plt.axis("off")


In [None]:
AUTOTUNE = tf.data.AUTOTUNE

train_dataset = train_dataset.prefetch(buffer_size=AUTOTUNE)
val_dataset = val_dataset.prefetch(buffer_size=AUTOTUNE)
# test_dataset = test_dataset.prefetch(buffer_size=AUTOTUNE)



In [None]:
data_augmentation = keras.Sequential([
    keras.layers.RandomFlip('horizontal'),
    keras.layers.RandomRotation(0.2),
])

In [None]:
for image, _ in train_dataset.take(1):
  plt.figure(figsize=(10, 10))
  first_image = image[0]
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    augmented_image = data_augmentation(tf.expand_dims(first_image, 0))
    plt.imshow(augmented_image[0] / 255)
    plt.axis('off')


In [None]:
# changes the range of pixes values to [-1, 1] for MobileNet
preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input

In [None]:
IMG_SHAPE = IMG_SIZE + (3,)
base_model = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE,
                                               include_top=False,
                                               weights='imagenet')

In [None]:
image_batch, label_batch = next(iter(train_dataset))
feature_batch = base_model(image_batch)
print(feature_batch.shape)

In [None]:
base_model.trainable = False

In [None]:
base_model.summary()