In [1]:
import os
import json
import numpy as np
import pandas as pd
import cv2
import tensorflow as tf
import numba
from numba import njit
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, models
from tqdm import tqdm

2025-11-20 00:05:10.689345: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1763597110.922262      48 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1763597110.982373      48 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

# 1. RLE ENCODE / DECODE FUNCTIONS

In [2]:
@numba.jit(nopython=True)
def _rle_encode_jit(x, fg_val=1):
    dots = np.where(x.T.flatten() == fg_val)[0]
    run_lengths = []
    prev = -2
    for b in dots:
        if b > prev + 1:
            run_lengths.extend((b + 1, 0))
        run_lengths[-1] += 1
        prev = b
    return run_lengths

def rle_encode(masks, fg_val=1):
    return ';'.join([json.dumps(_rle_encode_jit(x, fg_val)) for x in masks])

In [3]:
@numba.njit
def _rle_decode_jit(mask_rle, height, width):
    if len(mask_rle) % 2 != 0:
        raise ValueError("Odd RLE length")
    starts, lengths = mask_rle[0::2], mask_rle[1::2]
    starts -= 1
    ends = starts + lengths
    for i in range(len(starts) - 1):
        if ends[i] > starts[i+1]:
            raise ValueError("Overlapping mask")
    img = np.zeros(height * width, dtype=np.bool_)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img

def rle_decode(mask_rle, shape):
    mask_rle = json.loads(mask_rle)
    mask_rle = np.asarray(mask_rle, dtype=np.int32)
    h, w = shape
    return _rle_decode_jit(mask_rle, h, w).reshape(shape, order="F")

# 2. PATHS AND DATA SPLIT

In [4]:
DATA = "/kaggle/input/recodai-luc-scientific-image-forgery-detection"

train_img_dir = f"{DATA}/train_images/forged"
auth_img_dir  = f"{DATA}/train_images/authentic"
train_mask_dir = f"{DATA}/train_masks"

supp_img_dir = f"{DATA}/supplemental_images"
supp_mask_dir = f"{DATA}/supplemental_masks"

test_dir = f"{DATA}/test_images"

In [5]:
forged_images = sorted(os.listdir(train_img_dir))
auth_images = sorted(os.listdir(auth_img_dir))

print("Forged:", len(forged_images))
print("Authentic:", len(auth_images))

Forged: 2751
Authentic: 2377


# 3. Data Pipeline

In [6]:
def load_masks(image_id):
    """
    Returns a list of binary masks for the given image_id.
    There may be multiple masks.
    """
    masks = []
    for folder in [train_mask_dir, supp_mask_dir]:
        path = os.path.join(folder, image_id.replace(".png", ".npy"))
        if os.path.exists(path):
            arr = np.load(path)
            if arr.ndim == 2:
                masks.append(arr)
            else:
                for i in range(arr.shape[0]):
                    masks.append(arr[i])
    return masks

In [8]:
IMG_SIZE = 512

In [9]:
def load_image(path):
    img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
    return img.astype(np.float32) / 255.

In [10]:
def load_mask_list(masks):
    final = np.zeros((IMG_SIZE, IMG_SIZE), dtype=np.float32)
    for m in masks:
        m2 = cv2.resize(m.astype(np.float32), (IMG_SIZE, IMG_SIZE))
        final = np.maximum(final, m2)
    return final

In [11]:
X, Y = [], []

for img_name in tqdm(forged_images):
    path = os.path.join(train_img_dir, img_name)
    img = load_image(path)
    masks = load_masks(img_name)
    if len(masks) == 0:
        continue
    mask = load_mask_list(masks)
    X.append(img)
    Y.append(mask)

X = np.array(X)[...,None]
Y = np.array(Y)[...,None]

print("Train samples:", X.shape)

100%|██████████| 2751/2751 [02:29<00:00, 18.43it/s]


Train samples: (2751, 512, 512, 1)


In [None]:
X_train, X_val, Y_train, Y_val = train_test_split(X,Y,test_size=0.1,random_state=42)

# 4. Modelling

In [None]:
def conv_block(x, f):
    x = layers.Conv2D(f, 3, padding="same", activation="relu")(x)
    x = layers.Conv2D(f, 3, padding="same", activation="relu")(x)
    return x

In [None]:
def build_unet(input_shape=(256,256,1)):
    inputs = layers.Input(input_shape)

    c1 = conv_block(inputs, 32); p1 = layers.MaxPool2D()(c1)
    c2 = conv_block(p1, 64); p2 = layers.MaxPool2D()(c2)
    c3 = conv_block(p2, 128); p3 = layers.MaxPool2D()(c3)

    b = conv_block(p3, 256)

    u3 = layers.UpSampling2D()(b)
    u3 = layers.Concatenate()([u3, c3])
    c6 = conv_block(u3,128)

    u2 = layers.UpSampling2D()(c6)
    u2 = layers.Concatenate()([u2,c2])
    c7 = conv_block(u2,64)

    u1 = layers.UpSampling2D()(c7)
    u1 = layers.Concatenate()([u1,c1])
    c8 = conv_block(u1,32)

    outputs = layers.Conv2D(1,1,activation="sigmoid")(c8)

    return models.Model(inputs, outputs)

In [None]:
model = build_unet()
model.summary()

In [None]:
# def focal_loss(gamma=2., alpha=0.25):
#     def loss(y_true, y_pred):
#         bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
#         bce_exp = tf.exp(-bce)
#         return alpha * (1 - bce_exp)**gamma * bce
#     return loss

# model.compile(optimizer="adam", loss=focal_loss())

In [None]:
def dice_loss(y_true, y_pred, smooth=1e-6):
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return 1 - ((2. * intersection + smooth) / 
                (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth))

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-4),
    loss=lambda y_true, y_pred: 
         0.5 * tf.keras.losses.binary_crossentropy(y_true, y_pred) + 
         0.5 * dice_loss(y_true, y_pred),
    metrics=["accuracy"]
)

# 5. Model Training

In [None]:
history = model.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=1,
    batch_size=16
)

# 6. Predict test set

In [None]:
test_images = sorted(os.listdir(test_dir))

In [None]:
sub_rows = []
for img_name in tqdm(test_images):
    img = load_image(os.path.join(test_dir, img_name))
    p = model.predict(img[None,...,None])[0,...,0]
    p_bin = (p > 0.5).astype(np.uint8)

    if p_bin.sum() == 0:
        sub_rows.append([img_name.replace(".png",""), "authentic"])
        continue

    # RLE requires list-of-masks:
    masks = [p_bin]
    rle = rle_encode(masks)

    sub_rows.append([img_name.replace(".png",""), rle])

# 7. Submission file

In [None]:
sub = pd.DataFrame(sub_rows, columns=["case_id", "annotation"])
sub.to_csv("submission.csv", index=False)

print(sub.head())
print("Submission saved!")