In [1]:
from osgeo import gdal
from osgeo import gdal_array
from osgeo import osr
import numpy as np
import pandas as pd
import image_utils as iu
import os
import os.path
from PIL import Image, ImageEnhance
from PIL import ImageFilter
from shutil import copy2
import matplotlib.pyplot as plt
import geotiff_utils as gu
import uuid
import shutil
import numpy_utils as nu
import PIL
PIL.Image.MAX_IMAGE_PIXELS = 3520956600

In [64]:
data_path = 'E:\\Staff\\Asare\\Ssm\\data'
segmentation_path = os.path.join(data_path, 'segmentation')
original_path = os.path.join(segmentation_path, 'original')
real_data_path = os.path.join(original_path, 'real')
mask_data_path = os.path.join(original_path, 'mask')
# divided images
divided_data_path = os.path.join(segmentation_path, 'divided')
real_divided_data_path = os.path.join(divided_data_path, 'real')
mask_divided_data_path = os.path.join(divided_data_path, 'mask')
#
final_data_path = os.path.join(segmentation_path, 'final')
real_final_data_path = os.path.join(final_data_path, 'real')
mask_final_data_path = os.path.join(final_data_path, 'mask')

In [None]:
#Create base dir's
iu.create_dir_if_not_exists(data_path)
iu.create_dir_if_not_exists(segmentation_path)

In [None]:
# stores the path of an image and a mask pair
class DataLabelPair:
    def __init__(self, data_real_path, label_path):
        self.data_real_path = data_real_path
        self.label_path = label_path

In [None]:
#Define the class mappings
class_encoding = {0: 'No Data',
                  1: 'Active-Mines',
                  2: 'Abandoned-Mines',
                  3: 'Polluted-Rivers'}

r_class_encoding = {v: k for k, v in class_encoding.items()}
# print(reversed_dict)
new_color_encoding = {3: [255, 255, 0], 
                      2: [180, 96, 0], 
                      1: [251, 72, 196],  
                      0: [0, 0, 0]}

In [5]:
def display_legend(np_image, color_encoding):
    flat_gh_im = np_image.reshape(-1, 1)
    #
    new_gh_im = [None] * flat_gh_im.shape[0]
    for x in range(flat_gh_im.shape[0]):
        new_gh_im[x] = color_encoding.get(flat_gh_im[x][0], [0, 0, 0])
    #
    new_gh_im = np.array(new_gh_im).reshape(
        np_image.shape[0], np_image.shape[1], 3)
    enc_im = Image.fromarray(np.array(new_gh_im, dtype=np.uint8))
    return enc_im

# Splits image into specified rows and columns
def split_geoTiff_image(geoTiff, max_grid_size, strides):
    parts = []
    width, height = geoTiff.RasterXSize, geoTiff.RasterYSize
    left = 0
    top = 0
    stride_width, stride_height = strides
    max_grid_width, max_grid_height = max_grid_size
    total_grids = np.ceil(height / stride_width) * np.ceil(width / stride_height)
    for r in range(0, height, stride_height):
        for c in range(0, width, stride_width):
            left, top = c, r
            right, bottom = min(
                    c + max_grid_width, width), min(r + max_grid_height,height)
            grid = gu.crop_geoTiff(
                    geoTiff, left, top, right, bottom, 'uint16')
            # part = gu.crop_geoTiff(geoTiff, left, top, right, bottom)
            parts.append(grid)
    return parts

def split_image(image, max_grid_size, strides):
    parts = []
    width, height = image.size
    left = 0
    top = 0
    stride_width, stride_height = strides
    max_grid_width, max_grid_height = max_grid_size
    for r in range(0, height, stride_height):
        for c in range(0, width, stride_width):
            left, top = c, r
            right, bottom = min(
                    c + max_grid_width, width), min(r + max_grid_height,height)
            grid = image.crop((left, top, right, bottom))
            # part = gu.crop_geoTiff(geoTiff, left, top, right, bottom)
            parts.append(grid)
    return parts


In [None]:
#Gets all mask files in the mask folder
original_mask_images_path = iu.get_all_files(mask_data_path, '*.png')
print(len(original_mask_images_path), 'Labeled Files')

In [34]:
images_original = []
for path in original_mask_images_path:
    image = DataLabelPair(os.path.join(real_data_path, path.replace('.png', '.tif')),
                          os.path.join(mask_data_path, path))
    images_original.append(image)

In [None]:
max_grid_size = (512,512)
strides = (128, 128)
# images are divided into smaller images.
# This reduces the dimension of the images and also increases the training data.
iu.create_dir_if_not_exists(mask_divided_data_path)
iu.create_dir_if_not_exists(real_divided_data_path)
iu.create_dir_if_not_exists(mask_divided_data_path)
i = 0
for image_original in images_original:
    real_image = gdal.Open(image_original.data_real_path)
    part_count = col_count * row_count
    np_real_image = gu.read_geoTiff_bands(real_image)
    mask_image = iu.read_image(image_original.label_path, 'L')
    real_image_parts = split_geoTiff_image(real_image, max_grid_size, strides)
    mask_image_parts = split_image(mask_image, max_grid_size, strides)
    for x in range(len(real_image_parts)):
        part_filename = str(uuid.uuid4())
        real_image_part = real_image_parts[x]
        mask_image_part = mask_image_parts[x]
        # Save the arrays to a .npz file
        nu.save_numpy_file(real_image_part, os.path.join(
            real_divided_data_path, part_filename + '.npz'))
        mask_image_part.save(os.path.join(
            mask_divided_data_path, part_filename + '.png'))
    i += 1
    iu.print_progress(i, len(images_original))

In [None]:
#Images that do not contain any information is deleted from storage.
divided_images_path = iu.get_all_files(mask_divided_data_path)
print(len(divided_images_path), 'Divided files')

In [None]:
images_divided = []
for path in divided_images_path:
    image = DataLabelPair(os.path.join(real_divided_data_path, path.replace('.png', '.npz')),
                          os.path.join(mask_divided_data_path, path))
    images_divided.append(image)

In [40]:
def overlay_mask(original_image, mask_image):
    # Open the image and mask
    image = original_image.convert('RGBA')
    mask = mask_image.convert('RGBA')

    # Make pixel value [0, 0, 0] transparent in the mask
    mask_data = mask.getdata()
    new_mask_data = []
    for item in mask_data:
        if item[:3] == (0, 0, 0):
            # Set transparency for [0, 0, 0]
            new_mask_data.append((0, 0, 0, 0))
        else:
            new_mask_data.append(item)
    mask.putdata(new_mask_data)

    # Overlay the mask on the image
    overlaid = Image.alpha_composite(image, mask)

    # Save the overlaid image
    return overlaid

0

In [19]:
#20  12
import random
indx = random.randint(0, len(divided_images_path))
np_gt = nu.load_numpy_file(images_divided[indx].data_real_path) 
real_im = gu.display_np_geoTiff(np_gt, (2, 1, 0))
#
np_gh_img = np.asarray(iu.read_image(
    images_divided[indx].label_path))
mask = display_legend(np_gh_img, new_color_encoding)
display_images({'Satellite Image': real_im, 'Mask': mask,
                'Overlay': overlay_mask(real_im, mask)}, (1, 3))

In [41]:
def zero_pad_array(input_array, new_shape):
    """
    Zero-pad the input_array to the specified new_shape.
    Args:
        input_array (numpy.ndarray): Input array of shape (height, width, x).
        new_shape (tuple): Desired new shape (new_height, new_width, x).

    Returns:
        numpy.ndarray: Zero-padded array of shape (new_height, new_width, x).
    """
    if input_array.shape[0] >= new_shape[0] and input_array.shape[1] >= new_shape[1]:
        return input_array
    # Calculate the amount of padding needed for each dimension
    pad_height = new_shape[0] - input_array.shape[0]
    pad_width = new_shape[1] - input_array.shape[1]
    # Create a tuple of padding values for each dimension
    pad_values = [(0, pad_height), (0, pad_width)] + [(0, 0)] * (input_array.ndim - 2)
    # Use numpy.pad to pad the array with zeros
    zero_padded_array = np.pad(input_array, pad_values, mode='constant', constant_values=0)
    return zero_padded_array


In [None]:

# Augments images by rotation.
def aug_image(image, new_size=None):
    width, height = image.size
    if new_size == None:
        new_size = (width, height)
    images_augmented = []
    np_image = np.asarray(image)
    np_image = zero_pad_array(np_image, new_size)
    image = Image.fromarray(np_image)
    # image = image.resize(new_size, resample=Image.BOX)
    new_im = image
    images_augmented.append(new_im)
    images_augmented.append(new_im.rotate(360 - 90))
    images_augmented.append(new_im.rotate(360 - 180))
    images_augmented.append(new_im.rotate(360 - 270))
    new_im = image.transpose(Image.FLIP_LEFT_RIGHT)
    images_augmented.append(new_im)
    images_augmented.append(new_im.rotate(360 - 90))
    images_augmented.append(new_im.rotate(360 - 180))
    images_augmented.append(new_im.rotate(360 - 270))
    return images_augmented


def aug_np_image(np_image, new_size=None):
    height, width, channels = np_image.shape[0], np_image.shape[1], np_image.shape[2]
    if new_size == None:
        new_size = (height, width)
    np_images_aug = []
    # np_image = nu.resize_np_image(np_image, new_size)
    np_image = zero_pad_array(np_image, new_size)
    new_np_im = np_image
    np_images_aug.append(new_np_im)
    np_images_aug.append(nu.rotate_np_image(new_np_im, 90))
    np_images_aug.append(nu.rotate_np_image(new_np_im, 180))
    np_images_aug.append(nu.rotate_np_image(new_np_im, 270))
    new_np_im = nu.flip_np_image_left_right(np_image)
    np_images_aug.append(new_np_im)
    np_images_aug.append(nu.rotate_np_image(new_np_im, 90))
    np_images_aug.append(nu.rotate_np_image(new_np_im, 180))
    np_images_aug.append(nu.rotate_np_image(new_np_im, 270))
    return np_images_aug

In [67]:
# images are augmented and saved on disk.
iu.create_dir_if_not_exists(final_data_path)
iu.create_dir_if_not_exists(real_final_data_path)
iu.create_dir_if_not_exists(mask_final_data_path)
i = 0
for divided_image in images_divided:
    mask_image = iu.read_image(divided_image.label_path, 'L')
    np_mask_image = np.asarray(mask_image)
    real_image = nu.load_numpy_file(
        divided_image.data_real_path.replace('.png', '.npz'))
    #The max reflectance value in the geotiff is clipped to 6000.
    real_image = np.clip(real_image, None, 6000)
    im_size = (512, 512)
    forest_p = get_class_percentages(np_mask_image).get(0, 0)
    real_aug_images = aug_np_image(real_image, im_size)
    mask_aug_images = aug_image(mask_image, im_size)
    for x in range(len(real_aug_images)):
        part_filename = str(uuid.uuid4())
        real_image_part = real_aug_images[x]
        mask_image_part = mask_aug_images[x]
        np_mask_image_part = np.asarray(mask_image_part)
        # np_mask_image_part = np.where(
        #     np_mask_image_part == 0, np_mask_image_part, 1)
        np_mask_image_part = np_mask_image_part.reshape(
            (np_mask_image_part.shape[0], np_mask_image_part.shape[1]))
        #
        nu.save_numpy_file(real_image_part, os.path.join(
            real_final_data_path, part_filename + '.npz'))

        mask_image_part = Image.fromarray(np_mask_image_part)
        mask_image_part.save(os.path.join(
            mask_final_data_path, part_filename + '.png'))
    # os.remove(divided_image.data_real_path.replace('.png', '.npz'))
    # os.remove(divided_image.label_path)
    i += 1
    iu.print_progress(i, len(images_divided))


In [70]:
final_images_path = iu.get_all_files(real_final_data_path, '*.npz')
len(final_images_path)

2306

In [75]:
images_final = []
for path in final_images_path:
    image = DataLabelPair(os.path.join(real_final_data_path, path),
                          os.path.join(mask_final_data_path, path.replace('.npz', '.png')))
    images_final.append(image)

In [None]:
#20  12
import random
indx = random.randint(0, len(final_images_path))
np_gt = nu.load_numpy_file(images_final[indx].data_real_path) 
real_im = gu.display_np_geoTiff(np_gt, (2, 1, 0))
#
np_gh_img = np.asarray(iu.read_image(
    images_final[indx].label_path))
mask = display_legend(np_gh_img, new_color_encoding)
display_images({'Satellite Image': real_im, 'Mask': mask,
                'Overlay': overlay_mask(real_im, mask)}, (1, 3))

In [24]:
# Splits dataset into training, validation and test sets.
def train_val_test_split(val_per, test_per, input_data, labels=None):
    if labels is not None and len(input_data) != len(labels):
        raise Exception("input data and label length mismatch")
    data_len = len(input_data)
    val_len = int(data_len * (val_per / 100))
    test_len = int(data_len * (test_per / 100))
    x_val = input_data[0: val_len]
    x_test = input_data[val_len: val_len + test_len]
    x_train = input_data[val_len + test_len: data_len]
    if labels is not None:
        y_val = labels[0: val_len]
        y_test = labels[val_len: val_len + test_len]
        y_train = labels[val_len + test_len: data_len]
        return (x_train, y_train), (x_val, y_val), (x_test, y_test)
    else:
        return x_train, x_val, x_test


In [25]:
train_images, val_images, test_images = train_val_test_split(
    20, 0, images_final)
print('Train length:', len(train_images))
print('Val length:', len(val_images))
print('Test length', len(test_images))

In [77]:
#create directories for all training data.
training_path = os.path.join(segmentation_path, 'training')
iu.create_dir_if_not_exists(training_path)
#train
train_path = os.path.join(training_path, 'train')
iu.create_dir_if_not_exists(train_path)
train_real_path = os.path.join(train_path, 'real')
iu.create_dir_if_not_exists(train_real_path)
train_mask_path = os.path.join(train_path, 'mask')
iu.create_dir_if_not_exists(train_mask_path)
#val
val_path = os.path.join(training_path, 'val')
iu.create_dir_if_not_exists(val_path)
val_real_path = os.path.join(val_path, 'real')
iu.create_dir_if_not_exists(val_real_path)
val_mask_path = os.path.join(val_path, 'mask')
iu.create_dir_if_not_exists(val_mask_path)
#test
test_path = os.path.join(training_path, 'test')
iu.create_dir_if_not_exists(test_path)
test_real_path = os.path.join(test_path, 'real')
iu.create_dir_if_not_exists(test_real_path)
test_mask_path = os.path.join(test_path, 'mask')
iu.create_dir_if_not_exists(test_mask_path)


In [None]:
# move training images to training folder
i = 0
for image in train_images:
    shutil.move(image.data_real_path, os.path.join(
        train_real_path, os.path.basename(image.data_real_path)))
    shutil.move(image.label_path, os.path.join(
        train_mask_path, os.path.basename(image.label_path)))
    i += 1
    iu.print_progress(i, len(train_images))

In [None]:
# move validation images to validation folder
i = 0
for image in val_images:
    shutil.move(image.data_real_path, os.path.join(
        val_real_path, os.path.basename(image.data_real_path)))
    shutil.move(image.label_path, os.path.join(
        val_mask_path, os.path.basename(image.label_path)))
    i += 1
    iu.print_progress(i, len(val_images))