In [None]:
# Устанавливаем все, что будет необходимо для работы: библиотеки

!pip install ObjectNat
!pip install folium matplotlib mapclassify

In [None]:
# Устанавливаем все, что будет необходимо для работы: все остальное

import geopandas as gpd
import pandas as pd
import numpy as np
import osmnx as ox
import os
import pandas as pd
import warnings
import iduedu

from iduedu import get_boundary
from shapely import MultiPolygon, Polygon

from objectnat import config

logger = config.logger
warnings.filterwarnings("ignore") # отключаем все предупреждения

In [None]:
# data_path = '/content/data' # определяем путь к файлм

data_path = 'F:/Резерв/Учеба/НИР-3/Чат gpt/Выгрузка зданий и расселение'


In [None]:
def eval_is_living(row: gpd.GeoSeries):
    """
    Determine if a building is used for residential purposes based on its attributes.

    Parameters
    ----------
    row : gpd.GeoSeries
        A GeoSeries representing a row in a GeoDataFrame, containing building attributes.

    Returns
    -------
    bool
        A boolean indicating whether the building is used for residential purposes.

    Examples
    --------
    >>> buildings = download_buildings(osm_territory_id=421007)
    >>> buildings['is_living'] = buildings.apply(eval_is_living, axis=1)
    """
    return row["building"] in (
        "apartments",
        "house",
        "residential",
        "detached",
        "dormitory",
        "semidetached_house",
        "bungalow",
        "cabin",
        "farm",
    )


In [None]:
def eval_population(source: gpd.GeoDataFrame, population_column: str, area_per_person: float = 33):
    """
    Estimate the population of buildings in a GeoDataFrame based on their attributes.

    Parameters
    ----------
    source : gpd.GeoDataFrame
        A GeoDataFrame containing building geometries and attributes.
    population_column : str
        The name of the column where the estimated population will be stored.
    area_per_person : float
        The standart living space per person im m², (default is 33)
    Returns
    -------
    gpd.GeoDataFrame
        A GeoDataFrame with an added column for estimated population.

    Raises
    ------
    RuntimeError
        If the 'building:levels' column is not present in the provided GeoDataFrame.

    Examples
    --------
    >>> source = gpd.read_file('buildings.shp')
    >>> source['is_living'] = source.apply(eval_is_living, axis=1)
    >>> population_df = eval_population(source, 'approximate_pop')
    """
    if "building:levels" not in source.columns:
        raise RuntimeError("No 'building:levels' column in provided GeoDataFrame")
    df = source.copy()
    local_crs = source.estimate_utm_crs()
    df["area"] = df.to_crs(local_crs).geometry.area.astype(float)
    df["building:levels_is_real"] = df["building:levels"].apply(lambda x: False if pd.isna(x) else True)
    df["building:levels"] = df["building:levels"].fillna(1)
    df["building:levels"] = pd.to_numeric(df["building:levels"], errors="coerce")
    df = df.dropna(subset=["building:levels"])
    df["building:levels"] = df["building:levels"].astype(int)
    df[population_column] = np.nan
    df.loc[df["is_living"] == 1, population_column] = (
        df[df["is_living"] == 1]
        .apply(
            lambda row: (
                3
                if ((row["area"] <= 400) & (row["building:levels"] <= 2))
                else (row["building:levels"] * row["area"] * 0.8 / area_per_person)
            ),
            axis=1,
        )
        .round(0)
        .astype(int)
    )
    return df

In [None]:
def download_buildings(
    osm_territory_id: int | None = None,
    osm_territory_name: str | None = None,
    terr_polygon: Polygon | MultiPolygon | None = None,
    is_living_column: str = "is_living",
    population_column: str = "approximate_pop",
    area_per_person: float = 33,
) -> gpd.GeoDataFrame | None:
    """
    Download building geometries and evaluate 'is_living' and 'population'
     attributes for a specified territory from OpenStreetMap.

    Parameters
    ----------
    osm_territory_id : int, optional
        The OpenStreetMap ID of the territory to download buildings for.
    osm_territory_name : str, optional
        The name of the territory to download buildings for.
    terr_polygon : Polygon or MultiPolygon, optional
        A Polygon or MultiPolygon geometry defining the territory to download buildings for.
    is_living_column : str, optional
        The name of the column indicating whether a building is residential (default is "is_living").
    population_column : str, optional
        The name of the column for storing estimated population (default is "approximate_pop").
    area_per_person : float
        The standart living space per person im m², (default is 33)
    Returns
    -------
    gpd.GeoDataFrame or None
        A GeoDataFrame containing building geometries and attributes, or None if no buildings are found or an error occurs.

    Examples
    --------
    >>> buildings_df = download_buildings(osm_territory_name="Saint-Petersburg, Russia")
    >>> buildings_df.head()
    """
    polygon = get_boundary(osm_territory_id, osm_territory_name, terr_polygon)

    logger.debug("Downloading buildings from OpenStreetMap and counting population...")
    buildings = ox.features_from_polygon(polygon, tags={"building": True})
    if not buildings.empty:
        buildings = buildings.loc[
            (buildings["geometry"].geom_type == "Polygon") | (buildings["geometry"].geom_type == "MultiPolygon")
        ]
    if buildings.empty:
        logger.warning("There are no buildings in the specified territory. Output GeoDataFrame is empty.")
        return buildings

    buildings[is_living_column] = buildings.apply(eval_is_living, axis=1)
    buildings = eval_population(buildings, population_column, area_per_person)
    buildings.reset_index(drop=True, inplace=True)
    logger.debug("Done!")
    return buildings[
        [
            "building",
            "addr:street",
            "addr:housenumber",
            "amenity",
            "area",
            "name",
            "building:levels",
            "leisure",
            #"design:year",
            is_living_column,
            "building:levels_is_real",
            population_column,
            "geometry",
        ]
    ]

In [None]:
buildings = download_buildings(osm_territory_id = 1115367, area_per_person = 33) # id = 1115367 - это Приморский район

In [None]:
buildings.explore(column='approximate_pop',tiles='CartoDB positron')

In [None]:
# Сброс индекса, чтобы превратить индексы в обычные столбцы
buildings = buildings.reset_index(drop=True)

In [None]:
# Добавляем столбец с названием района

# НЕ ЗАБЫТЬ ПОМЕНЯТЬ НАЗВАНИЕ

buildings['District'] = 'Приморский'

# Добавляем столбец с количеством населения в той форме, в которой это надо ObjectNat
buildings['storeys_count'] = buildings['building:levels']

# Проверяем, что правильно выводится
buildings.head()

In [None]:
# Проверяем общее количество скачаных домов

print(buildings.count().max())

In [None]:
# Оставляем только жилые и проверяем, сколько осталось

buildings = buildings[buildings['is_living']]
print(buildings.count().max())

In [None]:
# Еще раз сбрасываем индекс, чтобы удалить пропущенные номера строк
buildings = buildings.reset_index(drop=True)

buildings

In [None]:
# Это расселение от ObjectNat
# ЗАПУСКАТЬ ДВА РАЗА
# !!!!! НЕ ЗАБЫТЬ ПОМЕНЯТЬ НАСЕЛЕНИЕ !!!!!

from objectnat import get_balanced_buildings
import geopandas as gpd

# Importing the necessary libraries:
# - 'get_balanced_buildings' from 'objectnat' to balance the population across buildings
# - 'geopandas' for handling geospatial data

#buildings = gpd.read_parquet("examples_data/buildings.parquet")
# Loading building data from a .parquet file.

living_building = buildings[buildings['is_living']]
# Filtering the buildings to include only residential buildings, based on the 'is_living' attribute.

buildings.to_crs(32643, inplace=True)
# Reprojecting the buildings' geometries to the local coordinate system EPSG:32643 (UTM zone)
# to ensure accurate calculation of areas for balancing the population.

balanced_buildings = get_balanced_buildings(living_buildings=living_building, population = 5451) # ВОТ ЗДЕСЬ ПОМЕНЯТЬ
# Balancing the population distribution across the residential buildings.
# - 'living_buildings': a GeoDataFrame of only the residential buildings.
# - 'population': the total population (36000) to distribute across the buildings.
# The result is a GeoDataFrame where the population is redistributed based on building attributes (like floor count or living area).

balanced_buildings

In [None]:
# Это расселение, которое я сделал сам
# !!!!! НЕ ЗАБЫТЬ ПОМЕНЯТЬ НАСЕЛЕНИЕ !!!!!

df = balanced_buildings.copy() # Копируем датафрейм

total_population = 5451 # Определяем общую численность населения ВОТ ЗДЕСЬ ПОМЕНЯТЬ
total_area = df['living_area'].sum() # Определяем общую жилую площадь

# Распределяем население по домам
df['new_population'] = (df['living_area'] / total_area * total_population).round().astype(int)

# Корректируем распределение, если сумма не совпадает с общим населением
difference = total_population - df['new_population'].sum()

if difference > 0:
    # Находим индексы для добавления единиц населению
    idx_to_add = np.random.choice(df.index, size=difference, replace=True)
    for idx in idx_to_add:
        df.at[idx, 'new_population'] += 1

elif difference < 0:
    # Находим индексы для уменьшения населения
    population_to_remove = -difference
    idx_to_remove = np.random.choice(df.index, size=population_to_remove, replace=True)
    for idx in idx_to_remove:
        # Уменьшаем на 1 только если это возможно
        if df.at[idx, 'new_population'] > 0:
            df.at[idx, 'new_population'] -= 1

# Результат
df.head()

In [None]:
# Cравнение результатов расселения

objectnat = 'population'
my_code = 'new_population'

# Вычисление минимального, максимального значения и суммы
min_value = df[objectnat].min()
max_value = df[objectnat].max()
sum_value = df[objectnat].sum()

min_value_1 = df[my_code].min()
max_value_1 = df[my_code].max()
sum_value_1 = df[my_code].sum()

# Вывод результатов
print(f"Минимальное значение: {min_value}")
print(f"Минимальное значение: {min_value_1}")
print(f"Максимальное значение: {max_value}")
print(f"Максимальное значение: {max_value_1}")
print(f"Сумма всех значений: {sum_value}")
print(f"Сумма всех значений: {sum_value_1}")

In [None]:
# Удаляем ненужные столбцы

df.drop(['amenity', 'name', 'leisure', 'approximate_pop', 'building:levels', 'building:levels_is_real', 'is_living'], axis=1, inplace=True)

In [None]:
# Оставшиеся столбцы расставляем в нужном порядке и проверяем, что все сработало

df = df.reindex(columns=['building', 'District', 'addr:street', 'addr:housenumber', 'area', 'storeys_count', 'living_area', 'population', 'new_population', 'geometry'])
df

In [None]:
# Смотрим, какая красота получилась

df.explore(column='new_population',tiles='CartoDB positron')

In [None]:
# И сохраняем ее в файл
# Не забыть поменять название файла !!!!

df.to_file(os.path.join(data_path,"Здания Приморский.geojson"))

Это более оптимизированная версия кода. А может и нет. Прошлую я писал три месяца назад и не помню, что там происходит, так что пришлось переделать

In [None]:
buildings = gpd.read_file(os.path.join(data_path, 'buildings.geojson'))
buildings['unique_id'] = range(1, len(buildings) + 1)
buildings.explore(column='new_population',tiles='CartoDB positron')

In [None]:
# Создание датафрейма с данными по населению

district_demand_data = {
    'name_of_district': ['Адмиралтейский', 'Василеостровский', 'Выборгский', 'Калининский', 'Кировский', 'Колпинский', 
 'Красногвардейский', 'Красносельский', 'Кронштадтский', 'Курортный', 'Московский', 'Невский', 
 'Петроградский', 'Петродворцовый', 'Приморский', 'Пушкинский', 'Фрунзенский', 'Центральный', 
 'Кудрово', 'Янино-1', 'Новое Девяткино', 'Бугры', 'Аннино', 'Мурино', 'Сертолово'],
    'actual_population': [1194, 1590, 4208, 4109, 2561, 1456, 
                          2829, 3366, 345, 648, 2579, 4249, 
                          893, 1031, 5451, 2092, 3197, 1537, 
                          502, 161, 193, 254, 78, 852, 542]
}

district_demand_data = pd.DataFrame(district_demand_data)

district_demand_data

In [None]:
name_of_district = 'Сертолово'
actual_population = 542

In [None]:
name_of_district = district_demand_data['name_of_district']
actual_population = district_demand_data['actual_population']

In [None]:
filter_for_buildings = buildings[buildings['District'] == name_of_district]

total_population = actual_population # Определяем общую численность населения ВОТ ЗДЕСЬ ПОМЕНЯТЬ
total_area = filter_for_buildings['living_area'].sum() # Определяем общую жилую площадь

# Распределяем население по домам
filter_for_buildings['demand'] = (filter_for_buildings['living_area'] / total_area * total_population).round().astype(int)

# Корректируем распределение, если сумма не совпадает с общим населением
difference = total_population - filter_for_buildings['demand'].sum()

if difference > 0:
    # Находим индексы для добавления единиц населению
    idx_to_add = np.random.choice(filter_for_buildings.index, size=difference, replace=True)
    for idx in idx_to_add:
        filter_for_buildings.at[idx, 'demand'] += 1

elif difference < 0:
    # Находим индексы для уменьшения населения
    population_to_remove = -difference
    idx_to_remove = np.random.choice(filter_for_buildings.index, size=population_to_remove, replace=True)
    for idx in idx_to_remove:
        # Уменьшаем на 1 только если это возможно
        if filter_for_buildings.at[idx, 'demand'] > 0:
            filter_for_buildings.at[idx, 'demand'] -= 1

# Результат filter_for_buildings.head()

In [None]:
# Добавляем новые значения населения в копию исходного датафрейма

buildings_with_demand = buildings.merge(filter_for_buildings[['unique_id', 'demand']], on='unique_id', how='left')
buildings_with_demand.info()

In [None]:
# Добавляем новые значения населения в копию исходного датафрейма

buildings_with_demand = buildings_with_demand.merge(filter_for_buildings[['unique_id', 'demand']], on='unique_id', how='left')
buildings_with_demand.info()

In [None]:
# Объединение двух столбцов в один
buildings_with_demand['demand'] = buildings_with_demand['demand_x'].combine_first(buildings_with_demand['demand_y'])
buildings_with_demand = buildings_with_demand.drop(columns=['demand_x', 'demand_y'])
buildings_with_demand.info()

new_buildings = buildings_with_demand
new_buildings['demand'] = new_buildings['demand'].astype('int64')
new_buildings = new_buildings.drop(columns=['population', 'new_population'])
new_buildings.to_file(os.path.join(data_path,"buildings.geojson"))
new_buildings.info()

In [None]:
actual = 'demand'

# Вычисление минимального, максимального значения и суммы
min_value = new_buildings[actual].min()
max_value = new_buildings[actual].max()
sum_value = new_buildings[actual].sum()

# Вывод результатов
print(f"Минимальное значение: {min_value}")
print(f"Максимальное значение: {max_value}")
print(f"Сумма всех значений: {sum_value}")
