The code below extracts the image associated with the mask in the training set and saves it to an assigned directory.

In [None]:
import json
import os
import requests
from concurrent.futures import ThreadPoolExecutor

def download_and_save_image(image_url, image_path):
    response = requests.get(image_url)
    if response.status_code == 200:
        with open(image_path, "wb") as img_file:
            img_file.write(response.content)

def process_entry(entry):
    image_id = entry['data_row']['external_id']
    image_url = entry['data_row']['row_data']


    file_extension = os.path.splitext(image_id)[1]

    image_path = os.path.join(image_directory_path, f"{image_id}")


    download_and_save_image(image_url, image_path)


# Set the paths
json_file_path = r"C:\Users\sebca\Downloads\actual_complete_training_dataset.json"
image_directory_path = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\Image Directory"


# Create the directories if they don't exist
os.makedirs(image_directory_path, exist_ok=True)

# Read JSON data
with open(json_file_path, 'r') as file:
    data = json.load(file)

# Use a ThreadPoolExecutor to process entries concurrently
with ThreadPoolExecutor() as executor:
    executor.map(process_entry, data)


The code below takes the json files containing the mask data from Labelbox and saves them as pngs to a folder of your choice.

In [None]:
import json
import requests
from pathlib import Path
import time
import concurrent.futures
#Downloads the masks using the urls given in the json file, but requires an API key in order to do so.
def download_mask(url, save_path, api_keys, retries=3, delay=5):
    for api_key in api_keys:
        headers = {'Authorization': f'Bearer {api_key}'}
        
        for attempt in range(retries):
            try:
                response = requests.get(url, headers=headers)
#Occasionally, it may take too long for the urls to load. This ensures that the masks are saved when this happens.
                if response.status_code == 200:
                    with open(save_path, 'wb') as file:
                        file.write(response.content)
                    return
                elif response.status_code != 403:  # Not an authentication error, raises the exception
                    response.raise_for_status()
            
            except requests.exceptions.HTTPError as e:
                if e.response.status_code >= 500:  # Server-side error
                    if attempt < retries - 1:  # Not the last attempt
                        print(f"Server-side error ({e}), retrying in {delay} seconds...")
                        time.sleep(delay)
                    else:  # Last attempt, re-raises the exception
                        raise e
                else:
                    raise e

    raise Exception("Failed to download mask with all provided API keys")
#Finds the mask URL and extracts it for saving to the assigned directory.
def process_labelbox_data(labelbox_data):
    external_id = labelbox_data['data_row']['external_id']
    projects = labelbox_data['projects']

    for project_id, project_data in projects.items():
        labels = project_data['labels']

        for label in labels:
            annotations = label['annotations']
            objects = annotations['objects']

            for obj in objects:
                if obj['annotation_kind'] == 'ImageSegmentationMask':
                    mask_url = obj['mask']['url']
                    mask_filename = f"{external_id}_mask.png"
                    save_path = mask_output_folder / mask_filename
                    print(f"Downloading mask for {external_id}...")
                    download_mask(mask_url, save_path, api_keys)
                    print(f"Downloaded mask for {external_id} to {save_path}")

# Loads your Labelbox JSON file
labelbox_export_file = Path(r"C:\Users\sebca\Downloads\actual_complete_training_dataset.json")
with open(labelbox_export_file, 'r') as file:
    labelbox_data_list = json.load(file)

mask_output_folder = Path(r'C:\Users\sebca\OneDrive\Final Year Project\Week 4\Mask Directory')

# Replace with your Labelbox API key - if using multiple accounts, you need to use multiple keys
api_keys = [ Key1, 
            Key2, 
            Key3
]

# Creates the output folder if it doesn't exist
mask_output_folder.mkdir(parents=True, exist_ok=True)

# Use ThreadPoolExecutor to process labelbox_data_list concurrently - makes the process a lot faster.
with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(process_labelbox_data, labelbox_data_list)


The code below is responsible for creating and training the initial machine learning model that can then be further trained later on.

In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Concatenate, UpSampling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import TensorBoard

# Loads and preprocesses the training dataset
def load_dataset(image_dir, mask_dir):
    images, masks = [], []

    for file in os.listdir(image_dir):
        if not file.endswith(".jpg"):
            continue

        print(f"Processing file: {file}") 
        #Reads the image in grayscale and resizs them
        img_path = os.path.join(image_dir, file)
        print(f"Image path: {img_path}")  
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, (256, 256))
        img = img / 255.0
        
        mask_filename = file.replace(".jpg", ".png")  # Replaces .jpg with .png for mask files
        mask_path = os.path.join(mask_dir, mask_filename)
        print(f"Mask path: {mask_path}")  
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        if mask is None:
            print(f"Unable to read mask file: {mask_path}")
            continue
       # Resizes and normalises the masks
        mask = cv2.resize(mask, (256, 256))
        mask = mask / 255.0

        images.append(img)
        masks.append(mask)

    print(f"Loaded {len(images)} images and {len(masks)} masks.")
    #Returns the lists as numpy arrays
    return np.array(images), np.array(masks)

#Defines the model architecture, which was selected as it was easy to run on standard hardware and is fairly standard.
def unet_model(input_size=(256, 256, 1)):
    inputs = tf.keras.Input(input_size)
    #Defines the encoding path
    c1 = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    c1 = Conv2D(64, (3, 3), activation='relu', padding='same')(c1)
    p1 = MaxPooling2D(pool_size=(2, 2))(c1)
    c2 = Conv2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = Conv2D(128, (3, 3), activation='relu', padding='same')(c2)
    #Defines the decoding path
    u2 = UpSampling2D(size=(2, 2))(c2)
    u2 = Concatenate(axis=3)([u2, c1])
    c3 = Conv2D(64, (3, 3), activation='relu', padding='same')(u2)
    c3 = Conv2D(64, (3, 3), activation='relu', padding='same')(c3)
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(c3)
    #Returns the model
    return Model(inputs, outputs)

#Specifies the directories containing the images and masks for training.
image_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\Image Directory"
mask_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\Mask Directory"

images, masks = load_dataset(image_dir, mask_dir)

# Splits the dataset
X_train, X_test, y_train, y_test = train_test_split(images, masks, test_size=0.2, random_state=42)

# Creates and compiles the U-Net model
model = unet_model()
model.compile(optimizer=Adam(lr=1e-4), loss='binary_crossentropy', metrics=['accuracy'])

# Creates a TensorBoard callback
log_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\logs"
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

# Trains the model with the TensorBoard callback, which is incredibly useful for tracking the progress of the model.
model.fit(X_train, y_train, batch_size=8, epochs=40, validation_data=(X_test, y_test), callbacks=[tensorboard_callback], verbose=1)

# Saves the trained model at the specified location for loading at a later date.
model.save(r'C:\Users\sebca\OneDrive\Final Year Project\Week 4\pot_segmentation_model.h5')

The code below preprocesses the training images, trains the model that already has been created, and then runs the algorithm on the input images so to generate the predictions.

In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras import backend as K

#Defines the training dataset and preprocesses it so it can be fed into the algorithm
def load_dataset(image_dir, mask_dir):
    images, masks = [], []

    for file in os.listdir(image_dir):
        if not file.endswith(".png"): #Change to input image type when needed
            continue

        img_path = os.path.join(image_dir, file)
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, (256, 256))
        img = img / 255.0

        mask_filename = file.replace(".jpg", ".png")  # Replace .jpg with .png for mask files
        mask_path = os.path.join(mask_dir, mask_filename)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        if mask is None:
            continue

        mask = cv2.resize(mask, (256, 256))
        mask = mask / 255.0

        images.append(img)
        masks.append(mask)

    return np.array(images), np.array(masks)
#Defines the predicted images should look like and what they should be called.
def save_predicted_image(input_image, prediction, input_filename, output_dir):
    img = input_image.reshape((256, 256))
    pred = prediction.reshape((256, 256))
    pred = cv2.normalize(pred, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
    _, pred = cv2.threshold(pred, 127, 255, cv2.THRESH_BINARY)
    pred = 255 - pred  # Invert the binary mask
    output_filename = os.path.splitext(input_filename)[0] + '.png'
    
    # Resize the prediction to 512x512 before saving
    pred_resized = cv2.resize(pred, (512, 512))
    cv2.imwrite(os.path.join(output_dir, output_filename), pred_resized)
#Creates the batches that all the input images are gathered under so that the algorithm can process them without overloading the memory.
def process_batch(images_batch):
    images_batch = np.array(images_batch) / 255.0
    images_batch = images_batch.reshape(images_batch.shape[0], 256, 256, 1)
    predictions = get_output([images_batch])[0]
    return predictions

# Load the training dataset
image_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 5\Training data for cleaning"
mask_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\Mask Directory"

images, masks = load_dataset(image_dir, mask_dir)

X_train, X_test, y_train, y_test = train_test_split(images, masks, test_size=0.2, random_state=42)

# Loads the model and trains it
model_path = r'C:\Users\sebca\OneDrive\Final Year Project\Week 5\pot_segmentation_model_cleaning.h5'
model = load_model(model_path)
#Outputs the loss and accuracy values to a folder, as well as creates a visual interface with which to watch the models progress as it trains 
log_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 5\logs"
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)
#Defines what the training cycles look like - batch size indicates the number of images being processed at any one time, and the number of epochs is how many times the model goes over the complete dataset.
model.fit(X_train, y_train, batch_size=15, epochs=15, validation_data=(X_test, y_test), callbacks=[tensorboard_callback], verbose=1)

# Saves the trained model
model.save(model_path)

# Loads the input images that the trained model is to process
input_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 5\Processed Images"
output_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 5\Cleaned Images New"

#Ensures that the images are standardised in terms of size before loading and that they are grayscale. 
def load_images_from_folder(folder):
    images = []
    filenames = []  # List to store the filenames
    for filename in os.listdir(folder):
        img = cv2.imread(os.path.join(folder, filename), cv2.IMREAD_GRAYSCALE)
        if img is not None:
            img = cv2.resize(img, (256, 256))  # Resizes the image to the desired shape
            images.append(img)
            filenames.append(filename)  # Adds the filename to the list in the order that they were loaded.
    return images, filenames

test_images, input_filenames = load_images_from_folder(input_dir)
print("Loaded test images: ", len(test_images))

# Loads the model again for prediction
model = load_model(model_path)
model.compile(loss='binary_crossentropy')

print("Model loaded and compiled.")

get_output = K.function([model.layers[0].input], [model.layers[-1].output])
print("Custom predict function created.")

# Batch processing parameters
batch_size = 2
n_batches = len(test_images) // batch_size + (1 if len(test_images) % batch_size != 0 else 0)

all_predictions = []
for batch_idx in range(n_batches):
    start_idx = batch_idx * batch_size
    end_idx = (batch_idx + 1) * batch_size
    batch_images = test_images[start_idx:end_idx]
    batch_filenames = input_filenames[start_idx:end_idx]
    print(f"Processing batch {batch_idx + 1}/{n_batches}")
    batch_predictions = process_batch(batch_images)
    for i in range(len(batch_predictions)):
        save_predicted_image(batch_images[i], batch_predictions[i], batch_filenames[i], output_dir)

print("Predicted images saved.")


The code below takes processes the original training images into predictions, then outputs them into a new directory to serve as training data for the cleaning model.

In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras import backend as K

#Defines the training dataset and preprocesses it so it can be fed into the algorithm
def load_dataset(image_dir, mask_dir):
    images, masks = [], []

    for file in os.listdir(image_dir):
        if not file.endswith(".jpg"):
            continue

        img_path = os.path.join(image_dir, file)
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, (256, 256))
        img = img / 255.0

        mask_filename = file.replace(".jpg", ".png")  # Replace .jpg with .png for mask files
        mask_path = os.path.join(mask_dir, mask_filename)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        if mask is None:
            continue

        mask = cv2.resize(mask, (256, 256))
        mask = mask / 255.0

        images.append(img)
        masks.append(mask)

    return np.array(images), np.array(masks)
#Defines the predicted images should look like and what they should be called.
def save_predicted_image(input_image, prediction, input_filename, output_dir):
    img = input_image.reshape((256, 256))
    pred = prediction.reshape((256, 256))
    pred = cv2.normalize(pred, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
    _, pred = cv2.threshold(pred, 127, 255, cv2.THRESH_BINARY)
    pred = 255 - pred  # Invert the binary mask
    output_filename = os.path.splitext(input_filename)[0] + '.png'
    
    # Resize the prediction to 512x512 before saving
    pred_resized = cv2.resize(pred, (512, 512))
    cv2.imwrite(os.path.join(output_dir, output_filename), pred_resized)
#Creates the batches that all the input images are gathered under so that the algorithm can process them without overloading the memory.
def process_batch(images_batch):
    images_batch = np.array(images_batch) / 255.0
    images_batch = images_batch.reshape(images_batch.shape[0], 256, 256, 1)
    predictions = get_output([images_batch])[0]
    return predictions

# Load the training dataset
image_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\Image Directory"
mask_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\Mask Directory"

images, masks = load_dataset(image_dir, mask_dir)

X_train, X_test, y_train, y_test = train_test_split(images, masks, test_size=0.2, random_state=42)

# Loads the model and trains it
model_path = r'C:\Users\sebca\OneDrive\Final Year Project\Week 4\pot_segmentation_model.h5'
model = load_model(model_path)
#Outputs the loss and accuracy values to a folder, as well as creates a visual interface with which to watch the models progress as it trains 
log_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\logs"
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)
#Defines what the training cycles look like - batch size indicates the number of images being processed at any one time, and the number of epochs is how many times the model goes over the complete dataset.
#model.fit(X_train, y_train, batch_size=15, epochs=1, validation_data=(X_test, y_test), callbacks=[tensorboard_callback], verbose=1)

# Saves the trained model
model.save(model_path)

# Loads the input images that the trained model is to process
input_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\Image Directory"
output_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 5\Training data for cleaning"

#Ensures that the images are standardised in terms of size before loading and that they are grayscale. 
def load_images_from_folder(folder):
    images = []
    filenames = []  # List to store the filenames
    for filename in os.listdir(folder):
        img = cv2.imread(os.path.join(folder, filename), cv2.IMREAD_GRAYSCALE)
        if img is not None:
            img = cv2.resize(img, (256, 256))  # Resizes the image to the desired shape
            images.append(img)
            filenames.append(filename)  # Adds the filename to the list in the order that they were loaded.
    return images, filenames

test_images, input_filenames = load_images_from_folder(input_dir)
print("Loaded test images: ", len(test_images))

# Loads the model again for prediction
model = load_model(model_path)
model.compile(loss='binary_crossentropy')

print("Model loaded and compiled.")

get_output = K.function([model.layers[0].input], [model.layers[-1].output])
print("Custom predict function created.")

# Batch processing parameters
batch_size = 2
n_batches = len(test_images) // batch_size + (1 if len(test_images) % batch_size != 0 else 0)

all_predictions = []
for batch_idx in range(n_batches):
    start_idx = batch_idx * batch_size
    end_idx = (batch_idx + 1) * batch_size
    batch_images = test_images[start_idx:end_idx]
    batch_filenames = input_filenames[start_idx:end_idx]
    print(f"Processing batch {batch_idx + 1}/{n_batches}")
    batch_predictions = process_batch(batch_images)
    for i in range(len(batch_predictions)):
        save_predicted_image(batch_images[i], batch_predictions[i], batch_filenames[i], output_dir)

print("Predicted images saved.")

The code below creates a machine learning model designed to specifically clean up the code. It then trains it, preprocesses the files to be cleaned, then generates the clean results as outputs.

In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Concatenate, UpSampling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

#Defines the training dataset and preprocesses it so it can be fed into the algorithm
def load_dataset(image_dir, mask_dir):
    images, masks = [], []

    for file in os.listdir(image_dir):
        if not file.endswith(".png"):
            continue

        img_path = os.path.join(image_dir, file)
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, (256, 256))
        img = img / 255.0

        mask_filename = file.replace(".jpg", ".png")  # Replace .jpg with .png for mask files
        mask_path = os.path.join(mask_dir, mask_filename)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        if mask is None:
            continue

        mask = cv2.resize(mask, (256, 256))
        mask = mask / 255.0

        images.append(img)
        masks.append(mask)

    return np.array(images), np.array(masks)

#Defines the model architecture
def unet_model(input_size=(256, 256, 1)):
    inputs = tf.keras.Input(input_size)
    c1 = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    c1 = Conv2D(64, (3, 3), activation='relu', padding='same')(c1)
    p1 = MaxPooling2D(pool_size=(2, 2))(c1)
    c2 = Conv2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = Conv2D(128, (3, 3), activation='relu', padding='same')(c2)
    u2 = UpSampling2D(size=(2, 2))(c2)
    u2 = Concatenate(axis=3)([u2, c1])
    c3 = Conv2D(64, (3, 3), activation='relu', padding='same')(u2)
    c3 = Conv2D(64, (3, 3), activation='relu', padding='same')(c3)
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(c3)
    return Model(inputs, outputs)

# Load the training dataset
image_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 5\Training data for cleaning"
mask_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 4\Mask Directory"

images, masks = load_dataset(image_dir, mask_dir)

X_train, X_test, y_train, y_test = train_test_split(images, masks, test_size=0.2, random_state=42)

# Creates and compiles the U-Net model
model = unet_model()
model.compile(optimizer=Adam(lr=1e-4), loss='binary_crossentropy', metrics=['accuracy'])

log_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 5\logs"
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

# Trains the model with the TensorBoard callback
model.fit(X_train, y_train, batch_size=8, epochs=15, validation_data=(X_test, y_test), callbacks=[tensorboard_callback], verbose=1)

# Saves the trained model at the specified location for loading at a later date.
model_path = r'C:\Users\sebca\OneDrive\Final Year Project\Week 5\pot_segmentation_model_cleaning.h5'
model.save(model_path)

#Defines the predicted images should look like and what they should be called.
def save_predicted_image(input_image, prediction, input_filename, output_dir):
    img = input_image.reshape((256, 256))
    pred = prediction.reshape((256, 256))
    pred = cv2.normalize(pred, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
    _, pred = cv2.threshold(pred, 127, 255, cv2.THRESH_BINARY)
    pred = 255 - pred  # Invert the binary mask
    output_filename = os.path.splitext(input_filename)[0] + '.png'

    # Resize the prediction to 512x512 before saving
    pred_resized = cv2.resize(pred, (512, 512))
    cv2.imwrite(os.path.join(output_dir, output_filename), pred_resized)

#Creates the batches that all the input images are gathered under so that the algorithm can process them without overloading the memory.
def process_batch(images_batch):
    images_batch = np.array(images_batch) / 255.0
    images_batch = images_batch.reshape(images_batch.shape[0], 256, 256, 1)
    predictions = get_output([images_batch])[0]
    return predictions

# Loads the input images that the trained model is to process
input_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 5\Processed Images"
output_dir = r"C:\Users\sebca\OneDrive\Final Year Project\Week 5\Cleaned Images"

#Ensures that the images are standardised in terms of size before loading and that they are grayscale. 
def load_images_from_folder(folder):
    images = []
    filenames = []  # List to store the filenames
    for filename in os.listdir(folder):
        img = cv2.imread(os.path.join(folder, filename), cv2.IMREAD_GRAYSCALE)
        if img is not None:
            img = cv2.resize(img, (256, 256))  # Resizes the image to the desired shape
            images.append(img)
            filenames.append(filename)  # Adds the filename to the list in the order that they were loaded.
    return images, filenames

test_images, input_filenames = load_images_from_folder(input_dir)
print("Loaded test images: ", len(test_images))

# Loads the model again for prediction
model = load_model(model_path)
model.compile(loss='binary_crossentropy')

print("Model loaded and compiled.")

get_output = K.function([model.layers[0].input], [model.layers[-1].output])
print("Custom predict function created.")

# Batch processing parameters
batch_size = 2
n_batches = len(test_images) // batch_size + (1 if len(test_images) % batch_size != 0 else 0)

all_predictions = []
for batch_idx in range(n_batches):
    start_idx = batch_idx * batch_size
    end_idx = (batch_idx + 1) * batch_size
    batch_images = test_images[start_idx:end_idx]
    batch_filenames = input_filenames[start_idx:end_idx]
    print(f"Processing batch {batch_idx + 1}/{n_batches}")
    batch_predictions = process_batch(batch_images)
    for i in range(len(batch_predictions)):
        save_predicted_image(batch_images[i], batch_predictions[i], batch_filenames[i], output_dir)

print("Predicted images saved.")
