In [10]:
import os
import json
from PIL import Image
import numpy as np
import keras.api._v2.keras as keras
import random
import helper as hp

In [11]:
MODEL_NAME = "it_is_just_a_test"
MODEL_PATH = os.path.join("..", "tmp", "models")

RESSOURCES_PATH = os.path.join("..", "tmp", "train", "ressources")
TRAIN_DATA_FOLDER = "data_02"

In [12]:
# Image properties
IMAGE_HEIGHT_PX = 180
IMAGE_WIDTH_PX = 320
NUM_CHANNELS = 3
NUM_CLASSES = 4
NUM_POSITIONS = 8

In [13]:
color_mapping = { 'red': 0, 'yellow': 1, 'blue': 2, '': 3 }
label_mapping = { 0: 'red', 1: 'yellow', 2: 'blue', 3: ''}

In [14]:
NORMALIZE_VALUE = 255

def map_labels_to_nummeric(label):
    mapped_label = []

    for pos in label.values():
        mapped_label.append(color_mapping[pos])

    return mapped_label

# Normalize the images so that all values are between 0 and 1
def normalize_images(images):
    return images / NORMALIZE_VALUE

In [15]:
# Base-Model generation
LOSS_FUNCTION = 'categorical_crossentropy'

input_branch_1 = keras.layers.Input(shape=(180, 320, 3))
input_branch_2 = keras.layers.Input(shape=(180, 320, 3))

# Shared convolutional layers for image processing
convolutional_layers = [
    keras.layers.Conv2D(32, (3, 3), activation='relu'),
    keras.layers.MaxPooling2D((2, 2)),
    keras.layers.Conv2D(64, (3, 3), activation='relu'),
    keras.layers.MaxPooling2D((2, 2)),
    keras.layers.Conv2D(128, (3, 3), activation='relu'),
    keras.layers.MaxPooling2D((2, 2)),
    keras.layers.Flatten()
]

# Process first image
x1 = input_branch_1
for layer in convolutional_layers:
    x1 = layer(x1)

# Process second image
x2 = input_branch_2
for layer in convolutional_layers:
    x2 = layer(x2)

x = keras.layers.Concatenate(axis=-1)([x1, x2])
x = keras.layers.Dense(256, activation='relu')(x)
x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Dense(NUM_POSITIONS * NUM_CLASSES, activation='softmax')(x)  # Output layer with 8 * 4 units

output = keras.layers.Reshape((NUM_POSITIONS, NUM_CLASSES))(x)

# Build the model with the two input branches and the output layer
base_model = keras.models.Model(inputs=[input_branch_1, input_branch_2], outputs=output)

optimizer = keras.optimizers.legacy.Adam(learning_rate=0.001)

base_model.compile(optimizer=optimizer, loss=LOSS_FUNCTION, metrics=['accuracy', 'mean_squared_error'])

In [34]:
# Data loading
IN_DEBUG_MODE = False
IMAGE_FOLDER = "Images"
LABELS_FOLDER = "Labels"
JSON_NAME = "scene_results.json"

def get_data(stage):
    labels = []
    images = []

    scene_results_path = os.path.join(RESSOURCES_PATH, TRAIN_DATA_FOLDER, stage, JSON_NAME)

    if IN_DEBUG_MODE:
        print("CURRENT STAGE: " + stage + "\n")
        print("READING SCENE RESULTS AT: " + scene_results_path)

    with open(scene_results_path, 'r') as file:
        scene_results = json.load(file)

    for result in scene_results:
        img = [];

        image_not_found = False

        for img_path in result["imagePaths"]:

            if IN_DEBUG_MODE:
                print("READING IMAGE AT: " + img_path)

            try:
                i = Image.open(os.path.join(RESSOURCES_PATH, TRAIN_DATA_FOLDER, img_path))
            except:
                image_not_found = True
                continue


            # Scale down image (resize)
            i = i.resize((IMAGE_WIDTH_PX, IMAGE_HEIGHT_PX))

            # Channel order of Pillow is different than OpenCV
            i = np.array(i)[:, :, ::-1]

            i = hp.Preprocess.start(i)

            img.append(i)

        if image_not_found == False:
            images.append(img)
            try:
                # np.array(images) # is only necessary to check if the data is homogenous
                labels.append(result["positions"])
            except:
                print(f"Images shape got inhomogenous at: ${result['imagePaths']}")
                images.pop()

            

    if IN_DEBUG_MODE:
        print("\n\n")

    return [np.array(images), np.array(labels)]

In [42]:
# Train model

IN_DEBUG_MODE = False

def fit_model(model, train_data, verify_data):
    train_images = normalize_images(np.array(train_data[0]))
    numberic_train_labels = np.array([map_labels_to_nummeric(label) for label in train_data[1]])
    train_labels = keras.utils.to_categorical(numberic_train_labels, num_classes=NUM_CLASSES)

    verify_images = normalize_images(np.array((verify_data[0])))
    numberic_verify_labels = np.array([map_labels_to_nummeric(label) for label in verify_data[1]])
    verify_labels = keras.utils.to_categorical(numberic_verify_labels, num_classes=NUM_CLASSES)

    if IN_DEBUG_MODE: 
        print("----- SHAPES ------\n")
        print(f"Train labels shape: {train_labels.shape}")
        print(f"Train images shape: {train_images.shape}")

        print(f"Verify labels shape: {verify_labels.shape}")
        print(f"Verify images shape: {verify_images.shape}\n\n")

    model.fit(
        [train_images[:, 0], train_images[:, 1]], train_labels, 
        epochs=10, 
        validation_data=([verify_images[:, 0], verify_images[:, 1]], verify_labels), verbose=1)
    
    return model

train_data = get_data("Train")
verify_data = get_data("Verify")
trained_model = fit_model(base_model, train_data, verify_data)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [43]:
# Save trained model 
trained_model.save(os.path.join(MODEL_PATH, MODEL_NAME))

INFO:tensorflow:Assets written to: ../tmp/models/it_is_just_a_test/assets


INFO:tensorflow:Assets written to: ../tmp/models/it_is_just_a_test/assets


In [46]:
# Load model
model = keras.models.load_model(os.path.join(MODEL_PATH, MODEL_NAME))


In [47]:
# Test model
test_data = get_data("Test")
test_labels = np.array([map_labels_to_nummeric(label) for label in test_data[1]])

if IN_DEBUG_MODE:
    model.summary()

test_images = normalize_images(np.array((test_data[0])))
predictions = model.predict([test_images[:, 0], test_images[:, 1]])

label_index = random.randint(0, len(test_labels)-1)

print("\n\n")
print(f"------ PREDICTION: Index {label_index + 1} --------\n")
predicted_nummeric = np.argmax(predictions, axis=-1)
predicted_readable = np.vectorize(label_mapping.get)(predicted_nummeric)

actual_readable = np.vectorize(label_mapping.get)(test_labels)


print("NUMMERIC: \n")
print(predicted_nummeric[label_index])
print("READABLE: \n")
print(predicted_readable[label_index])
print("\n\n")

print(f"------ ACTUAL: Index {label_index + 1} ------ \n")
print("NUMMERIC: \n")
print(test_labels[label_index])
print("READABLE: \n")
print(actual_readable[label_index])




------ PREDICTION: Index 1 --------

NUMMERIC: 

[0 0 2 0 1 0 2 0]
READABLE: 

['red' 'red' 'blue' 'red' 'yellow' 'red' 'blue' 'red']



------ ACTUAL: Index 1 ------ 

NUMMERIC: 

[1 3 2 1 1 3 2 1]
READABLE: 

['yellow' '' 'blue' 'yellow' 'yellow' '' 'blue' 'yellow']
