This is a prepared notebook intended to be used within **Dymaxion Labs Platform** that uses pre-trained weights to predict and then post-process its results.

## Preparation

In [None]:
# Task
INPUT_ARTIFACTS_URL = "gs://dym-temp/immap-flood-areas/test-task/input"
OUTPUT_ARTIFACTS_URL = "gs://dym-temp/immap-flood-areas/test-task/output"

WEIGHTS_URL = (
    "gs://dym-ml-models/dymaxionlabs/immap-flood-areas/v0.1.0/weights.h5"
)

# Extract chips
SIZE = 160
STEP_SIZE = SIZE

# Post-process
MIN_AREA = 750000
THRESHOLD = 0.2

In [None]:
import os

from labfunctions.shortcuts import settings

In [None]:
# Predict
WEIGHTS_PATH = os.path.join(settings.BASE_PATH, "data", "weights.h5")
IMAGES_DIR = os.path.join(settings.BASE_PATH, "data/images")
PREDICT_CHIPS_DIR = os.path.join(settings.BASE_PATH, "data/chips")
PREDICT_RESULTS_DIR = os.path.join(settings.BASE_PATH, "data/predict")
PREDICT_RASTER_PATH = os.path.join(settings.BASE_PATH, "data/predict/prob.tif")

# Post-process
POST_BIN_RASTER_PATH = os.path.join(settings.BASE_PATH, "data/post/bin.tif")
POST_BIN_VECTOR_PATH = os.path.join(settings.BASE_PATH, "data/post/bin.gpkg")
POST_FILTERED_PATH = os.path.join(settings.BASE_PATH, "data/post/filtered_bin.gpkg")

# Output
OUTPUT_DIR = os.path.join(settings.BASE_PATH, "data/results/")
OUTPUT_RASTER_PATH = os.path.join(settings.BASE_PATH, "data/results/prob.tif")
OUTPUT_MASK_PATH = os.path.join(settings.BASE_PATH, "data/results/prob.gpkg")

### Configure Google Application credentials

In [None]:
import json
import base64

from labfunctions.shortcuts import secrets

In [None]:
GOOGLE_APPLICATION_CREDENTIALS_PATH = os.path.join(settings.BASE_PATH, ".google_aplication_credentials.json")

with open(GOOGLE_APPLICATION_CREDENTIALS_PATH, "w") as f:
    f.write(base64.b64decode(secrets["GOOGLE_APPLICATION_CREDENTIALS"]).decode())

In [None]:
!gcloud auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS_PATH

### Download pre-trained weights

In [None]:
os.makedirs(os.path.dirname(WEIGHTS_PATH), exist_ok=True)

In [None]:
!gsutil cp -n $WEIGHTS_URL $WEIGHTS_PATH

### Download input images

In [None]:
os.makedirs(IMAGES_DIR, exist_ok=True)

In [None]:
!gsutil -m cp -r $INPUT_ARTIFACTS_URL/* $IMAGES_DIR

## Prepare prediction dataset

In [None]:
!satproc_extract_chips $IMAGES_DIR/*.tif \
    -o $PREDICT_CHIPS_DIR \
    --size $SIZE \
    --step-size $STEP_SIZE \
    --rescale \
    --rescale-mode percentiles --lower-cut 0 --upper-cut 100

## Predict

In [None]:
from unetseg.predict import PredictConfig, predict
from unetseg.evaluate import plot_data_results
import subprocess
import glob

In [None]:
cfg = PredictConfig(
    images_path=PREDICT_CHIPS_DIR,
    results_path=PREDICT_RESULTS_DIR,
    batch_size=4,
    model_path=WEIGHTS_PATH,
    height=160,
    width=160,
    n_channels=6,
    n_classes=1,
    class_weights=[1],
)

In [None]:
predict(cfg)

In [None]:
def build_prediction_raster(*, input_dir, output_path):
    filenames = glob.glob(os.path.join(input_dir, "*.tif"))

    vrt_path = "/tmp/predict.vrt"
    tmp_list_path = "/tmp/list.txt"
    with open(tmp_list_path, "w") as f:
        for line in filenames:
            f.write(f"{line}\n")

    # Build virtual raster of all chip tifs into a single VRT
    subprocess.run(
        f"gdalbuildvrt -input_file_list {tmp_list_path} {vrt_path}",
        shell=True,
        check=True,
    )
    
    # Convert VRT to GeoTiff with DEFLATE compression enabled
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    subprocess.run(
        f"gdal_translate -of GTiff -co COMPRESS=DEFLATE -co PREDICTOR=2 -co ZLEVEL=9 {vrt_path} {output_path}",
        shell=True,
        check=True,
    )

In [None]:
build_prediction_raster(
    input_dir=PREDICT_RESULTS_DIR,
    output_path=PREDICT_RASTER_PATH,
)

## Post-process

### Binarize image with threshold to create mask

In [None]:
# expresion para seleccionar los pixeles mayores al umbral
# se convierte el valor umbral al rango de valores del raster de prediccion (que va de 0 a 255)
exp = f"((A > {int(THRESHOLD * 255)})*199)"

os.makedirs(os.path.dirname(POST_BIN_RASTER_PATH), exist_ok=True)
subprocess.run(
    (
        "gdal_calc.py --quiet --overwrite "
        f"-A {PREDICT_RASTER_PATH} "
        "--A_band=1 "
        f"--outfile {POST_BIN_RASTER_PATH} "
        f'--calc="{exp}" '
        "--NoDataValue=0"
    ),
    shell=True,
    check=True,
)

### Poligonize binary mask

In [None]:
# funcion para poligonizar las areas seleccionadas con pb mayor al umbral
os.makedirs(os.path.dirname(POST_BIN_VECTOR_PATH), exist_ok=True)
if os.path.exists(POST_BIN_VECTOR_PATH):
    os.unlink(POST_BIN_VECTOR_PATH)
subprocess.run(f'gdal_polygonize.py {POST_BIN_RASTER_PATH} {POST_BIN_VECTOR_PATH}', shell=True, check=True)

### Filter by min. area

In [None]:
import math

import fiona
from satproc.utils import reproject_shape
from shapely.geometry import shape
from tqdm import tqdm


def filter_by_min_area(src_file, dst_file, min_area=500, utm_code=None):
    if not utm_code:
        utm_code = get_epsg_utm_from(src_file)
        print(f"Using projected CRS {utm_code} for filtering by meters")

    with fiona.open(src_file) as src:
        with fiona.open(
            dst_file, "w", driver="GPKG", crs=src.crs, schema=src.schema
        ) as dst:
            for feature in tqdm(
                src, desc=f"Filtering polygons by area (>={min_area}m)", ascii=True
            ):
                geom = shape(feature["geometry"])
                repr_geom = reproject_shape(geom, src.crs, utm_code)
                if repr_geom.area >= min_area:
                    dst.write(feature)


def get_epsg_utm_from(vector_path):
    """Calculate UTM zone from a vector file in WGS84 geographic coordinates"""
    with fiona.open(vector_path) as src:
        some_feat = next(iter(src), None)
        if not some_feat:
            raise ValueError(f"{vector_path} has no features")
        some_geom = shape(some_feat["geometry"])
        if src.crs["init"] != "epsg:4326":
            some_geom = reproject_shape(some_geom, src.crs["init"], "epsg:4326")
        return get_epsg_utm_from_wgs_geom(some_geom)


def get_epsg_utm_from_wgs_geom(geom):
    """
    Calculate UTM zone from a geometry in WGS84 geographic coordinates and
    get corresponding EPSG code.

    """
    centroid = geom.centroid
    lon, lat = centroid.x, centroid.y
    utm_band = str((math.floor((lon + 180) / 6) % 60) + 1)
    if len(utm_band) == 1:
        utm_band = f"{utm_band}"
    if lat >= 0:
        epsg_code = f"epsg:326{utm_band}"
    else:
        epsg_code = f"epsg:327{utm_band}"
    return epsg_code

In [None]:
filter_by_min_area(POST_BIN_VECTOR_PATH, POST_FILTERED_PATH, min_area=MIN_AREA)

In [None]:
import geopandas as gpd

# Try to fix geometries if there are invalid
gdf = gpd.read_file(POST_FILTERED_PATH)
gdf.geometry = gdf.geometry.buffer(0)
gdf.to_file(POST_FILTERED_PATH)

### Clip raster with filtered vector mask

In [None]:
# -cutline indica el archivo vectorial que se utiliza para recortar al raster original (input_pred)
os.makedirs(os.path.dirname(OUTPUT_RASTER_PATH), exist_ok=True)
subprocess.run(
    (
        f"gdalwarp -overwrite "
        f"-cutline {POST_FILTERED_PATH} "
        "-crop_to_cutline "
        f"-dstalpha {PREDICT_RASTER_PATH} "
        f"{OUTPUT_RASTER_PATH}"
    ),
    shell=True,
    check=True,
)

In [None]:
import shutil

shutil.copyfile(POST_FILTERED_PATH, OUTPUT_MASK_PATH)

## Export results as artifacts

In [None]:
!gsutil -m cp -r $OUTPUT_DIR/* $OUTPUT_ARTIFACTS_URL/