<a href="https://colab.research.google.com/github/Bumper-Car/Vroomie_AI/blob/main/LaneDetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
os.chdir('/content/drive/MyDrive/Vroomie/training')

In [None]:
import os
import shutil
import sys
import time
import copy
import xml.etree.ElementTree as ET

import numpy as np
import pandas as pd
import cv2
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader, Dataset
from torch.autograd import Variable
from torch.nn.modules.loss import _Loss
from torch.optim import lr_scheduler
from torch.nn import init

from torchvision import transforms

In [None]:
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(DEVICE)

cuda:0


# AIHUBDrivingDataset

In [None]:
def download_and_extract(base_url, dataset_key, file_keys, api_key, output_dir=""):
    download_path = os.path.join(output_dir, "download.tar")
    download_url = f"{base_url}/{dataset_key}.do?fileSn={file_keys}"

    print("다운로드를 시작합니다...")
    result = subprocess.run(
        ["curl", "-L", "-C", "-", "-o", download_path, "-H", f"apikey:{api_key}", download_url],
        capture_output=True,
        text=True
    )

    if result.returncode == 0:
        print("다운로드 성공.")
        # 압축 해제
        subprocess.run(["tar", "-xvf", download_path], cwd=output_dir)

        # 병합 함수 정의
        def merge_parts(target_dir):
            part_files = glob.glob(os.path.join(target_dir, "*.part*"))
            if not part_files:
                return
            prefixes = sorted(set(f.split(".part")[0] for f in part_files))
            for prefix in prefixes:
                part_list = sorted(glob.glob(f"{prefix}.part*"))
                with open(prefix, "wb") as outfile:
                    for part in part_list:
                        with open(part, "rb") as infile:
                            outfile.write(infile.read())
                        os.remove(part)
                print(f"병합 완료: {os.path.basename(prefix)}")

        # 하위 디렉토리 탐색
        for root, dirs, files in os.walk(output_dir):
            if any("part" in f for f in files):
                merge_parts(root)

    # 4. 병합된 .zip 파일 해제
    for root, dirs, files in os.walk(output_dir):
        for file in files:
            if file.endswith(".zip"):
                zip_path = os.path.join(root, file)
                unzip_dir = os.path.splitext(zip_path)[0]
                os.makedirs(unzip_dir, exist_ok=True)
                subprocess.run(["unzip", "-q", zip_path, "-d", unzip_dir])
                print(f"압축 해제 완료: {zip_path}")

    # 5. tar 파일 삭제
    os.remove(download_path)
    print("모든 병합 및 압축 해제 완료.")

    else:
        print(f"다운로드 실패\n{result.stderr}")
        if os.path.exists(download_path):
            with open(download_path, "rb") as f:
                print(f.read().decode(errors="ignore"))
            os.remove(download_path)


In [None]:
AIHUB_API_KEY=E737AD1A-44BA-426B-92C6-D7BC8EBB5A87

In [None]:
download_and_extract(
    base_url="https://api.aihub.or.kr",
    dataset_key="180",
    file_keys="38568",
    api_key=AIHUB_API_KEY
)

In [None]:
download_and_extract(
    base_url="https://api.aihub.or.kr",
    dataset_key="180",
    file_keys="38569",
    api_key=AIHUB_API_KEY
)

In [None]:
download_and_extract(
    base_url="https://api.aihub.or.kr",
    dataset_key="180",
    file_keys="38576",
    api_key=AIHUB_API_KEY
)

In [None]:
download_and_extract(
    base_url="https://api.aihub.or.kr",
    dataset_key="180",
    file_keys="38577",
    api_key=AIHUB_API_KEY
)

In [None]:
download_and_extract(
    base_url="https://api.aihub.or.kr",
    dataset_key="180",
    file_keys="38578",
    api_key=AIHUB_API_KEY
)

In [None]:
download_and_extract(
    base_url="https://api.aihub.or.kr",
    dataset_key="180",
    file_keys="38579",
    api_key=AIHUB_API_KEY
)

In [None]:
def flatten_dataset(image_root, xml_root, target_img_dir, target_xml_dir):
    os.makedirs(target_img_dir, exist_ok=True)
    os.makedirs(target_xml_dir, exist_ok=True)

    img_count, xml_count = 0, 0

    for root, _, files in os.walk(image_root):
        for file in files:
            if file.endswith('.jpg'):
                src = os.path.join(root, file)
                dst = os.path.join(target_img_dir, file)
                shutil.copy2(src, dst)
                img_count += 1

    for root, _, files in os.walk(xml_root):
        for file in files:
            if file.endswith('.xml'):
                src = os.path.join(root, file)
                dst = os.path.join(target_xml_dir, file)
                shutil.copy2(src, dst)
                xml_count += 1

    print(f"복사 완료: 이미지 {img_count}개, XML {xml_count}개")

In [None]:
# flatten_dataset(
#     image_root="train/원천데이터",
#     xml_root="train/라벨링데이터",
#     target_img_dir="train/원천데이터_flat",
#     target_xml_dir="train/라벨링데이터_flat"
# )

In [None]:
# flatten_dataset(
#     image_root="val/원천데이터",
#     xml_root="val/라벨링데이터",
#     target_img_dir="val/원천데이터_flat",
#     target_xml_dir="val/라벨링데이터_flat"
# )

In [None]:
LANE_LABELS = {
    'Lane_White_Dash',
    'Lane_White_Solid',
    'Lane_Yellow_Dash',
    'Lane_Yellow_Solid',
    'Lane_Blue_Dash',
    'Lane_Blue_Solid',
}

In [None]:
class AIHubDrivingDataset(Dataset):
    def __init__(self, image_dir, label_dir, transform=None, target_transform=None, img_size=(320, 640)):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transform = transform
        self.target_transform = target_transform
        self.img_size = img_size

        self.label_list = sorted([f for f in os.listdir(label_dir) if f.endswith('.xml')])

    def __len__(self):
        return len(self.label_list)

    def __getitem__(self, idx):
        xml_name = self.label_list[idx]
        xml_path = os.path.join(self.label_dir, xml_name)

        tree = ET.parse(xml_path)
        root = tree.getroot()

        img_name = root.find('filename').text
        img_path = os.path.join(self.image_dir, img_name)

        image = cv2.imread(img_path, cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        label_binary_image = np.zeros([image.shape[0], image.shape[1]], dtype=np.uint8)
        label_instance_image = np.zeros([image.shape[0], image.shape[1]], dtype=np.uint8)

        instance_id = 1

        for line in root.findall('line'):
            name = line.find('name')
            if name is None or 'Lane' not in name.text:
                continue

            coords = list(line.find('controlPt'))
            pts = []

            for i in range(0, len(coords), 2):
                x = int(coords[i].text)
                y = int(coords[i+1].text)
                pts.append((x, y))



            if len(pts) >= 2:
                pts = np.array([pts], np.int64)
                cv2.polylines(label_binary_image, pts, isClosed=False, color=255, thickness=5)
                cv2.polylines(label_instance_image, pts, isClosed=False, color=instance_id * 50 + 2, thickness=5)
                instance_id += 1

        image = Image.fromarray(image)

        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label_binary_image = self.target_transform(label_binary_image)
            label_instance_image = self.target_transform(label_instance_image)

        if label_binary_image.ndim == 2:
            # Tensor → NumPy → BGR 변환 → 다시 Tensor
            label_binary_image = label_binary_image.cpu().numpy().astype(np.uint8)
            label_binary_image = cv2.cvtColor(label_binary_image, cv2.COLOR_GRAY2BGR)
            label_binary_image = torch.from_numpy(label_binary_image)

        label_binary = np.zeros([label_binary_image.shape[0], label_binary_image.shape[1]], dtype=np.uint8)
        mask = np.where((label_binary_image.cpu().numpy() != [0, 0, 0]).all(axis=2))
        label_binary[mask] = 1

        return image, label_binary, label_instance_image

# Model

In [None]:
class FocalLoss(nn.Module):
    '''
    Only consider two class now: foreground, background.
    '''
    def __init__(self, gamma=2, alpha=[0.5, 0.5], n_class=2, reduction='mean', device = DEVICE):
        super().__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction
        self.n_class = n_class
        self.device = device

    def forward(self, input, target):
        pt = F.softmax(input, dim=1)
        pt = pt.clamp(min=0.000001,max=0.999999)
        target_onehot = torch.zeros((target.size(0), self.n_class, target.size(1),target.size(2))).to(self.device)
        loss = 0
        for i in range(self.n_class):
            target_onehot[:,i,...][target == i] = 1
        for i in range(self.n_class):
            loss -= self.alpha[i] * (1 - pt[:,i,...]) ** self.gamma * target_onehot[:,i,...] * torch.log(pt[:,i,...])

        if self.reduction == 'mean':
            loss = torch.mean(loss)
        elif self.reduction == 'sum':
            loss = torch.sum(loss)

        return loss

In [None]:
class DiscriminativeLoss(_Loss):
    def __init__(self, delta_var=0.5, delta_dist=1.5, norm=2, alpha=1.0, beta=1.0, gamma=0.001,
                 usegpu=False, size_average=True):
        super(DiscriminativeLoss, self).__init__(reduction='mean')
        self.delta_var = delta_var
        self.delta_dist = delta_dist
        self.norm = norm
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma
        self.usegpu = usegpu
        assert self.norm in [1, 2]

    def forward(self, input, target):

        return self._discriminative_loss(input, target)

    def _discriminative_loss(self, embedding, seg_gt):
        batch_size, embed_dim, H, W = embedding.shape
        embedding = embedding.reshape(batch_size, embed_dim, H*W)
        seg_gt = seg_gt.reshape(batch_size, H*W)

        var_loss = torch.tensor(0, dtype=embedding.dtype, device=embedding.device)
        dist_loss = torch.tensor(0, dtype=embedding.dtype, device=embedding.device)
        reg_loss = torch.tensor(0, dtype=embedding.dtype, device=embedding.device)

        for b in range(batch_size):
            embedding_b = embedding[b]  # (embed_dim, H*W)
            seg_gt_b = seg_gt[b]  # (H*W)

            labels, indexs = torch.unique(seg_gt_b, return_inverse=True)
            num_lanes = len(labels)
            if num_lanes == 0:
                _nonsense = embedding.sum()
                _zero = torch.zeros_like(_nonsense)
                var_loss = var_loss + _nonsense * _zero
                dist_loss = dist_loss + _nonsense * _zero
                reg_loss = reg_loss + _nonsense * _zero
                continue

            centroid_mean = []
            for lane_idx in labels:
                seg_mask_i = (seg_gt_b == lane_idx)

                if not seg_mask_i.any():
                    continue

                embedding_i = embedding_b * seg_mask_i
                mean_i = torch.sum(embedding_i, dim=1) / torch.sum(seg_mask_i)
                centroid_mean.append(mean_i)
                # ---------- var_loss -------------
                var_loss = var_loss + torch.sum(F.relu(
                    torch.norm(embedding_i[:,seg_mask_i] - mean_i.reshape(embed_dim, 1), dim=0) - self.delta_var) ** 2) / torch.sum(seg_mask_i) / num_lanes
            centroid_mean = torch.stack(centroid_mean)  # (n_lane, embed_dim)

            if num_lanes > 1:
                centroid_mean1 = centroid_mean.reshape(-1, 1, embed_dim)
                centroid_mean2 = centroid_mean.reshape(1, -1, embed_dim)

                dist = torch.norm(centroid_mean1 - centroid_mean2, dim=2)   # shape (num_lanes, num_lanes)
                dist = dist + torch.eye(num_lanes, dtype=dist.dtype,
                                        device=dist.device) * self.delta_dist

                # divided by two for double calculated loss above, for implementation convenience
                dist_loss = dist_loss + torch.sum(F.relu(-dist + self.delta_dist) ** 2) / (
                        num_lanes * (num_lanes - 1)) / 2

            # reg_loss is not used in original paper
            # reg_loss = reg_loss + torch.mean(torch.norm(centroid_mean, dim=1))

        var_loss = var_loss / batch_size
        dist_loss = dist_loss / batch_size
        reg_loss = reg_loss / batch_size

        return var_loss, dist_loss, reg_loss

In [None]:
def compute_loss(net_output, binary_label, instance_label, loss_type = 'FocalLoss'):
    k_binary = 10    #1.7
    k_instance = 0.3
    k_dist = 1.0

    if(loss_type == 'FocalLoss'):
        loss_fn = FocalLoss(gamma=2, alpha=[0.25, 0.75])
    elif(loss_type == 'CrossEntropyLoss'):
        loss_fn = nn.CrossEntropyLoss()
    else:
        # print("Wrong loss type, will use the default CrossEntropyLoss")
        loss_fn = nn.CrossEntropyLoss()

    binary_seg_logits = net_output["binary_seg_logits"]
    binary_loss = loss_fn(binary_seg_logits, binary_label)

    pix_embedding = net_output["instance_seg_logits"]
    ds_loss_fn = DiscriminativeLoss(0.5, 1.5, 1.0, 1.0, 0.001)
    var_loss, dist_loss, reg_loss = ds_loss_fn(pix_embedding, instance_label)
    binary_loss = binary_loss * k_binary
    var_loss = var_loss * k_instance
    dist_loss = dist_loss * k_dist
    instance_loss = var_loss + dist_loss
    total_loss = binary_loss + instance_loss
    out = net_output["binary_seg_pred"]

    return total_loss, binary_loss, instance_loss, out

In [None]:
def train_model(model, optimizer, scheduler, dataloaders, dataset_sizes, device, loss_type = 'FocalLoss', num_epochs=25):
    since = time.time()
    training_log = {'epoch':[], 'training_loss':[], 'val_loss':[]}
    best_loss = float("inf")

    best_model_wts = copy.deepcopy(model.state_dict())

    for epoch in range(num_epochs):
        training_log['epoch'].append(epoch)
        current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        print('[{}] Epoch {}/{}'.format(current_time, epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_loss_b = 0.0
            running_loss_i = 0.0

            # Iterate over data.
            for inputs, binarys, instances in dataloaders[phase]:
                inputs = inputs.type(torch.FloatTensor).to(device)
                binarys = binarys.type(torch.LongTensor).to(device)
                instances = instances.type(torch.FloatTensor).to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = compute_loss(outputs, binarys, instances, loss_type)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss[0].backward()
                        optimizer.step()

                # statistics
                running_loss += loss[0].item() * inputs.size(0)
                running_loss_b += loss[1].item() * inputs.size(0)
                running_loss_i += loss[2].item() * inputs.size(0)

            if phase == 'train':
                if scheduler != None:
                    scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            binary_loss = running_loss_b / dataset_sizes[phase]
            instance_loss = running_loss_i / dataset_sizes[phase]
            print('{} Total Loss: {:.4f} Binary Loss: {:.4f} Instance Loss: {:.4f}'.format(phase, epoch_loss, binary_loss, instance_loss))

            # deep copy the model
            if phase == 'train':
                training_log['training_loss'].append(epoch_loss)
            if phase == 'val':
                training_log['val_loss'].append(epoch_loss)
                if epoch_loss < best_loss:
                    best_loss = epoch_loss
                    best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val_loss: {:4f}'.format(best_loss))
    training_log['training_loss'] = np.array(training_log['training_loss'])
    training_log['val_loss'] = np.array(training_log['val_loss'])

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, training_log

In [None]:
class InitialBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(InitialBlock, self).__init__()
        self.input_channel = in_ch
        self.conv_channel = out_ch - in_ch

        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch - in_ch, kernel_size = 3, stride = 2, padding=1),
            nn.BatchNorm2d(out_ch - in_ch),
            nn.PReLU()
        )
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        conv_branch = self.conv(x)
        maxp_branch = self.maxpool(x)
        return torch.cat([conv_branch, maxp_branch], 1)

In [None]:
class BottleneckModule(nn.Module):
    def __init__(self, in_ch, out_ch, module_type, padding = 1, dilated = 0, asymmetric = 5, dropout_prob = 0):
        super(BottleneckModule, self).__init__()
        self.input_channel = in_ch
        self.activate = nn.PReLU()

        self.module_type = module_type
        if self.module_type == 'downsampling':
            self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
            self.conv = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size = 2, stride = 2),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Conv2d(out_ch, out_ch, kernel_size = 3, stride=1, padding=padding),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Conv2d(out_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Dropout2d(p=dropout_prob)
            )
        elif self.module_type == 'upsampling':
            self.maxunpool = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)    # Use upsample instead of maxunpooling
            )

            self.conv = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.ConvTranspose2d(out_ch, out_ch, kernel_size=2, stride=2),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Conv2d(out_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Dropout2d(p=dropout_prob)
            )
        elif self.module_type == 'regular':
            self.conv = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Conv2d(out_ch, out_ch, kernel_size = 3, stride=1, padding=padding),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Conv2d(out_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Dropout2d(p=dropout_prob)
            )
        elif self.module_type == 'asymmetric':
            self.conv = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Conv2d(out_ch, out_ch, (asymmetric, 1), stride=1, padding=(padding, 0)),
                nn.Conv2d(out_ch, out_ch, (1, asymmetric), stride=1, padding=(0, padding)),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Conv2d(out_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Dropout2d(p=dropout_prob)
            )
        elif self.module_type == 'dilated':
            self.conv = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Conv2d(out_ch, out_ch, kernel_size = 3, stride=1, padding=padding, dilation=dilated),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Conv2d(out_ch, out_ch, kernel_size = 1),
                nn.BatchNorm2d(out_ch),
                nn.PReLU(),
                nn.Dropout2d(p=dropout_prob)
            )
        else:
            raise("Module Type error")

    def forward(self, x):
        if self.module_type == 'downsampling':
            conv_branch = self.conv(x)
            maxp_branch = self.maxpool(x)
            bs, conv_ch, h, w = conv_branch.size()
            maxp_ch = maxp_branch.size()[1]
            padding = torch.zeros(bs, conv_ch - maxp_ch, h, w).to(DEVICE)

            maxp_branch = torch.cat([maxp_branch, padding], 1).to(DEVICE)
            output = maxp_branch + conv_branch
        elif self.module_type == 'upsampling':
            conv_branch = self.conv(x)
            maxunp_branch = self.maxunpool(x)
            output = maxunp_branch + conv_branch
        else:
            output = self.conv(x) + x

        return self.activate(output)

In [None]:
def weights_init_kaiming(m):
    classname = m.__class__.__name__
    #print(classname)
    if classname.find('Conv') != -1:
        init.kaiming_normal_(m.weight.data, a=0, mode='fan_in')
    elif classname.find('Linear') != -1:
        init.kaiming_normal_(m.weight.data, a=0, mode='fan_in')
    elif classname.find('BatchNorm') != -1:
        init.normal_(m.weight.data, 1.0, 0.02)
        init.constant_(m.bias.data, 0.0)

In [None]:
class ENet_Encoder(nn.Module):

    def __init__(self, in_ch=3, dropout_prob=0):
        super(ENet_Encoder, self).__init__()

        # Encoder

        self.initial_block = InitialBlock(in_ch, 16)

        self.bottleneck1_0 = BottleneckModule(16, 64, module_type = 'downsampling', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck1_1 = BottleneckModule(64, 64, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck1_2 = BottleneckModule(64, 64, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck1_3 = BottleneckModule(64, 64, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck1_4 = BottleneckModule(64, 64, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)

        self.bottleneck2_0 = BottleneckModule(64, 128, module_type = 'downsampling', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck2_1 = BottleneckModule(128, 128, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck2_2 = BottleneckModule(128, 128, module_type = 'dilated', padding = 2, dilated = 2, dropout_prob = dropout_prob)
        self.bottleneck2_3 = BottleneckModule(128, 128, module_type = 'asymmetric', padding = 2, asymmetric=5, dropout_prob = dropout_prob)
        self.bottleneck2_4 = BottleneckModule(128, 128, module_type = 'dilated', padding = 4, dilated = 4, dropout_prob = dropout_prob)
        self.bottleneck2_5 = BottleneckModule(128, 128, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck2_6 = BottleneckModule(128, 128, module_type = 'dilated', padding = 8, dilated = 8, dropout_prob = dropout_prob)
        self.bottleneck2_7 = BottleneckModule(128, 128, module_type = 'asymmetric', padding = 2, asymmetric=5, dropout_prob = dropout_prob)
        self.bottleneck2_8 = BottleneckModule(128, 128, module_type = 'dilated', padding = 16, dilated = 16, dropout_prob = dropout_prob)

        self.bottleneck3_0 = BottleneckModule(128, 128, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck3_1 = BottleneckModule(128, 128, module_type = 'dilated', padding = 2, dilated = 2, dropout_prob = dropout_prob)
        self.bottleneck3_2 = BottleneckModule(128, 128, module_type = 'asymmetric', padding = 2, asymmetric=5, dropout_prob = dropout_prob)
        self.bottleneck3_3 = BottleneckModule(128, 128, module_type = 'dilated', padding = 4, dilated = 4, dropout_prob = dropout_prob)
        self.bottleneck3_4 = BottleneckModule(128, 128, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck3_5 = BottleneckModule(128, 128, module_type = 'dilated', padding = 8, dilated = 8, dropout_prob = dropout_prob)
        self.bottleneck3_6 = BottleneckModule(128, 128, module_type = 'asymmetric', padding = 2, asymmetric=5, dropout_prob = dropout_prob)
        self.bottleneck3_7 = BottleneckModule(128, 128, module_type = 'dilated', padding = 16, dilated = 16, dropout_prob = dropout_prob)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                weights_init_kaiming(m)
            elif isinstance(m, nn.BatchNorm2d):
                weights_init_kaiming(m)

    def forward(self, x):
        x = self.initial_block(x)

        x = self.bottleneck1_0(x)
        x = self.bottleneck1_1(x)
        x = self.bottleneck1_2(x)
        x = self.bottleneck1_3(x)
        x = self.bottleneck1_4(x)

        x = self.bottleneck2_0(x)
        x = self.bottleneck2_1(x)
        x = self.bottleneck2_2(x)
        x = self.bottleneck2_3(x)
        x = self.bottleneck2_4(x)
        x = self.bottleneck2_5(x)
        x = self.bottleneck2_6(x)
        x = self.bottleneck2_7(x)
        x = self.bottleneck2_8(x)

        x = self.bottleneck3_0(x)
        x = self.bottleneck3_1(x)
        x = self.bottleneck3_2(x)
        x = self.bottleneck3_3(x)
        x = self.bottleneck3_4(x)
        x = self.bottleneck3_5(x)
        x = self.bottleneck3_6(x)
        x = self.bottleneck3_7(x)

        return x

In [None]:
class ENet_Decoder(nn.Module):

    def __init__(self, out_ch=1, dropout_prob=0):
        super(ENet_Decoder, self).__init__()


        self.bottleneck4_0 = BottleneckModule(128, 64, module_type = 'upsampling', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck4_1 = BottleneckModule(64, 64, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck4_2 = BottleneckModule(64, 64, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)

        self.bottleneck5_0 = BottleneckModule(64, 16, module_type = 'upsampling', padding = 1, dropout_prob = dropout_prob)
        self.bottleneck5_1 = BottleneckModule(16, 16, module_type = 'regular', padding = 1, dropout_prob = dropout_prob)

        self.fullconv = nn.ConvTranspose2d(16, out_ch, kernel_size=2, stride=2)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                weights_init_kaiming(m)
            elif isinstance(m, nn.BatchNorm2d):
                weights_init_kaiming(m)

    def forward(self, x):

        x = self.bottleneck4_0(x)
        x = self.bottleneck4_1(x)
        x = self.bottleneck4_2(x)

        x = self.bottleneck5_0(x)
        x = self.bottleneck5_1(x)

        x = self.fullconv(x)

        return x

In [None]:
class LaneNet(nn.Module):
    def __init__(self, in_ch = 3, arch="ENet", output_size=(270, 480)):
        super(LaneNet, self).__init__()
        # no of instances for segmentation
        self.no_of_instances = 3  # if you want to output RGB instance map, it should be 3.
        print("Use {} as backbone".format(arch))
        self._arch = arch
        self.output_size = output_size

        self._encoder = ENet_Encoder(in_ch)
        self._encoder.to(DEVICE)

        self._decoder_binary = ENet_Decoder(2)
        self._decoder_instance = ENet_Decoder(self.no_of_instances)
        self._decoder_binary.to(DEVICE)
        self._decoder_instance.to(DEVICE)

        self.relu = nn.ReLU().to(DEVICE)
        self.sigmoid = nn.Sigmoid().to(DEVICE)

    def forward(self, input_tensor):
        c = self._encoder(input_tensor)
        binary = self._decoder_binary(c)
        instance = self._decoder_instance(c)

        binary = F.interpolate(binary, size=self.output_size, mode='bilinear', align_corners=True)
        instance = F.interpolate(instance, size=self.output_size, mode='bilinear', align_corners=True)

        binary_seg_ret = torch.argmax(F.softmax(binary, dim=1), dim=1, keepdim=True)

        pix_embedding = self.sigmoid(instance)

        return {
            'instance_seg_logits': pix_embedding,
            'binary_seg_pred': binary_seg_ret,
            'binary_seg_logits': binary
        }

# Training

In [None]:
save_path = 'output'
dataset = ''
batch_size = 4
model_type = 'ENet'
learning_rate = 0.0001
training_epochs = 25
loss_type = 'FocalLoss'
(resize_height, resize_width) = (270, 480)


if not os.path.isdir(save_path):
    os.makedirs(save_path)

# 경로 설정
train_image_dir = os.path.join(dataset, 'train/원천데이터_flat')
train_label_dir = os.path.join(dataset, 'train/라벨링데이터_flat')
val_image_dir = os.path.join(dataset, 'val/원천데이터_flat')
val_label_dir = os.path.join(dataset, 'val/라벨링데이터_flat')

data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((resize_height, resize_width)),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((resize_height, resize_width)),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

target_transform = transforms.Compose([
    transforms.Lambda(lambda mask: torch.from_numpy(
        cv2.resize(mask, (resize_width, resize_height), interpolation=cv2.INTER_NEAREST)
    ).long())
])

# Dataset 및 DataLoader 정의
train_dataset = AIHubDrivingDataset(
    image_dir=train_image_dir,
    label_dir=train_label_dir,
    transform=data_transforms['train'],
    target_transform=target_transform
)
val_dataset = AIHubDrivingDataset(
    image_dir=val_image_dir,
    label_dir=val_label_dir,
    transform=data_transforms['val'],
    target_transform=target_transform
)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

dataloaders = {'train': train_loader, 'val': val_loader}
dataset_sizes = {'train': len(train_dataset), 'val': len(val_dataset)}

# 모델 초기화
model = LaneNet(arch=model_type)
model.to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
print(f"{training_epochs} epochs with {len(train_dataset)} training samples\n")

model, log = train_model(
    model,
    optimizer,
    scheduler=None,
    dataloaders=dataloaders,
    dataset_sizes=dataset_sizes,
    device=DEVICE,
    loss_type=loss_type,
    num_epochs=training_epochs
)

# 로그 저장
df = pd.DataFrame(log)
train_log_save_filename = os.path.join(save_path, 'training_log.csv')
df.to_csv(train_log_save_filename, index=False, encoding='utf-8')
print(f"Training log saved: {train_log_save_filename}")

# 모델 저장
model_save_filename = os.path.join(save_path, 'best_model.pth')
torch.save(model.state_dict(), model_save_filename)
print(f"Model saved: {model_save_filename}")

Use ENet as backbone
25 epochs with 200 training samples

[2025-05-26 06:05:09] Epoch 0/24
----------
train Total Loss: 0.8115 Binary Loss: 0.2095 Instance Loss: 0.6020
val Total Loss: 0.4986 Binary Loss: 0.1414 Instance Loss: 0.3573

[2025-05-26 06:08:55] Epoch 1/24
----------
train Total Loss: 0.3873 Binary Loss: 0.1128 Instance Loss: 0.2745
val Total Loss: 0.3259 Binary Loss: 0.0937 Instance Loss: 0.2322

[2025-05-26 06:09:23] Epoch 2/24
----------
train Total Loss: 0.2816 Binary Loss: 0.0802 Instance Loss: 0.2013
val Total Loss: 0.2722 Binary Loss: 0.0703 Instance Loss: 0.2019

[2025-05-26 06:09:52] Epoch 3/24
----------
train Total Loss: 0.2373 Binary Loss: 0.0617 Instance Loss: 0.1757
val Total Loss: 0.2342 Binary Loss: 0.0562 Instance Loss: 0.1780

[2025-05-26 06:10:22] Epoch 4/24
----------
train Total Loss: 0.2106 Binary Loss: 0.0499 Instance Loss: 0.1607
val Total Loss: 0.2096 Binary Loss: 0.0465 Instance Loss: 0.1631

[2025-05-26 06:10:51] Epoch 5/24
----------
train Total L

In [None]:
!cp output/best_model.pth /content/drive/MyDrive/Vroomie/inference/model/lane_detection_model.pth