<a href="https://colab.research.google.com/github/ddecosmo-dev/thread-checker/blob/main/threadCheckerTrainer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#import libraries and download testing data from kaggle
#!kaggle datasets download -d devindecosmo/bolt-only-thread-checker-rev-1
import tensorflow as tf
from tensorflow.keras.layers import Rescaling
import numpy as np
import matplotlib.pyplot as plt
import os
import json
from google.colab import drive

drive.mount('/content/drive')

# You will be prompted to upload your 'kaggle.json' API key file.
# This only needs to be done once per session.
if not os.path.exists("/root/.kaggle/kaggle.json"):
    from google.colab import files
    print("Please upload your kaggle.json file")
    files.upload()
    !mkdir -p ~/.kaggle
    !cp kaggle.json ~/.kaggle/
    !chmod 600 ~/.kaggle/kaggle.json

# The name of the main folder that gets created after unzipping
# Change 'training' if your main folder is named something else
dataset_folder = 'Thread Checker Rev 2, bolts only'

if not os.path.exists(dataset_folder):
  print('Dataset not found. Downloading...')
  # Replace with the API command you copied from your dataset's Kaggle page
  !kaggle datasets download -d devindecosmo/thread-checker-bolts-rev2

  # Replace with the actual name of the downloaded .zip file
  !unzip -q thread-checker-bolts-rev2.zip
  print('Download and unzip complete.')
else:
  print('Dataset already exists.')

In [None]:
#Constants
#data set creation
VALIDATION_SPLIT = 0.2
SEED = 123;

#Data preparation
BATCH_SIZE = 16
#standard is 32
IMG_HEIGHT = 224
IMG_WIDTH = 224

#Model

#Training
EPOCHS = 10
LEARNING_RATE = 0.001


# Define the correct paths to your nested data directories
# Note: This assumes your unzipped folder is named 'Thread Checker Rev 1, bolts only'
TRAIN_DIR = 'Thread Checker Rev 2, bolts only/training'

In [None]:
#Preprocessing bounding box model training
#find a bolt dataset
#train a model to make bounding boxes, and save model, may need to be seperate
#Apply the bounding boxes to each training photo and resave them
#continue as normal

#Easy solution first
import os
from PIL import Image

def crop_to_center_square(image_path, target_size=224):
    """
    Crops an image to its largest possible central square, then resizes it
    to a target square dimension, overwriting the original file.

    Args:
        image_path (str): Path to the image file to be processed.
        target_size (int): The final desired square dimension (e.g., 224).
    """
    try:
        img = Image.open(image_path).convert('RGB')
    except Exception as e:
        print(f"Error opening image {image_path}: {e}")
        return

    width, height = img.size
    shortest_side = min(width, height)

    left = (width - shortest_side) / 2
    top = (height - shortest_side) / 2
    right = (width + shortest_side) / 2
    bottom = (height + shortest_side) / 2

    img_cropped = img.crop((left, top, right, bottom))
    img_resized = img_cropped.resize((target_size, target_size), Image.LANCZOS)

    # Overwrite the original file
    img_resized.save(image_path)
    print(f"Processed and overwrote: {image_path}")

def process_directory(directory_path):
    """
    Iterates through all image files in a directory and overwrites them
    with a center-square cropped and resized version.

    Args:
        directory_path (str): The path to the directory containing images.
    """
    if not os.path.isdir(directory_path):
        print(f"Error: Directory not found at {directory_path}")
        return

    print(f"Starting to process images in directory: {directory_path}")

    processed_count = 0
    skipped_count = 0

    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)

        # Check if the file is an image
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            crop_to_center_square(file_path)
            processed_count += 1
        else:
            skipped_count += 1
            print(f"Skipping non-image file: {filename}")

    print(f"\nProcessing complete. Processed {processed_count} images and skipped {skipped_count} files.")

# --- Example Usage ---
# Use this on your training data directory
# Make sure to back up your original images first, as this operation is irreversible!
train_images_directory = TRAIN_DIR
process_directory(train_images_directory)

In [None]:
#dataset import and creation
#may need some scaling or bounding box depending on accuracy
train_ds = tf.keras.utils.image_dataset_from_directory(
  TRAIN_DIR,
  validation_split=VALIDATION_SPLIT,
  subset="training",
  seed=123,  # The seed ensures the random split is the same every time
  image_size=(IMG_HEIGHT, IMG_WIDTH),
  batch_size=BATCH_SIZE,
  label_mode='categorical') # Add this line to get one-hot encoded labels

# We create the validation set from the remaining 20%
val_ds = tf.keras.utils.image_dataset_from_directory(
  TRAIN_DIR,
  validation_split=VALIDATION_SPLIT,
  subset="validation",
  seed=123, # Using the same seed is crucial to prevent overlap
  image_size=(IMG_HEIGHT, IMG_WIDTH),
  batch_size=BATCH_SIZE,
  label_mode='categorical') # Add this line to get one-hot encoded labels


# Get the class names from the folder structure
class_names = train_ds.class_names
num_classes = len(class_names)
print("Found the following classes:", class_names)

"""
plt.figure(figsize=(10, 10))
for images, labels in train_ds.take(1):
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)

    # Use the label directly as the index
    label_index = labels[i]

    plt.imshow(images[i].numpy().astype("uint8"))
    plt.title(class_names[label_index])
    plt.axis("off")
plt.show()
"""

In [None]:
# --- 3. CONFIGURE DATASETS FOR PERFORMANCE ---
# These steps optimize the data pipeline for speed during training.
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

print("\nDatasets are loaded, prepared, and ready for training.")

In [None]:
#data preparation

#resizing

#normalizing

#batching

In [None]:
#feature engineering

In [None]:
#model selection and training
# --- 4. PREPARE THE MODEL (Transfer Learning) ---

# Create a data augmentation layer to prevent overfitting and improve generalization.
#randomly changes the postion and orientation to improve training
data_augmentation = tf.keras.Sequential([
  tf.keras.layers.RandomFlip('horizontal'),
  tf.keras.layers.RandomRotation(0.2),
  tf.keras.layers.RandomZoom(0.2),
], name="data_augmentation")

# Get the specific preprocessing function required for MobileV2
#is this normalization
#preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input

# Load the pre-trained base model from Keras.
# We freeze its weights so we only train our new final layers.
base_model = tf.keras.applications.MobileNetV2(input_shape=(IMG_HEIGHT, IMG_WIDTH, 3),
                                               include_top=False, # Don't include the final ImageNet classifier
                                               weights='imagenet')
base_model.trainable = False # Freeze the convolutional base

# Create our own classification head to place on top of the base model
inputs = tf.keras.Input(shape=(IMG_HEIGHT, IMG_WIDTH, 3))
x = data_augmentation(inputs)      # 1. Apply augmentation
x = Rescaling(1./127.5, offset=-1)(x)            # 2. Preprocess for MobileNetV2
x = base_model(x, training=False)  # 3. Run the base model
x = tf.keras.layers.GlobalAveragePooling2D()(x) # 4. Pool features to a single vector, final layer before final
x = tf.keras.layers.Dropout(0.2)(x) # 5. Add dropout to prevent overfitting, randomly turns off nuerons to improve operability
outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x) # 6. Final prediction layer

# Chain all the pieces together into the final model
model = tf.keras.Model(inputs, outputs)

# Print a summary of the model's architecture
model.summary()

In [None]:
# Compile the model, configuring it for training
#come back and do more research on alternatives, or ways
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

print("Model compiled successfully.")

In [None]:
#train model
# Set the number of epochs to train for
# Start the training process

print("Starting model training...")
history = model.fit(
  train_ds,
  validation_data=val_ds,
  epochs=EPOCHS
)

print("Model training complete.")

Starting model training...
Epoch 1/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m62s[0m 2s/step - accuracy: 0.4166 - loss: 1.3022 - val_accuracy: 0.3623 - val_loss: 1.3098
Epoch 2/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 1s/step - accuracy: 0.4697 - loss: 1.0798 - val_accuracy: 0.4348 - val_loss: 1.1267
Epoch 3/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 1s/step - accuracy: 0.5900 - loss: 0.9576 - val_accuracy: 0.4493 - val_loss: 1.0683
Epoch 4/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 1s/step - accuracy: 0.5340 - loss: 0.9608 - val_accuracy: 0.5362 - val_loss: 0.9987
Epoch 5/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 1s/step - accuracy: 0.5898 - loss: 0.9319 - val_accuracy: 0.5362 - val_loss: 0.9543
Epoch 6/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 1s/step - accuracy: 0.6242 - loss: 0.8449 - val_accuracy: 0.5652 - val_loss: 0.9263
Epoch 7/10
[

In [None]:
#training metrics
# Extract the accuracy history
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

# Extract the loss history
loss = history.history['loss']
val_loss = history.history['val_loss']

# Get the number of epochs
epochs_range = range(EPOCHS)

# Plot Training and Validation Accuracy
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')

# Plot Training and Validation Loss
plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')

plt.show()

#additional validation stuff to add
#confusion matrices, other parameters

In [None]:
#saving trained model for GUI usage

# Define a path in your Google Drive.
# '/content/drive/MyDrive/' is the standard path to your main "My Drive" folder.
model_path = '/content/drive/MyDrive/24-679 AI ML Projects/Project 0/thread_checker_model.keras'

# Save the model to the specified path.
model.save(model_path)

print(f"Model saved to: {model_path}")

In [None]:
#save a JSON of directory data for GUI
# --- CONFIGURE YOUR PATHS HERE ---
DATASET_PATH = '/content/Thread Checker Rev 1, bolts only/training/' # Path to the dataset
OUTPUT_FILE_PATH = '/content/drive/MyDrive/24-679 AI ML Projects/Project 0/class_map.json' # Output JSON file

# --- SCRIPT TO BUILD THE NESTED MAP ---
class_map = {}
class_index = 0
print(f"Scanning directory: {DATASET_PATH}")

# os.walk goes through every folder and subfolder
for root, dirs, files in os.walk(DATASET_PATH):
    # We are looking for the deepest folders which contain the images
    if not dirs: # If a folder has no subdirectories, it's a leaf folder
        # Get the full path label, like 'bolts/Hex_Bolt_0.25'
        full_label = os.path.relpath(root, DATASET_PATH)

        # This is the unique key your model will be trained to recognize
        # Keras/TF use the full path as the class name in this case
        class_map[full_label] = class_index
        class_index += 1

if not class_map:
    print("WARNING: No leaf directories found. Did not generate a map.")
else:
    # Save the structured dictionary to a JSON file
    with open(OUTPUT_FILE_PATH, 'w') as f:
        json.dump(class_map, f, indent=2)
    print(f"✅ Successfully created class map with {len(class_map)} entries at {OUTPUT_FILE_PATH}")



In [None]:
##fine tuning if desired?
"""
# Unfreeze the base model
base_model.trainable = True

# Re-compile the model with a very low learning rate
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), # 10x smaller
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# Continue training for a few more epochs
fine_tune_epochs = 10
total_epochs = epochs + fine_tune_epochs

history_fine = model.fit(train_ds,
                         epochs=total_epochs,
                         initial_epoch=history.epoch[-1],
                         validation_data=val_ds)
"""