# Utility functionality for datasets

### References : TODO

### Imports

In [1]:
import os
import cv2
import numpy as np
import sys
from osgeo import gdal

## Functions

In [4]:
# Recursively search a directory, applying a given function to each file if it is of a specific datatype
def rec_dir(dir, rel_path, datatype, dir_out, f, fparams):
    current_dir = os.path.join(dir, rel_path)

    for fname in os.listdir(current_dir):
        fpath = os.path.join(current_dir, fname)
        ftype = os.path.splitext(fpath)[-1].lower()

        if os.path.isfile(fpath) and ftype == '.' + datatype:
            f(dir, rel_path, fpath, datatype, dir_out, fparams)
        else:
            rec_dir(dir, os.path.join(rel_path, fname), datatype, dir_out, f, fparams)

In [5]:
tiff_driver = gdal.GetDriverByName('GTiff')

# Convert a given file to a .tiff file
# Note: Reference is ChangeDetectionToolbox (raster2tiff script) to convert the images recursively in of a folder.
# Their references: https://gis.stackexchange.com/questions/42584/how-to-call-gdal-translate-from-python-code,
# https://gdal.org/tutorials/raster_api_tut.html#using-createcopy
def convert_to_tiff(dir, rel_path, fpath, datatype, dir_out, params):
    fname = os.path.basename(fpath).split('.')[0]
    data_in = gdal.Open(fpath)
    
    dir_file_out = os.path.join(dir_out, rel_path)
    if not os.path.exists(dir_file_out):
        os.makedirs(dir_file_out)
    
    tiff_driver.CreateCopy(os.path.join(dir_file_out, fname + '.tiff'), data_in, 0)

In [6]:
# Thresholding util function
def threshold_val(value, inclusive_upper, threshold):
    if inclusive_upper:
        if value >= threshold:
            return max
        else:
            return min
    else:
        if value > threshold:
            return max
        else:
            return min

# Check if a file is a binary change map
def is_binary_file(dir, rel_path, fpath, dataset):
    if dataset == 'AirChange':
        dtype = os.path.basename(fpath).split('.')[0]
        return dtype == 'gt'
    elif dataset == 'LEVIR':
        return 'label' in rel_path
    else:
        # TODO adjust this to other datasets
        sys.exit('Invalid dataset selected')

In [7]:
# Downsample image and threshold binary maps
def downsample_and_threshold(dir, rel_path, fpath, datatype, dir_out, params):
    dir_file_out = os.path.join(dir_out, rel_path)
    if not os.path.exists(dir_file_out):
        os.makedirs(dir_file_out)
    
    image = cv2.imread(fpath)
    
    if is_binary_file(dir, rel_path, fpath, params['dataset']):
        img_max = np.maximum.reduce(image, 2)
        img_min = np.minimum.reduce(image, 2)

        # Safety check
        if not np.all(img_max == img_min):
            sys.exit('Invalid binary result file')

        # 'Reduce' the image
        image = img_max

    # Resize the image
    size_1 = int(image.shape[0] * 1 / params['downsample_factor'])
    size_2 = int(image.shape[1] * 1 / params['downsample_factor'])
    resized = cv2.resize(image, dsize=(size_2, size_1), interpolation=params['interpolation_alg'])

    # Apply threshold
    if is_binary_file(dir, rel_path, fpath, params['dataset']):
        with np.nditer(resized, op_flags=['readwrite']) as iterator:
            for value in iterator:
                value[...] = threshold_val(value, params['inclusive_upper'], params['threshold'])

    # Write to file
    cv2.imwrite(os.path.join(dir_file_out, os.path.basename(fpath)), resized)

## Convert to .tiff

In [23]:
# ------ Settings ------
# => These variables should be set to the desired settings!

# Dataset and directories
dir_in = '' # The directory of the dataset
dir_out = '' # The directory to save the modified dataset to
datatype = 'bmp' # The image datatype

In [24]:
# rec_dir(dir_in, '', datatype, dir_out, convert_to_tiff, {})

## Downsample and threshold

In [18]:
# ------ Settings ------
# => These variables should be set to the desired settings!

# Thresholding binary maps
min = 0     # Minimum value of a band in one pixel
max = 255   # Maximum value of a band in one pixel
threshold = max / 2     # Threshold for setting a pixel to be 'true' (i.e., change)
inclusive_upper = True  # Whether a value equal to the threshold value should be set to 'true' (i.e., change)

# Downsampling
interpolation_alg = cv2.INTER_AREA  # The interpolation algorithm for downsampling
downsample_factor = 4   # The inverse factor of downsampling (e.g., 2 results in 1/2 of the size)

# Dataset and directories
dir_in = ''
# The directory of the dataset
dir_out = dir_in + '_factor_' + str(downsample_factor) # The directory to save the modified dataset to
add_size_info = True # Whether to add the downsampling info to the directory name (form of 'sample_facor_' + factor)
datatype = 'tiff' # The image datatype
dataset = 'LEVIR' # The dataset name, selected from: AirChange

In [19]:
fparams = {
    'downsample_factor' : downsample_factor,
    'interpolation_alg' : interpolation_alg,
    'threshold' : threshold,
    'inclusive_upper': inclusive_upper,
    'add_size_info' : add_size_info, # TODO consider this, or better to have a file in the directory with all information
    'dataset' : dataset
}

rec_dir(dir_in, '', datatype, dir_out, downsample_and_threshold, fparams)

## Randomly sample LEVIR-CD

In [2]:
# ------ Settings ------
# => These variables should be set to the desired settings!

num_samples = 120 # Number of image pairs to sample
dir_in = ''
# The directory of the *training* dataset
dir_out = '' # The directory to save the sampled dataset to

In [3]:
import random

# Create and set folders
a_folder_in = os.path.join(dir_in, 'A')
b_folder_in = os.path.join(dir_in, 'B')
label_folder_in = os.path.join(dir_in, 'label')

a_folder_out = os.path.join(dir_out, 'A')
b_folder_out = os.path.join(dir_out, 'B')
label_folder_out = os.path.join(dir_out, 'label')

if not os.path.exists(a_folder_out):
    os.makedirs(a_folder_out)

if not os.path.exists(b_folder_out):
    os.makedirs(b_folder_out)

if not os.path.exists(label_folder_out):
    os.makedirs(label_folder_out)

# Check how many images there are
all_names = os.listdir(a_folder_in)
num_tot = len(all_names)
if num_samples > num_tot:
    sys.exit('Invalid sample amount')

# Randomly sample a set size from the list
samples = random.sample(all_names, num_samples)

# Create copies
for sample in samples:
    a_img = cv2.imread(os.path.join(a_folder_in, sample))
    b_img = cv2.imread(os.path.join(b_folder_in, sample))
    label_img = cv2.imread(os.path.join(label_folder_in, sample))

    cv2.imwrite(os.path.join(a_folder_out, sample), a_img)
    cv2.imwrite(os.path.join(b_folder_out, sample), b_img)

    img_max = np.maximum.reduce(label_img, 2)
    img_min = np.minimum.reduce(label_img, 2)

    # Safety check
    if not np.all(img_max == img_min):
        sys.exit('Invalid binary result file')

    # 'Reduce' the label image
    label_img = img_max
    cv2.imwrite(os.path.join(label_folder_out, sample), label_img)