In [None]:
import os
os.environ["SM_FRAMEWORK"] = "tf.keras"

import numpy as np
import pandas as pd
import cv2
import tensorflow as tf
import segmentation_models as sm
import albumentations as A
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [None]:
BASE_PATH = '/kaggle/input/terra-seg-rugged-terrain-segmentation/offroad-seg-kaggle'
TRAIN_IMG_DIR = os.path.join(BASE_PATH, 'train_images')
TRAIN_MASK_DIR = os.path.join(BASE_PATH, 'train_masks')
TEST_IMG_DIR = os.path.join(BASE_PATH, 'test_images_padded')

IMG_HEIGHT = 256
IMG_WIDTH = 256
BATCH_SIZE = 16
EPOCHS = 15
BACKBONE = 'resnet34'

preprocess_input = sm.get_preprocessing(BACKBONE)

print("Configuration Set")

In [None]:
class TerraSegGenerator(tf.keras.utils.Sequence):
    def __init__(self, image_ids, img_dir, mask_dir=None, batch_size=16, 
                 img_size=(256, 256), is_train=True, augment=False):
        self.image_ids = image_ids
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.batch_size = batch_size
        self.img_size = img_size
        self.is_train = is_train
        self.augment = augment
        
        self.augmentation = A.Compose([
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.2),
            A.RandomRotate90(p=0.5),
            A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.1, rotate_limit=15, p=0.5),
            A.OneOf([
                A.RandomBrightnessContrast(p=1),
                A.HueSaturationValue(p=1),
            ], p=0.3),
        ])

    def __len__(self):
        return int(np.ceil(len(self.image_ids) / self.batch_size))

    def __getitem__(self, index):
        batch_ids = self.image_ids[index * self.batch_size : (index + 1) * self.batch_size]
        images, masks = [], []
        
        for img_id in batch_ids:
            img_path = os.path.join(self.img_dir, img_id)
            img = cv2.imread(img_path)
            if img is None: continue
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, self.img_size)
            
            if self.is_train:
                mask_path = os.path.join(self.mask_dir, img_id) 
                mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
                mask = cv2.resize(mask, self.img_size, interpolation=cv2.INTER_NEAREST)
                mask = (mask > 0).astype(np.float32)

                if self.augment:
                    augmented = self.augmentation(image=img, mask=mask)
                    img, mask = augmented['image'], augmented['mask']
                
                masks.append(np.expand_dims(mask, axis=-1))
            
            img = preprocess_input(img)
            images.append(img)
        
        if self.is_train:
            return np.array(images).astype(np.float32), np.array(masks).astype(np.float32)
        else:
            return np.array(images).astype(np.float32)

In [None]:
all_ids = [f for f in os.listdir(TRAIN_IMG_DIR) if f.endswith(('.png', '.jpg'))]
train_ids, val_ids = train_test_split(all_ids, test_size=0.2, random_state=42)

train_gen = TerraSegGenerator(train_ids, TRAIN_IMG_DIR, TRAIN_MASK_DIR, BATCH_SIZE, (IMG_HEIGHT, IMG_WIDTH), is_train=True, augment=True)
val_gen = TerraSegGenerator(val_ids, TRAIN_IMG_DIR, TRAIN_MASK_DIR, BATCH_SIZE, (IMG_HEIGHT, IMG_WIDTH), is_train=True, augment=False)

model = sm.Unet(BACKBONE, encoder_weights='imagenet', classes=1, activation='sigmoid')

model.compile(
    optimizer=Adam(learning_rate=0.0001),
    loss=sm.losses.bce_jaccard_loss,
    metrics=[sm.metrics.iou_score]
)

callbacks = [
    ModelCheckpoint('best_model.keras', save_best_only=True, monitor='val_iou_score', mode='max', verbose=1),
    ReduceLROnPlateau(monitor='val_iou_score', factor=0.5, patience=3, min_lr=1e-6, verbose=1, mode='max'),
    EarlyStopping(monitor='val_iou_score', patience=8, restore_best_weights=True, verbose=1, mode='max')
]

print("Starting Training")
history = model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=EPOCHS,
    callbacks=callbacks
)

In [None]:
def rle_encode(img):
    pixels = img.flatten(order='F')
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

print("\nGenerating Submission")
if os.path.exists('best_model.keras'):
    model.load_weights('best_model.keras')

test_ids = sorted([f for f in os.listdir(TEST_IMG_DIR) if f.endswith(('.png', '.jpg'))])
submission_data = []

for img_id in tqdm(test_ids):
    path = os.path.join(TEST_IMG_DIR, img_id)
    img = cv2.imread(path)
    if img is None: continue
    
    orig_h, orig_w = img.shape[:2]
    
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_resized = cv2.resize(img_rgb, (IMG_WIDTH, IMG_HEIGHT))
    img_batch = preprocess_input(np.array([img_resized], dtype=np.float32))
    
    pred_mask = model.predict(img_batch, verbose=0)[0]
    
    pred_binary = (pred_mask > 0.5).astype(np.uint8)
    final_mask = cv2.resize(pred_binary, (orig_w, orig_h), interpolation=cv2.INTER_NEAREST)
    
    rle = rle_encode(final_mask)
    clean_id = img_id.split('.')[0] 
    submission_data.append([clean_id, rle])

df_sub = pd.DataFrame(submission_data, columns=['image_id', 'encoded_pixels'])
df_sub.to_csv('submission.csv', index=False)
print("\n'submission.csv' generated")