# SERLI - AI4Industry - 2025

---
Notebook du Groupe rouge foncé (on aurait préféré le bleu)
---

## Import des librairies

In [37]:
import cv2
import os
import json
from tqdm import tqdm
import numpy as np
import threading
from scipy.ndimage import label
import time

import tensorflow as tf
from tensorflow import keras as K
from tensorflow.keras import layers as L
from tensorflow.keras.models import Model
from tensorflow.data import Dataset

## Variables globales

In [38]:
DATASET_FOLDER = "dataset/"
DATASET_LIGHT_FOLDER = "dataset_light/dataset/"
USE_LIGHT_DATASET = True

USE_MEMORY_SAVING = True # If False, can crash the kernel
USE_OPENCV = True

IMAGE_SIZE = (848, 480)
MASK_FILE = "mask1.png"

SEED = int(time.time())
BATCH_SIZE = 8
TEST_SPLIT = 0.1
VALIDATION_SPLIT = 0.2

LIMIT_LAT_MIN = 47.3947574
LIMIT_LON_MIN = -1.1866685
LIMIT_LAT_MAX = 47.3968351
LIMIT_LON_MAX = -1.1841471

## Import des données

In [None]:
data_lock = threading.Lock()

def read_subfolder(subfolder_path, image_size, image_list, label_list):
    print(f"Reading subfolder: {subfolder_path}")
    for file in os.listdir(subfolder_path):
        if file.endswith('.png'):
            image_path = os.path.join(subfolder_path, file)
            file_name = file.split(".")[0]
            json_path = os.path.join(subfolder_path, file_name + ".json")

            if USE_LIGHT_DATASET:
                index = int(int(file_name.split("_")[1])/10)
            else:
                index = int(file_name.split("_")[1])

            if os.path.exists(json_path) and os.path.exists(image_path):
                with data_lock:
                    image_list[index-1] = str(image_path)
                    with open(json_path) as json_file:
                        data = json.load(json_file)
                        label_list[index-1] = data
            else:
                print(f"json file does not exist for: {file}")
    print(f"Finished reading subfolder: {subfolder_path}")

if os.path.exists(DATASET_FOLDER) and os.path.exists(DATASET_LIGHT_FOLDER):
    print("Dataset folder exists")
    
    folder = DATASET_FOLDER if not USE_LIGHT_DATASET else DATASET_LIGHT_FOLDER
    dataset_len = 0
    for subfolders in os.listdir(folder):
        subfolder_path = os.path.join(folder, subfolders)
        for file in os.listdir(subfolder_path):
            if file.endswith('.png'):
                dataset_len += 1
                
    image_list = np.zeros((dataset_len), dtype=object)
    label_list = np.zeros((dataset_len), dtype=object)
    
    threads = []

    # Lancer un thread pour chaque sous-dossier
    for subfolder in os.listdir(folder):
        subfolder_path = os.path.join(folder, subfolder)
        if os.path.isdir(subfolder_path):
            thread = threading.Thread(target=read_subfolder, args=(subfolder_path, IMAGE_SIZE, image_list, label_list))
            threads.append(thread)
            thread.start()

    # Attendre que tous les threads soient terminés
    for thread in threads:
        thread.join()
else:
    raise Exception("Dataset folder does not exist")


In [None]:
print (f"Dataset length: {len(image_list)}")
print (f"Label length: {len(label_list)}")
print (f"Image list: {image_list[:5]}")
print (f"Label list: {label_list[:5]}")

In [41]:
if USE_OPENCV:
    mask = cv2.imread(MASK_FILE, cv2.IMREAD_GRAYSCALE)
    mask = cv2.resize(mask, (IMAGE_SIZE[1], IMAGE_SIZE[0]))
else:
    mask = tf.io.read_file(MASK_FILE)
    mask = tf.image.decode_png(mask, channels=1)
    mask = tf.image.resize(mask, IMAGE_SIZE)
    mask = tf.cast(mask, tf.float32) / 255.0

def load_image(image_path):
    if USE_OPENCV:
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        image = cv2.resize(image, (IMAGE_SIZE[1], IMAGE_SIZE[0]))
        image = image / 255.0
        image = cv2.bitwise_and(image, image, mask=mask)
        image = np.expand_dims(image, axis=-1)
    else:
        image = tf.io.read_file(image_path)
        image = tf.image.decode_png(image, channels=1)
        image = tf.image.resize(image, IMAGE_SIZE)
        image = tf.cast(image, tf.float32) / 255.0
        image = image * mask
    
    return image

def compute_position(position):
    lat = position["lat"]
    lon = position["lon"]
    x = (lon - LIMIT_LON_MIN) / (LIMIT_LON_MAX - LIMIT_LON_MIN)
    y = (lat - LIMIT_LAT_MIN) / (LIMIT_LAT_MAX - LIMIT_LAT_MIN)
    return (x, y)

def image_generator(frames, positions):
    for image_path, position in zip(frames, positions):
        if os.path.exists(image_path):
            yield load_image(image_path), compute_position(position)
        else:
            print(f"Image manquante : {image_path}")

In [None]:
if USE_MEMORY_SAVING:
    # Générateur d'images
    generator = lambda: image_generator(image_list, label_list)

    # Création du dataset à partir du générateur
    dataset = Dataset.from_generator(
        generator,
        output_signature=(
            tf.TensorSpec(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 1), dtype=tf.float32),
            tf.TensorSpec(shape=(2,), dtype=tf.float32)  # Ajustez selon le format de `positions`
        )
    )

    # Mélange, division et batch du dataset
    dataset = dataset.shuffle(buffer_size=1000, seed=SEED)

    dataset_length = len(image_list)
    dataset = dataset.apply(tf.data.experimental.assert_cardinality(dataset_length))

    validation_size = int(dataset_length * VALIDATION_SPLIT)
    test_size = int(dataset_length * TEST_SPLIT)

    dataset_validation = dataset.take(validation_size).batch(BATCH_SIZE)
    dataset_test = dataset.skip(validation_size).take(test_size).batch(BATCH_SIZE)
    dataset_train = dataset.skip(validation_size + test_size).batch(BATCH_SIZE)
    
    dataset_train = dataset_train.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    dataset_validation = dataset_validation.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    dataset_test = dataset_test.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

else:
    dataset_length = len(image_list)

    images = np.zeros((dataset_length, IMAGE_SIZE[0], IMAGE_SIZE[1], 1), dtype=np.float32)
    positions = np.zeros((dataset_length, 2), dtype=np.float32)
    
    for i in tqdm(range(dataset_length)):
        image = load_image(image_list[i])
        images[i] = image
        positions[i] = compute_position(label_list[i])

    dataset_images = Dataset.from_tensor_slices(images)
    dataset_positions = Dataset.from_tensor_slices(positions)

    dataset = Dataset.zip((dataset_images, dataset_positions)).shuffle(SEED)

    dataset_validation = dataset.take(int(dataset_length * VALIDATION_SPLIT))
    dataset_test = dataset.skip(len(dataset_validation)).take(int(dataset_length * TEST_SPLIT))
    dataset_train = dataset.skip(len(dataset_validation) + len(dataset_test)).take(dataset_length - len(dataset_validation) - len(dataset_test))

    dataset_validation = dataset_validation.batch(BATCH_SIZE)
    dataset_test = dataset_test.batch(BATCH_SIZE)
    dataset_train = dataset_train.batch(BATCH_SIZE)

print("Datasets shapes:")
print(dataset_train.element_spec)

## Création du modèle

In [43]:
def create_model(input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 1), output_shape=2, model_name="cnn_model"):
    inputs = L.Input(shape=input_shape, name=f'{model_name}_input')
    x = L.Conv2D(3, kernel_size=7, activation='relu', padding='same', name=f'{model_name}_conv2D_3_7')(inputs)
    x = L.MaxPooling2D(pool_size=2, name=f'{model_name}_maxpool_3_7')(x)
    x = L.Conv2D(16, kernel_size=5, activation='relu', padding='same', name=f'{model_name}_conv2D_16_5')(x)
    x = L.MaxPooling2D(pool_size=2, name=f'{model_name}_maxpool_16_5')(x)
    x = L.Conv2D(48, kernel_size=3, activation='relu', padding='same', name=f'{model_name}_conv2D_48_3')(x)
    x = L.MaxPooling2D(pool_size=2, name=f'{model_name}_maxpool_48_3')(x)
    x = L.Conv2D(24, kernel_size=3, activation='relu', padding='same', name=f'{model_name}_conv2D_24_3')(x)
    x = L.MaxPooling2D(pool_size=2, name=f'{model_name}_maxpool_24_3')(x)
    x = L.Conv2D(12, kernel_size=3, activation='relu', padding='same', name=f'{model_name}_conv2D_12_3')(x)
    x = L.MaxPooling2D(pool_size=2, name=f'{model_name}_maxpool_12_3')(x)
    x = L.Dropout(0.5, name=f'{model_name}_dropout')(x)
    x = L.Flatten(name=f'{model_name}_flatten')(x)
    x = L.Dense(256, activation='relu', name=f'{model_name}_dense_256')(x)
    x = L.Dense(128, activation='relu', name=f'{model_name}_dense_128')(x)
    outputs = L.Dense(output_shape, activation='softmax', name=f'{model_name}_output')(x)
    
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
    return model

In [None]:
model = create_model()
model.summary()

In [None]:
model.fit(dataset_train, validation_data=dataset_validation, epochs=10)