### Libraries

In [5]:
import os
import sys
import itertools
import yaml
from patoolib import extract_archive

# Local Module Imports
sys.path.append("../src")  # adds source code directory
from utils import wkt_to_masc
from log_setup import logger
from polygon_handle import masks_to_polygons

### Main directories

In [3]:
current_dir = os.getcwd()
base_dir = os.path.dirname(current_dir)
dataset_dir = os.path.join(base_dir, "dataset")
data_dir = os.path.join(base_dir, "data")
config_file = os.path.join(base_dir, "config.yml")

#### Dataset extration and config file loading

In [7]:
# if there isn´t unrar installed, install it with: sudo apt-get install unrar (linux)
# for windows, install it from: https://www.rarlab.com/rar_add.htm (unrarw32.exe)

dataset_path = os.path.join(dataset_dir, "BurnedAreaUAV_dataset")
if not os.path.exists(dataset_path):
    extract_archive(
        os.path.join(dataset_dir, "BurnedAreaUAV_dataset_v1.rar"),
        program="unrar",
        outdir=dataset_dir,
    )
    os.remove(os.path.join(dataset_dir, "BurnedAreaUAV_dataset_v1.rar"))

with open(config_file, "r", encoding="utf-8") as f:
    config = yaml.safe_load(f)

#### Output directory struture creation

In [23]:
model_names = ["CVAE", "pstg", "shape"]
keys = ["interpol", "extrapol"]
sub_keys = ["full", "sampled"]
sub_sub_keys = ["PNG", "WKT"]

for model_name in model_names:
    for key in keys if model_name == "CVAE" else ["interpol"]:
        for sub_key, sub_sub_key in itertools.product(sub_keys, sub_sub_keys):
            # Create the directories
            dir_path = os.path.join(
                base_dir, "outputs", model_name, key, sub_key, sub_sub_key
            )
            os.makedirs(dir_path, exist_ok=True)
            # Create a .gitkeep file in the directory
            with open(os.path.join(dir_path, ".gitkeep"), "w") as f:
                pass

#### Masks directory creation

In [12]:
train_masks_dir = os.path.join(base_dir, config["data"]["train_dir"], "masks")
if not os.path.exists(train_masks_dir):
    os.makedirs(train_masks_dir)

test_masks_dir = os.path.join(base_dir, config["data"]["test_dir"], "masks")
if not os.path.exists(test_masks_dir):
    os.makedirs(test_masks_dir)

train_sampled_masks_dir = os.path.join(
    base_dir, config["data"]["train_sampled_dir"], "masks"
)
if not os.path.exists(train_sampled_masks_dir):
    os.makedirs(train_sampled_masks_dir)

unet_masks_dir = os.path.join(base_dir, config["data"]["unet_gen_dir"], "masks")
if not os.path.exists(unet_masks_dir):
    os.makedirs(unet_masks_dir)

#### WKT files to masks connversion

In [13]:
# full training set
wkt_to_masc(
    wkt_file=os.path.join(base_dir, config["data"]["train_wkt"]),
    images_path=train_masks_dir,
    orig_dims=config["data"]["original_vid_dims"][::-1],
    height=config["data"]["input_size"][0],
    width=config["data"]["input_size"][1],
)

# test set
wkt_to_masc(
    wkt_file=os.path.join(base_dir, config["data"]["test_wkt"]),
    images_path=test_masks_dir,
    orig_dims=config["data"]["original_vid_dims"][::-1],
    height=config["data"]["input_size"][0],
    width=config["data"]["input_size"][1],
)

# sampled training set
wkt_to_masc(
    wkt_file=os.path.join(base_dir, config["data"]["sampled_masks_wkt"]),
    images_path=train_sampled_masks_dir,
    orig_dims=config["data"]["original_vid_dims"][::-1],
    height=config["data"]["input_size"][0],
    width=config["data"]["input_size"][1],
)

INFO - 
    --------------------------------------
    # [1mProperties of the resulting masks[0m
    # Width: 512, Height: 512
    # Number of masks to create: 226
    --------------------------------------
    
100%|██████████| 226/226 [00:00<00:00, 628.07it/s]
INFO - 
    --------------------------------------
    # [1mProperties of the resulting masks[0m
    # Width: 512, Height: 512
    # Number of masks to create: 23
    --------------------------------------
    
100%|██████████| 23/23 [00:00<00:00, 658.00it/s]
INFO - 
    --------------------------------------
    # [1mProperties of the resulting masks[0m
    # Width: 512, Height: 512
    # Number of masks to create: 13
    --------------------------------------
    
100%|██████████| 13/13 [00:00<00:00, 683.53it/s]


### U-Net data distance-based sampling

In [4]:
from shapely.wkt import loads
from glob import glob

In [8]:
current_dir = os.getcwd()
BASE_DIR = os.path.dirname(current_dir)
config_file = os.path.join(BASE_DIR, "config.yml")

with open(config_file, "r", encoding="utf-8") as f:
    config = yaml.safe_load(f)

In [10]:
# List of generated frames paths
msks_paths = sorted(glob(os.path.join(BASE_DIR, "data/unet_generated/masks", "*.png")))

# Convert the masks to polygons and save them as a WKT file
masks_to_polygons(
    msks_paths,
    out_dim=tuple(config["data"]["original_vid_dims"]),
    save_path=os.path.join(BASE_DIR, "data/unet_generated/WKT", "masks.wkt"),
)

INFO - Converting masks to polygons...
INFO - Saved polygons to /home/tiagociic/Projectos/spatiotemporal-vae-reconstruction/data/unet_generated/WKT/masks.wkt


In [None]:
from scipy.spatial.distance import jaccard
import numpy as np
from tqdm import tqdm
from rasterio.features import rasterize


def calculate_distances(polygons: list, out_shape: tuple = (720, 1280)):
    """
    Calculates the Jaccard distance between binary segmentation masks of the
    first and subsequent polygons

    Args:
        polygons (list): A list of polygons represented as a list of coordinate
        tuples.
        out_shape (tuple): The shape of the output rasterized mask.
        Default is (720, 1280).

    Returns:
        dict: A dictionary with two keys - "Jaccard distance". The values
        for each key are lists containing the distance values between the
        first and subsequent polygons

    """
    distances = {"Jaccard distance": []}
    masks = rasterize(polygons, out_shape)
    mask_t0 = masks[0]

    # Calculate Jaccard distances
    for i in tqdm(range(1, len(polygons))):
        mask_tn = masks[i]
        if np.all(mask_tn == 0) or np.all(mask_t0 == 0):
            distances["Jaccard distance"].append(0)
        else:
            jaccard_distance = jaccard(mask_t0.flatten(), mask_tn.flatten())
            distances["Jaccard distance"].append(jaccard_distance)

    return distances


def gen_similar_poly_samples(polygons, threshold=0.15, out_shape=(720, 1280)):
    """
    Generate a set of samples from a list of polygons based on their similarity.

    Args:
        polygons (list): A list of polygons represented as lists of (x, y) tuples.
        threshold (float): The Jaccard distance threshold for creating a new sample.
            Defaults to 0.15.
        out_shape (tuple): The output shape of the rasterized polygons.
            Defaults to (720, 1280).

    Returns:
        dict: A dictionary with two keys: "index" and "Jaccard distance".
            The "index" value is the index of the polygon in the input list
            for each sample.

    """
    # Instantiate dictionary to store index and distance values
    samples = {"index": [], "Jaccard distance": []}
    idx = 0
    while idx < len(polygons) - 1:
        # Rasterize the first polygon
        first_mask = rasterize([polygons[idx]], out_shape)
        jaccard_distance = 0.0
        while jaccard_distance < threshold and idx < len(polygons) - 1:
            idx += 1
            # Rasterize the subsequent polygon
            second_mask = rasterize([polygons[idx]], out_shape)
            # Calculate Jaccard distance
            jaccard_distance = jaccard(first_mask.flatten(), second_mask.flatten())

        # Append index and distance to dictionary
        samples["index"].append(idx)
        samples["Jaccard distance"].append(jaccard_distance)
        print(f"Index: {idx}, Jaccard distance: {jaccard_distance:.4f}  ", end="\r")

    logger.info(f"Number of resulting samples: {len(samples['index'])}")

    return samples