In [1]:
import math

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely import LineString, MultiPolygon, Point, Polygon
from shapely.ops import polygonize, unary_union



In [2]:
import pandas as pd

data = {
    30: {
        63: 0,
        125: 0.0002,
        250: 0.0009,
        500: 0.003,
        1000: 0.0075,
        2000: 0.014,
        4000: 0.025,
        8000: 0.064
    },
    20: {
        63: 0,
        125: 0.0003,
        250: 0.0011,
        500: 0.0028,
        1000: 0.0052,
        2000: 0.0096,
        4000: 0.025,
        8000: 0.083
    },
    10: {
        63: 0,
        125: 0.0004,
        250: 0.001,
        500: 0.002,
        1000: 0.0039,
        2000: 0.01,
        4000: 0.035,
        8000: 0.125
    },
    0: {
        63: 0,
        125: 0.0004,
        250: 0.0008,
        500: 0.0017,
        1000: 0.0049,
        2000: 0.017,
        4000: 0.058,
        8000: 0.156
    }
}

air_resist_ratio = pd.DataFrame(data)


def get_nearest_values(array, value):
    sorted_array = sorted(array)
    if value in sorted_array:
        return [value]
    if value > max(sorted_array):
        return [sorted_array[-1]]
    if value < min(sorted_array):
        return [sorted_array[0]]

    for i, val in enumerate(sorted_array):
        if value < val:
            return sorted_array[max(i - 1, 0)], sorted_array[i]
    return sorted_array[-2], sorted_array[-1]


def get_air_resist_ratio(temp, freq):
    nearest_temp = get_nearest_values(air_resist_ratio.columns, temp)
    nearest_freq = get_nearest_values(air_resist_ratio.index, freq)

    if len(nearest_temp) == 1 and len(nearest_freq) == 1:
        return air_resist_ratio.loc[nearest_freq[0], nearest_temp[0]]

    if len(nearest_temp) == 2 and len(nearest_freq) == 2:
        freq1, freq2 = nearest_freq
        temp1, temp2 = nearest_temp

        coef_temp1_freq1 = air_resist_ratio.loc[freq1, temp1]
        coef_temp1_freq2 = air_resist_ratio.loc[freq2, temp1]
        coef_temp2_freq1 = air_resist_ratio.loc[freq1, temp2]
        coef_temp2_freq2 = air_resist_ratio.loc[freq2, temp2]

        weight_temp1 = (temp2 - temp) / (temp2 - temp1)
        weight_temp2 = (temp - temp1) / (temp2 - temp1)
        weight_freq1 = (freq2 - freq) / (freq2 - freq1)
        weight_freq2 = (freq - freq1) / (freq2 - freq1)

        coef_freq1 = coef_temp1_freq1 * weight_temp1 + coef_temp2_freq1 * weight_temp2
        coef_freq2 = coef_temp1_freq2 * weight_temp1 + coef_temp2_freq2 * weight_temp2

        final_coef = coef_freq1 * weight_freq1 + coef_freq2 * weight_freq2

        return final_coef

    if len(nearest_temp) == 2 and len(nearest_freq) == 1:
        temp1, temp2 = nearest_temp
        freq1 = nearest_freq[0]

        coef_temp1 = air_resist_ratio.loc[freq1, temp1]
        coef_temp2 = air_resist_ratio.loc[freq1, temp2]

        weight_temp1 = (temp2 - temp) / (temp2 - temp1)
        weight_temp2 = (temp - temp1) / (temp2 - temp1)

        return coef_temp1 * weight_temp1 + coef_temp2 * weight_temp2

    if len(nearest_temp) == 1 and len(nearest_freq) == 2:
        temp1 = nearest_temp[0]
        freq1, freq2 = nearest_freq

        coef_freq1 = air_resist_ratio.loc[freq1, temp1]
        coef_freq2 = air_resist_ratio.loc[freq2, temp1]

        weight_freq1 = (freq2 - freq) / (freq2 - freq1)
        weight_freq2 = (freq - freq1) / (freq2 - freq1)

        return coef_freq1 * weight_freq1 + coef_freq2 * weight_freq2


In [3]:
import numpy as np
from scipy.optimize import fsolve


def dist_to_target_db(init_noise_db, target_noise_db, geometric_mean_freq_hz, air_temperature):
    def equation(r):
        return L - L_ist + 20 * np.log10(r) + k * r

    L_ist = init_noise_db
    L = target_noise_db
    k = get_air_resist_ratio(air_temperature, geometric_mean_freq_hz)
    initial_guess = 1
    r_solution = fsolve(equation, initial_guess)

    # print(
    #     f"Шум громкостью {init_noise_db} дБ,среднегеометрической частотой {geometric_mean_freq_hz} Гц при температуре {air_temperature} Цельсия затухает к шуму {target_noise_db} дБ за расстояние {r_solution} \nКоэффициент сопротивления воздуха {k}")

    return r_solution[0]


dist_to_target_db(100, 40, 1000, 20)

669.6974695039747

In [22]:
%reload_ext autoreload
%autoreload 2

from shapely import Point
import geopandas as gpd
from objectnat import get_visibility_accurate


def noise_from_point_task(task) -> tuple[gpd.GeoDataFrame, bool]:
    point_from, obstacles, passed_dist, dist_db_zip, max_dist = task
    local_crs = obstacles.crs
    dist = max_dist - passed_dist
    vis_poly = get_visibility_accurate(point_from, obstacles, dist)
    vis_poly_points = gpd.GeoDataFrame(geometry=[Point(coords) for coords in vis_poly.simplify(0.5).exterior.coords],
                                       crs=local_crs)
    
    vis_poly_points['point'] = vis_poly_points.geometry
    vis_poly_points.geometry = vis_poly_points.geometry.buffer(1, resolution=1)
    vis_poly_points = vis_poly_points.sjoin(obstacles, predicate='intersects')
    vis_poly_points.geometry = vis_poly_points.difference(vis_poly).difference(obstacles_union)
    vis_poly_points = vis_poly_points[~vis_poly_points.is_empty]
    vis_poly_points = vis_poly_points[vis_poly_points.area >= 0.1]
    vis_poly_points.geometry = vis_poly_points['point']
    vis_poly_points['dist'] = vis_poly_points.distance(point_from)
    new_obs = pd.concat([obstacles, gpd.GeoDataFrame(geometry=[vis_poly], crs=local_crs)])
    
    for _, loc in vis_poly_points.iterrows():
        dist_last = (1 - loc[absorb_ratio_column]) * (max_dist - loc.dist)
        cur_vis_poly = get_visibility_accurate(loc.geometry,
                                               new_obs.difference(loc.geometry.buffer(2, resolution=1)).to_frame(),
                                               dist_last)
        vision_res.append(cur_vis_poly)


def simulate_noise(point_from: Point, obstacles: gpd.GeoDataFrame, dB_step=5, **kwargs):
    obstacles = obstacles.copy()

    local_crs = obstacles.estimate_utm_crs()
    obstacles.to_crs(local_crs, inplace=True)

    init_noise_db = kwargs['init_noise_db']
    target_noise_db = kwargs['target_noise_db']
    geometric_mean_freq_hz = kwargs['geometric_mean_freq_hz']
    air_temperature = kwargs['air_temperature']

    absorb_ratio_column = kwargs.get('absorb_ratio_column', None)
    if absorb_ratio_column is None:
        absorb_ratio_column = 'absorb_ratio'
        obstacles[absorb_ratio_column] = 0.03
    donuts = []
    values = []
    dists = []
    max_dist = 0
    cur_dB = init_noise_db - dB_step
    to_cut_off = point_from

    while cur_dB != target_noise_db - dB_step:
        max_dist = dist_to_target_db(init_noise_db, cur_dB, geometric_mean_freq_hz, air_temperature)
        cur_buffer = point_from.buffer(max_dist)
        dists.append(max_dist)
        donuts.append(cur_buffer.difference(to_cut_off))
        values.append(cur_dB)
        to_cut_off = cur_buffer
        cur_dB = cur_dB - dB_step

    dist_db_zip = zip(values, dists)
    print(max(dist_db_zip, key=lambda x: x[1]))
    s = obstacles.intersects(point_from.buffer(max_dist))
    obstacles = obstacles.loc[s[s].index]
    obstacles_union = obstacles.unary_union

    vis_poly = get_visibility_accurate(point_from, obstacles, max_dist)
    vis_poly_points = gpd.GeoDataFrame(geometry=[Point(coords) for coords in vis_poly.simplify(0.5).exterior.coords],
                                       crs=local_crs)
    vis_poly_points['dist'] = vis_poly_points.distance(point_from)
    vis_poly_points['point'] = vis_poly_points.geometry
    vis_poly_points.geometry = vis_poly_points.geometry.buffer(1, resolution=1)
    vis_poly_points = vis_poly_points.sjoin(obstacles, predicate='intersects')

    vis_poly_points.geometry = vis_poly_points.difference(vis_poly).difference(obstacles_union)
    vis_poly_points = vis_poly_points[~vis_poly_points.is_empty]
    vis_poly_points = vis_poly_points[vis_poly_points.area >= 0.1]
    vis_poly_points.geometry = vis_poly_points['point']

    new_obs = pd.concat([obstacles, gpd.GeoDataFrame(geometry=[vis_poly], crs=local_crs)])
    vision_res = [vis_poly]
    for _, loc in vis_poly_points.iterrows():
        dist_last = (1 - loc[absorb_ratio_column]) * (max_dist - loc.dist)
        cur_vis_poly = get_visibility_accurate(loc.geometry,
                                               new_obs.difference(loc.geometry.buffer(2, resolution=1)).to_frame(),
                                               dist_last)
        vision_res.append(cur_vis_poly)

    return vision_res
    noise_df = gpd.GeoDataFrame(data={'noise_dB': values}, geometry=donuts, crs=local_crs)
    return noise_df.clip(vis_poly)


start_p = Point(348065, 6647674)
obstacles = gpd.read_parquet('buildings.parquet').to_crs(epsg=32636)
initial_sim = simulate_noise(start_p,
                             obstacles,
                             init_noise_db=90,
                             target_noise_db=30,
                             geometric_mean_freq_hz=2000,
                             air_temperature=20,
                             )

(30, 546.5704781477021)


In [16]:
m1 = obstacles.explore(color='pink', tiles='CartoDB positron')
gpd.GeoDataFrame(geometry=initial_sim, crs=32636).explore(m=m1)

In [None]:
import concurrent.futures
import multiprocessing
import time

from shapely.geometry import LineString, MultiPolygon, Point, Polygon

task_queue = multiprocessing.Queue()
task_queue.put((initial_func, args, kwargs))
res, roads = parallel_split_queue(task_queue, self.local_crs)


def parallel_split_queue(task_queue: multiprocessing.Queue):
    splitted = []
    # with concurrent.futures.ProcessPoolExecutor() as executor:
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_task = {}
        while True:
            while not task_queue.empty() and len(future_to_task) < executor._max_workers:
                func, task, kwargs = task_queue.get_nowait()
                future = executor.submit(func, task, **kwargs)
                future_to_task[future] = task

            done, _ = concurrent.futures.wait(future_to_task.keys(), return_when=concurrent.futures.FIRST_COMPLETED)

            for future in done:
                future_to_task.pop(future)
                result, create_tasks = future.result()
                if create_tasks:
                    for func, new_task, kwargs in result:
                        task_queue.put((func, new_task, kwargs))
                else:
                    splitted.append(result)
            time.sleep(0.01)
            if not future_to_task and task_queue.empty():
                break
    return splitted


In [None]:
m1 = obstacles.explore(color='pink', tiles='CartoDB positron')
initial_sim.explore(color='purple', m=m1)
gpd.GeoDataFrame(geometry=[vis_poly], crs=32636).explore(m=m1)
m1

In [None]:
%reload_ext autoreload
%autoreload 2

new_obs = pd.concat([obstacles, gpd.GeoDataFrame(geometry=[res], crs=32636)])
p = [Point(coords) for coords in res.simplify(1).exterior.coords][14]
new_obs.geometry = new_obs.geometry.difference(p.buffer(1, quad_segs=1))
vis_poly2 = get_visibility_accurate(p, new_obs, 500)
# m1=new_obs.explore(color='red')
# m1= gpd.GeoDataFrame(geometry=[vis_poly],crs=32636).explore()
# gpd.GeoDataFrame(geometry=[Point((347810.97481683653,6647261.617306325))],crs=32636).explore(m=m1)

In [None]:
gpd.GeoDataFrame(geometry=vis_poly, crs=32636)
m1 = res.explore(column='noise_dB', cmap='RdYlGn_r', vmin=10, tiles='CartoDB positron')
obstacles.explore(m=m1, color='pink')
m1
m1 = obstacles.explore(color='pink')
gpd.GeoDataFrame(geometry=[result2], crs=32636).explore(m=m1)
gpd.GeoDataFrame(geometry=[Point(coords) for coords in result2.exterior.coords], crs=32636).explore(m=m1, color='red')

points = [Point(coords) for coords in result2.simplify(1).exterior.coords]
print(len(points))
gpd.GeoDataFrame(geometry=points, crs=32636).explore(m=m1, color='purple')