In [None]:
import os
import cv2
import numpy as np
from glob import glob
from scipy.io import loadmat
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.python.client import device_lib



In [None]:
os.environ["CUDA_VISABLE_DEVICES"] = "0"

BATCH_SIZE = 2
NUM_CLASSES = 1
DATA_DIR = "./"
NUM_TRAIN_IMAGES = 282
NUM_VAL_IMAGES = 44

train_images = sorted(glob(os.path.join(DATA_DIR, "Images/*")))[:NUM_TRAIN_IMAGES]
train_masks = sorted(glob(os.path.join(DATA_DIR, "Masks/*")))[:NUM_TRAIN_IMAGES]
val_images = sorted(glob(os.path.join(DATA_DIR, "Val_Images/*")))[:NUM_VAL_IMAGES]
val_masks = sorted(glob(os.path.join(DATA_DIR, "Val_Masks/*")))[:NUM_VAL_IMAGES]

def read_image(image_path, mask=False):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    if mask:
        image = tf.image.resize(images=image, size=[tf.shape(image)[0], tf.shape(image)[1]])
        image = tf.image.rgb_to_grayscale(image)
        image = tf.cast(image, tf.float32) / 255.0
        image = tf.where(image >= 0.5, 1.0, 0.0)  # Apply threshold for binary conversion
    else:
        image = tf.image.resize(images=image, size=[tf.shape(image)[0], tf.shape(image)[1]])
        image = tf.keras.applications.resnet50.preprocess_input(image)
    return image

# def read_image(image_path, mask=False, guassain=True):
#     image = tf.io.read_file(image_path)
#     image = tf.image.decode_png(image, channels=3)
#     if mask:
#         image = tf.image.resize(images=image, size=[tf.shape(image)[0], tf.shape(image)[1]])
#         image = tf.image.rgb_to_grayscale(image)
#         image = tf.cast(image, tf.float32) / 255.0
#         image = tf.where(image >= 0.5, 1.0, 0.0)  # Apply threshold for binary conversion
#     else:
#         image = tf.image.resize(images=image, size=[tf.shape(image)[0], tf.shape(image)[1]])
#         image = tf.keras.applications.resnet50.preprocess_input(image)

#         # Apply Gaussian smoothing
#         if guassain:
#             image = tf.cast(image, tf.uint8)
#             image = tf.image.convert_image_dtype(image, tf.float32)
#             image = tf.image.resize(images=image, size=[tf.shape(image)[0], tf.shape(image)[1]])
#             image = tf.image.convert_image_dtype(image, tf.uint8)
#             image = tf.cast(image, tf.float32)
#             image = tf.expand_dims(image, axis=0)
#             image = tf.keras.layers.GaussianNoise(stddev=1.0)(image)
#             image = tf.squeeze(image, axis=0)

#     return image


def load_data(image_list, mask_list):
    image = read_image(image_list)
    mask = read_image(mask_list, mask=True)
    return image, mask

def data_generator(image_list, mask_list):
    dataset = tf.data.Dataset.from_tensor_slices((image_list, mask_list))
    dataset = dataset.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
    return dataset

train_dataset = data_generator(train_images, train_masks)
val_dataset = data_generator(val_images, val_masks)

In [None]:
# def convolution_block(
#     block_input,
#     num_filters=256,
#     kernel_size=3,
#     dilation_rate=1,
#     padding="same",
#     use_bias=False,
# ):
#     x = layers.Conv2D(
#         num_filters,
#         kernel_size=kernel_size,
#         dilation_rate=dilation_rate,
#         padding="same",
#         use_bias=use_bias,
#         kernel_initializer=keras.initializers.HeNormal(),
#     )(block_input)
#     x = layers.BatchNormalization()(x)
#     return tf.nn.relu(x)

def convolution_block(
    block_input,
    filters=256,
    kernel_size=3,
    dilation_rate=1,
    padding="same",
    use_bias=False,
):
    x = layers.Conv2D(
        filters,
        kernel_size=kernel_size,
        dilation_rate=dilation_rate,
        padding=padding,
        use_bias=use_bias,
        kernel_initializer=keras.initializers.HeNormal(),
    )(block_input)
    x = layers.BatchNormalization()(x)
    return tf.nn.relu(x)


In [None]:
def DilatedSpatialPyramidPooling(dspp_input):
    dims = tf.keras.backend.int_shape(dspp_input)
    pool_size = (dims[1] // 4, dims[2] // 4) if dims[1] is not None and dims[2] is not None else None
    if pool_size is not None:
        x = layers.AveragePooling2D(pool_size=pool_size)(dspp_input)
        x = convolution_block(x, kernel_size=1, use_bias=True)
        out_pool = layers.UpSampling2D(size=pool_size, interpolation="bilinear")(x)
    else:
        out_pool = None

    out_1 = convolution_block(dspp_input, kernel_size=1, dilation_rate=1)
    out_6 = convolution_block(dspp_input, kernel_size=3, dilation_rate=6)
    out_12 = convolution_block(dspp_input, kernel_size=3, dilation_rate=12)
    out_18 = convolution_block(dspp_input, kernel_size=3, dilation_rate=18)

    if out_pool is not None:
        x = layers.Concatenate(axis=-1)([out_pool, out_1, out_6, out_12, out_18])
    else:
        x = layers.Concatenate(axis=-1)([out_1, out_6, out_12, out_18])

    output = convolution_block(x, kernel_size=1)
    return output

def DeeplabV3Plus(num_classes):
    base_model = keras.applications.ResNet50(
        weights="imagenet", include_top=False, input_shape=(None, None, 3)
    )
    x = base_model.get_layer("conv4_block6_2_relu").output
    x = DilatedSpatialPyramidPooling(x)

    input_a = layers.UpSampling2D(size=(4, 4), interpolation="bilinear")(x)
    input_b = base_model.get_layer("conv2_block3_2_relu").output
    input_b = convolution_block(input_b, filters=48, kernel_size=1)

    x = layers.Concatenate(axis=-1)([input_a, input_b])
    x = convolution_block(x, filters=256)
    x = convolution_block(x, filters=256)
    x = layers.UpSampling2D(size=(4, 4), interpolation="bilinear")(x)
    model_output = layers.Conv2D(num_classes, kernel_size=(1, 1), padding="same", activation="sigmoid")(x)
    return keras.Model(inputs=base_model.input, outputs=model_output)



In [None]:
# with tf.device("/GPU:0"):
#     model = DeeplabV3Plus(num_classes=NUM_CLASSES)
#     model.build((None, None, None, 3))
#     model.summary()
#     loss = keras.losses.BinaryCrossentropy(from_logits=False)
#     model.compile(
#         optimizer=keras.optimizers.Adam(learning_rate=0.00075),
#         loss=loss,
#         metrics=["accuracy"],
#     )

#     history = model.fit(train_dataset, validation_data=val_dataset, epochs=50)

# plt.plot(history.history["loss"])
# plt.title("Training Loss")
# plt.ylabel("loss")
# plt.xlabel("epoch")
# plt.show()

# plt.plot(history.history["accuracy"])
# plt.title("Training Accuracy")
# plt.ylabel("accuracy")
# plt.xlabel("epoch")
# plt.show()

# plt.plot(history.history["val_loss"])
# plt.title("Validation Loss")
# plt.ylabel("val_loss")
# plt.xlabel("epoch")
# plt.show()

# plt.plot(history.history["val_accuracy"])
# plt.title("Validation Accuracy")
# plt.ylabel("val_accuracy")
# plt.xlabel("epoch")
# plt.show()

In [None]:
# lr0.001 50 epoch -> overfitting
# lr0.005 50 epoch -> overfitting , suck
# lr0.0005 50 epoch -> chat said its good but theres so many spike in learning curve
# lr0.0005 25 epoch -> look the same as above
# lr0.001 25 epoch -> good scenario?
# lr0.0015 25 epoch -> overfitting
# lr0.00075 25 epoch -> better
# lr0.00075 50 epoch -> better

# model.save("model_any_new_00075_50.h5")

In [None]:
loaded_model = keras.models.load_model("model_any_new_00075_50.h5")

def prediction_mask(model, image_path):
    image = read_image(image_path)
    predicted_mask = model.predict(tf.expand_dims(image, axis=0))[0]
    return predicted_mask


# Specify the path of the test image
test_image_dir = "C:/Users/kaewp/DeepLabV3/Test/old_test/"


In [None]:
def test_data_generator(image_list, mask_list):
    dataset = tf.data.Dataset.from_tensor_slices((image_list, mask_list))
    dataset = dataset.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(1)  # Batch size of 1 for testing set
    return dataset

test_images = sorted(glob(os.path.join(DATA_DIR, "Test/old_test/*")))
test_masks = sorted(glob(os.path.join(DATA_DIR, "Test/old_test_mask/*")))

test_dataset = test_data_generator(test_images, test_masks)

loss, accuracy = loaded_model.evaluate(test_dataset)
print("Testing Loss:", loss)
print("Testing Accuracy:", accuracy)

In [None]:
import math

# ref_image_path = "C:/Users/kaewp/DeepLabV3/images2/images2/frame-1612860770101.jpg"
# ref_2_image_path = "C:/Users/kaewp/DeepLabV3/images2/images2/frame-1612866803296.jpg"
# test_image_path = "images2/images2/frame-1612865449492.jpg"

ref_image_path = "Test/old_test/test2.png"
ref_2_image_path = "Test/old_test/test27.png"
test_image_path = "Test/old_test/test18.png"


ref_mask = prediction_mask(loaded_model, ref_image_path)
ref_2_mask = prediction_mask(loaded_model, ref_2_image_path)
test_mask = prediction_mask(loaded_model, test_image_path)

threshold = 0.5

ref_mask = np.where(ref_mask >= threshold, 1, 0)
ref_2_mask = np.where(ref_2_mask >= threshold, 1, 0)
test_mask = np.where(test_mask >= threshold, 1, 0)

ref_mask = ref_mask.astype(np.uint8) * 255
ref_2_mask = ref_2_mask.astype(np.uint8) * 255
test_mask = test_mask.astype(np.uint8) * 255

ref_mask = cv2.resize(ref_mask, (ref_mask.shape[1], ref_mask.shape[0]))  # Resize to original dimensions
ref_2_mask = cv2.resize(ref_2_mask, (ref_2_mask.shape[1], ref_2_mask.shape[0]))  # Resize to original dimensions
test_mask = cv2.resize(test_mask, (test_mask.shape[1], test_mask.shape[0]))  # Resize to original dimensions

ref_edge = cv2.Canny(ref_mask, 0, 255)
ref_2_edge = cv2.Canny(ref_2_mask, 0, 255)
test_edge = cv2.Canny(test_mask, 0, 255)

real_height = 3.35
real_ref_height = 3.18

real_dis_magnitude = None

ref_point = None
ref_2_point = None
test_point = None

collision_ref_2_edge = None
collision_ref_edge = None
collision_test_edge = None

line_start_point = None
line_end_point = None
saved_line_start_point = None
saved_line_end_point = None
line_points = []
line_thickness = 2
line_BGR_color = (0, 255, 255)  #Yellow
dot_radius = 3
dot_ref_color = (0, 0, 255) #
dot_edge_color = (255, 255, 0)

def draw_line(event, x, y, flags, param):
    global line_start_point, line_end_point
    global composite_image
    global saved_line_start_point, saved_line_end_point
    global ref_point, ref_2_point, test_point
    global collision_ref_edge, collision_ref_2_edge, collision_test_edge
    global real_dis_magnitude
    global ref_edge, ref_2_edge, test_edge
    if event == cv2.EVENT_LBUTTONDOWN:
        line_start_point = (x, y)
    elif event == cv2.EVENT_LBUTTONUP:
        line_end_point = (x, y)
        composite_image_copy = composite_image.copy()
        collision_ref_edge, collision_ref_point = line_edge_collision(line_start_point, line_end_point, ref_edge)
        collision_ref_2_edge, collision_ref_2_point = line_edge_collision(line_start_point, line_end_point, ref_2_edge)
        collision_test_edge, collision_test_point = line_edge_collision(line_start_point, line_end_point, test_edge)
        if collision_ref_edge:
            saved_line_start_point = line_start_point
            saved_line_end_point = line_end_point
            ref_point = collision_ref_point
            cv2.line(composite_image_copy, line_start_point, line_end_point, line_BGR_color, line_thickness)
            cv2.circle(composite_image_copy, collision_ref_point, dot_radius, dot_ref_color, -1)
            print("Collision detected between the line and the reference edge. Dot drawn at collision point.")
            cv2.imshow("Edge Image", composite_image_copy)
        else:
            print("No collision detected between the line and the reference edge.")

        if collision_ref_2_edge:
            ref_2_point = collision_ref_2_point
            # cv2.circle(composite_image_copy, collision_ref_2_point, dot_radius, dot_edge_color, -1)
            print("Collision detected between the line and the edge. Dot drawn at collision point.")
            cv2.imshow("Edge Image", composite_image_copy)
        else:
            print("No collision detected between the line and the edge.")
        if collision_ref_2_edge and collision_ref_edge:
            print("passed value", ref_point[0], ref_point[1], ref_2_point[0], ref_2_point[1])
            print("real", collision_ref_point[0], collision_ref_point[1], collision_ref_2_point[0], collision_ref_2_point[1])
            real_dis_magnitude = cal_distance(collision_ref_point[0], collision_ref_point[1], collision_ref_2_point[0], collision_ref_2_point[1])
            print(real_dis_magnitude)
        else:
            print("UNKNOWN")
        if collision_ref_point and collision_ref_2_edge and collision_test_edge:
            cv2.circle(composite_image_copy, collision_test_point, dot_radius, dot_edge_color, -1)
            temp_dis = cal_distance(collision_ref_point[0], collision_ref_point[1], collision_test_point[0], collision_test_point[1])
            estimate_dis = (temp_dis * abs(real_ref_height - real_height)) / real_dis_magnitude
            print("estimate distance =", estimate_dis)
            cv2.imshow("Edge Image", composite_image_copy)

def line_edge_collision(start_point, end_point, edge_mask):
    x0, y0 = start_point
    x1, y1 = end_point

    dx = abs(x1 - x0)
    dy = abs(y1 - y0)
    sx = -1 if x0 > x1 else 1
    sy = -1 if y0 > y1 else 1
    err = dx - dy

    while True:
        if edge_mask[y0, x0] != 0:
            return True, (x0, y0)

        if x0 == x1 and y0 == y1:
            break

        e2 = 2 * err
        if e2 > -dy:
            err -= dy
            x0 += sx
        if e2 < dx:
            err += dx
            y0 += sy

    return False, None

def cal_distance(x1, y1, x2, y2):
    dx = x2 - x1
    dy = y2 - y1
    distance = math.sqrt(dx ** 2 + dy **2)
    return distance

cv2.namedWindow("Edge Image")
cv2.setMouseCallback("Edge Image", draw_line)

background_image = cv2.imread(test_image_path)

mask = test_mask

# Resize the mask to match the background image if necessary // Mask and Edge of the input one
if background_image.shape[:2] != mask.shape[:2]:
    mask = cv2.resize(mask, (background_image.shape[1], background_image.shape[0]))
if background_image.shape[:2] != test_edge.shape[:2]:
    test_edge = cv2.resize(test_edge, (background_image.shape[1], background_image.shape[0]))

# Convert the binary mask to a 3-channel mask
mask_rgb = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
mask_rgb[np.where(mask_rgb[:, :, 0])] = (0, 0, 255)
edge_rgb = cv2.cvtColor(ref_2_edge, cv2.COLOR_GRAY2BGR)
edge_rgb[np.where(edge_rgb[:, :, 0])] = (255, 255, 0)
test_edge_rgb = cv2.cvtColor(test_edge, cv2.COLOR_GRAY2BGR)
test_edge_rgb[np.where(test_edge_rgb[:, :, 0])] = (255, 0, 0)
ref_edge_rgb = cv2.cvtColor(ref_edge, cv2.COLOR_GRAY2BGR)
ref_edge_rgb[np.where(ref_edge_rgb[:, :, 0])] = (0, 255, 255)

# Create a composite image by overlaying the mask on the background image
# composite_image = cv2.bitwise_or(background_image, mask_rgb)
composite_image = cv2.bitwise_or(background_image, ref_edge_rgb)
composite_image = cv2.bitwise_or(composite_image, edge_rgb)
composite_image = cv2.bitwise_or(composite_image, test_edge_rgb)

cv2.imshow("Edge Image", composite_image)
# Display the composite image

cv2.waitKey(0)

# Close the window
cv2.destroyAllWindows()

In [None]:
import tkinter as tk

def get_input():
    value = entry.get()  # Get the input value from the Entry widget
    print("Entered value:", value)
    root.destroy()  # Close the GUI window

# Create the GUI window
root = tk.Tk()

# Create an Entry widget for user input
entry = tk.Entry(root)
entry.pack()

# Create a button to submit the input
button = tk.Button(root, text="Submit", command=get_input)
button.pack()

# Start the GUI event loop
root.mainloop()


In [2]:
import cv2
import numpy as np
def get_input(event, x, y, flags, param):
    if event == cv2.EVENT_LBUTTONDOWN:
        value = input("Enter a value: ")
        print("Entered value:", value)
        cv2.destroyAllWindows()

# Create a blank image to serve as the GUI window
window = 255 * np.ones((200, 400, 3), np.uint8)

# Create a named window and set the mouse callback
cv2.namedWindow("Input Window")
cv2.setMouseCallback("Input Window", get_input)

# Display the GUI window
cv2.imshow("Input Window", window)
cv2.waitKey(0)


: 

: 