In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
from PIL import Image
import matplotlib.image as mpimg

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image as image_utils

from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense

In [None]:
original_image_dir = './Jaws_labeled_images'
train_image_dir = '/blue/bsc4892/adrian.l/Jaws_augmented_images'
valid_image_dir = '/blue/bsc4892/adrian.l/Jaws/Jaws_valiation_images'

In [None]:
# Input image dimensions, etc.
resized_height = 224
resized_width = 224
num_channel = 3 
num_classes = 18
batch_size = 32

In [None]:
# load train images while making everything into 1 batch
#import os

# data_dir = './Jaws_labeled_images/*'
# num_images = len(os.listdir(data_dir))

train_images = image_dataset_from_directory(
    train_image_dir, labels='inferred', label_mode='categorical',
     color_mode='rgb', image_size=(resized_height ,
    resized_width ), batch_size=batch_size, shuffle=True, seed=42,
    interpolation='bilinear', follow_links=False,
    crop_to_aspect_ratio=False)
print(train_images)

In [None]:
valid_images = image_dataset_from_directory(
    valid_image_dir, labels='inferred', label_mode='categorical',
     color_mode='rgb', image_size=(resized_height ,
    resized_width ), batch_size=32, shuffle=True, seed=42,
    interpolation='bilinear', follow_links=False,
    crop_to_aspect_ratio=False)
print(valid_images)

In [None]:
# Note that the images have been loaded with 3 color channels!
class_names = train_images.class_names
print(class_names)
plt.figure(figsize=(10, 10))
for images, labels in train_images.take(1):
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(images[i].numpy().astype("uint8"))
    #plt.title(class_names[labels[i]])
    plt.axis("off")
    print(images[i].shape)

In [None]:
# Initialize empty lists to store images and labels
images = []
labels = []

# Iterate over the dataset to extract images and labels
for image_batch, label_batch in train_images:
    images.append(image_batch.numpy())
    labels.append(label_batch.numpy())

# Concatenate the lists of images and labels
image_array = np.concatenate(images, axis=0)
label_array = np.concatenate(labels, axis=0)

print("Images shape:", image_array.shape)  # Output: (88, 224, 224, 3)
print("Labels shape:", label_array.shape)  # Output: (88, 18)

In [None]:
from keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest',
)


In [None]:
# from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img

# def augment_images(image, label):
#     image = datagen.random_transform(image)
#     return image, label
# input_shape = (224, 224, 3)

# augmented_dataset = train_images.map(augment_images)

In [None]:
# # test script with one image
# img = load_img('./Jaws_labeled_images/Carcharhinidae/IMG_0180_Large.png')
# x = img_to_array(img)
# x = x.reshape((1,) + x.shape)
# print(x.shape)

In [None]:
augmented_images = datagen.flow(image_array)

for i in range(10):
    augmented_image = next(augmented_images)[0]  # Retrieve the augmented image from the generator
    plt.imshow(augmented_image.astype('uint8')) # Plot the augmented image
    plt.show()

In [None]:
# # # Reshape the image to (1, height, width, channels) for the datagen.flow() method
# # image = np.expand_dims(x, axis=0)

# # Generate augmented images indefinitely
# augmented_images = datagen.flow(image_array, label_array)

# x, y = next(augmented_images)
# print(x.shape, y.shape)
# # fig, ax = plt.subplots(nrows=4, ncols=8)
# for i in range(batch_size):
#     image = x[i]
# #     ax.flatten()[i].imshow(np.squeeze(image))
#     plt.imshow(image.astype('uint8'))
#     plt.show()



In [None]:
augmented_images = datagen.flow(image_array, label_array)

x, y = next(augmented_images)

num_rows = 4
num_cols = 8

fig, axes = plt.subplots(num_rows, num_cols, figsize=(10, 10))

for i in range(batch_size):
    row = i // num_cols
    col = i % num_cols
    augmented_image = x[i]
    axes[row, col].imshow(augmented_image.astype('uint8'))  # Plot the augmented image
    axes[row, col].axis('off')  # Turn off axis labels

plt.tight_layout()
plt.show()

In [None]:
# Initialize arrays to store generated images and labels
generated_images = []
generated_labels = []


num_batches = 100

for _ in range(num_batches):
    x_batch, y_batch = next(augmented_images)
    generated_images.extend(x_batch)
    generated_labels.extend(y_batch)

# Convert lists to numpy arrays
generated_images = np.array(generated_images)
generated_labels = np.array(generated_labels)

# Verify the shape of generated data
print("Shape of generated images:", generated_images.shape)
print("Shape of generated labels:", generated_labels.shape)

In [None]:
for i in range(5):
    plt.imshow(generated_images[i].astype('uint8')) # Plot the augmented image
    plt.show()

## CNN on original data

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout


model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)),
    MaxPooling2D(2, 2),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax') 
])

In [None]:
from tensorflow.keras.optimizers import Adam

model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image_dataset_from_directory

# Load the training dataset 
train_images = image_dataset_from_directory(
    original_image_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    image_size=(224, 224),
    batch_size=batch_size,  
    shuffle=True,
    seed=42,
    validation_split=0.2,
    subset='training',
    interpolation='bilinear'
)

# Load validation dataset
val_ds = image_dataset_from_directory(
    original_image_dir,
    validation_split=0.2,  
    subset="validation",
    seed=42,  
    image_size=(224, 224),
    batch_size=batch_size,
    label_mode='categorical',  
    shuffle=True  
)

# Ensure the train_images dataset is suitable for model training
train_images = train_images.prefetch(buffer_size=32)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping


early_stopper = EarlyStopping(
    monitor='val_loss',  # Monitor the validation loss
    patience=5,          # Number of epochs with no improvement after which training will be stopped
    restore_best_weights=True  # Restores model weights from the epoch with the best value of the monitored quantity
)

# Train the model
history = model.fit(
    train_images,  
    epochs=10,  
    validation_data=val_ds,
    callbacks=[early_stopper]
)

In [None]:
val_loss, val_accuracy = model.evaluate(val_ds)
print("Validation loss:", val_loss)
print("Validation accuracy:", val_accuracy)

In [None]:
import matplotlib.pyplot as plt

acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(acc) + 1)

# Accuracy plot
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and Validation Accuracy')
plt.legend()

# Loss plot
plt.subplot(1, 2, 2)
plt.plot(epochs, loss, 'ro', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and Validation Loss')
plt.legend()

plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

val_predictions = model.predict(val_ds)
val_predictions = np.argmax(val_predictions, axis=1)  # Convert probabilities to class indices

# Convert one-hot encoded labels to class indices
true_labels = np.concatenate([y.numpy() for _, y in val_ds])
true_labels = np.argmax(true_labels, axis=1)  # Convert from one-hot to class indices

# Compute the confusion matrix
conf_mat = confusion_matrix(true_labels, val_predictions)

# Plot the confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()

In [None]:
from helpers_plot_history import plot_history
plot_history(history)

## CNN on augmented images 

In [None]:
# Using the same model but augmented images

In [None]:
# Load the training dataset 
train_images = image_dataset_from_directory(
    train_image_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    image_size=(224, 224),
    batch_size=batch_size,  
    shuffle=True,
    seed=42,
    validation_split=0.2,
    subset='training',
    interpolation='bilinear'
)

# Load validation dataset
val_ds = image_dataset_from_directory(
    valid_image_dir,
    validation_split=0.2,  
    subset="validation",
    seed=42,  
    image_size=(224, 224),
    batch_size=batch_size,
    label_mode='categorical',  
    shuffle=True  
)

# Ensure the train_images dataset is suitable for model training
train_images = train_images.prefetch(buffer_size=32)

In [None]:
# Train the model
history = model.fit(
    filtered_train_images,  
    epochs=10,  
    validation_data=val_ds,
    callbacks=[early_stopper]
)

In [None]:
val_loss, val_accuracy = model.evaluate(val_ds)
print("Validation loss:", val_loss)
print("Validation accuracy:", val_accuracy)

In [None]:
plot_history(history)

In [None]:
val_predictions = model.predict(val_ds)
val_predictions = np.argmax(val_predictions, axis=1)  # Convert probabilities to class indices

# Convert one-hot encoded labels to class indices
true_labels = np.concatenate([y.numpy() for _, y in val_ds])
true_labels = np.argmax(true_labels, axis=1)  # Convert from one-hot to class indices

# Compute the confusion matrix
conf_mat = confusion_matrix(true_labels, val_predictions)

# Plot the confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()

## Transfer Learning

### MobileNetV2

In [None]:
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import GlobalAveragePooling2D


In [None]:
# Load the base MobileNetV2 model with weights pre-trained on ImageNet
base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Freeze the layers of the base model
base_model.trainable = False

In [None]:
# Create a new model on top
model = Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')  

In [None]:
model.compile(optimizer=Adam(learning_rate=0.0001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
# Load and preprocess the data
train_images = image_dataset_from_directory(
    train_images,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    image_size=(224, 224),
    batch_size=batch_size,
    shuffle=True,
    seed=42,
    validation_split=0.2,
    subset='training',
    interpolation='bilinear'
)

val_ds = image_dataset_from_directory(
    valid_images,
    validation_split=0.2,
    subset="validation",
    seed=42,
    image_size=(224, 224),
    batch_size=batch_size,
    label_mode='categorical',
    shuffle=True
)

# Prefetch data
train_images = train_images.prefetch(buffer_size=32)

In [None]:
# Early stopping callback
early_stopper = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

# Train the model
history = model.fit(
    train_images,
    epochs=10,
    validation_data=val_ds,
    callbacks=[early_stopper]
)

In [None]:
# Evaluate the model
val_loss, val_accuracy = model.evaluate(val_ds)
print("Validation loss:", val_loss)
print("Validation accuracy:", val_accuracy)

## ResNet50

In [None]:
from tensorflow.keras.applications import ResNet50


In [None]:
# Load the base ResNet50 model with weights pre-trained on ImageNet
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Freeze the layers of the base model
base_model.trainable = False

In [None]:
# Create a new model on top
model = Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')  
])

In [None]:
model.compile(optimizer=Adam(learning_rate=0.0001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
# Load and preprocess the data
train_images = image_dataset_from_directory(
    train_images,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    image_size=(224, 224),
    batch_size=batch_size,
    shuffle=True,
    seed=42,
    validation_split=0.2,
    subset='training',
    interpolation='bilinear'
)

val_ds = image_dataset_from_directory(
    valid_images,
    validation_split=0.2,
    subset="validation",
    seed=42,
    image_size=(224, 224),
    batch_size=batch_size,
    label_mode='categorical',
    shuffle=True
)

# Prefetch data
train_images = train_images.prefetch(buffer_size=32)

In [None]:
# Train the model
history = model.fit(
    train_images,
    epochs=20,
    validation_data=val_ds,
    callbacks=[early_stopper]
)

In [None]:
val_loss, val_accuracy = model.evaluate(val_ds)
print("Validation loss:", val_loss)
print("Validation accuracy:", val_accuracy)

## VGG16

In [None]:
from tensorflow.keras.applications import VGG16


In [None]:
base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Freeze the layers of the base model
base_model.trainable = False

In [None]:
model = Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')  # Replace num_classes with your actual number of classes
])

In [None]:
model.compile(optimizer=Adam(learning_rate=0.0001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
# Load and preprocess the data
train_images = image_dataset_from_directory(
    train_images,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    image_size=(224, 224),
    batch_size=batch_size,
    shuffle=True,
    seed=42,
    validation_split=0.2,
    subset='training',
    interpolation='bilinear'
)

val_ds = image_dataset_from_directory(
    valid_images,
    validation_split=0.2,
    subset="validation",
    seed=42,
    image_size=(224, 224),
    batch_size=batch_size,
    label_mode='categorical',
    shuffle=True
)

# Prefetch data
train_images = train_images.prefetch(buffer_size=32)

In [None]:
history = model.fit(
    train_images,
    epochs=20,
    validation_data=val_ds,
    callbacks=[early_stopper]
)

In [None]:
val_loss, val_accuracy = model.evaluate(val_ds)
print("Validation loss:", val_loss)
print("Validation accuracy:", val_accuracy)

## Unsupervised learning

In [None]:
from tensorflow.keras.layers import UpSampling2D, Input, Conv2DTranspose
from tensorflow.keras.models import Model


In [None]:
# Define the encoder part of the autoencoder
input_img = Input(shape=(224, 224, 3))  # Adapt this if using `channels_first` image data format

In [None]:
x = Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
encoded = MaxPooling2D((2, 2), padding='same')(x)

# Define the decoder part of the autoencoder
x = Conv2DTranspose(128, (3, 3), strides=2, activation='relu', padding='same')(encoded)
x = Conv2DTranspose(64, (3, 3), strides=2, activation='relu', padding='same')(x)
x = Conv2DTranspose(32, (3, 3), strides=2, activation='relu', padding='same')(x)
decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

# Autoencoder model
autoencoder = Model(input_img, decoded)
autoencoder.compile(optimizer='adam', loss='binary_crossentropy')

In [None]:
# Load your data
train_images = image_dataset_from_directory(
    original_image_dir,
    label_mode=None,  # No labels needed as this is unsupervised
    color_mode='rgb',
    image_size=(224, 224),
    batch_size=32,
    shuffle=True
)

In [None]:
# Normalize images to [0, 1] to match the sigmoid activation in the decoder output
normalized_images = train_images.map(lambda x: (x / 255.0, x / 255.0))  # Set x as both input and target
normalized_images = normalized_images.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# Train the autoencoder
history = autoencoder.fit(
    normalized_images,
    epochs=50,
    batch_size=batch_size,
    callbacks=[tf.keras.callbacks.EarlyStopping(monitor='loss', patience=5)],
)

In [None]:
val_loss = autoencoder.evaluate(normalized_images)
print("Validation Loss:", val_loss)

## Toy model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Dense,
    Conv2D,
    MaxPool2D,
    Flatten,
    Dropout,
    BatchNormalization,
)

model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)),
    MaxPool2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPool2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')
])


# CNNs are more concise and have fewer parameteres than dense layers

In [None]:
# Fit the generator on the training data
datagen.fit(image_array)

# Compile the new model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
from tensorflow.keras.callbacks import EarlyStopping


early_stopper = EarlyStopping(
    monitor='val_loss',  # Monitor the validation loss
    patience=5,          # Number of epochs with no improvement after which training will be stopped
    restore_best_weights=True  # Restores model weights from the epoch with the best value of the monitored quantity
)

# Train the model
history = model.fit(
    train_images,  
    epochs=10,  
    validation_data=valid_images,
    verbose=1,
    callbacks=[early_stopper]
)

In [None]:
from helpers_plot_history import plot_history
plot_history(history)

In [None]:
history= model.fit(augmented_images,
          epochs=20,
#          steps_per_epoch=4,
          validation_data=valid_images)

# Below are test scripts

In [None]:


import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define an ImageDataGenerator with augmentation parameters
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Load a sample image
image = np.random.random((224, 224, 3))

# Reshape the image to (1, height, width, channels) for the datagen.flow() method
image = np.expand_dims(image, axis=0)

# Generate augmented images indefinitely
augmented_images = datagen.flow(image)

# Specify how many augmented images you want to generate
num_images_to_generate = 5

# Generate and plot the specified number of augmented images
for i in range(num_images_to_generate):
    augmented_image = next(augmented_images)[0]  # Retrieve the augmented image from the generator
    plt.imshow(augmented_image.astype('uint8'))  # Plot the augmented image
    plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define an ImageDataGenerator with augmentation parameters
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Load a sample image
image = np.random.random((224, 224, 3))

# Reshape the image to (1, height, width, channels) for the datagen.flow() method
image = np.expand_dims(image, axis=0)

# Generate augmented images indefinitely
augmented_images = datagen.flow(image)

# Generate and plot 32 augmented images in an 8x4 grid plot
num_images_to_generate = 32
num_rows = 8
num_cols = 4

fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 30))

for i in range(num_images_to_generate):
    row = i // num_cols
    col = i % num_cols
    augmented_image = next(augmented_images)[0]  # Retrieve the augmented image from the generator
    axes[row, col].imshow(augmented_image.astype('uint8'))  # Plot the augmented image
    axes[row, col].axis('off')  # Turn off axis labels

plt.tight_layout()
plt.show()
