# Introduction to Libraries

In [None]:
import os #to work with the operating system
import shutil #to save files if needed
import numpy as np #to create numpy array for manipulating image and storing
from PIL import Image, ImageDraw #for displaying image, opening image and drawing on image
import matplotlib.pyplot as plt #to display grid
import json #to easily read json file
import random #to pick random image
import tensorflow as tf #to load up tensors of image and masks
import keras #to create and train the deep learning model
from keras import layers #to work with the layers of the model
import multiprocessing #to allow for multiple gpus to be use for speed
# loading dependencies



In [None]:
# do this first because can't do later
def configure_tensorflow_gpu():
    # turn on multiple cores.  Once started you cannot change it.
    num_cores = multiprocessing.cpu_count()
    tf.config.threading.set_intra_op_parallelism_threads(num_cores)
    # tf.config.threading.set_inter_op_parallelism_threads(2)
    # Make sure it is taking advantage of the gpu instead of cpu.  Faster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu,True)
        except RuntimeError as e:
            print(f"no gpu from {e}")
    else:
        print("no gpu")
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'
    return num_cores

In [None]:
configure_tensorflow_gpu()

In [None]:
def get_image_path(img_num):
    img_path = f"img{img_num}.png"
    # this is where I stored it, but you might need to change, and we will have to get file structure figured out before sending
    img_path = os.path.join("../final_project/google_collab_imgs", img_path)

    return img_path

# Model Design & Implementation


# Training Process

In [None]:
# created smaller function to help just get the json file.  I used hardcoded values of 293 images with 2 augmented images each
def get_json_path(img_num):
    img_num = img_num if img_num < 294 else img_num - 293 if img_num < 587 else img_num - 586
    json_name = f"img{img_num}.json"
    json_path = os.path.join("../final_project/google_collab_imgs",json_name)
    return json_path

In [None]:
# also initally did this step so we could look at the augmentations and observe testing, however, I know there is some functionality where the model will
# do this automatically, but wanted to do it ourselves
def augment_img(img_num):
    img_path = get_image_path(img_num)
    img = Image.open(img_path)
    img = img.convert("RGB")
    img_array = np.array(img)
    xy = get_x_y_coordinates_from_json(img_num)
    img_array_augmented = random_img_augmentation(img_array, xy)
    return img_array_augmented


In [None]:
def get_x_y_coordinates_from_json(img_num):
    # get_json_path is hardcoed to 293 only 3 times
    json_path = get_json_path(img_num)
    with open(json_path, 'r') as f:
        data = json.load(f)
        points = data['shapes'][0]['points']
        avg_x = sum([point[0] for point in points]) / len(points)
        avg_y = sum([point[1] for point in points]) / len(points)
    return [avg_x, avg_y]

In [None]:
# also initally did this step so we could look at the augmentations and observe testing, can do automatically but we wanted to do ourselves
def augment_img(img_num):
    img_path = get_image_path(img_num)
    img = Image.open(img_path)
    img = img.convert("RGB")
    img_array = np.array(img)
    xy = get_x_y_coordinates_from_json(img_num)
    img_array_augmented = random_img_augmentation(img_array, xy)
    return img_array_augmented



In [None]:
def random_img_augmentation(img, xy):
    # Randomly choose a transformation from the ones below.  We also thought about flipping and rotating, but our actual bills
    # were already flipped and rotated sufficiently with the images of the bills in all kinds of positions, we didn't think this
    # would provide enough difference.  Each value below was tweaked through trial and error to find sufficient blurrings/lightings/darkenings
    # to truly work the model but not too much to leave it completely unreadable.
    # The choice of adding holes had too many of the images with the holes completely away from the actual bills, so the choice was made
    # to include the x and y coordinates of the money to center the holes around where the money was located to make the holes more impactful
    choice = np.random.choice([ 'darken','lighten', 'blur', 'add_holes'])
    from PIL import Image, ImageFilter
    pil_img = Image.fromarray(img)
    if choice == 'darken':
        # found a good level here.  0.9 couldn't see any difference.  Wanted it to be something worthwhile
        factor = np.random.uniform(0.3, 0.1)
        pil_img = pil_img.point(lambda p: p * factor)
    elif choice == 'lighten':
        factor = np.random.uniform(2, 4)
        pil_img = pil_img.point(lambda p: min(255, p * factor))
    elif choice == 'blur':
        blur_val = np.random.uniform(5, 13)
        pil_img = pil_img.filter(ImageFilter.GaussianBlur(radius=blur_val))
    elif choice == 'add_holes':
        avg_x, avg_y = xy
        draw = ImageDraw.Draw(pil_img)
        num_holes = np.random.randint(3, 10)
        for _ in range(num_holes):
            #
            x1 = int(avg_x) + np.random.randint(-400, 400)
            y1 = int(avg_y) + np.random.randint(-400, 400)
            x2 = x1 + np.random.randint(50, 200)
            y2 = y1 + np.random.randint(50, 200)
            draw.rectangle([x1, y1, x2, y2], fill=(0, 0, 0))
    return np.array(pil_img)

In [None]:
# binary masks with the img_number coming in, the label_array of what we want masked crop, money, one, five, ten, etc
# bill_imagined wasn't working well here.
def create_binary_mask_from_json(img_num, label_array, img_shape):
    # so we don't have to create so many json files
    json_path = get_json_path(img_num)
    width, height = img_shape

    masks  = np.zeros((len(label_array), height, width), dtype = np.uint8)
    with open(json_path, 'r') as f:
        tags = json.load(f)

    for i, label in enumerate(label_array):
        temp_img = Image.new('L', (width,height),0)
        draw = ImageDraw.Draw(temp_img)

        for shape in tags['shapes']:
            if shape['label'] == label:
                points = shape['points']

                if len(points) == 2:
                    draw.rectangle([tuple(points[0]), tuple(points[1])], fill = 255)
                else:
                    draw.polygon([tuple(point) for point in points], fill = 255)
        temp_array = np.array(temp_img)
        masks[i] = np.maximum(masks[i], temp_array)
    return masks

In [None]:
def create_segmentation_dataset(label_array = ["money"],iters = 2, target_size = (256,256)):
    images = []
    masks = []
    imgs_processed = 0

    for img_num in range(1,294):
        json_path = get_json_path(img_num)
        img_path = get_image_path(img_num)
        try:
            image = Image.open(img_path).convert('RGB')
            img_size = image.size
            image_resized = np.array(image.resize(target_size)) / 255.0
            mask_arrays = create_binary_mask_from_json(img_num,label_array, img_size)

            # resize all masks and keep them separate for multi-channel approach
            processed_masks = []
            for mask_array in mask_arrays:
                mask_resized = np.array(Image.fromarray(mask_array).resize(target_size, Image.NEAREST))
                mask_resized = (mask_resized > 127).astype(np.float32)
                mask_resized = np.expand_dims(mask_resized, axis=-1)
                processed_masks.append(mask_resized)

            # Create multi-channel mask: concatenates along channel dimension
            combined_mask = np.concatenate(processed_masks, axis=-1)  # Shape: (256, 256, num_labels)
            for img_augmenting in range(2):
                augmented_img_array = augment_img(img_num)
                image_augmented = Image.fromarray(augmented_img_array)
                image_augmented_resized = np.array(image_augmented.resize(target_size)) / 255.0
                images.append(image_augmented_resized)
                masks.append(combined_mask)  # SAME JSON because SAME IMAGE JUST AUGMENTED
                imgs_processed += 1
            imgs_processed += 1
            images.append(image_resized)
            masks.append(combined_mask)  # SAME JSON because SAME IMAGE JUST AUGMENTED
            if imgs_processed % 50 == 0:
                print(f"processed {imgs_processed}/879")
        except Exception as e:
            print(f"Error processing {img_path}: {e}")
            continue
    images = np.array(images)
    masks = np.array(masks)
    return images, masks

In [None]:
images, masks = create_segmentation_dataset(label_array = ["one","five","ten","twenty","fifty","hundred","money","crop"])

In [None]:
def plot_image_and_masks(image, masks, label_array, img_num):
    num_masks = masks.shape[-1]
    total_plots = 1 + num_masks  # 1 for image + number of masks
    cols = 3
    rows = (total_plots + cols - 1) // cols  # Calculate required rows

    fig, axes = plt.subplots(rows, cols, figsize=(5 * cols, 5 * rows))
    axes = axes.flatten()  # Flatten to easily index

    # Plot the original image
    axes[0].imshow(image)
    axes[0].set_title(f"Original Image num{img_num}")
    axes[0].axis("off")

    # Plot each mask
    for i in range(num_masks):
        axes[i + 1].imshow(image)
        axes[i + 1].imshow(masks[:, :, i], cmap="gray", alpha=0.7)
        axes[i + 1].set_title(f"Mask: {label_array[i]}")
        axes[i + 1].axis("off")

    # Hide any unused subplots
    for j in range(total_plots, len(axes)):
        axes[j].axis("off")

    plt.tight_layout()
    plt.show()

In [None]:
images[1].shape

In [None]:
sample_index = random.randint(0, images.shape[0] - 1)
sample_image = images[sample_index]
sample_masks = masks[sample_index]
plot_image_and_masks(sample_image, sample_masks, ["one","five","ten","twenty","fifty","hundred","money","crop"],sample_index+1)

In [None]:
# Hyperparameters
IMG_SIZE = 256          # make images same size
BATCH_SIZE = 8          # 8 seems to be pretty standard
AUTOTUNE = tf.data.AUTOTUNE
EPOCHS = 10             # can increase if needed
BASE_FILTERS = 96
LEARNING_RATE = 5e-4    # made a high learning rate, for quicker learning
DROPOUT_RATE = 0.2      # dropout for some regularization
L2_REG = 1e-4           # weight decay, tried various
AUGMENT_DATA = False     # False, because we did it on our own, so we could see the image results and have greater control

In [None]:
def double_conv_block(x, num_filters, dropout_rate=0.0):
    # First Conv2D layer with L2 regularization
    x = layers.Conv2D(filters=num_filters, kernel_size=(3, 3), padding="same", activation="relu",
                     kernel_regularizer=keras.regularizers.l2(L2_REG))(x)
    x = layers.BatchNormalization()(x)
    # Second Conv2D layer
    x = layers.Conv2D(filters=num_filters, kernel_size=(3, 3), padding="same", activation="relu")(x)
    x = layers.BatchNormalization()(x)
    return x


In [None]:
def encoder_block(x, num_filters):
    # x = # feature extraction (same H,W)
    x = double_conv_block(x, num_filters, dropout_rate=DROPOUT_RATE)
    # p = # downsample by 2
    p = layers.MaxPooling2D(pool_size=(2, 2), padding="valid")(x)

    return x, p

In [None]:
def decoder_block(x, skip, num_filters):
    # x = # upsample (H,W) x2
    x = layers.Conv2DTranspose(filters=num_filters, kernel_size=(3, 3), strides=2, padding="same")(x)
    # x = # fuse with encoder skip
    x = layers.Concatenate()([x, skip])
    # x = # refine features
    x = double_conv_block(x, num_filters, dropout_rate=DROPOUT_RATE/2)  # Less dropout in decoder

    return x

In [None]:
def make_unet_1(input_shape=(256, 256, 3), base_num_filters=32, num_classes=1, final_act=None):
    h, w, _ = input_shape
    # multiples of 16 needed
    assert h % 16 == 0 and w % 16 == 0

    inputs = keras.Input(shape=input_shape)

    # Encoder
    f1, p1 = encoder_block(inputs, base_num_filters)  # 256 -> 128
    f2, p2 = encoder_block(p1, base_num_filters * 2) # 128 -> 64
    f3, p3 = encoder_block(p2, base_num_filters * 4)# 64  -> 32
    f4, p4 = encoder_block(p3, base_num_filters * 8) # 32  -> 16

    # Bottleneck
    bn = double_conv_block(p4, base_num_filters * 16, dropout_rate=DROPOUT_RATE)

    # Decoder
    d4 = decoder_block(bn, f4, base_num_filters * 8)# 16 -> 32
    d3 = decoder_block(d4, f3, base_num_filters * 4)# 32 -> 64
    d2 = decoder_block(d3, f2, base_num_filters * 2)# 64 -> 128
    d1 = decoder_block(d2, f1, base_num_filters)# 128 -> 256

    # Head
    act = final_act if final_act is not None else ('sigmoid' if num_classes == 1 else 'softmax')
    outputs = layers.Conv2D(num_classes, 1, activation=act, padding='same')(d1)

    return keras.Model(inputs, outputs, name='U-Net')

In [None]:
model_money = make_unet_1(input_shape=(IMG_SIZE,IMG_SIZE,3), num_classes=1, final_act='sigmoid')

In [None]:
# gets just the masks associated with the money.
money_masks = masks[:,:,:,-2]

In [None]:
sample_index = random.randint(0, images.shape[0] - 1)
sample_image = images[sample_index]
sample_money_mask = money_masks[sample_index]
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.imshow(sample_image)
plt.title("Original Image")
plt.axis("off")

plt.subplot(1, 2, 2)
plt.imshow(sample_money_mask, cmap="gray")
plt.title("Money Mask")
plt.axis("off")

plt.tight_layout()
plt.show()

In [None]:
dataset_money = tf.data.Dataset.from_tensor_slices((images, money_masks))
dataset_money = dataset_money.shuffle(len(images), seed=21)

val_split = 0.15
val_size = int(len(images) * val_split)
train_size = len(images)- val_size

train_dataset_money = dataset_money.skip(val_size).batch(BATCH_SIZE)
val_dataset_money = dataset_money.take(val_size).batch(BATCH_SIZE)
print(f"Train samples: {train_size}, Val samples: {val_size}")


In [None]:
dataset = tf.data.Dataset.from_tensor_slices((images, masks))
dataset = dataset.shuffle(len(images), seed=21)

val_split = 0.2
val_size = int(len(images) * val_split)
train_size = len(images)- val_size

train_dataset = dataset.skip(val_size).batch(BATCH_SIZE)
val_dataset = dataset.take(val_size).batch(BATCH_SIZE)
print(f"Train samples: {train_size}, Val samples: {val_size}")


In [None]:
model_money.compile(
    optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss='binary_crossentropy', #might want to retry dice loss/combined loss
    metrics=['accuracy']
)

In [None]:
history_money = model_money.fit(
            train_dataset_money,
            validation_data=val_dataset_money,
            batch_size=BATCH_SIZE,
            epochs=EPOCHS,

        )

In [None]:
def plot_history(hist, log_scale=False):
  plt.figure(figsize=(8,5))
  plt.plot(hist.history["loss"], color="blue", linestyle="-", label="train")
  plt.plot(hist.history["val_loss"], color="red", linestyle="--", label="val")

  plt.xlabel("Epoch")
  plt.ylabel("Loss")
  plt.title("Training vs Validation Loss")
  plt.legend()
  plt.grid(True, which="both", ls=":")
  if log_scale:
      plt.yscale("log")
      plt.ylabel("Loss (log scale)")
  plt.show()

In [None]:
plot_history(history_money)

In [None]:
def plot_predictions_grid(images, masks, preds=None, num_samples=9):
    n = min(num_samples, len(images))
    cols = 3 if preds is not None else 2
    fig, axes = plt.subplots(nrows=n, ncols=cols, figsize=(5*cols, 3*n))

    for idx in range(num_samples):
        imag = images[idx]
        mask = masks[idx]
        pred  = preds[idx] if preds is not None else None

        # Original Image
        axes[idx, 0].set_title("Image")
        axes[idx, 0].imshow(imag)
        axes[idx, 0].axis("off")

        # Ground Truth Mask
        axes[idx, 1].set_title("Ground Truth Mask")
        axes[idx, 1].imshow(imag)
        axes[idx, 1].imshow(mask, cmap="grey", alpha=0.7)
        axes[idx, 1].axis("off")

        # Predicted Mask
        if preds is not None:
            axes[idx, 2].set_title("Predicted Mask")
            axes[idx, 2].imshow(imag)
            axes[idx, 2].imshow(pred, cmap="grey", alpha=0.7)
            axes[idx, 2].axis("off")

    plt.tight_layout()
    plt.show()

In [None]:
images_samp_money, masks_samp_money = next(iter(train_dataset_money))
preds = model_money.predict(images_samp_money)
preds = (preds > 0.6).astype("float32")  # threshold for binary mask, this we were tweaking and working with the find best 0.6 worked best

plot_predictions_grid(images_samp_money, masks_samp_money, preds, num_samples=3)



In [None]:
# Creating masks

In [None]:
# End creating masks and iamges

In [None]:
# Start with loading data

In [None]:
# End loading data

In [None]:
# Start building UNET

In [None]:
# End creating UNET

In [None]:
# Start train model

In [None]:
# End train model

In [None]:
# creating file structure (YOLO vs ViT)

In [None]:
# end creating file structure

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam

# Example: 256x256 RGB images
input_shape = (256, 256, 3)
num_classes = 3  # change this to your number of classes

model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
    MaxPooling2D((2, 2)),

    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),

    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),

    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')  # for multi-class
])

model.compile(
    optimizer=Adam(learning_rate=1e-3),
    loss='categorical_crossentropy',   # use 'binary_crossentropy' + Dense(1, sigmoid) for binary
    metrics=['accuracy']
)

model.summary()


# Evaluate Results

In [None]:
# Start visualizations

In [None]:
def plot_history(hist, log_scale=False):
  plt.figure(figsize=(8,5))
  plt.plot(hist.history["loss"], color="blue", linestyle="-", label="train")
  plt.plot(hist.history["val_loss"], color="red", linestyle="--", label="val")

  plt.xlabel("Epoch")
  plt.ylabel("Loss")
  plt.title("Training vs Validation Loss")
  plt.legend()
  plt.grid(True, which="both", ls=":")
  if log_scale:
      plt.yscale("log")
      plt.ylabel("Loss (log scale)")
  plt.show()

In [None]:
plot_history(history)

NameError: name 'history' is not defined

In [None]:
# Get a batch of test data
test_images, test_masks = next(iter(val_dataset))
test_predictions = model.predict(test_images)
test_predictions_binary = (test_predictions > 0.95).astype("float32")  # I tried various thresholds...
# could tweak this more 0.95 is high, but seems to work should try on more images, ie our test images that we didn't train the model on


NameError: name 'val_dataset' is not defined

In [None]:
def calculate_iou(y_true, y_pred):
    """Calculate Intersection over Union (IoU) for binary masks"""
    y_true = y_true.flatten()
    y_pred = y_pred.flatten()

    intersection = np.sum(y_true * y_pred)
    union = np.sum(y_true) + np.sum(y_pred) - intersection

    if union == 0:
        return 1.0 if intersection == 0 else 0.0
    return intersection / union

# Calculate IoU for each sample in the batch
ious = []
for i in range(len(test_images)):
    iou = calculate_iou(test_masks[i].numpy(), test_predictions_binary[i])
    ious.append(iou)

NameError: name 'test_images' is not defined

In [None]:
def plot_segmentation_results(images, true_masks, pred_masks, num_samples=6):
    """Plot comparison of original images, true masks, and predicted masks"""
    n = min(num_samples, len(images))
    fig, axes = plt.subplots(nrows=n, ncols=3, figsize=(15, 3*n))

    for i in range(n):
        img = images[i].numpy()
        true_mask = np.squeeze(true_masks[i].numpy())
        pred_mask = np.squeeze(pred_masks[i])

        # Original Image
        axes[i, 0].imshow(img)
        axes[i, 0].set_title(f"Original Image {i+1}")
        axes[i, 0].axis("off")

        # True Mask Overlay
        axes[i, 1].imshow(img)
        axes[i, 1].imshow(true_mask, cmap="Reds", alpha=0.5)
        axes[i, 1].set_title(f"True Mask {i+1}")
        axes[i, 1].axis("off")

        # Predicted Mask Overlay
        axes[i, 2].imshow(img)
        axes[i, 2].imshow(pred_mask, cmap="Blues", alpha=0.5)
        axes[i, 2].set_title(f"Predicted Mask {i+1}\nIoU: {ious[i]:.3f}")
        axes[i, 2].axis("off")

    plt.tight_layout()
    plt.show()

In [None]:
# Get a batch of test data
test_images, test_masks = next(iter(val_dataset))
test_predictions = model.predict(test_images)
test_predictions_binary = (test_predictions > 0.95).astype("float32")  # Use better threshold


NameError: name 'val_dataset' is not defined

In [None]:
plot_segmentation_results(test_images, test_masks, test_predictions_binary)

NameError: name 'test_images' is not defined