In [None]:
import os
import random

import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import osmnx as ox
import pandas as pd
from geometry_utils import generate_grid
from geometry_utils.gmaps_tiles import GoogleMapsTiles
from shapely.affinity import affine_transform
from shapely.geometry import Point, Polygon, box
from shapely.ops import triangulate
from tqdm.auto import tqdm

In [None]:
def generator_points_in_polygon(polygon):
    areas = []
    transforms = []
    for t in triangulate(polygon):
        areas.append(t.area)
        (x0, y0), (x1, y1), (x2, y2), _ = t.exterior.coords
        transforms.append([x1 - x0, x2 - x0, y2 - y0, y1 - y0, x0, y0])
    while True:
        transform = random.choices(transforms, weights=areas, k=1)[0]
        x, y = [random.random() for _ in range(2)]
        if x + y > 1:
            p = Point(1 - x, 1 - y)
        else:
            p = Point(x, y)
        yield affine_transform(p, transform)


def buildings_in_tile(bbox, crs=None, timeout=180) -> gpd.GeoDataFrame:
    ox.settings.log_console = False
    ox.settings.use_cache = False
    ox.settings.requests_timeout = timeout

    gdf = ox.features.features_from_bbox(bbox=bbox, tags={'building': True})
    if crs is not None:
        gdf = gdf.to_crs(crs)
    return gdf


def proj_bbox(bbox, crs_in, crs_out, buffer=0):
    b = gpd.GeoSeries([bbox.buffer(buffer)], crs=crs_in).to_crs(crs_out)
    return b.total_bounds


def compute_batch(gen, zoom, batch_size, y_threshold=10):
    batch_X, batch_y = [], []
    k = 0
    while len(batch_X) < batch_size:
        try:
            p = next(gen)
            x, y, tile = GoogleMapsTiles.download_numpy(lon=p.x, lat=p.y, zoom=zoom)
            mesh = generate_grid(*np.meshgrid(x, y), crs=GoogleMapsTiles.CRS, return_indices=True)
            mesh['cell_area'] = mesh.geometry.area

            bbox = box(x.min(), y.min(), x.max(), y.max())
            west, south, east, north = proj_bbox(bbox, crs_in=GoogleMapsTiles.CRS, crs_out=4326, buffer=50.)

            bati = buildings_in_tile(bbox=(north, south, east, west)).to_crs(GoogleMapsTiles.CRS)

            overlay = bati.overlay(mesh).dissolve(['iy', 'ix'])
            overlay['frac_bati'] = overlay.geometry.area/overlay['cell_area']
            y = (overlay.reindex(mesh.set_index(['iy', 'ix']).index)['frac_bati'].values.reshape(256, 256) > 0.33).astype(np.uint8)
            if y.sum() >= y_threshold:
                batch_X.append(tile)
                batch_y.append(np.flip(y, axis=0))
        except Exception as e:
            pass
        k += 1
    batch_X = np.array(batch_X)
    batch_y = np.array(batch_y)
    return batch_X, batch_y

In [None]:
CRS = GoogleMapsTiles.CRS

In [None]:
france = gpd.read_file("data/france/FRA_adm1.shp").simplify(0.0001).unary_union

In [None]:
gen = generator_points_in_polygon(polygon=france)

In [None]:
n_batches = 100
batch_size = 32
for zoom in (14, 15, 16):
    os.makedirs(f'data/learning_base/zoom_level_{zoom}/')
    for k in tqdm(range(n_batches)):
        batch_X, batch_y = compute_batch(gen=gen, zoom=zoom, batch_size=batch_size, y_threshold=20)
        np.savez_compressed(f"data/learning_base/zoom_level_{zoom}/batch_{hex(hash(pd.Timestamp.now())).split('0x')[1]}.npz", X=batch_X, y=batch_y)

In [None]:
i0 = random.choice(range(len(batch_X)))
plt.imshow(batch_X[i0])
plt.imshow(np.flip(np.where(batch_y[i0] > 0, 1, np.nan), axis=0))
plt.gca().set_aspect('equal')
plt.gca().axis('off')
plt.show()