In [46]:
import sys
import osmnx as ox
import geopandas as gp
from typing import Optional, Union, Tuple, List, Dict, Any, Iterable, Type
import shapely
from shapely.geometry.base import BaseGeometry
from shapely.geometry import (
    Point,
    MultiPoint,
    LineString,
    MultiLineString,
    Polygon,
    GeometryCollection,
    box,
)
import numpy as np

ox.__version__

'2.0.0b3'

In [16]:
def compute_perimeter(
        coordinates: Tuple[float, float],
        dist: float, 
        crs: str = "EPSG:4326"
    ):
    """
    Create two bounding boxes of side length `dist` around `coordinates`.
    The coordiantes are assumed to be in EPSG:4326 format by default.

    The first bbox has projected linear coordinates and can be used later when processing the geometry.
    The second bbox has coordinates according to `crs` and is used to query the OSM api.
    The perimeter is just the second bounding box as a GeoDataFrame to process it together with other geometry.
    """
    # The EPSG projections use long/lat pairs instead of the traditional lat/long.
    lat = coordinates[0]
    long = coordinates[1]

    # Convert the lat/long coordinates to linear coordinates by projecting according to EPSG:4326
    origin = ox.projection.project_gdf(gp.GeoDataFrame(geometry=[Point(long, lat)], crs=crs))

    x = origin.geometry[0].x
    y = origin.geometry[0].y
    # Define a bounding box on linear coodinates and unproject it again to get a bounding box in lat/long coordinates.
    dist2 = dist/2
    bbox_proj = box(x - dist2, y - dist2, x + dist2, y + dist2)
    perimeter = gp.GeoDataFrame(geometry=[bbox_proj], crs=origin.crs).to_crs(crs)
    bbox = box(*perimeter.geometry[0].bounds)
    return bbox_proj, bbox, perimeter

In [17]:
from dataclasses import dataclass
@dataclass
class Map:
    """
    A collection of GeoDataFrames with a bounding box.
    In the end we intersect all geometry with the bounding box in order to fit into a square picture.
    """
    gdfs: dict[str, gp.GeoDataFrame]
    bbox_proj: Polygon

In [18]:
GRAPH_LAYERS = ["streets", "railway"]

def osm_get_gdf(
        bbox: Polygon,
        layer: str,
        **kwargs
    ) -> Union[gp.GeoDataFrame, None] :
    """
    Get all features/graphs inside an area defined by the bounding box `bbox`.
    Any query parameters for osmnx are directly passed through in `**kwargs`.
    """
    print(f"Get gdf for layer {layer}")
    try:
        if layer in GRAPH_LAYERS:
            G = ox.graph_from_polygon(bbox, **kwargs)
            return ox.graph_to_gdfs(G, nodes=False, edges=True)
        else:
            return ox.features_from_polygon(bbox, **kwargs)
    # Exception thrown on an empty OSM response, e.g. if no feature with this tag exists at that location.
    except:
        return None
    
def osm_get_map(
        layers: Dict[str, dict],
        coordinates: Tuple[float, float],
        dist: float
    ) -> Map:
    """
    Create a Map containing a GeoDataFrame with feature/graph geometry for each layer in `layers`.
    The geometry for each layer will be centered on `coordinates` and fall within a bounding box of side length `dist`.
    """
    bbox_proj, bbox, perimeter = compute_perimeter(coordinates, dist)

    gdfs = {}
    for k, v in layers.items():
        gdf = osm_get_gdf(bbox, k, **v)
        if gdf is not None:
            gdfs[k] = gdf
    gdfs["perimeter"] = perimeter

    return Map(gdfs, bbox_proj)


In [47]:
def dilate_graph(
        gdf: gp.GeoDataFrame,
        width: Union[Dict[str, float], float]
    ) -> gp.GeoDataFrame:
    """
    Given a GeoDataFrame containing a graph (street/railway network),
    turn all LineStrings representing streets/railways into polygon strips by dilating by 'width'.
    """

    # Annotate GeoDataFrame with the width for each graph type
    def get_width(v):
        if isinstance(v, list):
            for x in v:
                if w := width.get(x):
                    return w
            return 1
        else:
            return width.get(v, 1)
                
    # Only the streets layer supports different widths.
    # TODO We also query the railway layer by different values (tram/subway/etc.) but these values are not preserved in the geodataframe.
    if isinstance(width, dict) and hasattr(gdf, "highway"):
        gdf["width"] = gdf.highway.map(get_width)
    else:
        gdf["width"] = width

    def dilate(row):
        if type(row.geometry) [LineString, MultiLineString, Point, MultiPoint]:
            return row.geometry.buffer(row.width, cap_style="flat")
        else:
            return row.geometry
    gdf.geometry = gdf.apply(dilate, axis=1)

    return gdf

def scale_geometry(
        geometry: BaseGeometry,
        bbox_src: Polygon,
        bbox_dst: Polygon,
    ) -> BaseGeometry:
    """
    Project a geometry from one frame of reference defined by a bounding box `bbox_src` to another defined by `bbox_dst`.
    """
    minx1, miny1, maxx1, maxy1 = bbox_src.bounds
    minx2, miny2, maxx2, maxy2 = bbox_dst.bounds
    
    # 1. Move everything to the origin.
    # 2. Scale from the size of `bbox_src` to the size of `bbox_dst`.
    # 3. Move everything into `bbox_dst`.
    g = shapely.affinity.translate(geometry, -minx1, -miny1)
    g = shapely.affinity.scale(g, (maxx2 - minx2) / (maxx1 - minx1), (maxy2 - miny2) / (maxy1 - miny1), origin=(0, 0))
    g = shapely.affinity.translate(g, minx2, miny2)
    return g

def classify_shapes(
        geometry: BaseGeometry
    ) -> Dict[Type, List[Union[Polygon, LineString, Point]]]:
    """
    Flatten a hierarchy of geometries (i.e. GeometryCollection, MultiPolygon, MultiLineString)
    into a dictionary of simple shapes (Polygon, LineString, Point).
    
    TODO: since we dilate all points and linestrings into polygons we should actually have only polygons left. 
    But I'm not sure if the geometry operations afterwards could result in new lines/points being created so leave it in for now.
    """
    # Stack of geometries that we still need to process.
    stack: List[BaseGeometry] = [geometry]
    shapes = {
        Polygon: [],
        LineString: [],
        Point: []
    }

    while len(stack) > 0:
        shape = stack.pop()
        # case for nested geometries like GeometryCollection, MultiPolygon, MultiLineString etc.
        if hasattr(shape, 'geoms'):
            stack.extend(shape.geoms)
        elif type(shape) in shapes:
            shapes[type(shape)].append(shape)
        else:
            print(f"Unknown shape: {shape}", file=sys.stderr)
    return shapes

def shapely_of_gdf(
    gdf: gp.GeoDataFrame,
    bbox_src: Polygon,
    size: int,
    width: Union[Dict[str, float], float],
    overdraw: bool = False,
) -> BaseGeometry:
    """
    Convert a single GeoDataFrame `gdf` to a shapely geometry and scale it to fit into a bounding box of side length `size` and anchored at the origin.
    Any line & point geometries will be dilated into polygon stripes by using `width`. 
    By default, the geometry will be intersected with the destination bounding box to only keep geometry that is inside.
    """

    gdf = ox.projection.project_gdf(gdf)
    gdf = dilate_graph(gdf, width=width)
    
    bbox_dst = box(0, 0, size, size)

    geometry = GeometryCollection(geoms=gdf.geometry.tolist())
    geometry = shapely.ops.unary_union(geometry)
    geometry = scale_geometry(geometry, bbox_src, bbox_dst)
    geometry = shapely.ops.unary_union(geometry)

    if not overdraw:
        geometry = bbox_dst.intersection(geometry)

    return geometry

In [48]:
import svg

def svg_of_poly(
        poly: Polygon,
        palette: Optional[List[str]] = None,
        **kwargs
    ) -> List[svg.Element]:
    """
    Create an SVG polygon from a shapely polygon.
    For a polygon without interior rings (i.e. holes) we can directly use the svg polygon constructor.
    If there are holes we define svg paths from the linear rings. The "evenodd" fill rule ensures that 
    the inside of a hole is not filled in since there are two paths between the hole and the outside
    area of the polygon.
    """
    if len(poly.interiors) > 0:
        # start with a "moveto" and then "lineto" for all further coordinates.
        def path_of_coords(r):
            return [svg.M(*r[0])] + [svg.L(*c) for c in r[1:]]

        ds = path_of_coords(poly.exterior.coords)
        for interior in poly.interiors:
            ds += path_of_coords(interior.coords)

        return [
            svg.Path(
                d=ds,
                fill_rule="evenodd",
                stroke=kwargs["ec"] if "ec" in kwargs else "black",
                fill=(
                    kwargs["fc"]
                    if "fc" in kwargs
                    else np.random.choice(palette) if palette else "black"
                ),
                stroke_width=kwargs["lw"] if "lw" in kwargs else 1,
            )
        ]
    else:
        return [
            svg.Polygon(
                points=[*poly.boundary.coords],
                stroke=kwargs["ec"] if "ec" in kwargs else "black",
                fill=(
                    kwargs["fc"]
                    if "fc" in kwargs
                    else np.random.choice(palette) if palette else "black"
                ),
                stroke_width=kwargs["lw"] if "lw" in kwargs else 1,
            )
        ]

def svg_of_shapely(
    geometry: BaseGeometry,
    palette: Optional[List[str]] = None,
    **kwargs,
) -> List[svg.Element]:
    """
    Translate shapely geometries into their equivalent SVG element and add styling.
    """
    shapes = classify_shapes(geometry) 
    elements = []
    for poly in shapes[Polygon]:
        elements += svg_of_poly(poly, palette=palette, **kwargs)
    for line in shapes[LineString]:
        print("WARN: LineString remains in output geometry which should have been dilated into a Polygon", file=sys.stderr)
        elements.append(
            svg.Polyline(
                points=[*line.coords],
                stroke=kwargs["ec"] if "ec" in kwargs else "black",
                stroke_width=kwargs["lw"] if "lw" in kwargs else 1,
            )
        )
    for point in shapes[Point]:
        elements.append(
            svg.Circle(
                cx=point.x, 
                cy=point.y,
                r=1,
                fill=(
                    kwargs["fc"]
                    if "fc" in kwargs
                    else np.random.choice(palette) if palette else "black"
                ),
                stroke=kwargs["ec"] if "ec" in kwargs else "black",
                stroke_width=kwargs["lw"] if "lw" in kwargs else 1,
            )
        )

    return elements
    
def svg_of_gdf(
    layer: str,
    map: Map,
    size: int,
    palette: Optional[List[str]] = None,
    width: Union[Union[dict, float], float] = 1,
    **kwargs,
) -> List[svg.Element]:
    """
    Create SVG elements from the GeoDataFrame of a single Map layer.
    First extract the shapely geometries from the gdf and then map them 
    to the closest equivalent in SVG.
    """
    geometry = shapely_of_gdf(map.gdfs[layer], map.bbox_proj, width=width, size=size)
    # SVG y axis grows downwards so mirror y axis at center.
    geometry = shapely.ops.transform(lambda x, y: (x, size - y), geometry)

    if (palette is None) and ("fc" in kwargs) and (type(kwargs["fc"]) != str):
        palette = kwargs.pop("fc")

    elements = svg_of_shapely(geometry, palette, **kwargs)
    return elements

In [43]:
def generate_svg(
        map: Map,
        filepath: str,
        size: int,
        styles: Dict[str, dict],
        css_prefix = "mapviz"
    ):
    """
    Generate an SVG image from a `map` object and save it to the given `filepath`.
    The internal coordinate system will go from 0 to `size`.
    Each layer is styled according to `styles[layer]`.
    All SVG elements will be tagged with a CSS class with prefix `css_prefix`.
    """
    elements = []
    # Go through the layers by zorder since later elements in an SVG are drawn on top of earlier elements.
    for layer in sorted(styles, key=lambda style: styles[style].get("zorder", 0)):
        if layer in map.gdfs:
            layer_elements = svg_of_gdf(layer, map, size=size, **styles[layer])

            # We tag all elements in a layer with a CSS class to identify them later.
            for e in layer_elements:
                e.class_ = f"{css_prefix}-{layer}"
            elements += layer_elements

    img = svg.SVG(
            # completely fill the parent HTML element
            width="100%",
            height="100%",
            # defining the internal viewport as `size` so that the whole image is scaled to fit into the SVG element's viewport.
            viewBox=f"0 0 {size} {size}",
            class_=f"{css_prefix}-map",
            elements=elements)
    with open(filepath, "w") as f:
        f.write(str(img))

In [24]:
street_types = [
    'motorway', 'motorway_link', 'trunk', 'trunk_link', 'primary', 'primary_link', 
    'secondary', 'secondary_link', 'tertiary', 'tertiary_link', 'residential', 'service',
    'unclassified', 'living_street'
]
street_widths = {
    "motorway": 5,
    "motorway_link": 5,
    "trunk": 5,
    "trunk_link": 5,
    "primary": 4.5,
    "primary_link": 4.5,
    "secondary": 4,
    "secondary_link": 4,
    "tertiary": 3.5,
    "tertiary_link": 3.5,
    "residential": 3,
    "service": 2,
    "unclassified": 1,
    "living_street": 1,
}

rail_types = ["rail", "light_rail", "subway", "funicular", "monorail", "narrow_gauge", "tram"]

layers = {
    "streets": {
        "custom_filter": f'["highway"~"{"|".join(street_types)}"]',
        "retain_all": True, 
        "truncate_by_edge": True
    },
    "railway": {
        "custom_filter": f'["railway"~"{"|".join(rail_types)}"]',
        "retain_all": True, 
        "simplify": False, 
        "truncate_by_edge": True, 
    },
    "building": {
        "tags": {    
            "building": True,
            "landuse": "construction",
            # TODO move that and temples to a special category
            "historic": "monument",
        },
    },
    "water": {
        "tags": {    
            "natural": [
                "water",
                "bay"
            ]
        }
    },
    "forest": {
        "tags": {    
            "landuse": "forest"
        }
    },
    "green": {
        "tags": {    
            "landuse": [
                "grass",
                "orchard",
            ],
            "natural": [
                "island",
                "scrub",
                "wood",
            ],
            "leisure": [
                "park",
                "pitch",
            ],
            "amenity": "grave_yard",
        }
    },
    "beach": {
        "tags": {    
            "natural": "beach"
        }
    },
    "parking": {
        "tags": {    
            "amenity": "parking",
            "highway": "pedestrian",
            "man_made": "pier",
            "place": "square",
        }
    }
}

In [25]:

styles = {
    'perimeter': {'fc': '#091833', 'ec': '#dadbc1', 'zorder': -1},
    'parking': {'fc': '#ea00d9', 'ec': '#2F3737', 'lw': 0.1, 'zorder': 1},
    'green': {'fc': '#089a00', 'ec': '#2F3737', 'lw': 1, 'zorder': 0},
    'forest': {'fc': '#089a00', 'ec': '#2F3737', 'lw': 1, 'zorder': 0},
    'water': {'fc': '#00b9ff', 'ec': '#711c91', 'lw': 1, 'zorder': 2},
    'streets': {'fc': '#8bf4f7', 'ec': '#0fcdf6', 'lw': 0, 'zorder': 3, "width": street_widths},
    'railway': {'fc': '#f7f132', 'ec': '#f7f132', 'lw': 0, 'zorder': 2.5, "width": 2},
    'building': {'palette': ['#2900c1', '#FF5E5B'], 'ec': '#2F3737', 'lw': .1, 'zorder': 4},
}


In [44]:
# get the place boundaries
places = {
    "Tokyo": (35.67694, 139.76384),
    "Sendai": (38.266224, 140.871838),
    "Napoli": (40.848923, 14.250147),
    "Berlin": (52.51702, 13.40178),
}

# Customize this
placename = "Napoli"
dist = 400

coordinates = places[placename]

In [45]:
import os.path
MAP_DIR="../maps/"
SIZE=2048

map = osm_get_map(layers, coordinates, dist)
generate_svg(map, os.path.join(MAP_DIR, f"{placename}-{dist}.svg"), SIZE, styles)

Get gdf for layer streets
Get gdf for layer railway
Get gdf for layer building
Get gdf for layer water
Get gdf for layer forest
Get gdf for layer green
Get gdf for layer beach
Get gdf for layer parking
