In [None]:
from geopy.distance import geodesic
import osmium
import os
import math
from scipy.spatial import KDTree
import numpy as np
import random

out_file = "../osm/buildings.osm"
out_points = "../osm/points.txt"
input_file = '../osm/export.osm'
point_count = 1000
through_city_percent = 5

transit_percent = 10
transit_to = ["1167757927"]
transit_from = ["659850844"]
SAVE_MODIFIED_WAYS=False

apartment_cap = 5
all_residential_cap = 2
comertial_cap = 3
comertial_naming_cap = 5
untyping_residential_cap = 2

In [None]:
class OSMHandler(osmium.SimpleHandler):
    def __init__(self):
        super().__init__()
        self.node_locs = osmium.index.create_map("sparse_mem_array")
        self.loc_handler = osmium.NodeLocationsForWays(self.node_locs)
        self.ways = []
        self.builsings = []
        self.nods_way = []
        self.nods_trunk = []
        self.nods_build = []
        
    def node(self, n: osmium.Node):
        self.node_locs.set(n.id, n.location)

    def way(self, w: osmium.Way):
       
        if 'highway' in w.tags:
            total_length = 0.0
            lanes = float(w.tags.get('lanes', '2'))
            prev_node = None
            trunk = w.tags.get("highway", "") in ["motorway", "trunk", "primary"]
            for n in w.nodes:
                self.nods_way.append((n.ref, n.location, 0, 0))
                if trunk:
                    self.nods_trunk.append((n.ref, n.location, 0, 0))
            
 
        elif 'building' in w.tags:
            lat = 0
            lon = 0
            for n in w.nodes:
                self.nods_build.append((n.ref, n.location))
                lat = lat + n.lat
                lon = lon + n.lon
            lat = lat/len(w.nodes)
            lon = lon/len(w.nodes)
            self.builsings.append((w.id, dict(w.tags), list(w.nodes), lat, lon))


In [None]:
def random_point_around(x0, y0, r = 5):

    # angle1_start, angle1_end = math.radians(270), math.radians(300)#~~~Москва
    # angle2_start, angle2_end = math.radians(145), math.radians(200) #бердск
    
    # rand = random.random()
    # if rand < 0.3:
    #     theta = random.uniform(angle1_start, angle1_end)
    # elif rand > 0.55 :
    #     theta = random.uniform(angle2_start, angle2_end)
    # else:
    theta = random.uniform(0, 2 * math.pi)
    x = x0 + r * math.cos(theta)
    y = y0 + r * math.sin(theta)
    
    return y, x



In [None]:
def save_start_end_predict(buildings, nodes_way, nodes_trunk):
    # Создаем массив координат нод для KD-Tree
    node_coords = [(loc.lat, loc.lon) for (ref, loc, r_cap, c_cap) in nodes_way]
   
    node_tree = KDTree(node_coords)
    
    points_from_dict = {}
    points_to_dict = {}
    all_commertial_val = 0
    all_residential_val = 0

    all_count = 0
    middle_lat = 0
    middle_lon = 0

   

    for id, tags, nodes, lat, lon in buildings:
        building_type = tags.get('building', "yes")
        building_coord = (lat, lon)
        building_level = int(float(tags.get("building:levels", "1"))//1)
        capacity = 5
    
        distance, index = node_tree.query(building_coord)
        
        if building_type in ['residential', 'house', 'apartments', 'dormitory', 'terrace', 'detached', 'bungalow']:
            match building_type:
                case 'apartments':
                    middle_lat = lat
                    middle_lon = lon
                    capacity = apartment_cap
                case 'residential','residential', 'house','dormitory', 'terrace', 'detached', 'bungalow':
                    capacity = all_residential_cap    
            ref, loc, r_cap, c_cap = nodes_way[index]
            nodes_way[index] = (ref, loc, r_cap + capacity, c_cap)
            points_from_dict[ref] = points_from_dict.get(ref, 0) + capacity*building_level
            all_residential_val += capacity*building_level
        elif building_type in ['commercial', 'office', 'industrial', "retail", "store", "school",
                                "hospital", 'garages', 'warehouse', 'service', 'kindergarten']:
            capacity = comertial_cap
            ref, loc, r_cap, c_cap = nodes_way[index]
            nodes_way[index] = (ref, loc, r_cap, c_cap + capacity)
            points_to_dict[ref] = points_to_dict.get(ref, 0) + capacity*building_level
            all_commertial_val += capacity*building_level
        else: 
            if (tags.get("name","") != ""):
                capacity = comertial_naming_cap
                ref, loc, r_cap, c_cap = nodes_way[index]
                nodes_way[index] = (ref, loc, r_cap, c_cap + capacity)
                points_to_dict[ref] = points_to_dict.get(ref, 0) + capacity*building_level
                all_commertial_val += capacity*building_level
            else:
                capacity = untyping_residential_cap
                ref, loc, r_cap, c_cap = nodes_way[index]
                nodes_way[index] = (ref, loc, r_cap + capacity, c_cap)
                points_from_dict[ref] = points_from_dict.get(ref, 0) + capacity*building_level
                all_residential_val += capacity*building_level

    node_coords = [(loc.lat, loc.lon) for (ref, loc, r_cap, c_cap) in nodes_trunk]
    node_tree = KDTree(node_coords)

    split = 100
    for y in range(split):
        point_coord = random_point_around(middle_lat, middle_lon)
        distance, index = node_tree.query((point_coord[1], point_coord[0]))
        ref, loc, r_cap, c_cap = nodes_trunk[index]
        nodes_trunk[index] = (ref, loc, 
                            r_cap + all_residential_val/split *through_city_percent/100,
                            c_cap + all_commertial_val/split *through_city_percent/100)
        points_to_dict[ref] = points_to_dict.get(ref, 0) + (all_commertial_val/split) *(through_city_percent/100)
        points_from_dict[ref] = points_from_dict.get(ref, 0) + (all_residential_val/split) *(through_city_percent/100)

        for node in nodes_way:
            if node[0] == ref: 
                node = (ref, loc, 
                            r_cap + all_residential_val/split *through_city_percent/100,
                            c_cap + all_commertial_val/split *through_city_percent/100)        

    points_from = list(points_from_dict.keys())
    weights_from = list(points_from_dict.values())

    points_to = list(points_to_dict.keys())
    weights_to = list(points_to_dict.values())
    print("Point go")
    with open(out_points, 'w') as file:
        for i in range(point_count):
            selected_point_from = random.choices(points_from, weights=weights_from, k=1)[0]
            selected_point_to = random.choices(points_to, weights=weights_to, k=1)[0]
            file.write(f"{selected_point_from} {selected_point_to}\n")
            if (i*10%point_count == 0):
                print(f"points: {i}")
            if (i%100 < transit_percent):
                selected_point_from = random.choices(transit_from)[0]
                selected_point_to = random.choices(transit_to)[0]
                file.write(f"{selected_point_from} {selected_point_to}\n")
                

    print("Point end")
    return (middle_lat, middle_lon)


In [None]:
def save_modified_ways(building, nodes_building, nodes_way, nodes_trunk, writer, loc):

    trunk_dict = {}
    for id, loc, r_cap, c_capde in nodes_trunk:
        trunk_dict[id] = (id, loc, r_cap, c_capde)
    for id, loc, r_cap, c_cap in nodes_way:
        node = osmium.osm.mutable.Node()
        node.id = id
        node.location = loc 
        node.tags = {}
        t_id, t_loc, t_r_cap, t_c_cap = trunk_dict.get(id, (0,0,0,0))
        r_cap += t_r_cap
        c_cap += t_c_cap

        if (r_cap != 0):
            node.tags["r_cap"] = str(r_cap)
        if (c_cap != 0):
            node.tags["c_cap"] = str(c_cap) 
        if (r_cap != 0 or c_cap != 0):
            writer.add_node(node)
        
    for id, loc in nodes_building:
        node = osmium.osm.mutable.Node()
        node.id = id
        node.location = loc 
        writer.add_node(node)
    middle_lat = loc.lat
    middle_lon = loc.lon
    print(middle_lat, middle_lon)
    for x in range(1000):
        node = osmium.osm.mutable.Node()
        id += 1
        node.id = id
        node.location = random_point_around(middle_lat, middle_lon) 
        node.tags = {}
        node.tags["FURE"] = "true"
        writer.add_node(node)

    for way_id, tags, nods, lat, lon in building:
        new_way = osmium.osm.mutable.Way()
        new_way.id = way_id
        new_way.tags = tags
        new_way.nodes = nods
        writer.add_way(new_way)

In [None]:
h = OSMHandler()
h.apply_file(input_file, locations=True)
loc = save_start_end_predict(h.builsings, h.nods_way, h.nods_trunk)
if SAVE_MODIFIED_WAYS:
    #Демонстративный вывод домиков
    if os.path.exists(out_file):
        os.remove(out_file)
    try:
        writer = osmium.SimpleWriter(out_file)
        save_modified_ways(h.builsings, h.nods_build, h.nods_way, h.nods_trunk, writer, loc)
    finally:
        writer.close() 