In [1]:
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import torch
import torchvision
import albumentations as A
from albumentations.pytorch import ToTensorV2
import torch.nn as nn
import torchvision.transforms.functional as TF
from tqdm import tqdm
import torch.optim as optim

In [2]:
class RoadDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.images[index])
        mask_path = os.path.join(self.mask_dir, self.images[index])
        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"), dtype=np.float32) # grayscale
        mask = np.where(mask > 4.0, 1.0, 0.0)
       
        if self.transform is not None:
            augmentations = self.transform(image=image, mask=mask)
            image = augmentations["image"]
            mask = augmentations["mask"]

        return image, mask

In [3]:
def get_loaders(
    train_dir,
    train_maskdir,
    batch_size,
    train_transform,
    val_transform,
    num_workers=4,
    pin_memory=True,
):
    train_dataset = RoadDataset(
        image_dir=train_dir,
        mask_dir=train_maskdir,
        transform=train_transform,
    )

    val_dataset = RoadDataset(
        image_dir=train_dir,
        mask_dir=train_maskdir,
        transform=val_transform,
    )
    
    train_ds, _ = random_split(
        train_dataset,
        [80, 20], 
        generator=torch.Generator().manual_seed(42))

    _, val_ds = random_split(
        val_dataset,
        [80, 20], 
        generator=torch.Generator().manual_seed(42))

    train_loader = DataLoader(
        train_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=True,
    )

    val_loader = DataLoader(
        val_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=False,
    )

    return train_loader, val_loader


In [4]:
image_height = 400  #  400 pixels originally
image_width = 400  #  400 pixels originally
train_dir = "data/train_images/"
train_maskdir = "data/train_masks/"
test_dir = 'data/test_images/'
batch_size = 4
num_workers = 0
pin_memory = True

train_transform = A.Compose(
    [
        A.Resize(height=image_height, width=image_width),
        A.Rotate(limit=35, p=1.0),
        # A.Rotate(limit=90, p=1.0),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.1),
        A.Normalize(
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=237.0, # dividing by 237, get a value between 0 and 1
        ),
        ToTensorV2(),
    ],
)

val_transform = A.Compose(
    [
        A.Resize(height=image_height, width=image_width),
        A.Normalize(
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=237.0, 
        ),
        ToTensorV2(),
    ],
)

train_loader, val_loader = get_loaders(
    train_dir,
    train_maskdir,
    batch_size,
    train_transform,
    val_transform,
    num_workers,
    pin_memory,
)

In [5]:
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False), # kernel, stride, padding
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.conv(x)

class UNET(nn.Module):
    def __init__(
            self, in_channels=3, out_channels=1, features=[64, 128, 256, 512],
    ): # outchannels = 1: binary class
        super(UNET, self).__init__()
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Down part of UNET
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        # Up part of UNET
        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(
                    feature*2, feature, kernel_size=2, stride=2,
                ) # feature*2: skip connection
            )
            self.ups.append(DoubleConv(feature*2, feature))

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]

            if x.shape != skip_connection.shape:
                x = TF.resize(x, size=skip_connection.shape[2:]) # height and width

            concat_skip = torch.cat((skip_connection, x), dim=1) # concatenate along in-channel axis
            x = self.ups[idx+1](concat_skip)

        return self.final_conv(x)

In [10]:
# Hyperparameters etc.
LEARNING_RATE = 1e-4
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 4
NUM_EPOCHS = 5
NUM_WORKERS = 0
IMAGE_HEIGHT = 400  # 400 originally
IMAGE_WIDTH = 400  # 400 originally
PIN_MEMORY = True
LOAD_MODEL = False

def train_fn(loader, model, optimizer, loss_fn, scaler):
    loop = tqdm(loader)

    for batch_idx, (data, targets) in enumerate(loop):
        data = data.to(device=DEVICE)
        targets = targets.float().unsqueeze(1).to(device=DEVICE) # add a channel dimension

        # forward
        with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)

        # backward
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # update tqdm loop
        loop.set_postfix(loss=loss.item())

In [162]:
import os
import shutil

determination = 'data/test_images/'
if not os.path.exists(determination):
    os.makedirs(determination)

path = 'data/test/'
folders = os.listdir(path)
for folder in folders:
    dir = path + '/' + str(folder)
    files = os.listdir(dir)
    for file in files:
        source = dir + '/' + str(file)
        deter = determination + '/' + str(file)
        shutil.copyfile(source, deter)

In [11]:
path_list = os.listdir(test_dir)
path_list.sort(key=lambda x: int(x.split(".")[0].split("_")[1]))
path_list

class RoadData_test_set(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.images = path_list # list all the files that are in that folder

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.images[index])
        image = np.array(Image.open(img_path).convert("RGB"))

        if self.transform is not None:
            augmentations = self.transform(image=image)
            image = augmentations["image"]

        return image

test_transform = A.Compose(
    [
        A.Resize(height=image_height, width=image_width),
        A.Normalize(
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=237.0, # dividing by 237, get a value between 0 and 1
        ),
        ToTensorV2(),
    ],
)

test_dataset = RoadData_test_set(
    image_dir=test_dir,
    transform=test_transform,
)

test_loader = DataLoader(
    test_dataset,
    batch_size=1,
    num_workers=0,
    # num_workers =  NUM_WORKERS,
    pin_memory=pin_memory,
    shuffle=False
)

In [8]:
model = UNET(in_channels=3, out_channels=1).to(DEVICE)
loss_fn = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [12]:
def check_accuracy(loader, model, device="cuda"):
    num_correct = 0
    num_pixels = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            y = y.to(device).unsqueeze(1) # the grayscale does not have channels, add
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
            num_correct += (preds == y).sum()
            num_pixels += torch.numel(preds)

    print(
        f"Got {num_correct}/{num_pixels} with acc {num_correct/num_pixels*100:.2f}"
    )
    model.train()

# def save_predictions_as_imgs(
#     loader, model, folder="saved_images/", device="cuda"
# ):
#     model.eval()
#     for idx, (x, y) in enumerate(loader):
#         x = x.to(device=device)
#         with torch.no_grad():
#             preds = torch.sigmoid(model(x))
#             preds = (preds > 0.5).float()
#         torchvision.utils.save_image(
#             preds, f"{folder}/pred_{idx}.png"
#         )
#         torchvision.utils.save_image(y.unsqueeze(1), f"{folder}{idx}.png")

#     model.train()

def save_predictions_as_imgs(
    test_loader, model, folder="saved_images", device="cuda"
):
    model.eval()
    for idx, x in enumerate(test_loader):
        x = x.to(device=device)
        with torch.no_grad():
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
        torchvision.utils.save_image(
            preds, f"{folder}/pred_{idx+1}.png"
        )

    model.train()

In [13]:
scaler = torch.cuda.amp.GradScaler()

for epoch in range(NUM_EPOCHS):
    train_fn(train_loader, model, optimizer, loss_fn, scaler)

    # check accuracy
    check_accuracy(val_loader, model, device=DEVICE)

# # print some examples to a folder
# save_predictions_as_imgs(
#     test_loader, model, folder="saved_images", device=DEVICE
# )

100%|██████████| 20/20 [58:00<00:00, 174.02s/it, loss=0.6]    


Got 772798/3200000 with acc 24.15


100%|██████████| 20/20 [17:21<00:00, 52.09s/it, loss=0.496]


Got 1362677/3200000 with acc 42.58


100%|██████████| 20/20 [21:51<00:00, 65.58s/it, loss=0.449]


Got 2567570/3200000 with acc 80.24


100%|██████████| 20/20 [23:13<00:00, 69.68s/it, loss=0.402]


Got 2577736/3200000 with acc 80.55


100%|██████████| 20/20 [22:33<00:00, 67.67s/it, loss=0.358]


Got 2795451/3200000 with acc 87.36


In [15]:
# print some examples to a folder
save_predictions_as_imgs(
    test_loader, model, folder="saved_images", device=DEVICE
)