# 🎓 Library

In [None]:
# Misc
import numpy as np
import pandas as pd
import os

# Model
import tensorflow as tf
import keras
from keras.api.utils import image_dataset_from_directory
from keras.api.models import Sequential, Model
from keras.api.layers import Dense, Dropout, Conv2D, MaxPooling2D, BatchNormalization, GlobalAveragePooling2D, Input, Concatenate, UpSampling2D
from keras.api.optimizers import Adam
from keras.api.losses import SparseCategoricalCrossentropy

# Metrics
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

# Tuning
import keras_tuner as kt

# Plot
import matplotlib.pyplot as plt
import plotly.graph_objects as go

# Deactivate XLA compilation
tf.config.optimizer.set_jit(False)
# TensorFlow, check if GPU is available
if tf.config.list_physical_devices('GPU'):
    print(f"GPU available: {tf.config.list_physical_devices('GPU')}")

In [None]:
# Environment variables

# Either on Kaggle or local
RAW_DATA = "/kaggle/input" if os.path.exists("/kaggle/input") else f"{os.getcwd()}/data"
RAW_DATA = os.path.join(RAW_DATA, "navires-2025")

LABEL_TO_VALUE = {'coastguard':0, 'containership':1, 'corvette':2, 'cruiser':3,
           'cv':4, 'destroyer':5, 'ferry':6, 'methanier':7, 'sailing':8,
           'smallfish':9, 'submarine':10, 'tug':11, 'vsmallfish':12}

VALUE_TO_LABEL = {v: k for k, v in LABEL_TO_VALUE.items()}

BATCH_SIZE = 32
IMG_SIZE = 32
SEED = 42

DATA_PATH = f"{os.getcwd()}/data/ships32"


print(f"RAW_DATA: {RAW_DATA}")
print(f"DATA_PATH: {DATA_PATH}")

In [None]:
# If the ships32 folder does not exist in data, unpack the ships.tgz file
!if [ ! -d data ]; then mkdir data; fi
!if [ ! -d data/ships32 ]; then tar xzf {RAW_DATA}/ships.tgz -C data; fi


## Step 0: Extract data

Mapping should be:
```python
{'coastguard':0, 'containership':1, 'corvette':2, 'cruiser':3, 'cv':4, 'destroyer':5, 'ferry':6, 'methanier':7, 'sailing':8, 'smallfish':9, 'submarine':10, 'tug':11, 'vsmallfish':12}
```
We will use the `image_dataset_from_directory` function from `keras.utils` to get a `tf.data.Dataset` object.

Data is already batched, we take multiple images at once.

In [None]:
# Validation split depends on the subset and seed
features = {
    "directory" : DATA_PATH,
    "labels" : "inferred",
    "label_mode" : "int",
    "batch_size" : BATCH_SIZE,
    "image_size" : (IMG_SIZE, IMG_SIZE),
    "seed" : SEED
}

ds = image_dataset_from_directory(**features)

Display a few of the images to check the data is loaded correctly.

In [None]:
def display_batch(batch):
    """Display a batch of images and labels."""
    
    
    for images, labels in batch:
        plt.figure(figsize=(12, 6))
        for i in range(9):
            plt.subplot(3, 3, i + 1)

            # Remove ticks and grid
            plt.xticks([])
            plt.yticks([])
            plt.grid(False)

            # Display the image
            plt.imshow(images[i].numpy().astype("uint8"))

            # Display the label
            plt.title(VALUE_TO_LABEL[labels[i].numpy()])
            
        plt.show()

In [None]:
display_batch(ds.take(1))

## Step 1: Preprocessing

We normalize the data to be in the range [0, 1] by dividing by 255.0 (images should now be black).

### Data augmentation

zoom or crop to increase.

In [None]:
ds.cardinality().numpy() * BATCH_SIZE

In [None]:
rotation_layer = keras.layers.RandomRotation(factor=0.05, seed=SEED)

def augment(image, label):
    image = tf.image.random_flip_left_right(image, seed=SEED)
    image = tf.image.random_hue(image, max_delta=0.05, seed=SEED)
    image = tf.image.random_brightness(image, max_delta=0.05, seed=SEED)
    return image, label

# First flip images and hue
more_images = ds.map(augment)
augmented_ds = ds.concatenate(more_images).shuffle(1000, seed=SEED)

# Then, rotate all images
more_images = ds.map(lambda x, y: (rotation_layer(x), y))
augmented_ds = augmented_ds.concatenate(more_images).shuffle(1000, seed=SEED)

display_batch(augmented_ds.take(1))

In [None]:
augmented_ds.cardinality().numpy() * BATCH_SIZE

### Resize and rescale

In [None]:
def preprocess(images, labels):
    """Preprocess the images."""

    # Resize and rescale the images
    image = images / 255.0
    
    return image, labels

In [None]:
augmented_ds = augmented_ds.map(preprocess)


### Train split

In [None]:
total_count = augmented_ds.cardinality().numpy()
train_count = int(0.8 * total_count)
val_count = int(0.2 * total_count)

# Shuffle before splitting for randomness
augmented_ds = augmented_ds.shuffle(buffer_size=total_count, seed=SEED)

train_ds = augmented_ds.take(train_count)
val_ds = augmented_ds.skip(train_count).take(val_count)

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
train_ds.cardinality().numpy() * BATCH_SIZE

## Step 2: Model

### Tuning hyperparameters

In [None]:
def build_model(hp):
    model = Sequential()
    model.add(Input(shape=(IMG_SIZE, IMG_SIZE, 3)))
    
    for i in range(4):  # 4 blocks
        filters = hp.Int(f"filters_{i}", min_value=32, max_value=512, step=32)
        model.add(Conv2D(filters=filters, kernel_size=3, activation="relu", padding="same"))
        model.add(BatchNormalization())
        model.add(Conv2D(filters=filters, kernel_size=3, activation="relu", padding="same"))
        model.add(BatchNormalization())
        model.add(MaxPooling2D(pool_size=2))
        model.add(Dropout(rate=hp.Float(f"dropout_{i}", 0.1, 0.5, step=0.1)))
    
    model.add(GlobalAveragePooling2D())
    model.add(Dense(units=hp.Int("dense_units", 32, 512, step=32), activation="relu"))
    model.add(Dense(units=len(LABEL_TO_VALUE)))    
    
    model.compile(
        optimizer=Adam(learning_rate=hp.Float("lr", 1e-4, 1e-2, sampling="log")),
        loss=SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"]
    )
    return model

In [None]:
tuner = kt.Hyperband(
    build_model,
    objective="val_accuracy",
    max_epochs=20,
    factor=3,
    directory="keras_tuner_dir",
    project_name="ship_model_tuning"
)


In [None]:
# Run only if you want to tune the model
# tuner.search(train_ds, validation_data=val_ds, epochs=20, verbose=1)

In [None]:
# model = tuner.get_best_models(num_models=1)[0]
# print(model.summary())

### Model A: Normal CNN
After testing some models, the following architecture has the saved hyperparameters.

In [None]:
model = Sequential([
    Conv2D(filters=320, kernel_size=(3, 3), activation="relu", padding="same", input_shape=(IMG_SIZE, IMG_SIZE, 3)),
    BatchNormalization(),
    Conv2D(filters=320, kernel_size=(3, 3), activation="relu", padding="same"),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.1),

    Conv2D(filters=256, kernel_size=(3, 3), activation="relu", padding="same"),
    BatchNormalization(),
    Conv2D(filters=256, kernel_size=(3, 3), activation="relu", padding="same"),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.3),

    Conv2D(filters=384, kernel_size=(3, 3), activation="relu", padding="same"),
    BatchNormalization(),
    Conv2D(filters=384, kernel_size=(3, 3), activation="relu", padding="same"),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.1),

    Conv2D(filters=288, kernel_size=(3, 3), activation="relu", padding="same"),
    BatchNormalization(),
    Conv2D(filters=288, kernel_size=(3, 3), activation="relu", padding="same"),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.3),
    keras.layers.GlobalAveragePooling2D(),
], name="ship_model")

In [None]:
output = Sequential([
    Dense(288, activation="relu"),
    Dense(len(LABEL_TO_VALUE))
], name="ship_output")

model.add(output)

In [None]:
model.compile(optimizer=Adam(learning_rate=0.00025431),
                loss=SparseCategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])

callbacks = [keras.callbacks.EarlyStopping(monitor='val_loss', patience=6, min_delta=0.0008, restore_best_weights=True, verbose=1),
             keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=4, cooldown=2, min_delta=0.0008, min_lr=1e-6, verbose=1),
             keras.callbacks.ModelCheckpoint(filepath='data/model.keras', monitor='val_loss', save_best_only=True, mode="min")]


In [None]:
print("Nombre de couches : ", len(model.layers))
model.summary()

## Model B: U-Net
The U-Net architecture consists of an encoder-decoder structure with skip connections, which allows the model to capture both high-level and low-level features in the input images.

In [None]:
# inputs = Input(shape=(IMG_SIZE, IMG_SIZE, 3))

# encoder_filters = [320, 256, 384]
# dropouts = [0.1, 0.3, 0.1]
# skips = []
# x = inputs

# # Encoder (Conv + BN per block)
# for filters, drop in zip(encoder_filters, dropouts):
#     x = Conv2D(filters, (3, 3), activation='relu', padding='same')(x)
#     x = BatchNormalization()(x)
#     skips.append(x)
#     x = MaxPooling2D((2, 2))(x)
#     x = Dropout(drop)(x)

# # Bottleneck
# x = Conv2D(384, (3, 3), activation='relu', padding='same')(x)
# x = BatchNormalization()(x)

# # Decoder
# for filters, skip in zip(reversed(encoder_filters), reversed(skips)):
#     x = UpSampling2D((2, 2))(x)
#     x = Concatenate()([x, skip])
#     x = Conv2D(filters, (3, 3), activation='relu', padding='same')(x)
#     x = BatchNormalization()(x)

# # Output head
# x = GlobalAveragePooling2D()(x)
# x = Dense(288, activation="relu")(x)
# outputs = Dense(len(LABEL_TO_VALUE))(x)

# model = Model(inputs=inputs, outputs=outputs, name="ship_unet_bn")

# model.compile(
#     optimizer=Adam(learning_rate=0.00025431),
#     loss=SparseCategoricalCrossentropy(from_logits=True),
#     metrics=['accuracy']
# )


## Step 3: Training & Evaluation

In [None]:
hist = model.fit(train_ds, validation_data=val_ds, epochs=45, callbacks=callbacks, verbose=1)

In [None]:
fig = go.Figure()

fig.add_trace(go.Scatter(
    y=hist.history["val_loss"],
    mode="lines",
    name="Validation Loss"
))

fig.add_trace(go.Scatter(
    y=hist.history["loss"],
    mode="lines",
    name="Train Loss"
))

fig.update_layout(
    title="Validation loss per epoch",
    xaxis_title="Epoch",
    yaxis_title="Loss",
    legend_title="Dataset",
    xaxis=dict(tickmode="linear"),
)

### Loading and evaluating the model

In [None]:
model = keras.models.load_model("data/best_model.keras")

In [None]:
test_loss, test_acc = model.evaluate(val_ds, verbose=2)

In [None]:
print(f"Test accuracy: {test_acc}")
print(f"Test loss: {test_loss}")

In [None]:
y_pred = model.predict(train_ds).argmax(axis=1)

In [None]:
y_labels = []
for image, label in train_ds.unbatch():
    y_labels.append(int(label.numpy()))

In [None]:
report = classification_report(y_pred, y_labels, target_names=VALUE_TO_LABEL.values())
print(report)

In [None]:
cm = confusion_matrix(y_labels, y_pred, labels=list(LABEL_TO_VALUE.values()))
cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis] * 100

heat = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=VALUE_TO_LABEL.values())
fig, ax = plt.subplots(figsize=(10, 10))

heat.plot(ax=ax)
plt.xticks(rotation=45)
plt.show()

# Results

In [None]:
X_test = np.load(f"{RAW_DATA}/ships_competition.npz", allow_pickle=True)['X']
X_test = X_test.astype('float32') / 255

In [None]:
res = model.predict(X_test).argmax(axis=1)
df = pd.DataFrame({"Category":res})
df.to_csv("data/reco_nav.csv", index_label="Id")

In [None]:
!head data/reco_nav.csv

In [None]:
from IPython.display import FileLink
FileLink(r'data/reco_nav.csv')