In [1]:
import datetime
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub

from pathlib import Path
from sklearn.model_selection import train_test_split

In [2]:
def fill_missing_data(data):
    for label, content in data.items():
        # Check for which numeric columns have null values
        if pd.api.types.is_numeric_dtype(content):
            if pd.isnull(content).sum():
                # Add a binary column which tells us if the data was missing
                data[label+"_is_missing"] = pd.isnull(content)
                # Fill missing numeric values with median
                data[label] = content.fillna(content.median())
            # Check for which categorial columns have null values
        elif not pd.api.types.is_numeric_dtype(content):
            # Add binary column to indicate whether sample had missing value
            data[label+"_is_missing"] = pd.isnull(content)
            # Turn categories into numbers and add +1 (missing values == -1)
            data[label] = pd.Categorical(content).codes + 1


def string_cols_to_category(data):
    # This will turn all of the string values into category values
    for label, content in data.items():
        if pd.api.types.is_string_dtype(content):
            data[label] = content.astype("category").cat.as_ordered()


class DataModel(object):
    train = ()
    valid = ()
    test = ()

    X_train = None
    X_test = None
    X_valid = None
    y_train = None
    y_test = None
    y_valid = None

    _TRAINING_SIZE = 0.7
    _VALIDATION_SIZE = 0.15
    _TEST_SIZE = 0.15
    _X = None
    _y = None

    def __init__(self, X, y, train_size=0.7, valid_size=0.15, test_size=0.15):
        split_size = train_size + valid_size + test_size
        if split_size != 1:
            msg = "Combined split sizes must equal 1."
            raise ValueError(f"{msg} Current split size={split_size}")

        self._TRAINING_SIZE = train_size
        self._VALIDATION_SIZE = valid_size
        self._TEST_SIZE = test_size
        self._X = X
        self._y = y

    def split_train_test_validation(self, data):
        len_df = len(data)
        train_split = round(self._TRAINING_SIZE * len_df)
        valid_split = round(train_split + self._VALIDATION_SIZE * len_df)

        self.train = (self._X[:train_split],
                      self._y[:train_split])

        self.valid = (self._X[train_split:valid_split],
                      self._y[train_split:valid_split])

        self.test = (self._X[valid_split:],
                     self._y[valid_split:])

        return (self.train, self.valid, self.test)

    def split_train_test(self, test_size=None):
        if not test_size:
            test_size = self._TEST_SIZE

        (self.X_train,
         self.X_test,
         self.y_train,
         self.y_test) = train_test_split(
            self._X, self._y, test_size=test_size)

    def split_train_valid(self, column):
        # Split training and validation data into
        self.X_train, self.y_train = self._X.drop(
            column, axis=1), self._X[column]
        self.X_valid, self.y_valid = self._y.drop(
            column, axis=1), self._y[column]


In [3]:
class Tensor_Image(DataModel):
    _IMG_SIZE = 0
    _BATCH_SIZE = 0

    # Truth_labels (np.ndarray): an array of label strings
    truth_labels: np.ndarray = []
    trained_model = None

    def __init__(self, X, y, train_size=0.7, valid_size=0.15, test_size=0.15,
                 img_size=224, batch_size=32):
        super().__init__(X, y, train_size, valid_size, test_size)
        self._IMG_SIZE = img_size
        self._BATCH_SIZE = batch_size

    def split_train_test(self, test_size=None):
        if not test_size:
            test_size = self._TEST_SIZE

        (self.X_train,
         self.X_valid,
         self.y_train,
         self.y_valid) = train_test_split(
            self._X, self._y, test_size=test_size)

    def process_image(self, image_path):
        """
        Takes an image file path and turns the image into a Tensor.
        """
        # Read in an image file
        image = tf.io.read_file(image_path)
        # Turn our image into numerical Tensor with 3 color channels (RGB)
        image = tf.image.decode_jpeg(image, channels=3)
        # Convert the color channel values from 0-255 to 0-1 values
        image = tf.image.convert_image_dtype(image, tf.float32)
        # Resize the image
        image = tf.image.resize(image, size=[self._IMG_SIZE, self._IMG_SIZE])

        return image

    def get_image_label(self, image_path, label):
        """
        Takes an image file path name and the associated label,
        process the image and returns a tuple of (image, label)
        """
        image = self.process_image(image_path)
        return image, label

    def create_data_batches(self, X,
                            y=None, valid_data=False, test_data=False):
        """
        Creates batches of data out of image (X) and label (y) pairs.
        It shuffles the data if it's training data but doesnt shuffle
        if its validation data.
        Also accepts test data as input (no labels).
        """
        # If the data is a test data set, we probably don't have labels
        if test_data:
            data = tf.data.Dataset.from_tensor_slices(
                (tf.constant(X)))  # only filepaths
            self.test = data.map(self.process_image).batch(self._BATCH_SIZE)
        # If the data is a valid dataset, we dont need to shuffle it.
        elif valid_data:
            data = tf.data.Dataset.from_tensor_slices((tf.constant(X),
                                                      tf.constant(y)))
            self.valid = data.map(self.get_image_label).batch(self._BATCH_SIZE)
        else:
            # Turn filepaths and labels into Tensors
            data = tf.data.Dataset.from_tensor_slices((tf.constant(X),
                                                      tf.constant(y)))

            # Shuffle before mapping image processor function is faster
            data = data.shuffle(buffer_size=len(X))
            data = data.map(self.get_image_label)

            # Finally turn the training data into batches
            self.train = data.batch(self._BATCH_SIZE)

    def create_model(self, model_url: str,
                     input_shape: list = None,
                     output_shape: list = None,
                     activation="softmax", metrics="accuracy"):
        """Defines the layers in a Keras model in a sequential fashion, then
        compiles and builds the model.

        Args:
            model_url (str): model URL from TensorFlow Hub
            input_shape (list, optional): input shape to the model.
                Defaults to None.
            activation (str, optional): layer density activation.
                Defaults to "softmax".
            metrics (str, optional): metric to be evaluated by the model.
                Defaults to "accuracy".

        Returns:
            tf.keras.Sequential: the keras model
        """
        if not input_shape:
            input_shape = [self._IMG_SIZE, self._IMG_SIZE, 3]

        # Setup model layers
        model = tf.keras.Sequential([
            hub.KerasLayer(model_url),
            tf.keras.layers.Dense(units=output_shape,
                                  activation=activation)
        ])

        # Compile the model
        model.compile(
            loss=tf.keras.losses.CategoricalCrossentropy(),
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[metrics]
        )

        # Build the model
        model.build(input_shape)

        return model

    # Create a function to build a TensorBoard callback
    def create_tensorboard_callback(self, logpath: str,
                                    strftime: str = "%Y%m%d-%H%M%S"):
        """Save logs to a directory and pass it to the model's fit() function

        Args:
            logpath (str): path to store log files
            strftime (str, optional): datetime string format.
                Defaults to "%Y%m%d-%H%M%S".

        Returns:
            tf.keras.callbacks.TensorBoard: the TensorBoard
        """
        # Create a log directory for storing TensorBoard logs
        logdir = Path(
            logpath, datetime.datetime.now().strftime(strftime))

        return tf.keras.callbacks.TensorBoard(logdir)

    def train_model(self, input_shape: list,
                    output_shape: int,
                    model_url: str, epochs: int,
                    logpath: str, strftime: str):
        """Trains a given model and returns the trained version.

        Args:
            input_shape (list, optional): input shape to the model.
                Defaults to None.
            model_url (str): model URL from TensorFlow Hub
            epochs (int): the number of epochs
            logpath (str): path to store log files
            strftime (str, optional): datetime string format.
                Defaults to "%Y%m%d-%H%M%S".

        Returns:
            tf.keras.Sequential: the keras model
        """
        # Create a model
        model = self.create_model(input_shape=input_shape,
                                  output_shape=output_shape,
                                  model_url=model_url)

        # Create new TensorBoard session erverytime we train a model
        tensorboard = self.create_tensorboard_callback(logpath, strftime)

        # Create early stopping callback
        early_stopping = tf.keras.callbacks.EarlyStopping(
            monitor="val_accuracy",
            patience=3)

        # fit the model to the data passing it the callbacks we created
        model.fit(x=self.train,
                  epochs=epochs,
                  validation_data=self.valid,
                  validation_freq=1,
                  callbacks=[tensorboard, early_stopping])

        # Return the fitted model
        self.trained_model = model

    def get_pred_label(self, prediction_probabilities: np.ndarray):
        """Turns an array of prediction probabilities into a label

        Args:
            prediction_probabilities (np.ndarray):
                array of prediction probabilities

        Returns:
            str: the label representation of the prediction
        """
        pred_label = self.truth_labels[np.argmax(prediction_probabilities[0])]
        return pred_label

    def unbatchify(self, data):
        """Takes a batched dataset of (image, label) Tensors and returns
        separate arrays of images and labels.

        Args:
            data (BatchDataset): batch dataset to be unbatched

        Returns:
            set(np.ndarray, np.ndarray): set of arrays (images, labels)
        """
        images = []
        labels = []

        # loop through unbatched data
        for image, label in data.unbatch().as_numpy_iterator():
            images.append(image)
            labels.append(self.truth_labels[np.argmax(label)])

        return images, labels

    def plot_pred(self, n=1):
        """View the prediction, ground truth, and image for sample n

        Args:
            prediction_probabilities (np.ndarray):
                array of prediction probabilities
            n (int, optional): _description_. index number
        """

        images, labels = self.unbatchify(self.valid)
        
        predictions = self.trained_model.predict(self.valid, verbose=1)
        pred_prob = predictions[n]
        true_label = labels[n]
        image = images[n]

        # Get the pred label
        pred_label = self.get_pred_label(pred_prob)

        # Plot image a& remove ticks
        plt.imshow(image)
        plt.xticks([])
        plt.yticks([])

        # Change the color of the title depending if the pred is right or wrong
        if pred_label == true_label:
            color = "green"
        elif pred_label != true_label:
            color = "red"

        # Change plot title to be predicted, probability of pred, and truth
        plt.title("{} {:2.0f}% {}".format(pred_label,
                                          np.max(pred_prob)*100,
                                          true_label),
                  color=color)


In [4]:
# Lets use the classes from the common utils for a full blown test
import numpy as np
import pandas as pd

from pathlib import Path

# Set the random seed
np.random.seed(42)

CWD_PATH = Path.cwd().parent
DOG_VISION = CWD_PATH / "images/Dog Vision"

labels_csv = pd.read_csv(DOG_VISION / "labels.csv")
labels = labels_csv['breed'].to_numpy()
unique_labels = np.unique(labels)

# Set the number of images
NUM_IMAGES = 1000
# Set the image size
IMG_SIZE = 224
# Setup input shape to the model
INPUT_SHAPE = [None, IMG_SIZE, IMG_SIZE, 3] # batch, height, width, color channels
# Setup output shape of the model
OUTPUT_SHAPE = len(unique_labels)
# Setup model URL from TensorFlow Hub
MODEL_URL = "https://tfhub.dev/google/imagenet/mobilenet_v2_140_224/classification/5"
NUM_EPOCHS = 100
logpath = DOG_VISION / "cb_logs"
strftime = "%Y%m%d-%H%M%S"

# Setup X & y variables
X = [str(DOG_VISION / f"train/{fname}.jpg") for fname in labels_csv["id"]]
y = [label == unique_labels for label in labels]

# create the datamodel of total size NUM_IMAGES
data_model = Tensor_Image(X[:NUM_IMAGES], y[:NUM_IMAGES], img_size=IMG_SIZE)

data_model.truth_labels = unique_labels

# Split them into training and validation 
data_model.split_train_test(test_size=0.2)

# Create training and validation data batches
data_model.create_data_batches(data_model.X_train, data_model.y_train)
data_model.create_data_batches(data_model.X_valid, data_model.y_valid, valid_data=True)

In [None]:
# Make predictions on the validation data (not used to train on)
data_model.train_model(input_shape=INPUT_SHAPE,
                       output_shape=OUTPUT_SHAPE,
                       model_url=MODEL_URL, 
                       epochs=NUM_EPOCHS, 
                       logpath=logpath, strftime=strftime)

In [None]:
print(data_model.trained_model.summary())

In [None]:
data_model.plot_pred()