In [3]:
# Imports
import logging

import matplotlib.pyplot as plt
import rasterio
from rasterio.windows import from_bounds
import pathlib
from pathlib import Path
from PIL import Image
import numpy as np
from pyproj import Transformer
import re

from typing import Dict, List

In [4]:
DATASET_DIRECTORY = "/Users/ksafiullin/src/geospatial-data-processing/data/orthophotos/nw"

In [5]:
log = logging.getLogger(__name__)
logging.basicConfig(format="[%(asctime)s] [%(levelname)s] %(message)s", level=logging.DEBUG)

In [6]:
def validate_input(latitude: float, longitude: float, radius: float = 100) -> None:
    logging.info(
        f"Incoming latitude '{latitude}' and longitude '{longitude}' with set radius '{radius}'."
    )

    if not -90 < latitude < 90 and not -180 < longitude < 180:
        error_message = (
            "Programm works with `EPSG:25832`. Looks like provided input data is in"
            "Geographic CRS (`EPSG:4326`). If it's not the case, ignore the warning!"
        )
        log.error(error_message)
        raise ValueError(error_message)

    if radius < 0 or radius > 100:
        log.warning(
            f"Radius expected to be from 0 meters till 100 meters. Received {radius} meters."
        )

In [7]:
def parse_tile_metadata(image_filepath: pathlib.Path) -> Dict[str,int]:
    """Extract the tile's bounding box in EPSG:25832 from provided dataset sample of .jp2 files."""
    pattern = r"dop10rgbi_32_(\d+)_(\d+)_1_nw_\d{4}\.jp2"
    match = re.match(pattern, image_filepath.name)
    if not match:
        raise ValueError(
            f"Incorrect filepath. Got {image_filepath},"
            "expected 'dop10rgbi_32_<easting>_<northing>_1_nw_<year>.jp2'"
        )
    
    x_km = int(match.group(1))
    y_km = int(match.group(2))
    
    x_min = x_km * 1000
    x_max = x_min + 999
    y_min = y_km * 1000
    y_max = y_min + 999

    return {
        "x_min": x_min,
        "x_max": x_max,
        "y_min": y_min,
        "y_max": y_max
    }

In [95]:
# [2025-01-12 18:48:43,402] [DEBUG] tile_bbox (468000, 5772000, 468999, 5772999)
# [2025-01-12 18:48:43,388] [DEBUG] query bbox: (5772399.999999996, 468399.99999999336, 5772599.999999996, 468599.99999999336)

def is_bbox_intersects(tile_bbox, query_bbox) -> bool:
    """Return boolean if tile image and final generated image overlap or not."""
    (title_bbox_left, title_bbox_bottom, title_box_right, title_box_top) = tile_bbox
    (query_bbox_left, query_bbox_bottom, query_bbox_right, query_bbox_top) = query_bbox
    
    log.debug(f"tile_bbox {tile_bbox}")
    if title_box_right < query_bbox_left or title_bbox_left > query_bbox_right:
        return False
    if title_box_top < query_bbox_bottom or title_bbox_bottom > query_bbox_top:
        return False
    
    return True

# is_bbox_intersects(
#     tile_bbox=(468000, 5772000, 468999, 5772999),
#     query_bbox=(5772399.999999996, 468399.99999999336, 5772599.999999996, 468599.99999999336)
# )

[2025-01-12 18:52:47,182] [DEBUG] tile_bbox (468000, 5772000, 468999, 5772999)


False

In [None]:
def get_paths_of_required_images(
    x_left: float,
    y_bottom: float,
    x_right: float,
    y_top: float,
    dataset_directory_full_path: pathlib.Path,
) -> List[pathlib.Path]:
    query_bbox = (x_left, y_bottom, x_right, y_top)
    log.debug(f"query bbox: {query_bbox}")
    relevant_tiles = []
    for jp2_file in dataset_directory_full_path.glob("*.jp2"):
        meta = parse_tile_metadata(jp2_file)
        if not meta:
            error_message = "Something goes wrong with parsing image"
            log.error(error_message)
            raise Exception(error_message)

        tile_bbox = (meta["x_min"], meta["y_min"], meta["x_max"], meta["y_max"])
        log.debug(f"tile_bbox {tile_bbox}")
        if is_bbox_intersects(tile_bbox, query_bbox):
            relevant_tiles.append(meta["file_path"])
        break

    logging.info(f"Found {len(relevant_tiles)} tile(s) intersecting the query area.")
    return relevant_tiles

In [None]:
wgs84_to_utm32 = Transformer.from_crs("EPSG:4326", "EPSG:25832", always_xy=True)

def get_image(latitude: float, longitude: float, radius: float = 100, dataset_directory: str = "") -> Image.Image:
    """Generate a 256x256 pixels Image for the input (latitude, longitude) in EPSG:4326,
    with a given radius in meters.

    - 'latitude' and 'longitude' must be in 'EPSG:4326' format.
    - 'radius' value expected to be '0' till '100' meters.
    - 'dataset_directory' - orthophotos directory full path e.g. ''

    Steps:
      1) Verify incoming data is on EPSG:4326 format.
      2) Compute bounding box: (left, bottom, right, top).
      3) Find all relevant tiles that intersect with this bounding box.
      4) Partially read from each tile.
      5) Concatenate the partial reads into one final image.
      6) Crop/resize to 256x256.
    """
    validate_input(latitude=latitude, longitude=longitude, radius=radius)

    log.info(
        "Converting latitude and longitude from 'EPSG:4326' to 'EPSG:25832' format."
    )
    x_meters, y_meters = wgs84_to_utm32.transform(xx=latitude, yy=longitude)
    log.debug(
        f"Converting latitude {latitude} -> to x_meters {x_meters}. Longitude {longitude}' degree -> to ty_meters {y_meters} "
    )
    left = x_meters - radius
    bottom = y_meters - radius
    right = x_meters + radius
    top = y_meters + radius
    dataset_directory = Path(DATASET_DIRECTORY) if not dataset_directory else Path(dataset_directory)

    log.debug(f"dataset_directory {dataset_directory}, left {left}, bottom {bottom}, right {right},top {top} ")
    image_tile_paths = get_paths_of_required_images(left, bottom, right, top, dataset_directory)
    #TODO: consider case where it's end of dataset and I need to add black images.

    print(image_tile_paths)
    # for image_tile in image_tile_paths:
    #     ...
        

In [105]:
# There is no Munster in sample dataset. Most probably compaby will test it on their dataset. Well lets try another data then.
# get_image(51.962291, 7.626426, 100) 

In [122]:
# Lets covert back EPSG:25832 to EPSG:4326 to test my code =)
# I will pick `dop10rgbi_32_468_5772_1_nw_2022.jp2` middle aka 5772500, 468500

t = Transformer.from_crs("EPSG:25832", "EPSG:4326", always_xy=True)
latitude, longitude,   = t.transform(xx=468500, yy=5772500)

log.debug(
    f"Convert EPSG:25832 to EPSG:4326 \n Easting X meters -> to  latitude X {latitude} degree \n"
    f" Northing Y  meters -> longitude Y {longitude} degrees to test on the sample"
)


[2025-01-12 23:20:32,170] [DEBUG] Convert EPSG:25832 to EPSG:4326 
 Easting X meters -> to  latitude X 8.54010563577907 degree 
 Northing Y  meters -> longitude Y 52.10215462837978 degrees to test on the sample


In [120]:
# get_image(longitude=longitude, latitude=latitude, radius=100)
get_image(latitude=51.96, longitude=7.62, radius=100)

[2025-01-12 19:06:58,236] [INFO] Incoming latitude '51.96' and longitude '7.62' with set radius '100'.
[2025-01-12 19:06:58,237] [INFO] Converting latitude and longitude from 'EPSG:4326' to 'EPSG:25832' format.
[2025-01-12 19:06:58,238] [DEBUG] Converting latitude 51.96 -> to x_meters 405180.02992657083. Longitude 7.62' degree -> to ty_meters 5757488.725332315 
[2025-01-12 19:06:58,238] [DEBUG] dataset_directory /Users/ksafiullin/src/geospatial-data-processing/data/orthophotos/nw, left 405080.02992657083, bottom 5757388.725332315, right 405280.02992657083,top 5757588.725332315 
[2025-01-12 19:06:58,238] [DEBUG] query bbox: (405080.02992657083, 5757388.725332315, 405280.02992657083, 5757588.725332315)
[2025-01-12 19:06:58,239] [DEBUG] tile_bbox (463000, 5773000, 463999, 5773999)
[2025-01-12 19:06:58,240] [DEBUG] tile_bbox (462000, 5772000, 462999, 5772999)
[2025-01-12 19:06:58,240] [DEBUG] tile_bbox (467000, 5768000, 467999, 5768999)
[2025-01-12 19:06:58,240] [DEBUG] tile_bbox (466000, 

[]
