---

[Pyspatiotemporalgeom](http://www.cs.siue.edu/~marmcke/docs/pyspatiotemporalgeom/) based interpolation

---

**Note:** this library was writen in Python 2. It had to be converted to 
          Python 3 compatible code.

In [1]:
from watermark import watermark
print(watermark(author="\033[1m" + "Tiago Ribeiro"+ "\033[0m", 
                github_username="\033[1m" + "Tiago1Ribeiro"+ "\033[0m", 
                current_date=True, current_time=True, python=True, 
                updated=True, iversions=True, globals_= globals())
                )

Author: [1mTiago Ribeiro[0m

Github username: [1mTiago1Ribeiro[0m

Last updated: 2023-04-04 12:05:14

Python implementation: CPython
Python version       : 3.10.9
IPython version      : 8.11.0



### Setup `pyspatiotemporalgeom`  library for Python 3

```bash
# Clones the repository with the Python3 version of pyspatiotemporalgeom lib.    
git clone https://github.com/CIIC-C-T-Polytechnic-of-Leiria/pyspatiotemporalgeom-for-Python-3.git
# Moves pyspatiotemporalgeom folder to the working directory 
mv pyspatiotemporalgeom-for-Python-3/pyspatiotemporalgeom .
# Removes the cloned repository
rm -rf pyspatiotemporalgeom-for-Python-3 
# on Windows: rmdir /s /q pyspatiotemporalgeom-for-Python-3 
```

In [3]:
import os
import re
from geomet import wkt
from typing import List, Union
import cv2
import matplotlib.pyplot as plt
import numpy as np
from glob import glob
# for debugging purposes
from IPython.display import clear_output

import sys
sys.path.insert(0,"working_dir\ESS_Data_Lab_misc\pyspatiotemporalgeom-for-Python-3\pyspatiotemporalgeom")

import pyspatiotemporalgeom.structureRegion as structureRegion
from pyspatiotemporalgeom.componentMovingRegion import cIntervalRegion
from pyspatiotemporalgeom.utilities import hsegLibrary

#### Data Sources

In [5]:
DATA_DIR = "D:\BurnedAreaUAV_files\BurnedAreaUAV_dataset\BurnedAreaUAV_dataset" # Replace with your data directory# Directory to save interpolated polygons
OUT_DIR = "E://BurnedAreaUAV_files//Interpolation//pyspatiotemporalgeom_interpol"
# WKT containing the manually annotated polygons
WKT_FILE = os.path.join(DATA_DIR, 'WKT//train_valid.wkt')
# Directory to save PNG format interpolated polygons
OUT_DIR_PNG = os.path.join(OUT_DIR, 'PNGs')
# Directory to save WKT format interpolated polygons
OUT_WKT_FILE = os.path.join(OUT_DIR, "pyspatial_interpol.wkt")

In [6]:
# Convert WKT to list of shapely polygons
with open(WKT_FILE, 'r') as wkt_file:
    multipolygons_file = wkt_file.readlines()
    multipolygons = [wkt.loads(multipolygons_file[i]) for i in range(len(multipolygons_file))]

#### Auxiliary functions

create_CIR

In [7]:
def create_CIR(source_SL: List, dest_SL: List, source_time: int, dest_time: int) -> Union[cIntervalRegion, None]:
    """
    Creates a Component Interval Region.
    This describes the movement of a moving region over a single time interval.

    Args:
        source_SL (List): The source segment list.
        dest_SL (List): The destination segment list.
        source_time (int): The initial instant of the time interval.
        dest_time (int): The final instant of the time interval.

    Returns:
        Union[cIntervalRegion, None]: A Component Interval Region or None.

    Note:
        source_time and dest_time must be integer timestamps.

    Adapted from: https://github.com/most-ieeta/SPT-DataLab/blob/master/pyspatiotemporalgeom/mckenney_final.py
    """
    src_sr = structureRegion.structuralRegion()
    src_sr_id = src_sr.addFace(source_SL)

    dst_sr = structureRegion.structuralRegion()
    dst_sr_id = dst_sr.addFace(dest_SL)

    cir = cIntervalRegion()

    cir.sourceSR = src_sr
    cir.destSR = dst_sr
    cir.sourceTime = int(source_time)
    cir.destTime = int(dest_time)

    if cir.mapComponent(src_sr_id, dst_sr_id):
        return cir

    return None

labelUniqueCycles

In [8]:
from collections import deque
from functools import cmp_to_key
def labelUniqueCycles(segs, onlyReturnOuterCycle=False):
    '''
    will do interior walks around cycles.  

    Each cycle will get a unique label.

    Label numbers will not necessarily start at 2, and may skip numbers

    Will also remove sticks!!!  a full service solution!

    Cycles will have unique labels, but holes in particular will have labelling 
    flipped.  

    Use: def switchLabelsForCorrectCycleLabelling( hsegs ): to finalize labels

    This function is used to create well-formed regions from possibly 
    non-wellformed input.

    **input**: 

    segs: a list of segs.

    onlyReturnOuterCycle:  if you want a simple region, set this to true!

    **output**: a hseg list representing a well formed region. Agian, call def 
    switchLabelsForCorrectCycleLabelling( hsegs ): to finalize labels!!! 
     Everything is labeled as an outercycle after this call.
    '''
    # remove dups from segs
    nonDupSegs = []
    for s in segs:
        if s[0] < s[1]:
            nonDupSegs.append(s)
        else:
            nonDupSegs.append((s[1], s[0]))
    seenSegSet = set(nonDupSegs)
    segs = list(seenSegSet)
    # convert segs to hsegs
    hsegs = []
    for s in segs:
        hsegs.append(((s[0], s[1]), -1, -1))
        hsegs.append(((s[1], s[0]), -1, -1))
    hsegs.sort(key=cmp_to_key(hsegCompForSorted))
    # create a hash table for the seg portions mapped to their index
    indexLookup = dict()
    for i, h in enumerate(hsegs):
        indexLookup[h[0]] = i

    # visit unprocessed segs in hseg order.  Each unprocessed will start a new cycle
    currLabel = 1
    nestedLabel = 2
    for i, h in enumerate(hsegs):
        if (h[1] == -1 and h[2] == -1) and isLeft(h):  # unprocessed, left
            #nestedLabel = currLabel+1
            # interior walk the cycle
            visitedPoiSet = set()
            seenSegSet = set()
            visitedHsegStack = deque()
            completeVisitedHistoryStack = deque()
            currIndex = i
            startIndex = i
            firstTimeThrough = True
            currLabel = nestedLabel
            nestedLabel += 1
            labellingAbove = True
            startHseg = hsegs[startIndex]
            while True:
                currHseg = hsegs[currIndex]
                # check if this is a stick seg.  If so, just label it as invalid and move on
                if (currHseg[0][1], currHseg[0][0]) in seenSegSet:
                    # we have walked its brother.  this is a stick. label it as such
                    hsegs[currIndex] = (currHseg[0], 0, 0)
                    brotherIndex = indexLookup[(
                        currHseg[0][1], currHseg[0][0])]
                    brother = hsegs[brotherIndex]
                    brother = (brother[0], 0, 0)
                    hsegs[brotherIndex] = brother
                    # If this happens to be the brother of the startHseg, we 
                    # need to unvisit the walked cycle. (since we walked the 
                    # exterior and may have walked mutliple stick-connected cycles)
                    if brother[0] == hsegs[startIndex][0]:
                        for itemHseg in completeVisitedHistoryStack:
                            if itemHseg[1] != 0:
                                itemIndex = indexLookup[itemHseg[0]]
                                brotherIndex = indexLookup[(
                                    itemHseg[0][1], itemHseg[0][0])]
                                hsegs[itemIndex] = (itemHseg[0], -1, -1)
                                hsegs[brotherIndex] = (
                                    (itemHseg[0][1], itemHseg[0][0]), -1, -1)
                        break
                # check if we have reached a processed seg (happpnes if we started on a stick)
                # we have to process first seg twice
                elif (currHseg[1] > 0 or currHseg[2] > 0) and currIndex != startIndex:
                    # we must have started on a stick, and followed it to get here.
                    # mark everything we followed as such
                    visitedHsegStack.appendleft(startHseg)
                    for itemHseg in visitedHsegStack:
                        itemIndex = indexLookup[itemHseg[0]]
                        brotherIndex = indexLookup[(
                            itemHseg[0][1], itemHseg[0][0])]
                        hsegs[itemIndex] = (itemHseg[0], 0, 0)
                        hsegs[brotherIndex] = (
                            (itemHseg[0][1], itemHseg[0][0]), 0, 0)
                    break
                # final case we have an unprocessed, non-stick seg.
                # label it appropriately
                else:
                    # label the current hseg and its brother
                    la = currLabel
                    lb = -1
                    if not labellingAbove:
                        lb = currLabel
                        la = -1

                    hsegs[currIndex] = (currHseg[0], la, lb)
                    brotherIndex = indexLookup[(
                        currHseg[0][1], currHseg[0][0])]
                    brother = hsegs[brotherIndex]
                    brother = (brother[0], la, lb)
                    hsegs[brotherIndex] = brother

                    # if we have closed a loop, pop and fix
                    if currHseg[0][0] in visitedPoiSet:
                        stopPoi = currHseg[0][0]

                        while True:
                            # debug statment.  We are about to pop a stack.  if it is empty, this prog
                            # will crash.  So, preemptively printout the current region. current seg,

                            # end of debug statement
                            # pop it, updateLabels
                            # print currHseg
                            poppedHseg = visitedHsegStack.popleft()

                            poppedIndex = indexLookup[poppedHseg[0]]
                            poppedIndexBrother = indexLookup[(
                                poppedHseg[0][1], poppedHseg[0][0])]
                            if hsegs[poppedIndex][1] > 0:
                                hsegs[poppedIndex] = (
                                    poppedHseg[0], nestedLabel, -1)
                                hsegs[poppedIndexBrother] = (
                                    (poppedHseg[0][1], poppedHseg[0][0]), nestedLabel, -1)
                            elif hsegs[poppedIndex][2] > 0:
                                hsegs[poppedIndex] = (
                                    poppedHseg[0], -1, nestedLabel)
                                hsegs[poppedIndexBrother] = (
                                    (poppedHseg[0][1], poppedHseg[0][0]), -1, nestedLabel)
                            if poppedHseg[0][0] == stopPoi:
                                break
                        nestedLabel += 1

                # update visited stacks and get the next seg
                if firstTimeThrough:
                    firstTimeThrough = False
                else:
                    # if not first time through, record point and seg, check stopping condition
                    visitedPoiSet |= set([currHseg[0][0]])
                    visitedHsegStack.appendleft(currHseg)
                    completeVisitedHistoryStack.appendleft(currHseg)
                    if currIndex == startIndex:
                        # if we get here, we finished a cycle
                        # if we only want the outer cycle, we can jsut return it here
                        if onlyReturnOuterCycle:
                            hsegs = [h for h in hsegs if h[1] != h[2]]
                            return hsegs
                        break

                seenSegSet |= set([(currHseg[0])])

                # get next hseg
                prevIndex = currIndex
                currIndex = getNextInnerCycleWalkIndex(
                    currHseg, hsegs, indexLookup)
                # check if we are switching label above
                if isLeft(hsegs[prevIndex]) != isLeft(hsegs[currIndex]):
                    labellingAbove = not labellingAbove

    hsegs = [h for h in hsegs if h[1] != h[2]]
    return hsegs

get_region_t

In [9]:
def get_region_t(cir, t):
    """
    Returns the structural region defined at the specified time.

    Input: A component interval region and a time.
    Output: The structural region defined at the specified time.
    """

    # Exctract the structural region defined by cir at time t.
    return cir.getStructuralRegionAtTime(t)

getStructuralRegionAtTime

In [10]:
def getStructuralRegionAtTime(self, time):
    '''
    Exctract the structural region defined by this component interval region 
    at time ``time``.

    Call the interpolator, get the triangles, extract the region from those 
    triangles

    **Input**:
    time:
        The time at which to extract the structural regions

    **Returns**:
    None:
        If the ``time`` is outside the bounds of this interval regions

    structural region:
        otherwise
    '''
    # print('1')
    if time < self.sourceTime or time > self.destTime:
        return None
    # print('2')
    if time == self.sourceTime:
        return self.sourceSR

    if time == self.destTime:
        return self.destSR
    # print('3')
    print("time ok", end="\r")
    # find out the percentage of how far along the time interval **time** lies
    multiplier = float(time-self.sourceTime) / \
        (self.destTime - self.sourceTime)

    # step 1:  get the motion triangles, but maintain the IDs of the
    # compnonets involved so we can rebuild the mappings
    componentTris = self.getMotionTriangles()
    # step 2:  create a structural region to hold the result
    sr = structureRegion.structuralRegion()
    # print('componentTris', componentTris)
    # step 3: go through the triangles for each moving component, extract
    # the segs at time **time**, and add the structures
    for component in componentTris:
        segs = []
        for tri in component[2]:
            s1 = ((tri[0][0], tri[0][1]), (tri[2][0], tri[2][1]))
            s2 = ((tri[1][0], tri[1][1]), (tri[2][0], tri[2][1]))
            if tri[2][2] < tri[0][2]:
                s1 = (s1[1], s1[0])
                s2 = (s2[1], s2[0])
            p1x = (multiplier * (s1[1][0] - s1[0][0])) + s1[0][0]
            p1y = (multiplier * (s1[1][1] - s1[0][1])) + s1[0][1]
            p2x = (multiplier * (s2[1][0] - s2[0][0])) + s2[0][0]
            p2y = (multiplier * (s2[1][1] - s2[0][1])) + s2[0][1]
            segs.append(((p1x, p1y), (p2x, p2y)))
        # now we have the segs.  place them into the return SR
        # note, there will never be a line or point in the middle of an interval
        if self.sourceSR.getComponentType(component[0]) != 'H':
            sr.addFace(segs, component[0])
        elif self.sourceSR.getComponentType(component[0]) == 'H':
            sr.addHole(segs, self.sourceSR.getIDsForHolesInFace(
                component[0]), component[0])
        # print('segs: ', segs)
    return sr

points_list_to_intermediate_wkt

In [11]:
def points_list_to_intermediate_wkt(points):
    """
	Returns a string representation of the given list of points in the form:
		(x1 y1, ..., xn yn, x1 y1)
	
	Input: A list of points: [(x0, y0), ..., (xn, yn)]
	Output: (x1 y1, ..., xn yn, x1 y1)
	
	Notes:
	Used as an intermediate step in the computation of a wkt of a geometry.

    Source: https://github.com/most-ieeta/SPT-DataLab/blob/master/pyspatiotemporalgeom/mckenney_final.py
    """
    
    if points != None:
        _wkt = '('
        for point in points:
            _wkt += str(point[0]) + ' ' + str(point[1]) + ', '
        # Close the cycle.
        _wkt += str(points[0][0]) + ' ' + str(points[0][1]) + ')'
        return _wkt

    return None

wkt_to_segment_list

In [12]:
def wkt_to_segment_list(_wkt):
    """
        Parses a wkt string and returns a list of tuples with line segments 
        representing a geometry.
        
        Notes:
        Currently the function does not handle holes!
        Currently the function handles only POLYGON type geometries!
        
        Input: A wkt string.
        Output: A segment list.
        
        WKT Examples:
        
        POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))
        POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))
        
        MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))
        MULTIPOLYGON (((40 40, 20 45, 45 30, 40 40)), ((20 35, 10 30, 10 10, 30 5, 45 20, 20 35), (30 20, 20 15, 20 25, 30 20)))
        
        Source: https://github.com/most-ieeta/SPT-DataLab/blob/master/pyspatiotemporalgeom/mckenney_final.py
    """
    segment_list = []
    geom_json = wkt.loads(_wkt)
    geometry_type = geom_json['type']
    if geometry_type != "Polygon":
         print('Only Polygon is supported.')
         return []

    coordinates = geom_json['coordinates']

    for i in range(0, len(coordinates)):
        values = coordinates[i]
        # Face.
        if i == 0:
            for j in range(0, len(values) - 1):
                # Close the segment list.
                if (j == len(values) - 2):
                    segment_list.append((tuple(values[j]), tuple(values[0])))
                else:
                    segment_list.append((tuple(values[j]), tuple(values[j+1])))
        # Holes.
        else:
            """
            for j in range(0, len(values) - 1):
                # Close the segment list.
                if (j == len(values) - 2):
                    segment_list.append((tuple(values[j]), tuple(values[0])))
                else:
                    segment_list.append((tuple(values[j]), tuple(values[j+1])))
            """
    # Validate the wkt parsing process output.
    if segment_list == []:
        print("Error parsing wkt.")

    return segment_list

structural_region_to_wkt

In [13]:
def structural_region_to_wkt(sregion):
    """
        Gets the wkt of a given structural region.
    """

    # Currently, only POLYGON is being considered.
    # In the future, MULTIPOLYGON will also be considered.
    geom_wkt = ''
    counter = 0

    # Iterate through the faces.
    for face_id in sregion.F:
        face = sregion.F[face_id]

        # Labeled segs for this face.
        hsegs = hsegLibrary.labelUniqueCycles(face, True)

        # List of points, in cyclic order, that define the boundary of the outer cycle of this face.
        face_outer_cycle_points = hsegLibrary.getOuterWalkPointSequence(hsegs)

        if counter > 0:
            geom_wkt += ', '

        # Add the intermediate wkt representation of the face with no holes.
        geom_wkt += '(' + points_list_to_intermediate_wkt(face_outer_cycle_points)
        counter = counter + 1

        # Face has holes?
        if sregion.F2H != None:
            if face_id in sregion.F2H:
                #print("Has holes.")
                for j in sregion.F2H[face_id]:
                    hsegs = hsegLibrary.labelUniqueCycles(sregion.H[j])
                    hole_outer_cycle_points = hsegLibrary.getOuterWalkPointSequence(hsegs)
                    # Add the intermediate wkt representation of the face's hole(s).
                    geom_wkt += ', ' + points_list_to_intermediate_wkt(hole_outer_cycle_points)

        # Close the polygon wkt representation.
        geom_wkt += ')'

    # The geometry is a single polygon (1 face with >= 0 holes).
    if counter == 1:
        geom_wkt = 'POLYGON ' + geom_wkt
    # The geometry has >1 faces each with >= 0 holes.
    else:
        geom_wkt = 'MULTIPOLYGON (' + geom_wkt + ')'

    return geom_wkt

### Interpolation

In [27]:
def interpolate_polygons(multipolygons_file: List[str], n_samples: int, source_t: int, target_t: int, output_file = "output.wkt") -> None:
    """
    Interpolates polygons between consecutive pairs of multipolygons in a file.

    Parameters:
    - multipolygons_file (List[str]): A list of strings, where each string is a 
      WKT representation of a multipolygon.
    - n_samples (int): The number of intermediate polygons to interpolate 
      between each pair of consecutive polygons.
    - source_t (int): The timestamp of the first multipolygon in the file, 
      in a format recognized by PySpatioTemporalGeom.
    - target_t (int): The timestamp of the last multipolygon in the file, 
      in a format recognized by PySpatioTemporalGeom.
    - output_file (str): The name of the output file to write the interpolated

    Example usage:
    >>> multipolygons_file = ["MULTIPOLYGON (((0 0, 0 1, 1 1, 1 0, 0 0)))", 
        "MULTIPOLYGON (((1 1, 1 2, 2 2, 2 1, 1 1)))"]
    >>> interpolated_pol = interpolate_polygons(multipolygons_file, 5, 10, 20)
    >>> print(interpolated_pol)
    ['POLYGON ((0.0 0.0, 0.0 0.2, 0.2 0.2, 0.2 0.0, 0.0 0.0))', 
      'POLYGON ((0.0 0.0, 0.0 0.4, 0.4 0.4, 0.4 0.0, 0.0 0.0))', ...] 
    """
    polygons_num = len(multipolygons_file)
    with open(output_file, 'a') as f:
      
    # Iterate over all consecutive pairs of polygons
    for i in range(polygons_num-1):
          f.write(multipolygons_file[0]+ '\n')
          source_geom = wkt_to_segment_list(multipolygons_file[i])
          target_geom = wkt_to_segment_list(multipolygons_file[i+1])
          # Create the Component Interval Region
          cir = create_CIR(source_geom, target_geom, source_t, target_t)
          # Time interval and number of samples for this pair of polygons
          delta_t = target_t - source_t
          dt = delta_t / (n_samples + 1)
          
          # Interpolate n_samples equally spaced between source and target
          for j in range(1, n_samples+1):
              t = j * dt / delta_t + source_t
              # Region at query_time
              region_at = get_region_t(cir, t)
              clear_output()
              region_wkt = structural_region_to_wkt(region_at)
              #f.write(region_wkt + '\n')
    return None

In [14]:
invalid_polygons = [180,181,182,193,194,201,202]

interpolated_polygons = list()
with open(OUT_WKT_FILE, 'w') as file:
    file.write('')
idx_list = list()

for i in range(0, 180):
    print(f"Indice {i}")
    idx_list.append(i)
    interpolate_polygons([multipolygons_file[i],multipolygons_file[i+1]], 
                         n_samples = 99, source_t = 0, target_t = 1, 
                         output_file = OUT_WKT_FILE)

#     99+1+99+1+99 = 399
#  |----|----|----|
# 179  180  181  182  183
interpolate_polygons([multipolygons_file[180],multipolygons_file[183]], 
                     n_samples = 299, source_t = 0, target_t = 1, 
                     output_file = OUT_WKT_FILE)
                
# 18300 

for i in range(183, 192):
    print(f"Indice {i}")
    idx_list.append(i)
    interpolate_polygons([multipolygons_file[i],multipolygons_file[i+1]], 
                         n_samples = 99, source_t = 0, target_t = 1, 
                         output_file = OUT_WKT_FILE)

#    99+1+99+1+99 = 299
#  |----|----|----|
# 192  193  194  195 
interpolate_polygons([multipolygons_file[192], multipolygons_file[195]], 
                     n_samples = 299, source_t = 0, target_t = 1, 
                     output_file = OUT_WKT_FILE)

for i in range(195, 200):
    print(f"Indice {i}")
    idx_list.append(i)
    interpolate_polygons([multipolygons_file[i],multipolygons_file[i+1]], 
                         n_samples = 99, source_t = 0, target_t = 1, 
                         output_file = OUT_WKT_FILE)

# 20000

#    99+1+99+1+99 = 299
#  |----|----|----|
# 200  201  202  203 
interpolate_polygons([multipolygons_file[200], multipolygons_file[203]],
                     n_samples = 299, source_t = 0, target_t = 1, 
                     output_file = OUT_WKT_FILE)

#20300

for i in range(203,  len(multipolygons_file)-1):
    print(f"Indice {i}")
    idx_list.append(i)
    interpolate_polygons([multipolygons_file[i],multipolygons_file[i+1]], 
                         n_samples = 99, source_t = 0, target_t = 1, 
                         output_file = OUT_WKT_FILE)

In [24]:
# Convert WKT to list of shapely polygons
with open(OUT_WKT_FILE, 'r') as wkt_file:
    interpol_file = wkt_file.readlines()

#### wkt2masc

In [19]:
def wkt2masc(wkt_file, images_path, orig_dims, out_dims, delete_files=True):
    """ 
    Converts WKT files to segmentation masks.
    Parameters:
        wkt_file {str} -- path to the WKT file
        images_path {str} -- path to the folder where the masks will be saved
        orig_dims {tuple} -- (width, height) original dimensions of the masks 
        out_dims {tuple} -- (width, height) output dimensions of the masks  
    Returns:
        Creates PNG images of the masks
    """

    os.makedirs(images_path, exist_ok=True)

    if delete_files:
        # delete files in the folder, if any
        for filename in os.listdir(images_path):
            if filename.endswith(".png"):
                os.remove(os.path.join(images_path, filename))

    # open WKT file
    wkt = open(wkt_file, 'r')
    num_lines = len(wkt.readlines())
    cnt = 0
    
    print(f"""
    {'-'*38}
    # \033[1mProperties of the resulting masks\033[0m
    # Width: {out_dims[0]}, Height: {out_dims[1]}
    # Number of masks to create: {num_lines}
    {'-'*38}
    """)
    
    # process each line of the WKT file
    with open(wkt_file) as f:
        for line in f:
            # extract numbers from the line
            points = [int(float(number)) for number in re.findall(r'\d+(?:\.\d+)?', line)]
            # create empty mask
            mask = np.zeros((orig_dims[1],orig_dims[0]), dtype=np.uint8)
            # create array with polygon points, with 2 columns (x,y)
            arr = np.array(points, dtype=np.int32).reshape((-1,2))
            # draw mask
            cv2.drawContours(image = mask,
                             contours=[arr],
                             contourIdx=-1,
                             color=(255, 255, 255),
                             thickness=-1,  # if > 0, thickness of the contour; if -1, fill object
                             lineType=cv2.LINE_AA)
            
            if out_dims != orig_dims:
                # resize frames with Lanczos interpolation
                mask = cv2.resize(mask, out_dims, interpolation=cv2.INTER_CUBIC)
            # save mask as PNG
            cv2.imwrite(os.path.join(images_path, f"frame_{cnt:06d}.png"), mask)
            cnt += 1
            # print progress
            print(f"\r\033[1m{cnt}\033[0m/{num_lines} masks created", end="\r")

#### frames2video

In [22]:
def frames2video(img_list, nome_ficheiro='video', fps_ = 25, titulo: str = "", frame_num_text  = False, font_size: int = 1) -> None:
    """ 
    Converte lista de imagens em ficheiro AVI com a mesma resolucão da primeira 
    imagem da lista.
      Parametros: - lista de imagens PNG, TIFF, JPEG, BMP, WEBP, STK, LSM ou XCF
                  - nome do ficheiro do video
      Devolve: salva vídeo no diretório de execucão
    """
    # guarda dimensões da primeira imagem
    img = cv2.imread(img_list[0])
    height, width, _ = img.shape
    size = (width, height)
    num_frames =  len(img_list)

    img_array = list()
    for i in range(len(img_list)):
        img = cv2.imread(img_list[i])
        img_array.append(img)
        print(f"1. Appending frames {i+1}/{num_frames}", end="\r")
        
    print("2. Creating video writer...", end="\r")
    video = cv2.VideoWriter(filename= nome_ficheiro + '.avi',
                            fourcc=cv2.VideoWriter_fourcc(*'mp4v'), fps = fps_,
                            frameSize=size)

    for i in range(len(img_array)):
        if frame_num_text:

            frame_number_text = f"frame_{i:06d}"
            cv2.putText(img_array[i], frame_number_text, (width-300, 50), 
                            cv2.FONT_HERSHEY_SIMPLEX,font_size, (255, 100, 100), 
                            2, cv2.LINE_AA)
        if titulo:
            cv2.putText(img_array[i], titulo, (50, 50), cv2.FONT_HERSHEY_SIMPLEX,
                        font_size, (255, 255, 255), 2, cv2.LINE_AA)
        
        video.write(img_array[i])
        print(f"3. Writing frames to file {i+1}/{num_frames}", end="\r")
    video.release()

#### PNGs and video creation 

In [20]:
wkt2masc(wkt_file = OUT_WKT_FILE, images_path = OUT_DIR_PNG, orig_dims = (1280, 720), out_dims = (1280, 720))

Number of lines in the WKT file: 22500

    --------------------------------------
    # [1mProperties of the resulting masks[0m
    # Width: 1280, Height: 720
    # Number of masks to create: 22500
    --------------------------------------
    
[1m22500[0m/22500 masks created

In [23]:
img_list = sorted(glob(os.path.join(OUT_DIR_PNG, '*.png')))
frames2video(img_list = img_list, nome_ficheiro='pyspatial_interpol', 
             fps_ = 25*10, titulo = "Pyspatialtemporalgeom interpolation (10x speed)", 
             frame_num_text = True, font_size = 1)

3. Writing frames to file 22500/22500

## Interpolation of the sampled polygons

### Data Sources

In [14]:
# WKT with sampled polygons
WKT_FILE_SAMPLED = os.path.join("E:/BurnedAreaUAV_files/Interpolation/reference_masks", "sampled_masks.txt")
# Directory to save PNG format interpolated polygons
OUT_DIR_SAMPLED_PNG = os.path.join(OUT_DIR, 'PNGs_sampled')
# create output directory
if not os.path.exists(OUT_DIR_SAMPLED_PNG):
    os.makedirs(OUT_DIR_SAMPLED_PNG)
# Directory to save WKT format interpolated polygons    
OUT_WKT_SAMPLED_FILE = os.path.join(OUT_DIR, "pyspatial_interpol_sampled.wkt")

In [15]:
# read txt file and extract polygons and their indexes
with open(WKT_FILE_SAMPLED, 'r') as f:
    polygons = f.readlines()
    # extract indexes and polygons
    indexes = [int(polygon.split(',')[0]) for polygon in polygons]
    polygons_wkt = [polygon.split(',', 1)[1][:-1] for polygon in polygons]

In [16]:
# makes sure the file is empty
open(OUT_WKT_SAMPLED_FILE, 'w').close()
for i in range(0, len(polygons_wkt)-1):
    # calculate number of samples to interpolate
    num_samples = (indexes[i+1] - indexes[i])*100 - 1
    interpolate_polygons([polygons_wkt[i], polygons_wkt[i+1]], 
                         n_samples = num_samples, source_t = 0, target_t = 1, 
                         output_file = OUT_WKT_SAMPLED_FILE)

# append last polygon to WKT file
with open(OUT_WKT_SAMPLED_FILE, 'a') as f:
    f.write(polygons_wkt[-1] + '\n')

In [17]:
with open(OUT_WKT_SAMPLED_FILE, 'a') as f:
    f.write(polygons_wkt[-1] + '\n')

#### WKT to PNG conversion

In [20]:
wkt2masc(wkt_file = OUT_WKT_SAMPLED_FILE, images_path = OUT_DIR_SAMPLED_PNG, orig_dims = (1280, 720), out_dims = (1280, 720))


    --------------------------------------
    # [1mProperties of the resulting masks[0m
    # Width: 1280, Height: 720
    # Number of masks to create: 22501
    --------------------------------------
    
[1m22501[0m/22501 masks created

#### Video Generation

In [23]:
img_list = sorted(glob(os.path.join(OUT_DIR_SAMPLED_PNG, '*.png')))
frames2video(img_list = img_list, nome_ficheiro='pyspatial_interpol_sampled', 
             fps_ = 25*10, titulo = "Pyspatialtemporalgeom interpolation (10x speed)", 
             frame_num_text = True, font_size = 1)

3. Writing frames to file 22501/22501