# CNNs and Computer Vision Notes

## Working with Images

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf

# Set the seed for NumPy
np.random.seed(42)

# Set the seed for TensorFlow
tf.random.set_seed(42)



In [None]:
# Import the mnist data from Keras
from tensorflow.keras.datasets import mnist
# Get data - it is already split into training and testing sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()



In [None]:
# Check the dataype of X_train
type(X_train)


In [None]:
# Check the shape of X_train
X_train.shape



In [None]:
# Previewing the type and shape of y_train
print(type(y_train))
y_train.shape



In [None]:
# Check first five values for y
y_train[0:5]



In [None]:
# Check Value Counts
pd.Series(y_train).value_counts()



In [None]:
# Checking the shape of a single image
img = X_train[0]
img.shape



In [None]:
# Viewing the values stored in the array for a single image
img



In [None]:
# Range of values for light intensity
np.min(img), np.max(img)



In [None]:
# Plot the data as an image
plt.imshow(img);



In [None]:
# Plot the data as an image
plt.imshow(img, cmap="gray")
plt.axis("off");



In [None]:
# To filter for rows that are 1's:
idx_1s = y_train == 1
idx_1s.sum()



In [None]:
# Proving we've isolated the 1's
y_train[idx_1s]



In [None]:
# Viewing the image data for 1's
X_train[idx_1s]



In [None]:
# Visualizing the first "1"
plt.imshow(X_train[idx_1s][0], cmap="gray")
plt.axis("off");



In [None]:
# Getting the list of unique classes
classes = np.unique(y_train)
classes



In [None]:
fig, axes = plt.subplots(ncols=10, figsize=(8, 1.5))  

for i, digit in enumerate(classes):
    # Get the axes
    ax = axes[i]
    # Filter for the current digit
    idx_digit = y_train == digit
    # Plot the first example digit
    ax.imshow(X_train[idx_digit][0], cmap="gray")
    ax.axis("off")
fig.tight_layout()



In [None]:
# Plotting 10 examples of each digit
n_example_rows = 10
fig, axes = plt.subplots(ncols=10, nrows=n_example_rows, figsize=(8, 8))
# axes = axes.flatten()x
for row in range(n_example_rows):
    row_axes = axes[row]
    for i, digit in enumerate(classes):
        # Get the axes
        ax = row_axes[i]
        # Filter for the current digit
        idx_digit = y_train == digit
        # Plot the first example digit
        ax.imshow(X_train[idx_digit][row], cmap="gray")
        ax.axis("off")
fig.tight_layout()



In [None]:
# New imports
from tensorflow.keras.preprocessing.image import (array_to_img, img_to_array, load_img, save_img)



In [None]:
# Try to view image with tensorflow
array_to_img(X_train[0])



In [None]:
# Viewing shape before reshaping
img.shape



In [None]:
# Saving the new dimensions of the new shape
new_shape = (*img.shape, 1)
new_shape



In [None]:
# Reshaping the image
reshaped_img = img.reshape(new_shape)
reshaped_img.shape



In [None]:
# Show image
array_to_img(reshaped_img)



In [None]:
# Adding extra final dimension  with tf.newaxis
X_train_reshaped_alt = X_train[...,tf.newaxis]
X_train_reshaped_alt.shape


In [None]:
# Showing first image from reshaped data
img_reshaped_alt = X_train_reshaped_alt[0]
array_to_img(img_reshaped_alt)



In [None]:
import requests
url = "https://upload.wikimedia.org/wikipedia/commons/d/d7/RGB_24bits_palette_sample_image.jpg"
resp = requests.get(url)
fname = "example-color-image.png"
with open(fname,'wb') as f:
    f.write(resp.content)



In [None]:
# Loading the file as a PIL Image using tensorflow's load_img
color_img = load_img(fname)
color_img



In [None]:
# Converting the color image to an array
color_img_data= img_to_array(color_img)
color_img_data.shape



In [None]:
# Viewing the pixel values for the first channel
channel0 = color_img_data[:,:,0]
channel0.shape



In [None]:
# Plot the red channel
fig, axes = plt.subplots()
axes.imshow(color_img_data[:,:,0], cmap='gray')
axes.set_title("Channel: Red");
axes.axis('off');



In [None]:
# loop through all 3 colors
fig, axes = plt.subplots(ncols=3, figsize=(10,4))
channels = ['red','green','blue']
for i, channel in enumerate(channels):
    axes[i].imshow(color_img_data[:,:,i], cmap='gray')
    axes[i].set_title(f"Channel: {channel}");
    axes[i].axis('off')



## CNNS in Python

In [None]:
# Imports
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Conv2D, Dense, Flatten, MaxPooling2D
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical



In [None]:
# Custom function for plotting each metric
def plot_history(history, figsize=(6,12), marker='o'):
       
    # Get list of metrics from history
    metrics = [c for c in history.history if not c.startswith('val_')]
    
    ## Separate row for each metric
    fig, axes = plt.subplots(nrows=len(metrics),figsize=figsize)
    
    # For each metric
    for i, metric_name in enumerate(metrics):
    
        # Get the axis for the current metric
        ax = axes[i]
    
        # Get metric from history.history
        metric_values = history.history[metric_name]
        # Get epochs from history
        epochs = history.epoch
    
        # Plot the training metric
        ax.plot(epochs, metric_values, label=metric_name, marker=marker)
    
        ## Check if val_{metric} exists. if so, plot:
        val_metric_name = f"val_{metric_name}"
        if val_metric_name in history.history:
            # Get validation values and plot
            metric_values = history.history[val_metric_name]
            ax.plot(epochs,metric_values,label=val_metric_name, marker=marker)
    
        # Final subplot adjustments 
        ax.legend()
        ax.set_title(metric_name)
    fig.tight_layout()
    return fig, axes


In [None]:
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

def classification_metrics(y_true, y_pred, label='',
                           output_dict=False, figsize=(8,4),
                           normalize='true', cmap='Blues',
                           colorbar=False):
    
    # Get the classification report
    report = classification_report(y_true, y_pred)
    ## Print header and report
    header = "-"*70
    print(header, f" Classification Metrics: {label}", header, sep='\n')
    print(report)
    
    ## CONFUSION MATRICES SUBPLOTS
    fig, axes = plt.subplots(ncols=2, figsize=figsize)
    
    # create a confusion matrix  of raw counts
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred,
                normalize=None, cmap='gist_gray', values_format="d", colorbar=colorbar,
                ax = axes[0],);
    axes[0].set_title("Raw Counts")
    
    # create a confusion matrix with the test data
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred,
                normalize=normalize, cmap=cmap, values_format=".2f", colorbar=colorbar,
                ax = axes[1]);
    axes[1].set_title("Normalized Confusion Matrix")
    
    # Adjust layout and show figure
    fig.tight_layout()
    plt.show()
    
    # Return dictionary of classification_report
    if output_dict==True:
        report_dict = classification_report(y_true, y_pred, output_dict=True)
        return report_dict



In [None]:
# Import the mnist data from Keras
from tensorflow.keras.datasets import mnist
# Get data - it is already split into training and testing sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()



In [None]:
# Check shape
X_train.shape



In [None]:
# Check shape
X_train.shape



In [None]:
# Reshape data
X_train = X_train[..., tf.newaxis]
X_test = X_test[..., tf.newaxis]



In [None]:
# Confirming new shape
X_train.shape



In [None]:
# Viewing the first label in original y_train
y_train[0]


In [None]:
# Create One-Hot-Encoded target for Tensorflow
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
y_train[0]



In [None]:
# Step 1: Define our network structure
# Save the input shape (skip the number of images)
input_shape = X_train.shape[1:]
input_shape



In [None]:
# Sequential model
model = Sequential()



In [None]:
from tensorflow.keras import layers
# Using rescaling layer to scale pixel values
scaling_layer = layers.Rescaling(1./255, input_shape=input_shape)
model.add(scaling_layer)



In [None]:
# Convolutional layer
model.add(
    Conv2D(
        filters=8,  # How many filters you want to use
        kernel_size=3,  # size of each filter
        input_shape=input_shape,  # What is the shape of your input features (we defined this above)
    )
) 
# Pooling layer
model.add(MaxPooling2D(pool_size=2))  # Size of pooling



In [None]:
# Flattening layer
model.add(Flatten())
# Output layer
model.add(
    Dense(10, activation="softmax")  # How many output possibilities we have
)  # What activation function are you using?



In [None]:
# Check model summary
model.summary()



In [None]:
# Step 2: Compile
model.compile(loss="categorical_crossentropy", optimizer="adam", 
              metrics=["accuracy"])



In [None]:
# Step 3: Fit our model
history = model.fit(X_train, y_train, 
                    validation_split=.2,
                    epochs=10)



In [None]:
# Call custom function to plot metrics
plot_history(history);



In [None]:
# Evaluate test data with model
model.evaluate(X_test, y_test, return_dict=True)



In [None]:
# Get predictions for sklearn metrics
y_test_pred = model.predict(X_test)
y_test_pred[0]



In [None]:
# Sum all the values for the first prediction
sum(y_test_pred[0])



In [None]:
# Find the max value
max(y_test_pred[0])



In [None]:
# Find index at the max value
y_test_pred[0].argmax()



In [None]:
# Getting Sklearn Metrics
classification_metrics(y_test.argmax(axis=1), y_test_pred.argmax(axis=1),
                          label='Test Data',
                         figsize=(10,8))



# TensorFlow Dataset Objects

In [None]:
import numpy as np
import tensorflow as tf
# Set the seed for NumPy
np.random.seed(42)
# Set the seed for TensorFlow
tf.random.set_seed(42)



In [1]:
import os, glob
import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow as tf
from tensorflow.keras.utils import load_img, img_to_array, array_to_img
from tensorflow.keras import layers, models
from tensorflow.keras.utils import to_categorical



# Note: Have to find a way to unzip folder into notebook

In [4]:
# Checking the contents of xrays folder
data_dir = "Data/xrays/"
data_dir



'Data/xrays/'

In [5]:
# Gettting the list of folders from data dir
subfolders = os.listdir(data_dir)
subfolders



[]

In [11]:
unzip "~/Data/xray_archive.zip"

SyntaxError: invalid syntax (2034360810.py, line 1)

In [6]:
# Getting list of img file paths (no folders)
img_files = glob.glob(data_dir+"**/*")
len(img_files)



0

In [None]:
# Take a look at the first 5 filepaths
img_files[0:5]



In [None]:
# Preview an example image (at full size)
img_loaded = load_img(img_files[0])
img_data = img_to_array(img_loaded)
img_data.shape



In [None]:
# Data can be converted back to image
array_to_img(img_data)



In [None]:
# Saving image params as vars for reuse
batch_size = 32
img_height = 96
img_width = 96



In [None]:
# make the dataset from the main folder of images
ds = tf.keras.utils.image_dataset_from_directory(
  data_dir,
    shuffle=True,
    label_mode='categorical',
  seed=123,
  image_size=(img_height, img_width),
  batch_size=batch_size)
ds



In [None]:
# Determine number of batches in dataset
ds_size = len(ds)
ds_size



In [None]:
# taking a sample batch to see batch shape
example_batch_imgs,example_batch_y= ds.take(1).get_single_element()
example_batch_imgs.shape



In [None]:
# Preview y for first 5 of first batch
example_batch_y[0:5]



In [None]:
# checking the class names
class_names = ds.class_names
class_names



In [None]:
# Saving # of classes for reuse
num_classes = len(class_names)
num_classes



In [None]:
# Saving dictionary of integer:string labels
class_dict = dict(zip(range(num_classes), class_names))
class_dict



In [None]:
# Individual image shape
input_shape = example_batch_imgs[0].shape
input_shape



In [None]:
# Demo Unpacking shape
input_shape = [*input_shape]
input_shape



In [None]:
# Set the ratio of the train, validation, test split
split_train = 0.7
split_val = 0.2
split_test = .1 
# Calculate the number of batches for training and validation data 
n_train_batches =  int(ds_size * split_train)
n_val_batches = int(ds_size * split_val)
print(f"Use {n_train_batches} batches as training data")
print(f"Use {n_val_batches} batches as validation data")
print(f"The remaining {len(ds)- (n_train_batches+n_val_batches)} batches will be used as test data.")



In [None]:
# Use .take to slice out the number of batches 
train_ds = ds.take(n_train_batches)
# Confirm the length of the training set
len(train_ds)



In [None]:
# Skipover the training batches
val_ds = ds.skip(n_train_batches)
# Take the correct number of validation batches
val_ds = val_ds.take(n_val_batches)
# Confirm the length of the validation set
len(val_ds)



In [None]:
# Skip over all of the training + validation batches
test_ds = ds.skip(n_train_batches + n_val_batches)
# Confirm the length of the testing data
len(test_ds)



In [None]:
# The original (non-take/non-skip) dataset contains the class_names
class_names  = ds.class_names
class_names



In [None]:
# Write the building and compiling steps within a function
def build_model():
    # Instantatie model
    model = models.Sequential()
    # Scaling layer
    scaling_layer = layers.Rescaling(1./255, input_shape=input_shape)
    model.add(scaling_layer)
    
    # Convolutional layer
    model.add(
        layers.Conv2D(
            filters=8,  # How many filters you want to use
            kernel_size=3,  # size of each filter
            input_shape=input_shape,
            padding='same'
        )) 
    # Pooling layer
    model.add(layers.MaxPooling2D(pool_size=2))  # Size of pooling
    # Convolutional layer
    model.add(
        layers.Conv2D(
            filters=8,  # How many filters you want to use
            kernel_size=3,  # size of each filter
            input_shape=input_shape,
            padding='same'
        )) 
    # Pooling layer
    model.add(layers.MaxPooling2D(pool_size=2))  # Size of pooling
    
    # Flattening layer
    model.add(layers.Flatten())
    # Output layer
    model.add(
        layers.Dense(3, activation="softmax")  # How many output possibilities we have
    )  # What activation function are you using?
    
    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    
    model.summary()
    return model



In [None]:
# Build the model
model1 = build_model()



In [None]:
import datetime as dt
# timing
start = dt.datetime.now()
    
# fit the neural network
epochs=5
history = model1.fit(
    train_ds,
    validation_data=val_ds,
    epochs=epochs, 
)
    
end = dt.datetime.now()
dur = end-start
print(f"Training time: {dur}")



In [None]:
# Use autotune to automatically determine best buffer sizes 
AUTOTUNE = tf.data.AUTOTUNE



In [None]:
# Make buffer size the same as the number of batches in train_ds
buffer_size = len(train_ds)
buffer_size



In [None]:
# Optimize training data
train_ds = train_ds.cache().shuffle(buffer_size= buffer_size,
                                   seed=42).prefetch(buffer_size=AUTOTUNE)
# Optimize validation data
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)
# Optimize teset data
test_ds = test_ds.cache().prefetch(buffer_size=AUTOTUNE)



In [None]:
# Call build function to create identical model
model2 = build_model()



In [None]:
# See how long it takes to fit the optimized dataset
# timing
start = dt.datetime.now()
# fit the neural network
epochs=5
history = model2.fit(
  train_ds,
  validation_data=val_ds,
  epochs=epochs, 
)
end = dt.datetime.now()
dur2 = end-start
print(f"Training time: {dur2}")



In [None]:
print(f"The optimized dataset was {dur/dur2:.2f} times faster!")



# Flexible Evaluation Functions

In [None]:
# Iterating through dataset object to separate image data from label data
for images, labels in test_ds.as_numpy_iterator():
    print(f'Image data shape is {images.shape}')
    print(f'Label data shape is {labels.shape}')



In [None]:
# Previewing a label (y_true)
labels[0]



In [None]:
# Obtain predictions from images
y_probs = model2.predict(images, batch_size=1)
y_probs[0]



In [None]:
def get_true_pred_labels(model,ds):
    """Gets the labels and predicted probabilities from a Tensorflow model and Dataset object.
    Adapted from source: https://stackoverflow.com/questions/66386561/keras-classification-report-accuracy-is-different-between-model-predict-accurac
    """
    y_true = []
    y_pred_probs = []
    
    # Loop through the dataset as a numpy iterator
    for images, labels in ds.as_numpy_iterator():
        
        # Get prediction with batch_size=1
        y_probs = model.predict(images, batch_size=1, verbose=0)
        # Combine previous labels/preds with new labels/preds
        y_true.extend(labels)
        y_pred_probs.extend(y_probs)
    ## Convert the lists to arrays
    y_true = np.array(y_true)
    y_pred_probs = np.array(y_pred_probs)
    
    return y_true, y_pred_probs



In [None]:
%%time
# Using the function
y_test, y_pred_test = get_true_pred_labels(model2, test_ds)
print(f'Shape of y_test: {y_test.shape}')
print(f'Example of y_test: {y_test[0]}')
print(f'Shape of y_pred_test {y_pred_test.shape}')
print(f'Example of y_pred_test {y_pred_test[0]}')



In [None]:
def convert_y_to_sklearn_classes(y, verbose=False):
    # If already one-dimension
    if np.ndim(y)==1:
        if verbose:
            print("- y is 1D, using it as-is.")
        return y
        
    # If 2 dimensions with more than 1 column:
    elif y.shape[1]>1:
        if verbose:
            print("- y is 2D with >1 column. Using argmax for metrics.")   
        return np.argmax(y, axis=1)
    
    else:
        if verbose:
            print("y is 2D with 1 column. Using round for metrics.")
        return np.round(y).flatten().astype(int)



In [None]:
# Test function
y_preds_transformed = convert_y_to_sklearn_classes(y_pred_test, verbose = True)
y_preds_transformed[:5]



In [None]:
## PREVIOUS CLASSIFICATION_METRICS FUNCTION FROM INTRO TO ML
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np

def classification_metrics(y_true, y_pred, label='',
                           output_dict=False, figsize=(8,4),
                           normalize='true', cmap='Blues',
                           colorbar=False,values_format=".2f"):
    """Modified version of classification metrics function from Intro to Machine Learning.
    Updates:
    - Reversed raw counts confusion matrix cmap  (so darker==more).
    - Added arg for normalized confusion matrix values_format
    """
    # Get the classification report
    report = classification_report(y_true, y_pred)
    
    ## Print header and report
    header = "-"*70
    print(header, f" Classification Metrics: {label}", header, sep='\n')
    print(report)
    
    ## CONFUSION MATRICES SUBPLOTS
    fig, axes = plt.subplots(ncols=2, figsize=figsize)
    
    # Create a confusion matrix  of raw counts (left subplot)
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred,
                                            normalize=None, 
                                            cmap='gist_gray_r',# Updated cmap
                                            values_format="d", 
                                            colorbar=colorbar,
                                            ax = axes[0]);
    axes[0].set_title("Raw Counts")

    
    # Create a confusion matrix with the data with normalize argument 
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred,
                                            normalize=normalize,
                                            cmap=cmap, 
                                            values_format=values_format, #New arg
                                            colorbar=colorbar,
                                            ax = axes[1]);
    axes[1].set_title("Normalized Confusion Matrix")
    
    # Adjust layout and show figure
    fig.tight_layout()
    plt.show()
    
    # Return dictionary of classification_report
    if output_dict==True:
        report_dict = classification_report(y_true, y_pred, output_dict=True)
        return report_dict



## PLOT_HISTORY FUNCTION FROM WEEK 3
def plot_history(history,figsize=(6,8)):
    # Get a unique list of metrics 
    all_metrics = np.unique([k.replace('val_','') for k in history.history.keys()])

    # Plot each metric
    n_plots = len(all_metrics)
    fig, axes = plt.subplots(nrows=n_plots, figsize=figsize)
    axes = axes.flatten()

    # Loop through metric names add get an index for the axes
    for i, metric in enumerate(all_metrics):

        # Get the epochs and metric values
        epochs = history.epoch
        score = history.history[metric]

        # Plot the training results
        axes[i].plot(epochs, score, label=metric, marker='.')
        # Plot val results (if they exist)
        try:
            val_score = history.history[f"val_{metric}"]
            axes[i].plot(epochs, val_score, label=f"val_{metric}",marker='.')
        except:
            pass

        finally:
            axes[i].legend()
            axes[i].set(title=metric, xlabel="Epoch",ylabel=metric)

    # Adjust subplots and show
    fig.tight_layout()
    plt.show()



In [None]:
def evaluate_classification_network(model, 
                                    X_train=None, y_train=None, 
                                    X_test=None, y_test=None,
                                    history=None, history_figsize=(6,6),
                                    figsize=(6,4), normalize='true',
                                    output_dict = False,
                                    cmap_train='Blues',
                                    cmap_test="Reds",
                                    values_format=".2f", 
                                    colorbar=False):
    """Evaluates a neural network classification task using either
    separate X and y arrays or a tensorflow Dataset
    
    Data Args:
        X_train (array, or Dataset)
        y_train (array, or None if using a Dataset
        X_test (array, or Dataset)
        y_test (array, or None if using a Dataset)
        history (history object)
        """
    # Plot history, if provided
    if history is not None:
        plot_history(history, figsize=history_figsize)
    ## Adding a Print Header
    print("\n"+'='*80)
    print('- Evaluating Network...')
    print('='*80)
    ## TRAINING DATA EVALUATION
    # check if X_train was provided
    if X_train is not None:
        ## Check if X_train is a dataset
        if hasattr(X_train,'map'):
            # If it IS a Datset:
            # extract y_train and y_train_pred with helper function
            y_train, y_train_pred = get_true_pred_labels(model, X_train)
        else:
            # Get predictions for training data
            y_train_pred = model.predict(X_train)
        ## Pass both y-vars through helper compatibility function
        y_train = convert_y_to_sklearn_classes(y_train)
        y_train_pred = convert_y_to_sklearn_classes(y_train_pred)
        
        # Call the helper function to obtain regression metrics for training data
        results_train = classification_metrics(y_train, y_train_pred, 
                                         output_dict=True, figsize=figsize,
                                             colorbar=colorbar, cmap=cmap_train,
                                               values_format=values_format,
                                         label='Training Data')
        
        ## Run model.evaluate         
        print("\n- Evaluating Training Data:")
        print(model.evaluate(X_train, return_dict=True))
    
    # If no X_train, then save empty list for results_train
    else:
        results_train = []
    ## TEST DATA EVALUATION
    # check if X_test was provided
    if X_test is not None:
        ## Check if X_train is a dataset
        if hasattr(X_test,'map'):
            # If it IS a Datset:
            # extract y_train and y_train_pred with helper function
            y_test, y_test_pred = get_true_pred_labels(model, X_test)
        else:
            # Get predictions for training data
            y_test_pred = model.predict(X_test)
        ## Pass both y-vars through helper compatibility function
        y_test = convert_y_to_sklearn_classes(y_test)
        y_test_pred = convert_y_to_sklearn_classes(y_test_pred)
        
        # Call the helper function to obtain regression metrics for training data
        results_test = classification_metrics(y_test, y_test_pred, 
                                         output_dict=True, figsize=figsize,
                                             colorbar=colorbar, cmap=cmap_test,
                                              values_format=values_format,
                                         label='Test Data')
        
        ## Run model.evaluate         
        print("\n- Evaluating Test Data:")
        print(model.evaluate(X_test, return_dict=True))
      
    # If no X_test, then save empty list for results_test
    else:
        results_test = []
      
    # Store results in a dictionary
    results_dict = {'train':results_train,
                    'test': results_test}
    if output_dict == True:
        return results_dict



In [None]:
# Testing with the CNN + Dataset
evaluate_classification_network(model2, X_test=test_ds, history=history);



# Tuning CNNs with Keras Tuner

In [None]:
import numpy as np
import tensorflow as tf
# Set the seed for NumPy
np.random.seed(42)
# Set the seed for TensorFlow
tf.random.set_seed(42)

import os, glob
import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow as tf
from tensorflow.keras.utils import load_img, img_to_array, array_to_img
from tensorflow.keras import layers, models
from tensorflow.keras.utils import to_categorical
import visualkeras as vk

import keras_tuner as kt
from keras_tuner import HyperParameters as hp

import os

folder = 'KerasTuner/'
os.makedirs(folder, exist_ok=True)


In [None]:
def get_true_pred_labels(model,ds):
    """Gets the labels and predicted probabilities from a Tensorflow model and Dataset object.
    Adapted from source: https://stackoverflow.com/questions/66386561/keras-classification-report-accuracy-is-different-between-model-predict-accurac
    """
    y_true = []
    y_pred_probs = []
    
    # Loop through the dataset as a numpy iterator
    for images, labels in ds.as_numpy_iterator():
        
        # Get prediction with batch_size=1
        y_probs = model.predict(images, batch_size=1, verbose=0)
        # Combine previous labels/preds with new labels/preds
        y_true.extend(labels)
        y_pred_probs.extend(y_probs)
    ## Convert the lists to arrays
    y_true = np.array(y_true)
    y_pred_probs = np.array(y_pred_probs)
    
    return y_true, y_pred_probs


In [None]:
def convert_y_to_sklearn_classes(y, verbose=False):
    # If already one-dimension
    if np.ndim(y)==1:
        if verbose:
            print("- y is 1D, using it as-is.")
        return y
        
    # If 2 dimensions with more than 1 column:
    elif y.shape[1]>1:
        if verbose:
            print("- y is 2D with >1 column. Using argmax for metrics.")   
        return np.argmax(y, axis=1)
    
    else:
        if verbose:
            print("y is 2D with 1 column. Using round for metrics.")
        return np.round(y).flatten().astype(int)



In [None]:
## PREVIOUS CLASSIFICATION_METRICS FUNCTION FROM INTRO TO ML
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np

def classification_metrics(y_true, y_pred, label='',
                           output_dict=False, figsize=(8,4),
                           normalize='true', cmap='Blues',
                           colorbar=False,values_format=".2f"):
    """Modified version of classification metrics function from Intro to Machine Learning.
    Updates:
    - Reversed raw counts confusion matrix cmap  (so darker==more).
    - Added arg for normalized confusion matrix values_format
    """
    # Get the classification report
    report = classification_report(y_true, y_pred)
    
    ## Print header and report
    header = "-"*70
    print(header, f" Classification Metrics: {label}", header, sep='\n')
    print(report)
    
    ## CONFUSION MATRICES SUBPLOTS
    fig, axes = plt.subplots(ncols=2, figsize=figsize)
    
    # Create a confusion matrix  of raw counts (left subplot)
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred,
                                            normalize=None, 
                                            cmap='gist_gray_r',# Updated cmap
                                            values_format="d", 
                                            colorbar=colorbar,
                                            ax = axes[0]);
    axes[0].set_title("Raw Counts")

    
    # Create a confusion matrix with the data with normalize argument 
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred,
                                            normalize=normalize,
                                            cmap=cmap, 
                                            values_format=values_format, #New arg
                                            colorbar=colorbar,
                                            ax = axes[1]);
    axes[1].set_title("Normalized Confusion Matrix")
    
    # Adjust layout and show figure
    fig.tight_layout()
    plt.show()
    
    # Return dictionary of classification_report
    if output_dict==True:
        report_dict = classification_report(y_true, y_pred, output_dict=True)
        return report_dict



## PLOT_HISTORY FUNCTION FROM WEEK 3
def plot_history(history,figsize=(6,8)):
    # Get a unique list of metrics 
    all_metrics = np.unique([k.replace('val_','') for k in history.history.keys()])

    # Plot each metric
    n_plots = len(all_metrics)
    fig, axes = plt.subplots(nrows=n_plots, figsize=figsize)
    axes = axes.flatten()

    # Loop through metric names add get an index for the axes
    for i, metric in enumerate(all_metrics):

        # Get the epochs and metric values
        epochs = history.epoch
        score = history.history[metric]

        # Plot the training results
        axes[i].plot(epochs, score, label=metric, marker='.')
        # Plot val results (if they exist)
        try:
            val_score = history.history[f"val_{metric}"]
            axes[i].plot(epochs, val_score, label=f"val_{metric}",marker='.')
        except:
            pass

        finally:
            axes[i].legend()
            axes[i].set(title=metric, xlabel="Epoch",ylabel=metric)

    # Adjust subplots and show
    fig.tight_layout()
    plt.show()



In [None]:
def evaluate_classification_network(model, 
                                    X_train=None, y_train=None, 
                                    X_test=None, y_test=None,
                                    history=None, history_figsize=(6,6),
                                    figsize=(6,4), normalize='true',
                                    output_dict = False,
                                    cmap_train='Blues',
                                    cmap_test="Reds",
                                    values_format=".2f", 
                                    colorbar=False):
    """Evaluates a neural network classification task using either
    separate X and y arrays or a tensorflow Dataset
    
    Data Args:
        X_train (array, or Dataset)
        y_train (array, or None if using a Dataset
        X_test (array, or Dataset)
        y_test (array, or None if using a Dataset)
        history (history object)
        """
    # Plot history, if provided
    if history is not None:
        plot_history(history, figsize=history_figsize)
    ## Adding a Print Header
    print("\n"+'='*80)
    print('- Evaluating Network...')
    print('='*80)
    ## TRAINING DATA EVALUATION
    # check if X_train was provided
    if X_train is not None:
        ## Check if X_train is a dataset
        if hasattr(X_train,'map'):
            # If it IS a Datset:
            # extract y_train and y_train_pred with helper function
            y_train, y_train_pred = get_true_pred_labels(model, X_train)
        else:
            # Get predictions for training data
            y_train_pred = model.predict(X_train)
        ## Pass both y-vars through helper compatibility function
        y_train = convert_y_to_sklearn_classes(y_train)
        y_train_pred = convert_y_to_sklearn_classes(y_train_pred)
        
        # Call the helper function to obtain regression metrics for training data
        results_train = classification_metrics(y_train, y_train_pred, 
                                         output_dict=True, figsize=figsize,
                                             colorbar=colorbar, cmap=cmap_train,
                                               values_format=values_format,
                                         label='Training Data')
        
        ## Run model.evaluate         
        print("\n- Evaluating Training Data:")
        print(model.evaluate(X_train, return_dict=True))
    
    # If no X_train, then save empty list for results_train
    else:
        results_train = []
    ## TEST DATA EVALUATION
    # check if X_test was provided
    if X_test is not None:
        ## Check if X_train is a dataset
        if hasattr(X_test,'map'):
            # If it IS a Datset:
            # extract y_train and y_train_pred with helper function
            y_test, y_test_pred = get_true_pred_labels(model, X_test)
        else:
            # Get predictions for training data
            y_test_pred = model.predict(X_test)
        ## Pass both y-vars through helper compatibility function
        y_test = convert_y_to_sklearn_classes(y_test)
        y_test_pred = convert_y_to_sklearn_classes(y_test_pred)
        
        # Call the helper function to obtain regression metrics for training data
        results_test = classification_metrics(y_test, y_test_pred, 
                                         output_dict=True, figsize=figsize,
                                             colorbar=colorbar, cmap=cmap_test,
                                              values_format=values_format,
                                         label='Test Data')
        
        ## Run model.evaluate         
        print("\n- Evaluating Test Data:")
        print(model.evaluate(X_test, return_dict=True))
      
    # If no X_test, then save empty list for results_test
    else:
        results_test = []
      
    # Store results in a dictionary
    results_dict = {'train':results_train,
                    'test': results_test}
    if output_dict == True:
        return results_dict



In [None]:
# Checking the contents of dataset folder (YOUR PATH MAY BE DIFFERENT!)
data_dir = "Data/CatsvsDogs/dataset/"
data_dir


In [None]:
# Getting the list of folders in the data_dir
os.listdir(data_dir)



In [None]:
# Getting list of all img file paths (ONLY, did not make recursive so no folders were included)
img_files = glob.glob(data_dir+"**/**/*")#, recursive=True)
len(img_files)



In [None]:
# Getting the # of images in the training_set folder
train_folder = data_dir+"training_set/"
print(train_folder)
len(glob.glob(train_folder + "**/*"))



In [None]:
test_folder = data_dir+"test_set/"
print(test_folder)
len(glob.glob(test_folder + "**/*"))



In [None]:
# Saving image params as vars for reuse
batch_size = 32
img_height = 64
img_width = 64



In [None]:
# make the dataset from the main folder of images
train_ds = tf.keras.utils.image_dataset_from_directory(
    train_folder,
    shuffle=True,
    label_mode='categorical',
    seed=123,
    image_size=(img_height, img_width),
    batch_size=batch_size)



In [None]:
# make the dataset from the main folder of images
test_ds, val_ds = tf.keras.utils.image_dataset_from_directory(
    test_folder,
    validation_split=.5,
    subset='both',
    shuffle=True,
    label_mode='categorical',
    seed=123,
    image_size=(img_height, img_width),
    batch_size=batch_size)



In [None]:
# Preview an example image (at full size)
img_loaded = load_img(img_files[0])
display(img_loaded)

#Get shape of image file
img_data = img_to_array(img_loaded)
img_data.shape


In [None]:
# Saving the class names
class_names = train_ds.class_names
class_names



In [None]:
# Saving a dictionary with class labels:names
class_dict = dict(zip(range(len(class_names)), class_names))
class_dict



In [None]:
# taking a sample banch to see batch shape
example_batch_imgs,example_batch_y= train_ds.take(1).get_single_element()
example_batch_imgs.shape



In [None]:
# Checking shape of a single y
display(example_batch_y[0])
print(example_batch_y[0].shape)



In [None]:
# Preview an image from the Dataset
array_to_img(example_batch_imgs[0])



In [None]:
# individual image shape
input_shape = example_batch_imgs[0].shape
input_shape



In [None]:
def build_model1():
    
    model = models.Sequential(name="Model1")
    # Using rescaling layer to scale pixel values
    model.add(layers.Rescaling(1./255, input_shape=input_shape))
    
    # Convolutional layer #1
    model.add( layers.Conv2D(filters=16,  kernel_size=3, padding='same') ) 
    # Pooling layer #1
    model.add( layers.MaxPooling2D(pool_size=2, strides=1))
    
    # Convolutional layer #2
    model.add( layers.Conv2D(filters=16, kernel_size=3, padding='same')) 
    # Pooling layer #2
    model.add(layers.MaxPooling2D(pool_size=2, strides=1))
    
    # Flattening layer
    model.add(layers.Flatten())
    # # Output layer
    model.add(
        layers.Dense(len(class_names), activation="softmax")
    )
    
    model.compile(optimizer='adam',
                  loss=tf.keras.losses.BinaryCrossentropy(),
                  metrics=['accuracy'])
    
    model.summary()
    return model



In [None]:
# Build model and visualize
model1 = build_model1()
vk.layered_view(model1, legend=True)



In [None]:
def get_callbacks(patience=3, monitor='val_accuracy'):
    early_stop = tf.keras.callbacks.EarlyStopping(patience=patience, monitor=monitor)
    return [early_stop]



In [None]:
# Build new model and fit 
model1 = build_model1()
history = model1.fit(
    train_ds, validation_data=val_ds, epochs=25, callbacks=get_callbacks()
)



In [None]:
# Evaluate the model using the evaluation function
evaluate_classification_network(
    model1, X_train=train_ds, X_test=test_ds, history=history);



In [None]:
def build_model_dense():
    model = models.Sequential(name='Model2')
    model.add(layers.Rescaling(1. / 255, input_shape=input_shape))
    # Conv2d/MaxPooling #1
    model.add(layers.Conv2D(16, kernel_size=3, padding='same'))
    model.add(layers.MaxPooling2D(2, strides=1))
    # Conv2d/MaxPooling #2
    model.add(layers.Conv2D(16, kernel_size=3,padding='same'))
    model.add(layers.MaxPooling2D(2, strides=1))
    model.add(layers.Flatten())
    ## NEW Hidden Dense layer
    model.add(layers.Dense(256, activation="relu"))
    model.add(layers.Dense(len(class_names), activation="softmax"))
    model.compile(
        optimizer="adam",
        loss=tf.keras.losses.BinaryCrossentropy(),
        metrics=[
            "accuracy"]
    )
    return model



In [None]:
model2 = build_model_dense()
display(vk.layered_view(model2, legend=True))
model2.summary()
# fit the neural network
history = model2.fit(
    train_ds, validation_data=val_ds, epochs=25, callbacks=get_callbacks()
)



In [None]:
evaluate_classification_network(
    model2, X_train=train_ds,  X_test=test_ds, history=history);



In [None]:
def build_tune_model_deep(hp):
    model = models.Sequential()
    model.add(layers.Rescaling(1.0 / 255, input_shape=input_shape))
    # Setting hp params and saving as var so they can be used at >1 layer
    n_filters = hp.Int('filters_1',min_value=16, max_value=64, step=16)
    pool_strides = hp.Choice('pool_strides',[1,2])
    
    model.add(layers.Conv2D(n_filters,
                            # Test using larger kernel size (in first Conv layer ONLY)
                            kernel_size= hp.Choice('kernel_size_1',[3,5]),
                            padding='same')
             )
    model.add(layers.MaxPooling2D(2, strides=pool_strides))
    for i in range(hp.Int('n_conv_layers',min_value=1, max_value=3)):
        # Double the number of filters vs. previous layer
        n_filters = n_filters * 2
        model.add(layers.Conv2D(n_filters, kernel_size=3, padding='same'))
        model.add(layers.MaxPooling2D(2, strides=pool_strides))
        
        # model.add(layers.Dropout(hp.Float('dropout_rate',min_value=0.1, max_value=0.5, step=.1)))
    # Final layers
    model.add(layers.Flatten())
    model.add(layers.Dense(n_filters, activation="relu"))
    # Test various dropout strengths
    model.add(layers.Dropout(hp.Float('dropout_rate_dense',min_value=0, max_value=0.5,
                                     step=.1)))
    model.add(layers.Dense(len(class_names), activation="softmax"))
    model.compile(
        optimizer="adam",
        loss=tf.keras.losses.BinaryCrossentropy(),
        metrics=[
            "accuracy"],
    )
    return model



In [None]:
# Define the tuner
tuner_hb = kt.Hyperband(build_tune_model_deep, objective='val_accuracy',
                        max_epochs=15, overwrite=True, seed=321,) 
# Preivew search summary
tuner_hb.search_space_summary()



In [None]:
# Start search
tuner_hb.search(train_ds, validation_data=val_ds, epochs=25)
# Obtain summary of results
tuner_hb.results_summary()



In [None]:
# Print the results for beset paramters
print(f"Best Params: \n {tuner_hb.get_best_hyperparameters()[0].values}" )
# Define the best model
best_model = tuner_hb.get_best_models()[0]
# Evalute the best model with the custom evaluation function
evaluate_classification_network(best_model, X_train=train_ds, X_test=test_ds);



# Transfer Learning

In [None]:
import numpy as np
import tensorflow as tf
# Set the seed for NumPy
np.random.seed(42)
# Set the seed for TensorFlow
tf.random.set_seed(42)

import os, glob
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras.utils import load_img, img_to_array, array_to_img
from tensorflow.keras import layers, models
import visualkeras as vk
tf.__version__



In [None]:
# Checking the contents of folder
data_dir = "Data/CatsvsDogs/dataset/"
# Training Folder 
train_folder = data_dir+"training_set/"
# Test folder
test_folder = data_dir+"test_set/"



In [None]:
# Saving image params as vars for reuse
batch_size = 32
img_height = 128
img_width = 128



In [None]:
# make the dataset from the main folder of images
train_ds = tf.keras.utils.image_dataset_from_directory(
    train_folder,
    shuffle=True,
    label_mode='categorical', 
    seed=123,
    image_size=(img_height,img_width),
    batch_size=batch_size)

## Split the test data 50/50 validation/test
test_ds, val_ds = tf.keras.utils.image_dataset_from_directory(
    test_folder,
    validation_split=.5,
    subset='both',
    shuffle=True,
    label_mode='categorical',
    seed=123,
    image_size=(img_width, img_height),
    batch_size=batch_size)



In [None]:
# checking the class names
class_names = train_ds.class_names
class_dict = dict(zip(range(len(class_names)), class_names))
class_dict



In [None]:
# Programmatically saving input_shape from training data
example_batch_imgs, example_batch_labels = train_ds.take(1).get_single_element()
input_shape = example_batch_imgs[0].shape
input_shape



In [None]:
# Use autotune to automatically determine best buffer sizes 
AUTOTUNE = tf.data.AUTOTUNE

# optimize
train_ds = train_ds.cache().shuffle(buffer_size= len(train_ds), seed=42).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)
test_ds = test_ds.cache().prefetch(buffer_size=AUTOTUNE)



In [None]:
# Downloading the full model (for demonstration only)
vgg16_full = tf.keras.applications.VGG16(include_top = True, weights = 'imagenet')
vgg16_full.summary()



In [None]:
# Visualize the model with visualkeras
vk.layered_view(vgg16_full, legend=True)



In [None]:
# Downloading just the convolutional base
vgg16_base = tf.keras.applications.VGG16(include_top = False, weights = 'imagenet', input_shape=input_shape)
vgg16_base.summary()



In [None]:
vk.layered_view(vgg16_base, legend=True)



In [None]:
# Check if base is trainable
vgg16_base.trainable



In [None]:
# Prevent layers from base_model from changing
vgg16_base.trainable = False
vgg16_base.trainable



In [None]:
tf.keras.applications.vgg16.preprocess_input?



In [None]:
# Create a lambda layer for the preprocess input function for the model
lambda_layer_vgg16 = tf.keras.layers.Lambda(tf.keras.applications.vgg16.preprocess_input,                                       name='preprocess_input')
lambda_layer_vgg16



In [None]:
# Downloading just the convolutional base
vgg16_base = tf.keras.applications.VGG16(
    include_top=False, weights="imagenet", input_shape=input_shape
)
# Prevent layers from base_model from changing 
vgg16_base.trainable = False

# Create the preprocessing lamdba layer
# Create a lambda layer for the preprocess input function for the model
lambda_layer_vgg16 = tf.keras.layers.Lambda(
    tf.keras.applications.vgg16.preprocess_input, name="preprocess_input"
)


def build_vgg16_model():
    model = models.Sequential(name="VGG16")
    # Use input layer (lambda layer will handle rescaling).
    model.add(tf.keras.layers.Input(shape=input_shape))

    ## Adding preprocessing lamabda layer
    model.add(lambda_layer_vgg16)

    # Add pretrained base
    model.add(vgg16_base)

    # Flattening layer
    model.add(layers.Flatten())

    ## Adding a Hidden Dense Layer
    model.add(layers.Dense(256, activation="relu"))
    model.add(layers.Dropout(0.5))

    # Output layer
    model.add(layers.Dense(len(class_names), activation="softmax"))

    model.compile(
        optimizer=tf.keras.optimizers.legacy.Adam(),
        loss=tf.keras.losses.CategoricalCrossentropy(),
        metrics=["accuracy"],
    )
    model.summary()
    return model


In [None]:
def get_callbacks(monitor='val_accuracy', patience=3, restore_best_weights=False):
    early_stopping = tf.keras.callbacks.EarlyStopping(patience=patience, 
                                                      restore_best_weights=restore_best_weights)
    return [early_stopping]



In [None]:
model_vgg16 = build_vgg16_model()
model_vgg16.summary()



In [None]:
model_vgg16=build_vgg16_model()
history = model_vgg16.fit(train_ds, validation_data=val_ds,epochs=20, 
                    callbacks=get_callbacks()
                         )
evaluate_classification_network(model_vgg16,X_test=test_ds,history=history);



In [None]:
# Download EfficientNet base
efficientnet_base =tf.keras.applications.EfficientNetB0(include_top=False, input_shape=input_shape)

# Make it not-trainable
efficientnet_base.trainable=False
vk.layered_view(efficientnet_base, legend=True)



In [None]:
# add preprocessing lambda layer
lambda_layer_efficient = tf.keras.layers.Lambda(tf.keras.applications.efficientnet.preprocess_input, 
                                      name='preprocess_input_enet')

def build_efficientnet_model():
    model = models.Sequential(name="EfficientNetB0")
    # Use input layer (lambda layer will handle rescaling).
    model.add(tf.keras.layers.Input(shape=input_shape))

    ## Adding preprocessing lamabda layer
    model.add(lambda_layer_efficient)

    # Add pretrained base
    model.add(efficientnet_base)

    # Flattening layer
    model.add(layers.Flatten())

    ## Adding a Hidden Dense Layer
    model.add(layers.Dense(256, activation="relu"))
    model.add(layers.Dropout(0.5))

    # Output layer
    model.add(layers.Dense(len(class_names), activation="softmax"))

    model.compile(
        optimizer=tf.keras.optimizers.legacy.Adam(),
        loss=tf.keras.losses.CategoricalCrossentropy(),
        metrics=["accuracy"],
    )
    model.summary()
    return model


In [None]:
# Build, fit, and evaluate EfficientNet Model
model_effnet = build_efficientnet_model()
history = model_effnet.fit(train_ds, validation_data=val_ds,epochs=20, 
                    callbacks=get_callbacks()
                   )
evaluate_classification_network(model_effnet, X_test=test_ds, history=history);



In [None]:
# Download Inception Base and prevent it from learning
inception_base = tf.keras.applications.InceptionV3(include_top=False, input_shape=input_shape)

# prevent training of transfer model
inception_base.trainable = False
vk.layered_view(inception_base, legend=True)



In [None]:
# add preprocessing lambda layer
lambda_layer_inception = tf.keras.layers.Lambda(tf.keras.applications.inception_v3.preprocess_input, 
                                      name='preprocess_input_inceptv3')

def build_inception_model():
    model = models.Sequential(name="InceptionV3")
    # Use input layer (lambda layer will handle rescaling).
    model.add(tf.keras.layers.Input(shape=input_shape))

    ## Adding preprocessing lamabda layer
    model.add(lambda_layer_inception)

    # Add pretrained base
    model.add(inception_base)

    # Flattening layer
    model.add(layers.Flatten())

    ## Adding a Hidden Dense Layer
    model.add(layers.Dense(256, activation="relu"))
    model.add(layers.Dropout(0.5))

    # Output layer
    model.add(layers.Dense(len(class_names), activation="softmax"))

    model.compile(
        optimizer=tf.keras.optimizers.legacy.Adam(),
        loss=tf.keras.losses.CategoricalCrossentropy(),
        metrics=["accuracy"],
    )
    model.summary()
    return model



In [None]:
# Build, fit, and evaluate EfficientNet Model
model_inception = build_inception_model()
history = model_inception.fit(train_ds, validation_data=val_ds,epochs=20, 
                    callbacks=get_callbacks()
                   )
evaluate_classification_network(model_inception, X_test=test_ds, history=history);



In [None]:
# Select the best model
best_model = model_vgg16 #model_effnet



In [None]:
import os

folder = 'BestModels/'
os.makedirs(folder, exist_ok=True)


In [None]:
# saving the best model
model_fname = 'BestModels/best-transfer-learning-to-explain.keras'
best_model.save(model_fname)



In [None]:
loaded_model = tf.keras.models.load_model(model_fname)
loaded_model.summary()
evaluate_classification_network(loaded_model, X_test=test_ds, history=history);

