In [None]:
import numpy as np
import os
import shutil
from glob import glob
from PIL import Image
from tqdm import tqdm
import zipfile
import albumentations as A
from albumentations.pytorch import  ToTensorV2
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms.functional as Func

In [None]:
path_to_zip_file = "/kaggle/input/carvana-image-masking-challenge/train.zip"
save_directory = "/kaggle/working/"
with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
    zip_ref.extractall(save_directory)
    
path_to_zip_file = "/kaggle/input/carvana-image-masking-challenge/train_masks.zip"
save_directory = "/kaggle/working/"
with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
    zip_ref.extractall(save_directory)

In [None]:
os.mkdir('/kaggle/workiing/validation')
os.mkdir('/kaggle/workiing/validation_masks')
os.mkdir('/kaggle/workiing/saved_images')

In [None]:
for file in sorted(os.listdir('/kaggle/working/train'))[:1526]:
    shutil.move('/kaggle/working/train/' + file, '/kaggle/working/validation')

for file in sorted(os.listdir('/kaggle/working/train_masks'))[:1526]:
    shutil.move('/kaggle/working/train_masks/' + file, '/kaggle/working/validation_masks')

In [None]:
TRAIN_DIR = '/kaggle/working/train'
TRAIN_MASK_DIR = '/kaggle/working/train_masks'
VALIDATION_DIR = '/kaggle/working/validation'
VALIDATION_MASK_DIR = '/kaggle/working/validation_masks'

In [None]:
print("Training set:  ", len(os.listdir("/kaggle/working/train")))
print("Training masks:", len(os.listdir("/kaggle/working/train_masks")))
print("Validation set:  ", len(os.listdir("/kaggle/working/validation")))
print("Validation masks:", len(os.listdir("/kaggle/working/validation_masks")))

In [None]:
LEARNING_RATE = 1e-4
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 16
EPOCHS = 10
NUM_WORKERS = 2
IMAGE_HEIGHT = 256
IMAGE_WIDTH = 256
PIN_MEMORY = True
LOAD_MODEL = False

In [None]:
class ImageDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform = None):
        self.image_dir = image_dir
        self.mask_dir  = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        image_path = os.path.join(self.image_dir, self.images[index])
        mask_path = os.path.join(self.mask_dir, self.images[index]).replace(".jpg", "_mask.gif")
        image = np.array(Image.open(image_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"), dtype = np.float32)
        mask[mask == 255.0] = 1.0

        if self.transform is not None:
            augmentations = self.transform(image=image, mask=mask)
            image = augmentations["image"]
            mask = augmentations["mask"]

        return image, mask

In [None]:
class ConvModel(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ConvModel, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias = False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace = True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias = False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace = True))

    def forward(self, x):
        return self.conv(x)


class UNET(nn.Module):
    def __init__(self, in_channels = 3, out_channels = 1, features = [64, 128, 256, 512]):
        super(UNET, self).__init__()
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size = 2, stride = 2)

        for feature in features:
            self.downs.append(ConvModel(in_channels, feature))
            in_channels = feature

        for feature in reversed(features):
            self.ups.append(nn.ConvTranspose2d(feature * 2,
                                            feature,
                                            kernel_size = 2,
                                            stride = 2))

            self.ups.append(ConvModel(feature * 2, feature))

            self.bottleneck = ConvModel(features[-1], features[-1] * 2)

            self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size = 1)

    def forward(self, x):
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]

        for index in range(0, len(self.ups), 2):
            x = self.ups[index](x)
            skip_connection = skip_connections[index // 2]
            if x.shape != skip_connection.shape:
                x = Func.resize(x, size = skip_connection.shape[2:])

            concat_skip = torch.cat((skip_connection, x), dim = 1)
            x = self.ups[index + 1](concat_skip)

        return self.final_conv(x)

def test():
    x = torch.rand((3, 1, 161, 161))
    model = UNET(in_channels = 1, out_channels = 1)
    prediction = model(x)
    print(prediction.shape)
    print(x.shape)
    assert prediction.shape == x.shape


if __name__ == '__main__':
    test()

In [None]:
def save_checkpoint(state, filename = "model_checkpoint.pth.tar"):
    print("Saving Checkpoint")
    torch.save(state, filename)

def load_checkpoint(checkpoint, model):
    print("\nLoading Checkpoint")
    model.load_state_dict(checkpoint["state_dict"])

def get_loaders(train_dir,
                train_maskdir,
                val_dir,
                val_maskdir,
                batch_size,
                train_transform,
                val_transform,
                num_workers = 4,
                pin_memory = True):
    train_set = ImageDataset(image_dir = train_dir,
                            mask_dir = train_maskdir,
                            transform = train_transform)

    train_loader = DataLoader(train_set,
                              batch_size = batch_size,
                              num_workers = num_workers,
                              pin_memory = pin_memory,
                              shuffle = True)

    val_set = ImageDataset(image_dir = val_dir,
                            mask_dir = val_maskdir,
                            transform = val_transform)

    val_loader = DataLoader(val_set,
                            batch_size = batch_size,
                            num_workers = num_workers,
                            pin_memory = pin_memory,
                            shuffle = False)

    return train_loader, val_loader

def check_accuracy(loader, model, device = "cuda"):
    correct_pixels = 0
    num_pixels = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            y = y.to(device).unsqueeze(1)
            prediction = torch.sigmoid(model(x))
            prediction = (prediction > 0.5).float()
            correct_pixels += (prediction == y).sum()
            num_pixels += torch.numel(prediction)

        print(f"Accuracy: {correct_pixels / num_pixels * 100:.2f}")

        model.train()


def save_predictions(loader, model, folder = "saved_images/", device = "cuda"):
    model.eval()
    for index, (x, y) in enumerate(loader):
        x = x.to(device = device)
        with torch.no_grad():
            prediction = torch.sigmoid(model(x))
            prediction = (prediction > 0.5).float()
        torchvision.utils.save_image(prediction, f"{folder}/prediction_{index}.png")
        torchvision.utils.save_image(y.unsqueeze(1), f"{folder}{index}.png")

    torchvision.utils.save_image(y.unsqueeze(1), f"{folder}{index}.png")

In [None]:
def train_func(loader, model, optimizer, loss_fn, scaler):
    loop = tqdm(loader)

    for batch_index, (data, targets) in enumerate(loop):
        data = data.to(device=DEVICE)
        targets = targets.float().unsqueeze(1).to(device = DEVICE)

        with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)

        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        loop.set_postfix(loss = loss.item())


def main():
    train_transform = A.Compose(
        [A.Resize(height = IMAGE_HEIGHT, width = IMAGE_WIDTH),
         A.Rotate(limit = 35, p = 1.0),
         A.HorizontalFlip(p = 0.5),
         A.VerticalFlip(p = 0.1),
         A.Normalize(mean = [0.0, 0.0, 0.0],
                     std = [1.0, 1.0, 1.0],
                     max_pixel_value = 255.0),
         ToTensorV2(),])

    validation_transform = A.Compose(
        [A.Resize(height = IMAGE_HEIGHT, width = IMAGE_WIDTH),
         A.Normalize(mean = [0.0, 0.0, 0.0],
                     std = [1.0, 1.0, 1.0],
                     max_pixel_value = 255.0),
         ToTensorV2(),])

    model = UNET(in_channels = 3, out_channels = 1).to(DEVICE)
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr = LEARNING_RATE)

    train_loader, val_loader = get_loaders(TRAIN_DIR,
                                           TRAIN_MASK_DIR,
                                           VALIDATION_DIR,
                                           VALIDATION_MASK_DIR,
                                           BATCH_SIZE,
                                           train_transform,
                                           validation_transform,
                                           NUM_WORKERS,
                                           PIN_MEMORY)

    if LOAD_MODEL:
        load_checkpoint(torch.load("model_checkpoint.pth.tar"), model)

    scaler = torch.cuda.amp.GradScaler()

    for epoch in range(EPOCHS):
        train_func(train_loader, model, optimizer, loss_fn, scaler)

        checkpoint = {"state_dict": model.state_dict(),
                      "optimizer":optimizer.state_dict()}
        save_checkpoint(checkpoint)

        check_accuracy(val_loader, model, device = DEVICE)

        save_predictions(val_loader, model, folder = "saved_images/", device = DEVICE)

if __name__ == "__main__":
    main()

In [None]:
LOAD_MODEL = True

In [None]:
def train_func(loader, model, optimizer, loss_fn, scaler):
    loop = tqdm(loader)

    for batch_index, (data, targets) in enumerate(loop):
        data = data.to(device=DEVICE)
        targets = targets.float().unsqueeze(1).to(device = DEVICE)

        with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)

        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        loop.set_postfix(loss = loss.item())


def main():
    train_transform = A.Compose(
        [A.Resize(height = IMAGE_HEIGHT, width = IMAGE_WIDTH),
         A.Rotate(limit = 35, p = 1.0),
         A.HorizontalFlip(p = 0.5),
         A.VerticalFlip(p = 0.1),
         A.Normalize(mean = [0.0, 0.0, 0.0],
                     std = [1.0, 1.0, 1.0],
                     max_pixel_value = 255.0),
         ToTensorV2(),])

    validation_transform = A.Compose(
        [A.Resize(height = IMAGE_HEIGHT, width = IMAGE_WIDTH),
         A.Normalize(mean = [0.0, 0.0, 0.0],
                     std = [1.0, 1.0, 1.0],
                     max_pixel_value = 255.0),
         ToTensorV2(),])

    model = UNET(in_channels = 3, out_channels = 1).to(DEVICE)
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr = LEARNING_RATE)

    train_loader, val_loader = get_loaders(TRAIN_DIR,
                                           TRAIN_MASK_DIR,
                                           VALIDATION_DIR,
                                           VALIDATION_MASK_DIR,
                                           BATCH_SIZE,
                                           train_transform,
                                           validation_transform,
                                           NUM_WORKERS,
                                           PIN_MEMORY)

    if LOAD_MODEL:
        load_checkpoint(torch.load("model_checkpoint.pth.tar"), model)

    check_accuracy(val_loader, model, device = DEVICE)
    scaler = torch.cuda.amp.GradScaler()

    for epoch in range(EPOCHS):
        train_func(train_loader, model, optimizer, loss_fn, scaler)

        checkpoint = {"state_dict": model.state_dict(),
                      "optimizer":optimizer.state_dict()}
        save_checkpoint(checkpoint)

        check_accuracy(val_loader, model, device = DEVICE)

        save_predictions(val_loader, model, folder = "saved_images/", device = DEVICE)

if __name__ == "__main__":
    main()