In [19]:
import gzip
import hashlib
import os.path
import re
import shutil
import tempfile
import urllib.parse
from dataclasses import dataclass
from enum import UNIQUE, StrEnum, auto, verify

import geopandas as gpd
import requests

import config

In [20]:
config.config()

In [21]:
def validate_io_job(filename: str, overwrite: bool) -> None:
    if os.path.isfile(filename) and not overwrite:
        raise ValueError()


@verify(UNIQUE)
class BAG3DLevelOfDetail(StrEnum):
    LoD12 = auto()
    LoD13 = auto()
    LoD22 = auto()


def read_tile(
    tile_id: str, lod: BAG3DLevelOfDetail, to_2d: bool = True
) -> gpd.GeoDataFrame:
    return gpd.read_file(f"{tile_id}.gpkg", layer=f"{lod}_2d", force_2d=to_2d)


@dataclass
class BAG3DTileAssetInfo:
    tid: list[str]
    url: list[str]


@dataclass
class BAG3DTileAssetManifest:
    img: BAG3DTileAssetInfo
    lidr: BAG3DTileAssetInfo

    def save(self, filename: str, overwrite: bool = False) -> None:
        validate_io_job(filename, overwrite)
        raise NotImplementedError


@verify(UNIQUE)
class BAG3DDataFormat(StrEnum):
    CityJSON = "city.json"
    GeoPackage = "gpkg"
    WavefrontObjectFile = "obj"


class BAG3DTileStore:
    """Wrapper around the 3DBAG sheet index, allowing relevant data downloads and lookups."""

    _BASE_URL = "https://data.3dbag.nl/"
    # TODO: Narrow this down
    _VALID_TILE = re.compile(r"^\d{1,2}/\d{3,4}/\d{2,4}$")

    def __init__(self, version: str = "v2024.02.28") -> None:
        """Initialize the store.

        This process connects to the 3DBAG data servers and mounts its sheet index to memory as a network file.

        Args:
            version: The 3DBAG version to look up. Defaults to the current RoofSense version if not explicitly specified.
        """
        self._ver = version
        self._init_version()

    @property
    def index(self) -> gpd.GeoDataFrame:
        """The sheet index currently mounted to memory."""
        return self._index

    @property
    def version(self) -> str:
        """The currently indexed version of the 3DBAG."""
        return self._ver

    @version.setter
    def version(self, version: str) -> None:
        self._ver = version
        self._init_version()

    def download_index(
        self, filename: str | None = None, overwrite: bool = False, **kwargs
    ):
        validate_io_job(filename, overwrite)
        self._index.to_file(filename, **kwargs)

    # TODO: Make downloads optional if possible.
    def download_tile(
        self,
        tile_id: str,
        filename: str | None = None,
        format: BAG3DDataFormat = BAG3DDataFormat.GeoPackage,
        checksum: bool = False,
        overwrite: bool = False,
        **kwargs,
    ) -> None:
        filename = (
            f"{tile_id.replace('/', '-')}.{format}" if filename is None else filename
        )

        # Handle CityJSON formats.
        if format == BAG3DDataFormat.CityJSON:
            format = "".join([ext[0] for ext in format.split(".")])

        self._validate_tile_id(tile_id)
        validate_io_job(filename, overwrite)

        # TODO: Check that this works.
        match = self._index.loc[self._index.tile_id == tile_id]

        url: str = match[f"{format}_download"].iat[0]
        if checksum:
            true_sha: str = match[f"{format}_sha256"].iat[0]
            curr_sha = hashlib.sha256()
        with requests.get(url=url, **kwargs) as r:
            r.raise_for_status()
            with tempfile.NamedTemporaryFile(
                mode="wb",
                # https://stackoverflow.com/questions/23212435/permission-denied-to-write-to-my-temporary-file
                delete=False,
            ) as temp:
                for chunk in r.iter_content(  # Write the file as it arrives.
                    chunk_size=None
                ):
                    temp.write(chunk)
                    if checksum:
                        curr_sha.update(chunk)
        if checksum:
            if curr_sha.hexdigest() != true_sha:
                raise RuntimeError()
        with gzip.open(temp.name, mode="rb") as src, open(filename, mode="wb") as dst:
            shutil.copyfileobj(src, dst)
        os.unlink(temp.name)

    def sample_tile(self) -> gpd.GeoSeries:
        raise NotImplementedError
        # seed_pt = self._seeds.sample(random_state=self._rng)[
        #     config.var("DEFAULT_GM_FIELD_NAME")
        # ]
        # tile_pt = gpd.GeoDataFrame(
        #     {
        #         config.var("DEFAULT_ID_FIELD_NAME"): [0],
        #         config.var("DEFAULT_GM_FIELD_NAME"): [
        #             self._gen_random_point(seed_pt)
        #         ],
        #     },
        #     crs=config.var("CRS"),
        # )
        # # NOTE: The point can be positioned on the interface of two or more
        # #       adjacent tiles.
        # tile_ids = self._index.overlay(tile_pt, keep_geom_type=False)["tid"]

    # TODO: This can be its own function.
    def tile_assets(
        self, tile_id: str, image_index: gpd.GeoDataFrame, lidar_index: gpd.GeoDataFrame
    ) -> BAG3DTileAssetManifest:
        """Find the image and point cloud assets of a tile.

        Args:
            tile_id: The tile ID.
            image_index: The image sheet index.
            lidar_index: The point cloud sheet index.

        Warnings:
            The image and point cloud indices must contain at least a tile ID and a corresponding download URL column, named ``tid`` and ``url``, respectively.

        Returns: The corresponding tile asset manifest.

        """
        tile = read_tile(
            tile_id,
            # NOTE: We use LoD2.2 because the others include some sort of ground layer so buildings with large atriums are not covered correctly because the gaps are not excluded. this is also true for all 3d layers so the only viable option is lod22_2d. tile 9-752-48 showcases this
            lod=BAG3DLevelOfDetail.LoD22,
        )

        image_matches = image_index.overlay(tile)
        lidar_matches = lidar_index.overlay(tile)

        return BAG3DTileAssetManifest(
            img=BAG3DTileAssetInfo(
                tid=image_matches.tid.unique(), url=image_matches.url.unique()
            ),
            lidr=BAG3DTileAssetInfo(
                tid=lidar_matches.tid.unique(), url=lidar_matches.url.unique()
            ),
        )

    def _init_version(self) -> None:
        url = urllib.parse.urljoin(
            self._BASE_URL, f"{self._ver.replace('.','')}/tile_index.fgb"
        )
        self._index = gpd.read_file(url)

    def _validate_tile_id(self, tile_id: str) -> None:
        if (
            not self._VALID_TILE.fullmatch(tile_id)
            or tile_id not in self._index.tile_id.values
        ):
            raise ValueError()

In [22]:
BAG3DTileStore().download_tile("9/284/556")

https://data.3dbag.nl/v20240228/tile_index.fgb
https://data.3dbag.nl/v20240228/tiles/9/284/556/9-284-556.gpkg.gz
