In [12]:
import os
import requests
import dataclasses
import uuid
from objects import Coordinate, Shapes
import json
from mapping import map_to_usage_type, map_to_highway_type, map_to_building_type
import geopy.distance

In [13]:
city_name = "Bremen"
city_name = "Balgstädt"
city_name = "Freyburg (Unstrut)"
city_name = "Berlin"
city_name = "Potsdam"

In [14]:
def query_land_use(city_name):
    # Define the Overpass API query to get land use data for the specified city
    overpass_url = "http://overpass-api.de/api/interpreter"
    overpass_query = f"""
        [out:json];
        area[name="{city_name}"]->.searchArea;
        (
            way(area.searchArea)["landuse"];
            relation(area.searchArea)["landuse"];
        );
        out center;
    """

    # Send the request to Overpass API
    response = requests.post(overpass_url, data=overpass_query)

    # Check if the request was successful
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print("Error occurred while fetching data.")
        return None

In [15]:
def query_buildings(city_name):
    # Define the Overpass API query to get land use data for the specified city
    overpass_url = "http://overpass-api.de/api/interpreter"
    overpass_query = f"""
        [out:json];
        area[name="{city_name}"]->.searchArea;
        (
          way["building"](area.searchArea);
        );
        out body;
        >;
        out skel qt;
    """

    # Send the request to Overpass API
    response = requests.post(overpass_url, data=overpass_query)

    # Check if the request was successful
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print("Error occurred while fetching data.")
        return None

In [16]:
def query_highways(city_name):
    # Define the Overpass API query to get land use data for the specified city
    overpass_url = "http://overpass-api.de/api/interpreter"
    overpass_query = f"""
        [out:json];
        area[name="{city_name}"]->.searchArea;
        (
          way(area.searchArea)["highway"];
        );
        out center;
    """

    # Send the request to Overpass API
    response = requests.post(overpass_url, data=overpass_query)

    # Check if the request was successful
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print("Error occurred while fetching data.")
        return None

In [17]:
def get_nodes(node_ids):
    # Convert the list of node IDs into a string for the Overpass API query
    ids_str = ",".join(str(node_id) for node_id in node_ids)
    
    # Define the Overpass API query to get nodes based on their IDs
    overpass_url = "http://overpass-api.de/api/interpreter"
    overpass_query = f"""
        [out:json];
        node(id:{ids_str});
        out;
    """

    # Send the request to Overpass API
    response = requests.post(overpass_url, data=overpass_query)

    # Check if the request was successful
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print("Error occurred while fetching data.")
        return None

In [18]:
def get_relation_geom(relation):
    # Define the Overpass API query to get nodes based on their IDs
    overpass_url = "http://overpass-api.de/api/interpreter"
    overpass_query = f"""
        [out:json];
        relation({relation});
        out geom;
    """

    # Send the request to Overpass API
    response = requests.post(overpass_url, data=overpass_query)

    # Check if the request was successful
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print("Error occurred while fetching data.")
        return None

In [19]:
areas = query_land_use(city_name)
buildings = query_buildings(city_name)
highways = query_highways(city_name)

# Preprocess the data

In [20]:
all_nodes = {}

## Areas

In [21]:
elements = areas.get("elements", [])
coordinates_tmp = {}
for element in elements:
    type = element.get("type")
    is_relation = type == "relation"
    if not is_relation:
        nodes = element.get("nodes", [])
        for node in nodes:
            coordinates_tmp[node] = Coordinate(lat=0, lon=0)
            all_nodes[node] = Coordinate(lat=0, lon=0)

coordinate_list = list(coordinates_tmp.keys())
coordinates_list = get_nodes(coordinate_list)
coordinate_ids_map = {}
for coordinate in coordinates_list.get("elements", []):
    node_id = coordinate.get("id")
    lat = coordinate.get("lat")
    lon = coordinate.get("lon")
    coordinates_tmp[node_id] = Coordinate(lat=lat, lon=lon)
    all_nodes[node_id] = Coordinate(lat=lat, lon=lon)
coordinates = coordinates_tmp

In [22]:
area_types = set()
area_objects = {}
elements = areas.get("elements", [])
for element in elements:
    type = element.get("type")
    is_relation = type == "relation"
    if not is_relation:
        landuse = element.get("tags", {}).get("landuse", "unknown")
        nodes = element.get("nodes", [])
        geometry = [coordinates[node] for node in nodes]
        landuse_id = uuid.uuid4()
        type = map_to_usage_type(landuse)
        area_types.add(type)
        landuse_obj = Shapes(id=landuse_id, type=type, geometry=geometry, open=False)
        area_objects[landuse_id] = landuse_obj
    else:
        relation_id = element.get("id")
        relation_data = get_relation_geom(relation_id)
        rel_element = relation_data.get("elements", [])[0]
        members = rel_element.get("members", [])
        outer_members = [member for member in members if member.get("role") == "outer"]
        member_geometries = []
        for member in outer_members:
            member_geometry = []
            geometry = member.get("geometry", [])
            for node in geometry:
                lat = node.get("lat")
                lon = node.get("lon")
                member_geometry.append(Coordinate(lat=lat, lon=lon))
            member_geometries.append(member_geometry)
            
        if len(member_geometries) == 0:
            continue
        
        # merge the geometry, so that we have a single geometry, they should be merged by the following:
        # 1. take the first member
        # 2. take the last node of the first member
        # 3. take all first and last nodes of the other members
        # 5. find the closest node to the last node of the first member
        
        def calculate_distance(node1, node2):
            lat1 = node1.lat
            lon1 = node1.lon
            lat2 = node2.lat
            lon2 = node2.lon
            return geopy.distance.distance((lat1, lon1), (lat2, lon2)).m
        final = member_geometries[0]
        while len(member_geometries) > 1:
            first_member = member_geometries[0]
            ref_node = first_member[-1]
            member_idx = None
            closest_distance = float("inf")
            direction = None
            for idx, member in enumerate(member_geometries[1:]):
                first_node = member[0]
                last_node = member[-1]
                distance_first = calculate_distance(ref_node, first_node)
                distance_last = calculate_distance(ref_node, last_node)
                if distance_first < closest_distance:
                    closest_distance = distance_first
                    member_idx = idx + 1
                    direction = "first"
                if distance_last < closest_distance:
                    member_idx = idx + 1
                    direction = "last"
                    
            if direction == "first":
                final.extend(member_geometries[member_idx])
            else:
                final.extend(member_geometries[member_idx][::-1])
                
            member_geometries.pop(member_idx)
                
            
        geometry = final
        type = rel_element.get("tags", {}).get("landuse", "unknown")
        landuse_id = uuid.uuid4()
        type = map_to_usage_type(type)
        area_types.add(type)
        landuse_obj = Shapes(id=landuse_id, type=type, geometry=geometry, open=False)
        area_objects[landuse_id] = landuse_obj

# get highways

In [23]:
elements = highways.get("elements", [])
coordinates_tmp = {}
for element in elements:
    type = element.get("type")
    is_relation = type == "relation"
    if not is_relation:
        nodes = element.get("nodes", [])
        for node in nodes:
            coordinates_tmp[node] = Coordinate(lat=0, lon=0)
            all_nodes[node] = Coordinate(lat=0, lon=0)

coordinate_list = list(coordinates_tmp.keys())
coordinates_list = get_nodes(coordinate_list)
coordinate_ids_map = {}
for coordinate in coordinates_list.get("elements", []):
    node_id = coordinate.get("id")
    lat = coordinate.get("lat")
    lon = coordinate.get("lon")
    coordinates_tmp[node_id] = Coordinate(lat=lat, lon=lon)
    all_nodes[node_id] = Coordinate(lat=lat, lon=lon)
coordinates = coordinates_tmp

In [24]:
highways = {}
highway_types = set()
for element in elements:
    street = element.get("tags", {}).get("highway", "unknown")
    nodes = element.get("nodes", [])
    geometry = [coordinates[node] for node in nodes]
    highway_id = uuid.uuid4()
    type = map_to_highway_type(street)
    highway_types.add(type)
    highway_obj = Shapes(id=highway_id, type=type, geometry=geometry, open=True)
    highways[highway_id] = highway_obj
    all_nodes.update({node: coordinates[node] for node in nodes})

## Buildings

In [25]:
elements = buildings.get("elements", [])
coordinates_tmp = {}
for element in elements:
    type = element.get("type")
    nodes = element.get("nodes", [])
    for node in nodes:
        coordinates_tmp[node] = Coordinate(lat=0, lon=0)
        all_nodes[node] = Coordinate(lat=0, lon=0)
        
        
coordinate_list = list(coordinates_tmp.keys())
coordinates_list = get_nodes(coordinate_list)
coordinate_ids_map = {}
for coordinate in coordinates_list.get("elements", []):
    node_id = coordinate.get("id")
    lat = coordinate.get("lat")
    lon = coordinate.get("lon")
    coordinates_tmp[node_id] = Coordinate(lat=lat, lon=lon)
    all_nodes[node_id] = Coordinate(lat=lat, lon=lon)
coordinates = coordinates_tmp

In [27]:
def parse_float(value, default):
    try:
        return float(value)
    except:
        return default

In [28]:
building_types = set()
building_meta = {}
building_objects = {}
elements = buildings.get("elements", [])
for element in elements:
    type = element.get("type")
    is_relation = type == "relation"
    if not is_relation:
        buildingType = element.get("tags", {}).get("building", "unknown")
        levels = element.get("tags", {}).get("building:levels", 3)
        roof_levels = element.get("tags", {}).get("roof:levels", 0)
        total_levels = parse_float(levels, 3) + parse_float(roof_levels, 0)
        nodes = element.get("nodes", [])
        geometry = [coordinates[node] for node in nodes]
        building_id = uuid.uuid4()
        buildingType = map_to_building_type(buildingType)
        building_types.add(buildingType)
        building_obj = Shapes(id=building_id, type=buildingType, geometry=geometry, open=False)
        building_objects[building_id] = building_obj
        building_meta[building_id] = {
            "levels": total_levels
        }
    else:
        print("Relation")

## Output

In [29]:
highway_types

{'other', 'road'}

In [30]:
area_types

{'construction',
 'farm',
 'free_field',
 'industrial',
 'other',
 'residential',
 'retail',
 'transport',
 'water'}

In [31]:
building_types

{'commercial',
 'construction',
 'farm',
 'high_density_residential',
 'industrial',
 'other',
 'residential',
 'retail',
 'transport'}

In [33]:
from mapping import FREE_FIELD, ROAD, get_useable_areas

highway_json = {}
for id, obj in highways.items():
    if obj.type != ROAD:
        continue
    highway_json[str(id)] = obj.serialize()
    
area_json = {}
serialized_area_types = set()
for id, obj in area_objects.items():
    if obj.type not in get_useable_areas():
        continue
    obj.type = FREE_FIELD
    serialized = obj.serialize()
    serialized_area_types.add(obj.type)
    serialized["base_area"] = obj.get_area()
    area_json[str(id)] = serialized
print(serialized_area_types)

{'free_field'}


In [36]:
from mapping import OTHER, get_used_building_types

total_area = 0
building_json = {}
for id, obj in building_objects.items():
    if obj.type not in get_used_building_types():
        continue
    serialized = obj.serialize()
    if len(obj.geometry) == 0:
        continue
    try:
        serialized["base_area"] = obj.get_area()
    except:
        print(obj.geometry)
        continue
        
    serialized["levels"] = building_meta[id]["levels"]
    serialized["area"] = serialized["base_area"] * serialized["levels"]
    total_area += serialized["area"]
    building_json[str(id)] = serialized
total_area

22515335.780989096

In [46]:
7.964138*total_area*0.1*0.1 / (1000*1000)

1.7931524127613496

In [43]:
total = {
    "highways": highway_json,
    "areas": area_json,
    "buildings": building_json
}

In [44]:
with open(f"./data/total_{city_name}.json", "w") as file:
    file.write(json.dumps(total, indent=1))

In [45]:
len(list(all_nodes.keys()))

322546