In [None]:
import json
import warnings
from datetime import date, datetime, timedelta
from glob import glob
from typing import Callable, OrderedDict

import geojson
import numpy as np
import pyproj
import rasterio
import rasterio.merge
import requests
from affine import Affine
from numba import njit, prange
from numba.core.errors import NumbaDeprecationWarning, NumbaPendingDeprecationWarning
from numpy.typing import NDArray
from rasterio.crs import CRS
from rasterio.io import BufferedDatasetWriter, DatasetReader, DatasetWriter, MemoryFile
from rasterio.mask import mask
from rasterio.plot import reshape_as_image, show
from rasterio.warp import calculate_default_transform, reproject
from requests import Response

# https://sentinelsat.readthedocs.io/en/stable/api_overview.html
from sentinelsat import SentinelAPI, make_path_filter
from shapely.geometry import (
    GeometryCollection,
    LinearRing,
    LineString,
    MultiLineString,
    MultiPoint,
    MultiPolygon,
    Point,
    Polygon,
    mapping,
    shape,
)
from shapely.ops import transform

warnings.simplefilter("ignore", category=NumbaDeprecationWarning)
warnings.simplefilter("ignore", category=NumbaPendingDeprecationWarning)

import os
import shutil

import gdal2tiles
from PIL import Image
from tqdm.autonotebook import tqdm


In [None]:
# Prepare username and password
username: str = ""
password: str = ""

with open("credentials") as file:
    string: str = file.read()
    username = string.split("\n")[0].split("username=")[-1]
    password = string.split("\n")[1].split("password=")[-1]


In [None]:
def get_polygon(
    osm_id: int,
) -> (
    Point
    | MultiPoint
    | LineString
    | MultiLineString
    | Polygon
    | MultiPolygon
    | LinearRing
    | GeometryCollection
):
    """
    Downloads and returns the geometry feature related to an OpenStreetMap
    relation id.

    Parameters
    ---
        osm_id : int
            The OpenStreetMap relation id to get geometry features from.

    Returns
    ---
        feature : Point | MultiPoint | LineString | MultiLineString
                    | Polygon | MultiPolygon | LinearRing | GeometryCollection
            The geometry feature that is tied to the id.

    """
    req = requests.get(
        url="https://nominatim.openstreetmap.org/lookup",
        params={
            "osm_ids": "R{:}".format(osm_id),
            "format": "geojson",
            "polygon_geojson": 1,
        },
    ).json()

    if len(req["features"]) == 0:
        requests.get("https://polygons.openstreetmap.fr/", params={"id": osm_id})
        geojson_req: Response = requests.get(
            "https://polygons.openstreetmap.fr/get_geojson.py", params={"id": osm_id}
        )
        return shape(geojson_req.json())

    return shape(req["features"][0]["geometry"])


In [None]:
def look_for_polygon_fix(city_name: str) -> tuple[int | None, tuple[float, ...] | None]:
    """
    Looks for a hard coded fix for the city, if there is one.
    These fixes were made as the query to the OpenStreetMap
    services didn't yield the wanted polygon/relation id.

    Parameters
    ---
        city_name : str
            The name of the city, with state.
            Example format: "Los Angeles, California"

    Returns
    ---
        osm_relation : int | None
            The corrected id, if there is one.

        bbox : tuple[float, ...] | None
            The bounding box for the corrected polygon,
            if there is one.

    """
    osm_id: int | None = None
    bbox: tuple[float, ...] | None = None
    # Some cities needed hard coding at the time
    # to get the correct polygons
    if city_name == "Albuquerque, New Mexico":
        osm_id = 171262
    elif city_name == "Cleveland, Ohio":
        osm_id = 182130
    elif city_name == "Columbia, Maryland":
        osm_id = 133606
    elif city_name == "Columbia, South Carolina":
        osm_id = 194000
    elif city_name == "Mesa, Arizona":
        osm_id = 110815
    elif city_name == "Miami, Florida":
        osm_id = 1216769
    elif city_name == "Pearl City, Hawaii":
        osm_id = 119264
    elif city_name == "San Bernardino, California":
        osm_id = 253639

    if osm_id != None:
        geo: (
            Point
            | MultiPoint
            | LineString
            | MultiLineString
            | Polygon
            | MultiPolygon
            | LinearRing
            | GeometryCollection
        ) = get_polygon(osm_id)
        bbox = geo.bounds

    return osm_id, bbox


In [None]:
def get_polygon_osm_id_and_bbox(
    city_name: str,
) -> tuple[int | None, tuple[float, ...] | None]:
    """
    Finds the OpenStreetMap polygon/relation id and
    bounding box for the city polygon if one is found.

    Parameters
    ---
        city_name : str
            The name of the city to query for.

    Returns
    ---
        osm_id : int | None
            The OpenStreetMap relation id for the
            city polygon if one is found.

         bbox : tuple[float, ...] | None
            The bounding box for the city polygon,
            if there is one.

    """
    osm_id: int | None
    bbox: tuple[float, ...] | None
    osm_id, bbox = look_for_polygon_fix(city_name)

    if bbox == None:
        reqs: Response = requests.get(
            "https://nominatim.openstreetmap.org/search",
            params={"q": city_name, "format": "geojson"},
        )
        for item in reqs.json()["features"]:
            if (
                item["properties"]["type"] == "administrative"
                and item["properties"]["category"] == "boundary"
            ):
                osm_id = item["properties"]["osm_id"]
                bbox = item["bbox"]
                break

    bbox_tuple: tuple[float, ...] | None = None
    if bbox != None:
        bbox_tuple = (bbox[0], bbox[2], bbox[3], bbox[1])

    return osm_id, bbox_tuple


In [None]:
def get_pin_location_osm_id(city_name: str) -> int | None:
    """
    Attempts to find an OpenStreetMap relation id for
    a pin location for the city, if there is one.

    Parameters
    ---
        city_name : str
            The name of the city to query for.

    Returns
    ---
        osm_id : int | None
            The OpenStreetMap relation id for the
            city pin location if one is found.

    """
    osm_id: int | None = None
    reqs: Response = requests.get(
        "https://nominatim.openstreetmap.org/search",
        params={"q": city_name, "format": "geojson"},
    )
    for item in reqs.json()["features"]:
        if (
            item["properties"]["type"] == "administrative"
            and item["properties"]["category"] == "admin_centre"
        ):
            osm_id = item["properties"]["osm_id"]
            break

    return osm_id


In [None]:
def get_products(
    bbox: tuple[float, ...],
    date: str | tuple[str | datetime | date, ...],
    area_relation: str = "Contains",
    limit: int | None = None,
) -> OrderedDict | None:
    """
    Gets the
    Parameters
    ---
        bbox : tuple[float, ...]
            The bounding box area to query within.

        date : str | tuple[str | datatime | date, ...]
            The date or date range to use when querying.

        area_relation : {'Intersects', 'Contains', 'IsWithin'},
            What relation to use for testing the AOI.

        limit : int | None
            Maximum number of products to get.

    Returns
    ---
        products : OrderedDict | None
            A dictionary that contains the info for the
            found products, if any are found.

    """
    api = SentinelAPI(username, password)
    products: OrderedDict = api.query(
        "ENVELOPE{}".format(bbox),
        platformname="Sentinel-2",
        processinglevel="Level-2A",
        limit=limit,
        area_relation=area_relation,
        date=date,
    )

    if len(products) == 0:
        return None

    return products


In [None]:
def download_all_images(products: list[OrderedDict]) -> None:
    """
    Download the images from all the listed products.

    Parameters
    ---
        products : list[OrderedDict]
            The list of products to download.

    """
    api: SentinelAPI = SentinelAPI(username, password)
    api.download_all(
        products,
        directory_path="images/src",
        nodefilter=make_path_filter("*SCL_20m*.jp2"),
    )


In [None]:
def get_image_paths(products: OrderedDict) -> list[str]:
    """
    Gets the image paths for the product.

    Parameters
    ---
        products : OrderedDict
            The product to get image paths for.

    Returns
    ---
        paths : list[str]
            The paths for the images.
    """
    api: SentinelAPI = SentinelAPI(username, password)
    paths: list[str] = []
    for folder in api.to_dataframe(products)["title"]:
        paths.append(
            glob("images/src/{:s}/**/**/**/**/*SCL_20m*.jp2".format(folder + ".SAFE"))[
                0
            ]
        )
    return paths


In [None]:
def merge_images(
    image_paths: list[str],
) -> DatasetReader | DatasetWriter | BufferedDatasetWriter:
    """
    Merges images into one, useful if one image doesn't cover
    the wanted area.

    Parameters
    ---
        image_paths : list[str]
            The paths of the images to merge.

    Returns
    ---
        image : DatasetReader | DatasetWriter | BufferedDatasetWriter
            The merged image.

    """
    crs: str | dict | CRS = rasterio.open(image_paths[0]).profile["crs"]
    image: np.ndarray
    affine: Affine
    image, affine = rasterio.merge.merge(image_paths)
    memfile: MemoryFile = MemoryFile()
    image_file: DatasetReader | DatasetWriter | BufferedDatasetWriter = memfile.open(
        driver="JP2OpenJPEG",
        height=image.shape[1],
        width=image.shape[2],
        count=image.shape[0],
        crs=crs,
        transform=affine,
        dtype=image.dtype,
    )
    image_file.write(image)
    return image_file


In [None]:
def get_transformed_geo(
    geo: (
        Point
        | MultiPoint
        | LineString
        | MultiLineString
        | Polygon
        | MultiPolygon
        | LinearRing
        | GeometryCollection
    ),
    crs: str | dict | CRS,
) -> list[
    (
        Point
        | MultiPoint
        | LineString
        | MultiLineString
        | Polygon
        | MultiPolygon
        | LinearRing
        | GeometryCollection
    )
]:
    """
    Transforms the geometry to a new crs/projection. We also need to
    flip the coordinates first.

    Parameters
    ---
        geo : Point | MultiPoint | LineString | MultiLineString
               | Polygon | MultiPolygon | LinearRing | GeometryCollection
            The geometry to transform/reproject.

        crs : str | dict | CRS
            The target crs/projection.

    Returns
    ---
        reproj_geo : Point | MultiPoint | LineString | MultiLineString
                          | Polygon | MultiPolygon | LinearRing | GeometryCollection
            The reprojected geometry.

    """

    flip_coords: Callable[..., tuple] = lambda x, y: (y, x)
    flipped_geo: list = []
    if geo.geom_type != "Polygon":
        flipped_geo = [transform(flip_coords, polygon) for polygon in geo.geoms]
    else:
        flipped_geo = [transform(flip_coords, geo)]

    projection: pyproj.Transformer = pyproj.Transformer.from_proj(
        pyproj.Proj("EPSG:4326"), crs
    )
    transformed_geo: list[
        (
            Point
            | MultiPoint
            | LineString
            | MultiLineString
            | Polygon
            | MultiPolygon
            | LinearRing
            | GeometryCollection
        )
    ] = [transform(projection.transform, polygon) for polygon in flipped_geo]
    return transformed_geo


In [None]:
@njit
def calc_highlight_fraction(
    image: np.ndarray,
    to_highlight: list[int],
    compare_to: list[int],
) -> float:
    """
    Calculates the fraction of the image which has a value
    that is highlighted compared to the total of highlighted
    and chosen comparison values.

    `fraction = to_highlight / (to_highlight + compare_to)`

    Parameters
    ---
        image : np.ndarray
            The image to calculate from.

        to_highlight : list[int]
            The bands/values to find the fraction for.
            The numerator of the fraction.

        compare_to : list[int]
            The bands/values to use in the denominator
            together with `to_highlight`.

    Returns
    ---
        fraction : float
            The fraction of the image with
            value in `to_highlight` compared to
            the sum of `compare_to` and `to_highlight`.

    """
    highlighted: int = 0
    comparison: int = 0
    for y in prange(image.shape[1]):
        for x in prange(image.shape[2]):
            pixel: int = image[0, y, x]
            if pixel in to_highlight:
                highlighted += 1
            elif pixel in compare_to:
                comparison += 1
    return highlighted / (highlighted + comparison)


In [None]:
def highlight(image:np.ndarray, to_highlight:int, with_alpha:bool = True,)->np.ndarray:
    '''
    Creates a highlighted version of the image, with only the highlighted value
    as white/255 and the rest black/0. The rest can be made transparent with
    the `with_alpha` parameter.

    Parameters
    ---
        image : np.ndarray
            The image to highlight.

        to_highlight : int
            The band/value to highlight in the image.

        with_alpha : bool
            Whether to add an alpha layer with the same value
            as the highlighted layer. This makes the parts
            of the image that aren't highlighted transparent 
            instead of black/0.
    
    Returns
    ---
        highlighted : np.ndarray
            The highlighted image.
        
    '''
    highlighted: np.ndarray =  np.where(image == to_highlight, 255, 0)
    if with_alpha:
        return np.reshape(np.array([highlighted, highlighted]), newshape=(highlighted.shape[0]+1,highlighted.shape[1], highlighted.shape[2]))
    return highlighted
            

In [None]:
def prepare_images(
    download: bool = True,
) -> tuple[dict[str, list[str]], set[str], set[str]]:
    """
    Prepares and finds the image products for the cities in `city_data.json`.
    If `product_map.json` exists it will be loaded instead.
    The images can be downloaded if `download` is true.

    Parameters
    ---
        download : bool
            Whether to download the images.

    Returns
    ---
        image_paths : dict[str, list[str]]
            A dictionary of the cities with paths to their source images.

        needs_attention_image : set[str]
            A list of cities that has no source images.

        needs_attention_osm : set[str]
            A list of cities that has missing OpenStreetMap relation id
            or bounding box.

    """
    all_products: list[OrderedDict] = []
    product_map: dict = {}
    needs_attention_image: set[str] = set()
    needs_attention_osm: set[str] = set()
    image_paths: dict[str, list[str]] = {}

    city_data: list[dict[str, str]] = json.load(open("city_data.json"))

    if os.path.exists("product_map.json"):
        image_paths = json.load(open("product_map.json"))
        if len(image_paths) == len(city_data):
            return image_paths, needs_attention_image, needs_attention_osm

    for data in tqdm(city_data, desc="Preparing products"):
        city_name: str = data["city"]
        start_date: date = date.fromisoformat(data["date"])
        end_date: date = start_date + timedelta(days=1)
        date_span: tuple[date, date] = (start_date, end_date)

        osm_id: int | None
        bbox: tuple[float, ...] | None
        osm_id, bbox = get_polygon_osm_id_and_bbox(city_name)

        if osm_id == None or bbox == None:
            needs_attention_osm.add(city_name)
        else:
            products: OrderedDict | None = get_products(
                bbox, date_span, area_relation="Contains", limit=1
            )
            if products == None:
                products = get_products(bbox, date_span, area_relation="Intersects")
                if products == None:
                    needs_attention_image.add(city_name)
                else:
                    product_map[city_name] = list(products)
                    all_products.extend(list(products))
                    image_paths[city_name] = get_image_paths(products)
            else:
                product_map[city_name] = list(products)
                all_products.extend(list(products))
                image_paths[city_name] = get_image_paths(products)

    print(
        "Ready: {:n}\t Attention image: {:n}\t Attention OSM: {:n}".format(
            len(set(all_products)), len(needs_attention_image), len(needs_attention_osm)
        )
    )

    with open("product_map.json", mode="w") as file:
        file.write(geojson.dumps(image_paths, indent=4))

    if download:
        download_all_images(all_products)

    return image_paths, needs_attention_image, needs_attention_osm


In [None]:
get_polygon_osm_id_and_bbox("Plano, Texas")


In [None]:
def mask_image(
    image: DatasetReader | DatasetWriter | BufferedDatasetWriter,
    geometry: list[
        (
            Point
            | MultiPoint
            | LineString
            | MultiLineString
            | Polygon
            | MultiPolygon
            | LinearRing
            | GeometryCollection
        )
    ],
    crop=True,
) -> DatasetReader | DatasetWriter | BufferedDatasetWriter:
    """
    Masks the image with geometry.

    Parameters
    ---
        image : DatasetReader | DatasetWriter | BufferedDatasetWriter
            The image to mask.

        geometry : Point | MultiPoint | LineString | MultiLineString
                         | Polygon | MultiPolygon | LinearRing | GeometryCollection
            The geometry to mask the image with.

    Returns
    ---
        masked_image : DatasetReader | DatasetWriter | BufferedDatasetWriter
            The masked image.

    """
    with rasterio.Env():
        masked: np.ndarray
        affine: Affine
        masked, affine = mask(image, geometry, crop=crop)
        memfile = MemoryFile()
        image_file: DatasetReader | DatasetWriter | BufferedDatasetWriter = (
            memfile.open(
                driver="JP2OpenJPEG",
                count=masked.shape[0],
                height=masked.shape[1],
                width=masked.shape[2],
                dtype=masked.dtype,
                crs=image.crs,
                transform=affine,
            )
        )
        image_file.write(masked)
        return image_file


In [None]:
def reproject_image(
    image: DatasetReader | DatasetWriter | BufferedDatasetWriter,
    crs="EPSG:3857",
) -> tuple[
    DatasetReader | DatasetWriter | BufferedDatasetWriter,
    tuple[list[float], ...],
]:
    """
    Reprojects the image to the `crs`, usually the flat map EPSG:3857
    projection (meters), and it's bounding box to the
    ellipsoid/spheric EPSG:4326 (degrees).

    Parameters
    ---
        image : DatasetReader | DatasetWriter | BufferedDatasetWriter
            The image to reproject.

        crs : str
            The crs/projection to project to.

    Returns
    ---
        reproj_image : DatasetReader | DatasetWriter | BufferedDatasetWriter
            The reprojected image.

        bbox : tuple[list[float], ...]
            The bounding box for the reprojected image in EPSG:4326 (degrees).

    """
    with rasterio.Env():
        source: np.ndarray = image.read()
        dst_transform, height, width = calculate_default_transform(
            src_crs=image.crs,
            dst_crs=crs,
            height=image.height,
            width=image.width,
            left=image.bounds.left,
            bottom=image.bounds.bottom,
            right=image.bounds.right,
            top=image.bounds.top,
        )

        memfile = MemoryFile()
        image_file: DatasetReader | DatasetWriter | BufferedDatasetWriter = (
            memfile.open(
                driver="JP2OpenJPEG",
                count=1,
                height=width,
                width=height,
                dtype=source.dtype,
                crs=crs,
                transform=dst_transform,
            )
        )

        reproject(
            source=rasterio.band(image, 1),
            destination=rasterio.band(image_file, 1),
            src_crs=image.crs,
            dst_crs=crs,
            src_transform=image.transform,
            dst_transform=dst_transform,
        )

        transformer: pyproj.Transformer = pyproj.Transformer.from_crs(
            crs_from=crs, crs_to="EPSG:4326"
        )
        sn: list[float]
        we: list[float]
        sn, we = transformer.transform(
            [image_file.bounds[0], image_file.bounds[2]],
            [image_file.bounds[1], image_file.bounds[3]],
        )

        return image_file, (sn, we)


In [None]:
# The different bands that are available in the Sentinel-2
# classified images.
BANDS: dict[int, str] = {
    0: "no_data",
    1: "saturated_or_defective",
    2: "dark_area_pixels",
    3: "cloud_shadows",
    4: "vegetation",
    5: "not_vegetated",
    6: "water",
    7: "unclassified",
    8: "cloud_medium_probability",
    9: "cloud_high_probability",
    10: "thin_cirrus",
    11: "snow",
}


In [None]:
def make_folder(path) -> None:
    """
    Creates the folder in path if it doesn't exist.

    Parameters
    ---
        path : str
            The folder to create.

    """
    if "/" in path:
        child: str = path.split("/")[-1]
        parent: str = path.split("/" + child)[0]
        make_folder(parent)
    if not os.path.isdir(path):
        os.mkdir(path)


In [None]:
def process_city(
    city_name: str,
    paths: list[str],
    highlight_bands=[4, 5, 6],
) -> tuple[dict[str, float], list[float]] | None:
    """
    Processes the city, i.e. creates masked image files
    and calulates the coverage fractions for the
    bands/values to highlight.

    Parameters
    ---
        city_name : str
            The city to process.

        paths : list[str]
            The paths to the images for the city.

        highlight_bands : list[int]
            The bands/values to highlight in the images
            and calculate fractions for.

    Returns
    ---
        coverages : dict[str, float]
            A dictionary with the coverage fractions
            for each band/value in highlight_bands.

        bbox : tuple[float, ...]
            The bounding box for the city.

    """
    image: DatasetReader | DatasetWriter | BufferedDatasetWriter = merge_images(paths)
    osm_id: int | None
    bbox: tuple[float, ...] | None
    osm_id, bbox = get_polygon_osm_id_and_bbox(city_name)
    if osm_id != None:
        geo: (
            Point
            | MultiPoint
            | LineString
            | MultiLineString
            | Polygon
            | MultiPolygon
            | LinearRing
            | GeometryCollection
        ) = get_polygon(osm_id)

        path = "export/polygons"
        make_folder(path)
        with open("{:s}/{:s}.json".format(path, city_name), mode="w") as file:
            file.write(geojson.dumps(mapping(geo), indent=4))

        usable_geo: list[
            (
                Point
                | MultiPoint
                | LineString
                | MultiLineString
                | Polygon
                | MultiPolygon
                | LinearRing
                | GeometryCollection
            )
        ] = get_transformed_geo(geo, image.profile["crs"])

        masked_image: DatasetReader | DatasetWriter | BufferedDatasetWriter = (
            mask_image(image, usable_geo, crop=True)
        )
        reprojected: DatasetReader | DatasetWriter | BufferedDatasetWriter
        bounds: tuple[list[float], ...]
        reprojected, bounds = reproject_image(masked_image)

        coverages: dict[str, float] = {}

        with rasterio.Env():
            path: str = "images/masked"
            make_folder(path)
            with rasterio.open(
                "{:s}/{:s}.png".format(path, city_name),
                mode="w",
                driver="PNG",
                count=1,
                height=reprojected.shape[0],
                width=reprojected.shape[1],
                crs=reprojected.profile["crs"],
                transform=reprojected.transform,
                dtype=np.uint8,
                compression="lzw",
            ) as file:
                file.write(reprojected.read())

            for band in highlight_bands:
                other_bands = highlight_bands.copy()
                coverage_percent = (
                    calc_highlight_fraction(
                        reprojected.read(), to_highlight=[band], compare_to=other_bands
                    )
                    * 100
                )
                band_name: str = BANDS[band]
                coverages[band_name] = coverage_percent
                highlighted: np.ndarray = highlight(reprojected.read(), band)

                path: str = "images/processed/{:s}".format(band_name)
                make_folder(path)
                with rasterio.open(
                    "{:s}/{:s}.png".format(path, city_name),
                    mode="w",
                    driver="PNG",
                    count=highlighted.shape[0],
                    height=highlighted.shape[1],
                    width=highlighted.shape[2],
                    crs=reprojected.profile["crs"],
                    transform=reprojected.transform,
                    dtype=np.uint8,
                    compression="lzw",
                ) as file:
                    file.write(highlighted)

                path: str = "export/masks/{:s}".format(band_name)
                make_folder(path)
                Image.fromarray(
                    np.array(reshape_as_image(highlighted), dtype=np.uint8)
                ).save("{:s}/{:s}.png".format(path, city_name))

        return coverages, [*bounds[1], *bounds[0]]


In [None]:
def run() -> None:
    """
    Calling this function will run the process for all the
    cities found in `city_data.json`.

    The results are saved in the `export` folder.

    """
    image_paths: dict[str, list[str]]
    needs_attention_image: set[str]
    needs_attention_osm: set[str]
    image_paths, needs_attention_image, needs_attention_osm = prepare_images(
        download=True
    )

    if needs_attention_image or needs_attention_osm:
        if needs_attention_image:
            print(f"These cities has image problems:\n{needs_attention_image}")

        if needs_attention_osm:
            print(f"These cities has OSM problems:\n{needs_attention_osm}")

        return

    processed: dict[str, dict[str, float]] = {}
    bboxes: dict[str, list[float]] = {}

    bands: np.ndarray = np.arange(1, 11 + 1)

    os.makedirs("tmp", exist_ok=True)

    tmp_coverage_file: str = f"tmp/coverage_percent_bands_{bands[0]}-{bands[-1]}.json"
    tmp_bbox_file: str = "tmp/bbox.json"

    coverage_file: str = f"export/coverage_percent_bands_{bands[0]}-{bands[-1]}.json"
    bbox_file: str = "export/bbox.json"

    if os.path.exists(tmp_coverage_file):
        processed = geojson.load(open(tmp_coverage_file))

    if os.path.exists(tmp_bbox_file):
        bboxes = geojson.load(open(tmp_bbox_file))

    progressbar: tqdm = tqdm(total=len(image_paths.items()), desc="Processing")

    for city_name, paths in image_paths.items():
        progressbar.set_postfix(city=city_name, refresh=True)
        if city_name in processed.keys() and city_name in bboxes.keys():
            progressbar.update(1)
            continue

        processed_city: tuple[dict[str, float], list[float]] | None = process_city(
            city_name, paths, highlight_bands=bands
        )
        if processed_city != None:
            coverage: dict[str, float] = processed_city[0]
            bbox_poly: list[float] = processed_city[1]
            processed[city_name] = coverage
            bboxes[city_name] = bbox_poly

        open(tmp_coverage_file, mode="w").write(
            geojson.dumps(
                processed,
                indent=4,
            )
        )

        open(tmp_bbox_file, mode="w").write(
            json.dumps(
                bboxes,
                indent=4,
            )
        )
        progressbar.update(1)

    open(coverage_file, mode="w").write(
        geojson.dumps(
            processed,
            indent=4,
        )
    )

    open(bbox_file, mode="w").write(
        json.dumps(
            bboxes,
            indent=4,
        )
    )

    os.remove(tmp_coverage_file)
    os.remove(tmp_bbox_file)
    os.rmdir("tmp")


In [None]:
run()


In [None]:
@njit
def stack_image_arrays(
    image_1: NDArray[np.uint8],
    image_2: NDArray[np.uint8],
) -> NDArray[np.uint8]:
    """
    Stacks two images on top of each other. The highest pixel
    value is kept at each position.

    Parameters
    ---
        image_1 : np.ndarray[np.uint8]
            The first image to stack.

        image_2 : np.ndarray[np.uint8]
            The second image to stack.

    Returns
    ---
        combined_image : np.ndarray[np.uint8]
            The combined stacked image.

    """
    combined_image: np.ndarray = np.zeros(image_1.shape, dtype=image_1.dtype)
    for y in prange(combined_image.shape[0]):
        for x in range(combined_image.shape[1]):
            for band in range(combined_image.shape[2]):
                combined_image[y][x][band] = max(
                    image_1[y][x][band], image_2[y][x][band]
                )

    return combined_image


In [None]:
def stack_image_tiles(
    image_path_1: str,
    image_path_2: str,
) -> NDArray[np.uint8]:
    """
    Stacks the images from the given paths. Calls `stack_image_arrays`.

    Parameters
    ---
        image_path_1 : str
            Path to the first image of the stack.

        image_path_2 : str
            Path to the second image of the stack.

    Returns
    ---
        stacked : np.ndarray[np.uint8]
            The stacked image.

    """
    image_1: np.ndarray = np.array(Image.open(image_path_1))
    image_2: np.ndarray = np.array(Image.open(image_path_2))
    stacked: np.ndarray = stack_image_arrays(image_1, image_2)

    return stacked


In [None]:
def copy_and_merge_tiles(
    tiles_path: str,
    export_path: str,
) -> None:
    """
    Copies and merges tiles for all the tiles found
    in the given path.

    Parameters
    ---
        tiles_path : str
            Where the tiles are stored.

        export_path : str
            Where to save the merged tiles.

    """
    if not os.path.exists(export_path):
        os.makedirs(export_path)

    files_in_tiles_path: list[str] = glob("*/*/*.png", root_dir=tiles_path)
    files_in_export_path: list[str] = glob("*/*/*.png", root_dir=export_path)

    for file_path in files_in_tiles_path:
        existing_path: str | None = None
        for existing_file_path in files_in_export_path:
            if file_path == existing_file_path:
                existing_path = existing_file_path

        if existing_path != None:
            stacked_image: Image.Image = Image.fromarray(
                stack_image_tiles(tiles_path + file_path, export_path + file_path)
            )
            stacked_image.save(export_path + file_path)
        else:
            os.makedirs(
                export_path
                + file_path.replace(file_path.split("/")[-1].split("\\")[-1], ""),
                exist_ok=True,
            )
            shutil.copy2(tiles_path + file_path, export_path + file_path)


In [None]:
def make_tiles(
    start_zoom=0,
    end_zoom=14,
    nb_processes=6,
    webviewer="none",
) -> None:
    """
    Makes map tiles from the processed images.
    The tiles are both separate for each city
    and band/class as well as combined to

    Parameters
    ---
        start_zoom : int
            The farthest zoom distance to make tiles for.

        end_zoom : int
            The closest zoom distance to make tiles for.

        nb_processes : int
            The number of threads to use for processing.

        webviewer : {'all', 'google', 'openlayers', 'none'}
            Which webviewer maps to generate preview html files for.

    """
    image_paths: dict[str, list[str]]
    needs_attention_image: set[str]
    needs_attention_osm: set[str]
    image_paths, needs_attention_image, needs_attention_osm = prepare_images(
        download=False
    )

    if needs_attention_image or needs_attention_osm:
        if needs_attention_image:
            print(f"These cities has image problems:\n{needs_attention_image}")

        if needs_attention_osm:
            print(f"These cities has OSM problems:\n{needs_attention_osm}")

        return

    first = True
    for city_name, paths in tqdm(image_paths.items(), desc="Processing"):
        bands: np.ndarray = np.arange(1, 11 + 1)
        for band in bands:
            band_name: str = BANDS[band]
            path: str = "images/processed/{:s}".format(band_name)

            gdal2tiles.generate_tiles(
                "{:s}/{:s}.png".format(path, city_name),
                "export/tiles/separate/{:s}/{:s}/".format(city_name, band_name),
                zoom=[start_zoom, end_zoom],
                resume=True,
                title="SaTreeLight - {:s} - {:s}".format(city_name, band_name),
                nb_processes=nb_processes,
                webviewer=webviewer,
            )

            copy_and_merge_tiles(
                tiles_path="export/tiles/separate/{:s}/{:s}/".format(
                    city_name, band_name
                ),
                export_path="export/tiles/merged/{:s}/".format(band_name),
            )

            if first:
                gdal2tiles.generate_tiles(
                    "{:s}/{:s}.png".format(path, city_name),
                    "export/tiles/merged/{:s}/".format(band_name),
                    zoom=[start_zoom, end_zoom],
                    resume=True,
                    title="SaTreeLight - {:s}".format(band_name),
                    nb_processes=nb_processes,
                    webviewer=webviewer,
                )
        if first:
            first = False


In [None]:
# Not worth further investigation, files get large/unparseable
# This was an attempt at trying to make polygons from the raster
# images, but wasn't found to be feasible.
def polygonize_test():
    from osgeo import gdal, ogr

    raster = gdal.Open("images/processed/vegetation/Akron, Ohio.png")
    band = raster.GetRasterBand(1)

    if not os.path.exists("test/"):
        os.makedirs("test")
    out_file = "test/akron.geojson"
    if os.path.exists(out_file):
        os.remove(out_file)
    driver = ogr.GetDriverByName("GeoJson")
    out_data_source = driver.CreateDataSource(out_file)
    out_layer = out_data_source.CreateLayer("vegetation", srs=None)

    new_field = ogr.FieldDefn("MYFLD", ogr.OFTInteger)
    out_layer.CreateField(new_field)

    gdal.Polygonize(band, None, out_layer, 0, [], callback=None)
