In [None]:
#import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import joblib
import os
from datetime import datetime
from tensorflow.keras.layers import RandomFlip, RandomRotation, RandomBrightness, RandomZoom, GaussianNoise, RandomContrast
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, Flatten, Dense, MaxPooling2D, concatenate, Input, Dropout, BatchNormalization
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import roc_auc_score, roc_curve
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import GlobalAveragePooling2D
from keras_tuner import HyperModel, Hyperband
from tensorflow.keras.layers import ReLU, AveragePooling2D

In [None]:
class DataCollector:
  '''
  This class collects data from a csv file and image folder saved in google drive and
  creates datasets for training a machine learning model.
  '''
  def __init__(self, csv_path, img_folder):
    '''
    Parameters:
    -----------------------------------------
    csv_path: str
      - path to csv file in google drive
    img_folder: str
      - path to image folder in google drive
    '''
    if csv_path is None or img_folder is None:
        raise ValueError("Must provide data path")
    self.csv_path = csv_path
    self.img_folder = img_folder
    self.df = self.clean_data()
    self.scaler = None

  def clean_data(self):
    '''
    Read csv file into dataframe and add column for cleave quality.

    Returns: pandas.DataFrame
      - dataframe with cleave quality column
    '''
    try:
      df = pd.read_csv(self.csv_path)
    except FileNotFoundError:
      print("Csv file not found!")
      return None
    df['CleaveQuality'] = ((df['CleaveAngle'] <= 0.45) & (df['Misting'] == 0) & (df['Hackle'] == 0)).astype(int)
    # Clean image path to read from google drive'
    df['ImagePath'] = df['ImagePath'].str.replace(self.img_folder, "")
    return df

  def load_process_images(self, filename):
    
    '''
    Load image from path in google drive and standardize to 224x224.

    Parameters:
    -----------------------------------------
    filename: str
      - path to image in google drive

    Returns: tf.tensor
      - image in tensor format
    '''
    def _load_image(file):
      file = file.numpy().decode('utf-8')
      full_path = os.path.join(self.img_folder, file)
      try:
        img_raw = tf.io.read_file(full_path)
      except FileNotFoundError:
        print("File not found")
        return None
      img = tf.image.decode_png(img_raw, channels=1)
      img = tf.image.resize(img, [224, 224])
      img = tf.image.grayscale_to_rgb(img)
      img = img / 255.0
      return img

    img = tf.py_function(_load_image, [filename], tf.float32)
    img.set_shape([224, 224, 3])
    return img

  def extract_data(self, feature_scaler_path=None):
    '''
    Extract data from dataframe into separate lists for creating datasets.

    Parameters:
    ------------------------------------

    scalar_filename: str
      - path to store pickled scaler 

    Returns: list, list, list
      - lists of images, features, and labels
    '''
    images = self.df['ImagePath'].values
    #features = self.df[['CleaveAngle', 'CleaveTension']].values
    features = self.df[['CleaveAngle', 'CleaveTension', 'ScribeDiameter', 'Misting', 'Hackle', 'Tearing']].values.astype(np.float32)
    labels = self.df['CleaveQuality'].values.astype(np.float32)
    self.scaler = MinMaxScaler()
    features = self.scaler.fit_transform(features)
    #joblib.dump(self.scaler, f'./{scaler_filename}.pkl')
    if feature_scaler_path:
      joblib.dump(self.scaler, f'{feature_scaler_path}.pkl')
    return images, features, labels

  def process_images_features(self, inputs, label):
    # Wrapper function for calling image processing
    image_input, features = inputs
    image = self.load_process_images(image_input)
    return (image, features), label
  
  def create_kfold_datasets(self, images, features, labels, buffer_size, batch_size, n_splits=5):
    '''
    Create datasets based on stratified k-fold process for binary classification.

    Parameters:
    --------------------------------------------------------------------

    images: list
      - list of image paths
    features: list
      - list of numerical features
    labels: list
      - list of target values for classification
    buffer_size: int
      - size of buffer to perform shuffling
    batch_size: int
      - size to group data in for training
    n_splits
      - number of k folds

    Returns: list of tuples
      - (train_ds, test_ds)
    
    '''
    kf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=24)

    datasets = []

    for train_index, test_index in kf.split(X=features, y=labels):
      train_imgs, test_imgs = images[train_index], images[test_index]
      train_features, test_features = features[train_index], features[test_index]
      train_labels, test_labels = labels[train_index], labels[test_index]

      train_ds = tf.data.Dataset.from_tensor_slices(((train_imgs, train_features), train_labels))
      test_ds = tf.data.Dataset.from_tensor_slices(((test_imgs, test_features), test_labels))

      train_ds = train_ds.map(lambda x, y: self.process_images_features(x, y))
      test_ds = test_ds.map(lambda x, y: self.process_images_features(x, y))

      train_ds = train_ds.shuffle(buffer_size=buffer_size).batch(batch_size).prefetch(tf.data.AUTOTUNE)
      test_ds = test_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)

      datasets.append((train_ds, test_ds))

    return datasets
  
  def create_datasets(self, images, features, labels, test_size, buffer_size, batch_size):
    '''
    Creates test and train datasets and splits into different batches after shuffling.

    Parameters:
    -----------------------------------------

    images: list
      - paths to images in google drive
    features: list
      - numerical parameters to label images
    labels: int
      - targets to qualify image quality
    test_size: float
      - decimal between 0 and 1 to represent test size of dataset
    buffer_size: int
      - size of buffer for shuffling data
    batch_size: int
      - size to group data into

    Returns: tf.tensor
      - train and test datasets
    '''
    train_imgs, test_imgs, train_features, test_features, train_labels, test_labels = train_test_split(
        images, features, labels, stratify=labels, test_size=test_size)
    train_ds = tf.data.Dataset.from_tensor_slices(((train_imgs, train_features), train_labels))
    test_ds = tf.data.Dataset.from_tensor_slices(((test_imgs, test_features), test_labels))

    # Map using bound method
    train_ds = train_ds.map(lambda x, y: self.process_images_features(x, y))
    test_ds = test_ds.map(lambda x, y: self.process_images_features(x, y))

    train_ds = train_ds.shuffle(buffer_size=buffer_size).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    test_ds = test_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)

    return train_ds, test_ds

In [None]:
class CustomModel:
    '''
    Class is used to define custom model using pre-trained MobileNetV2 model.
    '''
    def __init__(self, train_ds, test_ds):
      self.train_ds = train_ds
      self.test_ds = test_ds

    def build_pretrained_model(self, image_shape, param_shape):
      '''
      Utilize pretrained CNN to supplement small dataset

      Parameters:
      ------------------------------------------------

      image_shape: tuple
        - dimensions of image
      param_shape: tuple
        - dimension of features

      Returns: tf.keras.Model
        - returns model to train
      '''
      pre_trained_model = MobileNetV2(input_shape=image_shape, include_top=False, weights="imagenet")
      pre_trained_model.trainable =False

      # Data augmentation pipeline
      data_augmentation = Sequential([
            RandomFlip(mode="HORIZONTAL_AND_VERTICAL"),
            RandomRotation(factor=(0.2)),
            RandomBrightness(factor=(0.2)),
            RandomZoom(height_factor=0.1, width_factor=0.1),
            GaussianNoise(stddev=0.01),
            RandomContrast(0.2)
        ])
      # CNN for images
      image_input = Input(shape=image_shape)
      x = data_augmentation(image_input, training=True)
      x = pre_trained_model(image_input, training=False)
      x = GlobalAveragePooling2D()(x)
      x = Dropout(0.5)(x)

      # Numerical featuers section
      params_input = Input(shape=param_shape)
      y = Dense(32, activation='relu')(params_input)
      y = Dense(16, activation='relu')(y)

      combined = concatenate([x, y])
      z = Dense(64, activation='relu')(combined)
      z = Dense(1, activation='sigmoid')(z)

      model = Model(inputs=[image_input, params_input], outputs=z)
      model.summary()
      return model

    def compile_model(self, image_shape, param_shape, learning_rate=0.001, metrics=['accuracy', 'precision', 'recall']):
      '''
      Compile model after calling build_model function

      Parameters:
      -------------------------------------
      image_shape: tuple
          - dimensions of images
      param_shape: tuple
          - dimensions of parameters
      learning_rate: float
          - learning rate for training model
        metrics: list
          - metrics to monitor during training
          - default: accuracy

      Returns:
      tf.keras.Model
          - Mode to be trained
      '''
      # Adaptive Moment Estimation optimizer
      # Set learning rate and then compile model
      # Loss functions is binary_crossentropy for binary classification
      #model = build_model((image_shape), (param_shape))
      model = self.build_pretrained_model(image_shape, param_shape)
      optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
      model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=metrics)
      return model
    
    def create_checkpoints(self, checkpoint_filepath="/content/drive/MyDrive/checkpoints.keras", monitor="val_accuracy", mode="max", save_best_only=True):
      '''
      Create model checkpoints to avoid losing data while training

      Parameters:
      --------------------------------------

      checkpoint_filepath: str
        - path to save model checkpoints
        - default: /content/drive/MyDrive/checkpoints.keras
      monitor: str
        - metric to monitor during training
        - deafault: val_accuracy
      mode: str
        - max, min, avg
        - method to determine stoppping point of metric
        - default: max
      save_best_only: boolean
        - to determine if only best model shold be saved
        - deafault: True

      Returns: tf.callback.ModelCheckpoint
        - checkpoint to use during training
      '''
      model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath = checkpoint_filepath,
        monitor=monitor,
        mode=mode,
        save_best_only=save_best_only,
        verbose=1
      )
      return model_checkpoint_callback
    
    def create_early_stopping(self, patience=3, mode='max', monitor="val_accuracy"):
      '''
      Create early stopping callback to monitor training success and prevent overfitting.

      Parameters:
      ----------------------------------------

      patience: int
        - number of epochs to stop when monitor plateus
        - default: 3
      mode: str
        - max, min, avg
        - method to track monitor
        - default: max
      monitor: str
        - metric to monitor during training
        - default: val_accuracy
      
      Returns: tf.callbacks.EarlyStopping
        - early stopping callback
      '''
      es_callback = tf.keras.callbacks.EarlyStopping(
        monitor=monitor,
        patience=patience,
        mode = mode,
        restore_best_weights=True
      )
      return es_callback
    
    def train_model(self, model, checkpoints=None, epochs=5, initial_epoch=0, early_stopping=None, history_file=None, model_file=None):
      '''
      Train model with possible callbacks to prevent overfitting

      Parameters:
      -----------------------------------------

      model: tf.keras.Model
        - model to be trained
      checkpoints: tf.keras.callback.Checkpoints
        - checkpoints to save model
        - default: None
      epochs: int
        - number of training epochs to pass though
        - default: 5
      early_stopping: tf.keras.callback.EarlyStopping
        - early stopping callback to prevent overfitting
        - defatult: None
      history_file: str
        - file to save history to
      model_file: str
        - file to save model to

      Returns: tf.keras.Model
        - trained model
      '''
      callbacks = []
      if early_stopping:
        callbacks.append(early_stopping)
      if checkpoints:
        callbacks.append(checkpoints)

      if callbacks:
        history = model.fit(self.train_ds, epochs=epochs, initial_epoch=initial_epoch,
                    validation_data=(self.test_ds), callbacks=callbacks)
      else:
        print("Training without callbacks")
        history = model.fit(self.train_ds, epochs=epochs, initial_epoch=initial_epoch,
                    validation_data=(self.test_ds))
      if history_file:
        df = pd.DataFrame(history.history)
        df.to_csv(f"{history_file}.csv", index=False)
      else:
        print("History not saved")
      if model_file:
        model.save(f'{model_file}.keras')
      else:
        print("Model not saved")
      return history
    
    @staticmethod
    def train_kfold( datasets, image_shape, param_shape, learning_rate, metrics = ['accuracy', 'precision', 'recall'], checkpoints=None, epochs=5, initial_epoch=0, early_stopping=None, history_file=None, model_file=None):
      kfold_histories = []
      k_models = []
      train_datasets = [i[0] for i in datasets]
      test_datasets = [i[1] for i in datasets]

      callbacks=[]

      if early_stopping:
        callbacks.append(early_stopping)
      if checkpoints:
        callbacks.append(checkpoints)

      for fold, (train_ds, test_ds) in enumerate(zip(train_datasets, test_datasets)):
        print(f"\n=== Training fold {fold + 1} ===")

        custom_model = CustomModel(train_ds, test_ds)
        model = custom_model.compile_model(image_shape=image_shape, param_shape=param_shape, learning_rate=learning_rate, metrics=metrics)

        if callbacks:
          history = model.fit(train_ds, epochs=epochs, initial_epoch=initial_epoch,
                    validation_data=(test_ds), callbacks=callbacks)
        else:
          print("Training without callbacks")
          history = model.fit(train_ds, epochs=epochs, initial_epoch=initial_epoch,
                    validation_data=(test_ds))
          
        kfold_histories.append(history)
        k_models.append(model)
          
        if history_file:
          df = pd.DataFrame(history.history)
          df.to_csv(f"{history_file}_fold{fold+1}.csv", index=False)
        else:
          print("History not saved")
        if model_file:
          model.save(f'{model_file}_fold{fold+1}.keras')
        else:
         print("Model not saved")

      return k_models, kfold_histories
    
    @staticmethod
    def get_averages_from_kfold(kfold_histories):
      accuracy = []
      precision = []
      recall = []

      for history in kfold_histories:
        accuracy.append(max(history['accuracy']))
        precision.append(max(history['precision']))
        recall.append(max(history['recall']))

      avg_accuracy = np.mean(accuracy)
      avg_precision = np.mean(precision)
      avg_recall = np.mean(recall)

      print(f"Average Accuracy: {avg_accuracy:.2f}")
      print(f"Average Precision: {avg_precision:.2f}")
      print(f"Average Recall: {avg_recall:.2f}")


    def plot_metric(self, title, metric_1, metric_2, metric_1_label, metric_2_label, x_label, y_label):
      '''
      Plotting function for one metric

      Parameters:
      ----------------------------------------------

      title: str
        - title for plot
      metric_1, metric_2: strs
        - metrics to be plotted vs. each other
      metric_1_label, metric_2_label: strs
        - labels for each metric to plot
      x_label, y_label: strs
        - labels for graph axes
      '''

      plt.title(title)
      plt.plot(metric_1, label=metric_1_label)
      plt.plot(metric_2, label=metric_2_label)
      plt.xlabel(x_label)
      plt.ylabel(y_label)
      plt.legend(loc="lower right")
      plt.show()

In [None]:
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay

In [None]:
class TestPredictions:
  '''
  This class is used to test model performance on unseen data using metrics such as
  accuracy, precision, recall, and confusion matrix.
  '''
  def __init__(self, model_path, csv_path, scalar_path, img_folder):
    '''
    Parameters:
    ----------------------------------------------

    model_path: str
      - path to model in google drive
    csv_path: str
      - path to csv file in google drive
    scalar_path: str
      - path to scaler in google drive
    '''
    self.scalar_path = scalar_path
    self.img_folder = img_folder
    self.model = tf.keras.models.load_model(model_path)
    self.csv_path = csv_path

  def clean_data(self):
    '''
    Read csv file into dataframe and add column for cleave quality.

    Returns: pandas.DataFrame
      - dataframe with cleave quality column
    '''
    try:
      df = pd.read_csv(self.csv_path)
    except FileNotFoundError:
      print("File not found")
      return None
    df['CleaveQuality'] = ((df['CleaveAngle'] <= 0.45) & (df['Misting'] == 0) & (df['Hackle'] == 0)).astype(int)
    # Clean image path to read from google drive
    df['ImagePath'] = df['ImagePath'].str.replace("C:\\Thorlabs\\125PM\\", "")    
    pred_image_paths = df['ImagePath'].values
    pred_features = df[['CleaveAngle', 'CleaveTension', 'ScribeDiameter', 'Misting', 'Hackle', 'Tearing']].values.astype(np.float32)
    self.true_labels = list(df['CleaveQuality'])
    return pred_image_paths, pred_features

  def load_process_images(self, filename):
    '''
    Load image from path in google drive and standardize to 224x224.

    Parameters:
    -----------------------------------------
    filename: str
      - path to image in google drive

    Returns: tf.tensor
      - image in tensor format
    '''
    def _load_image(file):
      file = file.numpy().decode('utf-8')
      full_path = os.path.join(self.img_folder, file)
      try:
        img_raw = tf.io.read_file(full_path)
      except FileNotFoundError:
        print("File not found")
        return None
      img = tf.image.decode_png(img_raw, channels=1)
      img = tf.image.resize(img, [224, 224])
      img = tf.image.grayscale_to_rgb(img)
      img = img / 255.0
      return img

    img = tf.py_function(_load_image, [filename], tf.float32)
    img.set_shape([224, 224, 3])
    return img

  def test_prediction(self, image_path, feature_vector):
    '''
    Test function for generating prediction

    Parameters:
    ----------------------------------------------

    image_path: str
      - path to image to predict
    tension: int
      - tension value in grams
    cleave_angle: float
      - angle that was achieved from cleave

    Return: tf.keras.Model
      - predicition from new image of good or bad cleave
    '''
    image = self.load_process_images(image_path)
    image = np.expand_dims(image, axis=0)

    scalar = joblib.load(self.scalar_path)
    scaled_features = scalar.transform([feature_vector]) 

    prediction = self.model.predict([image, scaled_features])
    return prediction

  def gather_predictions(self):
    '''
    Gather multiple predictions from test data

    Returns: list
      - list of predictions
    '''

    pred_image_paths, pred_features = self.clean_data()
    predictions = []
    for img_path, feature_vector in zip(pred_image_paths, pred_features):
      prediction = self.test_prediction(img_path, feature_vector)
      predictions.append(prediction)

    # Set prediction labels to 0 or 1 based on probability
    pred_labels = [1 if pred[0][0] > 0.5 else 0 for pred in predictions]
    return pred_labels, predictions

  def display_confusion_matrix(self, pred_labels):
    '''
    Displays confusion matri metric comparing true labels to predicted labels.

    Parameters:
    ----------------------------------------------

    pred_labels: list
      - list of predicted labels
    '''
    cm = confusion_matrix(self.true_labels, pred_labels, labels=[0, 1])
    disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=['Bad Cleave', 'Good Cleave'])
    disp.plot()
    plt.show()

  def display_classification_report(self, true_labels, pred_labels):
    '''
    Diplays classification report comparing true labels to predicted labels.

    Parameters:
    ----------------------------------------------

    true_labels: list
      - list of true labels
    pred_labels: list
      - list of predicted labels
    '''
    print(classification_report(true_labels, pred_labels))

  def plot_roc(self, title, true_labels, pred_probabilites):
    pred_probabilites = np.array(pred_probabilites).flatten()
    fpr, tpr, thresholds = roc_curve(true_labels, pred_probabilites)

    auc = roc_auc_score(true_labels, pred_probabilites)

    plt.plot(fpr, tpr, label=f'ROC Curve (AUC={auc:.2f}%)')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.title(title)
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.legend(loc="lower right")
    plt.show()

In [None]:
class BuildHyperModel(HyperModel):
    '''
    This class build a HyperModel to determine optimal hyperparmeters
    '''
    def __init__(self, image_shape, param_shape):
      '''
      Parameters:
      ----------------------------------------------

      image_shape: tuple
        - dimensions of image
      param_shape: tuple
        - dimensions of parameters
      '''
      self.image_shape = image_shape
      self.param_shape = param_shape

    def build(self, hp):
      '''
      Build hypermodel to perform hyperparameter search.

      Parameters:
      -------------------------

      hp: keras_tuner.engine.hyperparameters.HyperParameters
        - hyperparameters to be tuned
      '''
        # Pre-trained base model
      pre_trained_model = MobileNetV2(
            input_shape=self.image_shape,
            include_top=False,
            weights="imagenet"
        )
      pre_trained_model.trainable = False

        # Data augmentation pipeline
      data_augmentation = Sequential([
            RandomFlip(mode="HORIZONTAL_AND_VERTICAL"),
            RandomRotation(factor=(0.2)),
            RandomBrightness(factor=(0.2)),
            RandomZoom(height_factor=0.1, width_factor=0.1),
            GaussianNoise(stddev=0.01),
            RandomContrast(0.2)
        ])

        # Image input and processing
      image_input = Input(shape=self.image_shape)
      x = data_augmentation(image_input)
      x = pre_trained_model(x, training=False)
      x = GlobalAveragePooling2D()(x)
      x = Dropout(hp.Float('dropout', 0.2, 0.5, step=0.1))(x)

        # Param input and processing
      param_input = Input(shape=self.param_shape)
      y = Dense(
            hp.Int('dense_param1', min_value=16, max_value=128, step=16),
            activation='relu')(param_input)
      y = Dense(
            hp.Int('dense_param2', min_value=8, max_value=64, step=8),
            activation='relu')(y)

        # Combine image and parameter features
      combined = concatenate([x, y])

      z = Dense(
            hp.Int('dense_combined', min_value=16, max_value=128, step=16),
            activation='relu')(combined)
      z = Dense(1, activation='sigmoid')(z)

      model = Model(inputs=[image_input, param_input], outputs=z)

      model.compile(
            optimizer=tf.keras.optimizers.Adam(
                learning_rate=hp.Choice('learning_rate', values=[0.0005, 0.001, 0.01])
            ),
            loss='binary_crossentropy',
            metrics=['accuracy', 'precision','recall']
        )

      return model

In [None]:
class HyperParameterTuning:
  '''
  This class is used to tune hyperparameters for model
  '''
  def __init__(self, image_shape, feature_shape, max_epochs=20, objective='val_accuracy', directory='/content/drive/MyDrive/Thorlabs', project_name='Cleave_Tuner3'):
    '''
    Parameters:
    ----------------------------------------------

    image_shape: tuple
      - dimensions of image
    feature_shape: tuple
      - dimensions of parameters
    max_epochs: int
      - maximum number of epochs to train for
      - default: 20
    objective: str
      - metric to monitor during tuning
      - default: val_accuracy
    directory: str
      - directory path to store hyperparameters
      - deafult: /content/drive/MyDrive/Thorlabs
    project_name: str
      - name of project
      - deafult: Cleave_Tuner3

    '''
    self.image_shape = image_shape
    self.feature_shape = feature_shape
    hypermodel = BuildHyperModel(self.image_shape, self.feature_shape)
    self.tuner = Hyperband(
        hypermodel,
        objective=objective,
        max_epochs=max_epochs,
        directory=directory,
        project_name=project_name
    )
  def run_search(self, train_ds, test_ds):
    '''
    Run hyperparameter search

    Parameters:
    ----------------------------------------------

    train_ds: tf.data.Dataset
      - training dataset
    test_ds: tf.data.Dataset
      - testing dataset
      
    '''
  
    self.tuner.search(train_ds, validation_data=test_ds)

  def get_best_model(self):
    '''
    Get best model from hyperparameter search

    Returns: tf.keras.Model
      - best model from hyperparameter search
    '''
    return self.tuner.get_best_models(num_models=1)[0]

  def get_best_hyperparameters(self):
    '''
    Get best hyperparameters from hyperparameter search

    Returns: keras_tuner.engine.hyperparameters.HyperParameters
      - best hyperparameters from hyperparameter search
    '''
    return self.tuner.get_best_hyperparameters(num_trials=1)[0]