In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install patchify

In [None]:
!pip install segmentation segmentation_models_pytorch

In [None]:
!pip install segmentation segmentation_models_pytorch.utils

In [5]:
import os
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
import torch
import numpy as np
from matplotlib import pyplot as plt
from patchify import patchify, unpatchify
from PIL import Image
from sklearn.preprocessing import MinMaxScaler
from segmentation_models_pytorch import Unet
import segmentation_models_pytorch.utils
import torch

In [None]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/

In [None]:
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
! kaggle datasets download -d adrianboguszewski/landcoverai

In [None]:
! mkdir data
! unzip landcoverai.zip -d data

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt

print("Image and Mask filename: M-33-20-D-c-4-2.tif")
print()
temp_img = cv2.imread("data/images/M-33-20-D-c-4-2.tif", 1) # 3 channels / spectral bands
temp_img = cv2.cvtColor(temp_img, cv2.COLOR_BGR2RGB)
print("Image shape:", temp_img.shape)
print()

plt.figure(figsize=(12, 8))
plt.subplot(121)
plt.imshow(temp_img[:,:,0])
plt.title("One channel of image")
plt.subplot(122)
plt.imshow(temp_img)
plt.title("All channels of image")
plt.show()
print()

temp_mask = cv2.imread("data/masks/M-33-20-D-c-4-2.tif") # 3 channels but all same. Can also read with cv2.imread(path, 0) to get only one channel.
print("Mask shape:", temp_mask[:,:,0].shape)
print()
classes, count = np.unique(temp_mask[:,:,0], return_counts=True) # Visualize only one channel. All chanels are identical.
print("Classes are: ", classes, " and the counts are: ", count)
print()

plt.figure(figsize=(6, 4))
plt.imshow(temp_mask[:,:,0])
plt.title("Mask")
plt.show()
print()

In [None]:
# Patching the images
from patchify import patchify
from PIL import Image

root_directory = "data"

img_dir = os.path.join(root_directory, "images")
mask_dir = os.path.join(root_directory, "masks")

patch_size = 512

patches_img_dir = os.path.join(f"patches_{patch_size}", "images")
patches_img_dir = os.path.join(root_directory, patches_img_dir)
os.makedirs(patches_img_dir, exist_ok=True)
patches_mask_dir = os.path.join(f"patches_{patch_size}", "masks")
patches_mask_dir = os.path.join(root_directory, patches_mask_dir)
os.makedirs(patches_mask_dir, exist_ok=True)


def patching(data_dir, patches_dir, patch_size):
  for filename in os.listdir(data_dir):
    if filename.endswith('.tif'):
      img = cv2.imread(os.path.join(data_dir, filename), 1)
      max_height = (img.shape[0] // patch_size) * patch_size
      max_width = (img.shape[1] // patch_size) * patch_size
      img = img[0:max_height, 0:max_width]
      print(f"Patchifying {filename}...")
      patches = patchify(img, (patch_size, patch_size, 3), step = patch_size)  # non-overlapping
      print("Patches shape:", patches.shape)
      for i in range(patches.shape[0]):
        for j in range(patches.shape[1]):
          single_patch = patches[i, j, 0, :, :] # the 0 is an extra unncessary dimension added by patchify for multiple channels scenario
          cv2.imwrite(os.path.join(patches_dir, filename.replace(".tif", f"_patch_{i}_{j}.tif")), single_patch)

print()
print("Dividing images into patches...")
patching(img_dir, patches_img_dir, patch_size)
print("Dividing images into patches completed successfull!")

print()
print("Dividing masks into patches...")
patching(mask_dir, patches_mask_dir, patch_size)
print("Dividing masks into patches completed successfull!")

In [None]:
print(len(os.listdir(patches_img_dir)))
print(len(os.listdir(patches_mask_dir)))

In [13]:
def discard_useless_patches(patches_img_dir, patches_mask_dir):
  for filename in os.listdir(patches_mask_dir):
    img_path = os.path.join(patches_img_dir, filename)
    mask_path = os.path.join(patches_mask_dir, filename)
    img = cv2.imread(img_path)
    mask = cv2.imread(mask_path)
    classes, count = np.unique(mask, return_counts = True)
    
    if (count[0] / count.sum()) > 0.95:
      os.remove(img_path)
      os.remove(mask_path)

discard_useless_patches(patches_img_dir, patches_mask_dir)


In [None]:
print(len(os.listdir(patches_img_dir)))
print(len(os.listdir(patches_mask_dir)))

In [None]:
import os
import glob

import cv2

IMGS_DIR = "/content/drive/MyDrive/Semantic_segmentation_dataset2/images"
MASKS_DIR = "/content/drive/MyDrive/Semantic_segmentation_dataset2/masks"
OUTPUT_DIR = "/content/drive/MyDrive/Semantic_segmentation_dataset2/output/github.tif"

TARGET_SIZE = 512

img_paths = glob.glob(os.path.join(IMGS_DIR, "*.tif"))
mask_paths = glob.glob(os.path.join(MASKS_DIR, "*.tif"))

img_paths.sort()
mask_paths.sort()

os.makedirs(OUTPUT_DIR)
for i, (img_path, mask_path) in enumerate(zip(img_paths, mask_paths)):
    img_filename = os.path.splitext(os.path.basename(img_path))[0]
    mask_filename = os.path.splitext(os.path.basename(mask_path))[0]
    img = cv2.imread(img_path)
    mask = cv2.imread(mask_path)

    assert img_filename == mask_filename and img.shape[:2] == mask.shape[:2]

    k = 0
    for y in range(0, img.shape[0], TARGET_SIZE):
        for x in range(0, img.shape[1], TARGET_SIZE):
            img_tile = img[y:y + TARGET_SIZE, x:x + TARGET_SIZE]
            mask_tile = mask[y:y + TARGET_SIZE, x:x + TARGET_SIZE]

            if img_tile.shape[0] == TARGET_SIZE and img_tile.shape[1] == TARGET_SIZE:
                out_img_path = os.path.join(OUTPUT_DIR, "{}_{}.jpg".format(img_filename, k))
                cv2.imwrite(out_img_path, img_tile)

                out_mask_path = os.path.join(OUTPUT_DIR, "{}_{}_m.png".format(mask_filename, k))
                cv2.imwrite(out_mask_path, mask_tile)

            k += 1

    print("Processed {} {}/{}".format(img_filename, i + 1, len(img_paths)))

In [None]:


print("Patched images and masks are saved in:", OUTPUT_DIR)

In [17]:
splitfolder="/content/drive/MyDrive/Semantic_segmentation_dataset2/split_folder"

In [None]:
pip install split-folders


In [None]:
import os
import splitfolders  

# Define the directories
OUTPUT_DIR = "/content/drive/MyDrive/Semantic_segmentation_dataset2/output/github"  # actual path where the patches are stored

# Print the current working directory
print("Current working directory:", os.getcwd())

# Define the input folder using the actual OUTPUT_DIR path
patches_img_dir = OUTPUT_DIR
input_folder = os.path.abspath(patches_img_dir.strip())  
print("Input folder:", input_folder)

# Check if the input folder exists
if not os.path.exists(input_folder):
    raise ValueError(f"The provided input folder '{input_folder}' does not exist.")

# Update this with the correct root directory path
root_directory = "/content/drive/MyDrive/Semantic_segmentation_dataset2/split_folder"
output_folder = os.path.join(root_directory, "train_val_test")
print("Output folder:", output_folder)

# Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# Split the dataset into training and validation sets
splitfolders.ratio(input_folder, output=output_folder, seed=42, ratio=(.8, .2), group_prefix=None, move=False)  
# Define training and validation directories
train_dir = os.path.join(output_folder, "train")
val_dir = os.path.join(output_folder, "val")


print("Training directory:", train_dir)
print("Validation directory:", val_dir)




In [None]:
x_train_dir = os.path.join(train_dir, "images")
y_train_dir = os.path.join(train_dir, "masks")

x_val_dir = os.path.join(val_dir, "images")
y_val_dir = os.path.join(val_dir, "masks")

In [None]:
# helper function for data visualization
def visualize(**images):
    n = len(images)
    plt.figure(figsize=(16, 5))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()

In [None]:
IMGS_DIR = "/content/drive/MyDrive/Semantic_segmentation_dataset2/images"
MASKS_DIR = "/content/drive/MyDrive/Semantic_segmentation_dataset2/masks"
OUTPUT_DIR = "/content/drive/MyDrive/Semantic_segmentation_dataset2/output/new2.tif"

In [None]:
from torch.utils.data import DataLoader
from torch.utils.data import Dataset

class SegmentationDataset(Dataset):


    CLASSES = ['background', 'building', 'woodland', 'water', 'road']

    def __init__(
            self,
            images_dir,
            masks_dir,
            classes=None,
            augmentation=None,
            preprocessing=None,
    ):
        self.ids = os.listdir(images_dir)
        self.images = [os.path.join(images_dir, image_id) for image_id in self.ids]
        self.masks = [os.path.join(masks_dir, image_id) for image_id in self.ids]

        # convert str names to class values on masks
        self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]

        self.augmentation = augmentation
        self.preprocessing = preprocessing

    def __getitem__(self, i):

        # read data
        image = cv2.imread(self.images[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = image / 255
        mask = cv2.imread(self.masks[i], 0)

        # extract certain classes from mask (e.g. cars)
        masks = [(mask == v) for v in self.class_values]
        mask = np.stack(masks, axis=-1).astype('float')

        # apply augmentations
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        # apply preprocessing
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        return image, mask

    def __len__(self):
        return len(self.ids)

In [None]:
# Visualizing all classes in the mask
dataset = SegmentationDataset(x_train_dir, y_train_dir, classes=['background', 'building', 'woodland', 'water', 'road'])
image, mask = dataset[4] # get some sample
visualize(
    image = image,
    # Convert the predicted one-hot encoded mask back to normal
    mask = np.argmax(mask, axis=2)
)

# Visualizing selected classes in the mask
dataset = SegmentationDataset(x_train_dir, y_train_dir, classes=['background', 'water', 'woodland'])
image, mask = dataset[16] # get some sample
visualize(
    image = image,
    # Convert the predicted one-hot encoded mask back to normal
    mask = np.argmax(mask, axis=2)
)

In [None]:
import albumentations as album

def get_training_augmentation():
    train_transform = [
        album.HorizontalFlip(p=0.5),
        album.VerticalFlip(p=0.5),
        
    ]
    return album.Compose(train_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing(preprocessing_fn):
   
    _transform = [
        album.Lambda(image=preprocessing_fn),
        album.Lambda(image=to_tensor, mask=to_tensor),
    ]
    return album.Compose(_transform)


In [None]:
# Visualize resulted augmented images and masks

augmented_dataset = SegmentationDataset(
    x_train_dir,
    y_train_dir,
    augmentation=get_training_augmentation(),
    classes=['background', 'building', 'woodland', 'water', 'road'],
)

for i in range(3):
    image, mask = augmented_dataset[5123]
    visualize(image=image, mask=np.argmax(mask, axis=2))

In [None]:
import torch
import segmentation_models_pytorch as smp
import segmentation_models_pytorch.utils

BATCH_SIZE = 16
ENCODER = 'efficientnet-b0'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = ['background', 'building', 'woodland', 'water']   # not training on 'road' class since it's instances in the data is too less
ACTIVATION = 'softmax2d'    # could be None for logits or 'softmax2d' for multiclass segmentation
DEVICE = 'cuda'
EPOCHS = 50

# create segmentation model with pretrained encoder
model = smp.Unet(
    encoder_name=ENCODER,
    encoder_weights=ENCODER_WEIGHTS,
    classes=len(CLASSES),
    activation=ACTIVATION,
)

preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

In [None]:
train_dataset = SegmentationDataset(
    x_train_dir,
    y_train_dir,
    augmentation=get_training_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    classes=CLASSES,
)

val_dataset = SegmentationDataset(
    x_val_dir,
    y_val_dir,
    preprocessing=get_preprocessing(preprocessing_fn),
    classes=CLASSES,
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)

In [None]:


loss = smp.utils.losses.DiceLoss()
metrics = [
    smp.utils.metrics.IoU(threshold=0.5)
]

optimizer = torch.optim.Adam([
    dict(params=model.parameters(), lr=0.0003),
])

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

In [None]:
# create epoch runners
train_epoch = smp.utils.train.TrainEpoch(
    model,
    loss=loss,
    metrics=metrics,
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = smp.utils.train.ValidEpoch(
    model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

In [None]:
max_score = 0

for i in range(0, EPOCHS):

    print('\nEpoch: {}'.format(i))
    train_logs = train_epoch.run(train_loader)
    valid_logs = valid_epoch.run(valid_loader)

    # save model
    if max_score < valid_logs['iou_score']:
        max_score = valid_logs['iou_score']
        torch.save(model, f'/content/drive/MyDrive/Colab Notebooks/Personal_Projects/Image Segmentation/Landcover Semantic Segmentation/landcover_unet_{ENCODER}_epochs{i}_patch{patch_size}_batch{BATCH_SIZE}.pth')
        print('Model saved!')

    scheduler.step(valid_logs['dice_loss'])