# Importing Libraries
---

In [None]:
# Importing Libraries
import os, math, json, cv2, numpy as np, pandas as pd, tensorflow as tf
tf.keras.mixed_precision.set_global_policy('float32')
from tensorflow.keras import layers, models, losses, optimizers, callbacks, metrics
from tensorflow.keras.applications import efficientnet_v2
import matplotlib.pyplot as plt, matplotlib.patches as patches
from sklearn.metrics import classification_report

# Global Parameters
---

In [None]:
IMG_SIZE = (224, 224, 3)
BATCH_SIZE = 32
EPOCHS = 100

# DataFrame creation extracting infos from the attached text files
---

In [None]:
BASE = "/Users/debanjan_5402/Downloads/archive/CUB_200_2011/CUB_200_2011"
CLASS_TXT = f"{BASE}/classes.txt"; IMG_TXT = f"{BASE}/images.txt"
SPLIT_TXT = f"{BASE}/train_test_split.txt"; LABEL_TXT = f"{BASE}/image_class_labels.txt"
BBOX_TXT = f"{BASE}/bounding_boxes.txt"; IMG_DIR = f"{BASE}/images"

In [None]:
class_df = pd.read_csv(CLASS_TXT, sep=' ', header=None, names=['class', 'class_name'], dtype={'class': 'int32', 'class_name': 'str'})
class_df['class_name'] = class_df['class_name'].str.replace(r'^\d{3}\.', '', regex=True)
class_df['class'] -= 1

LABEL_TO_CLASS = dict(zip(class_df['class'], class_df['class_name']))
CLASS_TO_LABEL = {v: k for k, v in LABEL_TO_CLASS.items()}
NUM_CLASSES = len(LABEL_TO_CLASS)

img_df = pd.read_csv(IMG_TXT, sep=' ', header=None, names=['img_id', 'img_path'], dtype={'img_id': 'int32'})
img_df['img_path'] = img_df['img_path'].apply(lambda x: os.path.join(IMG_DIR, x))

split_df = pd.read_csv(SPLIT_TXT, sep=' ', header=None, names=['img_id', 'is_train'], dtype={'img_id': 'int32', 'is_train': 'int32'})

label_df = pd.read_csv(LABEL_TXT, sep=' ', header=None, names=['img_id', 'class_id'], dtype={'img_id': 'int32', 'class_id': 'int32'})
label_df['class_id'] -= 1

bbox_df = pd.read_csv(BBOX_TXT, sep=' ', header=None, names=['img_id', 'x', 'y', 'w', 'h'], dtype={'img_id': 'int32'})
bbox_df['bbox'] = bbox_df.apply(lambda r: np.array([r.x, r.y, r.w, r.h], dtype=np.float32), axis=1)
bbox_df = bbox_df.drop(columns=['x', 'y', 'w', 'h'])

df = img_df.merge(split_df, on='img_id').merge(label_df, on='img_id').merge(bbox_df, on='img_id')

# Data Augmentation
---

In [None]:
# Random flip
def random_flip(img, bbox, iw, ih, prob=0.5):
    xmin, ymin, xmax, ymax = tf.unstack(bbox)
    def do_flip():
        img_f = tf.image.flip_left_right(img)
        return img_f, tf.stack([iw - xmax, ymin, iw - xmin, ymax])
    return tf.cond(tf.random.uniform(()) > prob, do_flip, lambda: (img, bbox))

In [None]:
def random_rotation(img, bbox, iw, ih, max_angle=10.0):
    # 1. Define the rotation angle in radians
    angle = tf.random.uniform([], -max_angle, max_angle) * math.pi / 180.0
    
    # 2. Get the transformation matrix for rotation
    c_x, c_y = tf.cast(iw, tf.float32) / 2.0, tf.cast(ih, tf.float32) / 2.0
    
    cos_angle = tf.cos(angle); sin_angle = tf.sin(angle)
    
    a0 = cos_angle; a1 = -sin_angle; a2 = c_x - a0 * c_x - a1 * c_y
    a3 = sin_angle; a4 = cos_angle; a5 = c_y - a3 * c_x - a4 * c_y
    transform = [a0, a1, a2, a3, a4, a5, 0.0, 0.0]
    
    rotated_img = tf.raw_ops.ImageProjectiveTransformV2(
        images=tf.expand_dims(img, 0),
        transforms=tf.expand_dims(transform, 0),
        output_shape=tf.stack([tf.cast(ih, tf.int32), tf.cast(iw, tf.int32)]),
        fill_mode="REFLECT", interpolation="BILINEAR"
    )
    rotated_img = tf.squeeze(rotated_img, 0)
    
    # 3. Rotate the bounding box
    xmin, ymin, xmax, ymax = tf.unstack(bbox)
    corners = tf.stack([xmin, ymin, xmax, ymin, xmax, ymax, xmin, ymax])
    corners = tf.reshape(corners, [-1, 2])
    corners_centered = corners - [c_x, c_y]
    
    rotated_x = cos_angle * corners_centered[:, 0] - sin_angle * corners_centered[:, 1]
    rotated_y = sin_angle * corners_centered[:, 0] + cos_angle * corners_centered[:, 1]
    
    rotated_corners = tf.stack([rotated_x, rotated_y], axis=1) + [c_x, c_y]
    
    # The fix: access elements by index instead of unpacking.
    min_coords = tf.reduce_min(rotated_corners, axis=0)
    max_coords = tf.reduce_max(rotated_corners, axis=0)
    
    x1_r = min_coords[0]; y1_r = min_coords[1]
    x2_r = max_coords[0]; y2_r = max_coords[1]
    rotated_bbox = tf.stack([x1_r, y1_r, x2_r, y2_r])
    
    return rotated_img, rotated_bbox

In [None]:
# Random Scaling
def random_scale(img, bbox, iw, ih, low=0.9, high=1.1):
    scale = tf.random.uniform((), low, high)
    new_h, new_w = tf.cast(ih * scale, tf.float32), tf.cast(iw * scale, tf.float32)

    # 1. Resize the image and the bounding box coordinates
    img_s = tf.image.resize(img, (new_h, new_w), method='bicubic')
    bbox_s = bbox * scale

    # 2. Calculate the offset from the crop/pad operation
    offset_x = (new_w - iw) // 2
    offset_y = (new_h - ih) // 2

    img_s = tf.image.resize_with_crop_or_pad(img_s, tf.cast(new_h, tf.int32), tf.cast(new_w, tf.int32))

    # 3. Apply the translation
    translated_x_min = bbox_s[0] - tf.cast(offset_x, tf.float32)
    translated_y_min = bbox_s[1] - tf.cast(offset_y, tf.float32)
    translated_x_max = bbox_s[2] - tf.cast(offset_x, tf.float32)
    translated_y_max = bbox_s[3] - tf.cast(offset_y, tf.float32)

    # 4. Clip and re-normalize the bounding box
    clipped_x_min = tf.maximum(0.0, translated_x_min)
    clipped_y_min = tf.maximum(0.0, translated_y_min)
    clipped_x_max = tf.minimum(tf.cast(iw, tf.float32), translated_x_max)
    clipped_y_max = tf.minimum(tf.cast(ih, tf.float32), translated_y_max)

    bbox_s = tf.stack([clipped_x_min, clipped_y_min, clipped_x_max, clipped_y_max], axis=-1)

    return img_s, bbox_s, new_w, new_h

In [None]:
def random_erasing(img, iw, ih, p=0.5, erase_area=0.1):
    if tf.random.uniform(()) > p:
        area = ih * iw
        target = erase_area * area
        e_h = e_w = tf.cast(tf.sqrt(target), tf.int32)
        iw_int = tf.cast(iw, tf.int32)
        ih_int = tf.cast(ih, tf.int32)
        x1 = tf.random.uniform((), tf.cast(0, tf.int32), iw_int - e_w, tf.int32)
        y1 = tf.random.uniform((), tf.cast(0, tf.int32), ih_int - e_h, tf.int32)

        # Create indices for the region to erase
        xx, yy = tf.meshgrid(tf.range(x1, x1 + e_w), tf.range(y1, y1 + e_h))
        indices = tf.stack([yy, xx], axis=-1)
        indices = tf.reshape(indices, [-1, 2])

        # Create the tensor of zeros to scatter
        updates = tf.zeros((e_h * e_w, tf.shape(img)[-1]), dtype=img.dtype)

        # Scatter the zeros into the image
        img = tf.tensor_scatter_nd_update(img, indices, updates)
    return img

In [None]:
def augment_data(img, label, bbox, iw, ih):
    
    img, bbox = random_flip(img, bbox, iw, ih, prob=0.5)
    img, bbox = random_rotation(img, bbox, iw, ih, max_angle=5.0)
    img, bbox, iw, ih = random_scale(img, bbox, iw, ih, low=0.95, high=1.05)

    img = random_erasing(img, iw, ih, p=0.5, erase_area=0.05)
    img = tf.image.random_brightness(img, 0.05)
    img = tf.image.random_contrast(img, 0.95, 1.05)
    img = tf.image.random_hue(img, 0.02)
    img = tf.image.random_saturation(img, 0.95, 1.05)
    return img, label, bbox, iw, ih

# Data Loading Pipeline
---

In [None]:
# Data pipeline
def read_point(path, label, bbox):
    img = tf.image.decode_jpeg(tf.io.read_file(path), channels=3)
    ih, iw = tf.cast(tf.shape(img)[0], tf.float32), tf.cast(tf.shape(img)[1], tf.float32)
    x, y, w, h = tf.unstack(bbox)
    return img, label, tf.stack([x, y, x+w, y+h]), iw, ih

from tensorflow.keras.applications.efficientnet_v2 import preprocess_input
def normalise(img, label, bbox, iw, ih):
    img = tf.image.resize(img, IMG_SIZE[:2], method='bicubic')
    img = (img - tf.reduce_min(img)) / (tf.reduce_max(img) - tf.reduce_min(img)) * 255.0
    #img = img/127.5 - 1
    img = preprocess_input(img)
    #img = tf.cast(img, tf.float32)
    x1, y1, x2, y2 = tf.unstack(bbox)
    return img, (label, tf.stack([x1/iw, y1/ih, x2/iw, y2/ih]))

def get_ds(df, batch_size, augment, shuffle):
    ds = tf.data.Dataset.from_tensor_slices((df.img_path.values,
                                             df.class_id.values,
                                             np.stack(df.bbox.to_numpy())))
    ds = ds.map(read_point, num_parallel_calls=tf.data.AUTOTUNE)
    if augment: 
        ds = ds.map(augment_data, num_parallel_calls=tf.data.AUTOTUNE)
        ds = ds.prefetch(tf.data.AUTOTUNE)
    if shuffle: ds = ds.shuffle(len(df))
    ds = ds.map(normalise, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

train_ds = get_ds(df[df.is_train==1], BATCH_SIZE, augment=True, shuffle=True)
test_ds  = get_ds(df[df.is_train==0], BATCH_SIZE, augment=False, shuffle=False)
test_ds_eval = get_ds(df[df.is_train==0], BATCH_SIZE, augment=False, shuffle=True)

# Dataset Visualisation
---

In [None]:
# Visualization
def viz_batch(imgs, labels, bboxes, label_map, cols=8):
    bs = imgs.shape[0]; rows = math.ceil(bs/cols)
    fig, axes = plt.subplots(rows, cols, figsize=(4*cols, 4*rows))
    axes = axes.flatten()
    for i in range(bs):
        img = imgs[i]
        img = tf.cast(img, tf.uint8)
        axes[i].imshow(img); axes[i].axis('off')
        x1,y1,x2,y2 = bboxes[i]*IMG_SIZE[0]
        rect = patches.Rectangle((x1,y1), x2-x1, y2-y1,
                                 linewidth=2, edgecolor='red', facecolor='none')
        axes[i].add_patch(rect)
        axes[i].text(x1, y1-5, label_map[int(labels[i])],
                     color='white', backgroundcolor='red', fontsize=8)
    plt.tight_layout(); plt.show()

for imgs, (labels, bboxes) in train_ds.take(1):
    viz_batch(imgs.numpy(), labels.numpy(), bboxes.numpy(), LABEL_TO_CLASS)

# Loss & Metric
---

In [None]:
# Custom IoU metric & focal-EIoU loss
def iou(y_true, y_pred):
    x1,y1,x2,y2 = tf.unstack(y_true, axis=-1)
    xp1,yp1,xp2,yp2 = tf.unstack(y_pred, axis=-1)
    ix1,iy1 = tf.maximum(x1, xp1), tf.maximum(y1, yp1)
    ix2,iy2 = tf.minimum(x2, xp2), tf.minimum(y2, yp2)
    iw, ih = tf.maximum(0., ix2-ix1), tf.maximum(0., iy2-iy1)
    inter = iw*ih
    area_t = (x2-x1)*(y2-y1); area_p = (xp2-xp1)*(yp2-yp1)
    return inter/(area_t+area_p-inter+1e-8)

@tf.keras.utils.register_keras_serializable()
def focal_eiou(y_true, y_pred, alpha=0.25, gamma=2.0):
    iou_score = iou(y_true, y_pred)
    huber = losses.Huber()(y_true, y_pred)
    focal = alpha * tf.pow(1 - iou_score, gamma)
    return focal * (huber + (1 - iou_score))

# Model Architecture
---

In [None]:
# Build model
def feature_extractor(trainable=False):
    base = efficientnet_v2.EfficientNetV2S(include_top=False, weights='imagenet', input_shape=IMG_SIZE)
    base.trainable = trainable
    inp = layers.Input(IMG_SIZE)
    return models.Model(inp, base(inp), name='feat_ext')

def build_detector():
    inp = layers.Input(IMG_SIZE)
    feat = feature_extractor(trainable=False)(inp)
    x = layers.GlobalAveragePooling2D()(feat)
    x = layers.Dense(1024, activation='relu')(x)
    x = layers.Dense(512, activation='relu')(x)
    cls = layers.Dense(NUM_CLASSES, activation='softmax', name='class_probs')(x)
    bbox = layers.Dense(4, activation='sigmoid', name='bbox')(x)
    return models.Model(inp, [cls, bbox], name='detector')

model = build_detector()
lr = 1e-4
sched = optimizers.schedules.CosineDecayRestarts(lr, first_decay_steps=5 * 188, t_mul=2.0, m_mul=0.8, alpha=0.1)

# Build and compile model
opt = optimizers.AdamW(learning_rate=sched, weight_decay=1e-4)
model.compile(optimizer=opt, loss={'class_probs': losses.SparseCategoricalCrossentropy(),
                                   'bbox': focal_eiou},
            loss_weights={'class_probs': 1.0, 'bbox': 2.0},
            metrics={'class_probs': [metrics.SparseCategoricalAccuracy(name='accuracy'), 
                                     metrics.SparseTopKCategoricalAccuracy(k=100, name='top_100')],
                      'bbox': [iou]})

# Training

In [None]:
# Train with the corrected callback
history = model.fit(train_ds, validation_data=test_ds, epochs=EPOCHS,
                    callbacks=[callbacks.ModelCheckpoint(f"best_detector_EfficientNet.keras", monitor='val_loss', save_best_only=True, verbose=1),
                                callbacks.EarlyStopping(monitor='val_loss', min_delta=1e-5, patience=10, restore_best_weights=True, verbose=1)],
                    verbose=1)

with open('training_history_EfficientNet.json', 'w') as f:
    json.dump(history.history, f)

##### Details
- Approx 3:20 min for each epoch.
- Max memory comsumption during starting of the epoch (maybe due to large buffersize - around 17 gb). Rapidly decreasing then at validation 5-6 gb.


# Model Evaluation

In [None]:
# Evaluate
model.load_weights('best_detector_EfficientNet.keras')
res = model.evaluate(test_ds_eval, return_dict=True)
print(res)

In [None]:
# Classification report & mean IoU
all_labels, all_preds, all_true, all_pred = [], [], [], []
for imgs, (cls_true, bb_true) in test_ds_eval:
    cls_pred, bb_pred = model.predict(imgs, verbose=0)
    all_labels += cls_true.numpy().tolist()
    all_preds  += np.argmax(cls_pred, axis=1).tolist()
    all_true   += bb_true.numpy().tolist()
    all_pred   += bb_pred.tolist()

In [None]:
mean_iou = np.mean([iou(tf.constant(t), tf.constant(p)).numpy()
                    for t, p in zip(all_true, all_pred)])
print(f"Mean IoU: {mean_iou:.4f}")

In [None]:
# Visualize predictions
def viz_pred(imgs, bb_true, bb_pred, label_true, cls_pred, label_map, cols=8):
    label_pred = np.argmax(cls_pred, axis=1); max_prob = np.max(cls_pred, axis=1)
    bs = imgs.shape[0]; rows = math.ceil(bs/cols)
    
    fig, axes = plt.subplots(rows, cols, figsize=(4*cols,4*rows), dpi=1000)
    
    # Flatten axes if it's a grid; handle single subplot case
    if rows == 1 and cols == 1: axes = np.array([axes])
    else: axes = axes.flatten()

    for i in range(bs):
        img = tf.cast(imgs[i], tf.uint8)
        axes[i].imshow(img); axes[i].axis('off')
        
        # 🆕 Calculate IoU for the current image's prediction
        iou_score = iou(y_true=tf.expand_dims(bb_true[i], axis=0), y_pred=tf.expand_dims(bb_pred[i], axis=0)).numpy().item()

        for b, l, col in zip([bb_true[i], bb_pred[i]], [label_true[i], label_pred[i]], ['green','red']):
            x1,y1,x2,y2 = b*IMG_SIZE[0]
            rect = patches.Rectangle((x1,y1), x2-x1, y2-y1, linewidth=2, edgecolor=col, facecolor='none')
            axes[i].add_patch(rect)
            if col=='green': axes[i].text(x1, y1-5, f"GT: {label_map[l]}", color='white', backgroundcolor=col, fontsize=8)
            else: axes[i].text(x1, y2+5, f"PR: {label_map[l]} ({max_prob[i]*100:.2f}%) \nIoU:{iou_score:.2f}", color='white', backgroundcolor=col, fontsize=8)

    plt.tight_layout(); plt.show()

for imgs, (labs, bbs) in test_ds_eval.take(1):
    cls_pred, bb_pred = model(imgs)

viz_pred(imgs.numpy(), bbs.numpy(), bb_pred, labs.numpy(), cls_pred, LABEL_TO_CLASS)

# Plotting History

In [None]:
def plot_all_history_metrics(history_data):
    # Identify the unique metrics (e.g., 'loss', 'bbox_iou', etc.)
    all_keys = history_data.keys()
    metric_names = sorted([key for key in all_keys if not key.startswith('val_')])

    # Determine the number of subplots and grid layout
    num_metrics = len(metric_names)
    cols = 3  # Plotting three columns for better aspect ratio
    rows = (num_metrics + 1) // cols

    fig, axes = plt.subplots(rows, cols, figsize=(6 * cols, 4 * rows), dpi=1000)
    # Flatten the axes array for easy iteration, useful for a grid
    axes = axes.flatten()

    for i, metric in enumerate(metric_names):
        ax = axes[i]
        val_metric = f'val_{metric}'

        # Check if the validation metric exists to avoid errors
        if val_metric in history_data:
            ax.plot(history_data[metric], label=f'Training {metric}', color='blue')
            ax.plot(history_data[val_metric], label=f'Validation {metric}', color='red', linestyle='dashed')
            ax.set_title(f'Training vs. Validation {metric.replace("_", " ").title()}', fontsize=10)
            ax.set_xlabel('Epoch', fontsize=10)
            ax.set_ylabel(metric.replace("_", " ").title(), fontsize=10)
            ax.legend(fontsize=9)
            ax.grid(True)
        else:
            # Handle cases with no validation metric
            ax.plot(history_data[metric], label=f'Training {metric}', color='blue')
            ax.set_title(f'Training {metric.replace("_", " ").title()}', fontsize=10)
            ax.set_xlabel('Epoch', fontsize=10)
            ax.set_ylabel(metric.replace("_", " ").title(), fontsize=10)
            ax.legend(fontsize=9)
            ax.grid(True)

    # Hide any unused subplots
    for j in range(i + 1, len(axes)):
        fig.delaxes(axes[j])

    plt.tight_layout()
    plt.show()

# Load the history data from the provided JSON file
with open('training_history_EfficientNet.json', 'r') as f:
    training_history_data = json.load(f)

# Call the function to plot all metrics
plot_all_history_metrics(training_history_data)

# Appendix
---

### A

In [None]:
print(classification_report(all_labels, all_preds,
      target_names=[LABEL_TO_CLASS[i] for i in range(NUM_CLASSES)]))