In [1]:
from idu_clients import UrbanAPI

urban_api = UrbanAPI('http://10.32.1.107:5300')

In [2]:
regions = await urban_api.get_regions()

In [3]:
regions.head()

Unnamed: 0_level_0,geometry,territory_type,parent_id,name,level,properties,admin_center,okato_code,oktmo_code,created_at,updated_at
territory_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
1,"MULTIPOLYGON (((34.32572 59.19331, 34.32541 59...","{'territory_type_id': 1, 'name': 'Субъект Феде...",12639,Ленинградская область,2,"{'Малые города': 15, 'Крупные города': 0, 'Вуз...",,41000000000,,2024-06-16T21:35:40.801621Z,2024-06-16T21:35:40.801621Z
3138,"MULTIPOLYGON (((29.42576 60.19074, 29.42719 60...","{'territory_type_id': 17, 'name': 'Город федер...",12639,Санкт-Петербург,2,"{'Малые города': 10, 'Крупные города': 0, 'Чис...",,40000000000,,2024-06-16T21:35:40.801621Z,2024-06-16T21:35:40.801621Z
3268,"MULTIPOLYGON (((36.80310 55.44083, 36.81136 55...","{'territory_type_id': 1, 'name': 'Субъект Феде...",12639,Москва,2,"{'Малые города': 16, 'Крупные города': 1, 'Чис...",,45000000000,,2024-06-16T21:35:40.801621Z,2024-06-16T21:35:40.801621Z
3427,"MULTIPOLYGON (((41.16810 50.77451, 41.17850 50...","{'territory_type_id': 1, 'name': 'Субъект Феде...",12639,Волгоградская область,2,"{'Малые города': 32, 'Крупные города': 1, 'Вуз...",,18000000000,,2024-06-16T21:35:40.801621Z,2024-06-16T21:35:40.801621Z
3902,"POLYGON ((35.89856 53.85018, 35.90015 53.84679...","{'territory_type_id': 1, 'name': 'Субъект Феде...",12639,Тульская область,2,"{'Малые города': 20, 'Крупные города': 1, 'Вуз...",,70000000000,,2024-06-16T21:35:40.801621Z,2024-06-16T21:35:40.801621Z


In [4]:
region = regions[regions.index == 1]

In [None]:
gdfs = await urban_api.get_region_territories(1)
towns_db = gdfs[5]

In [36]:
import pandas as pd
import geopandas as gpd

acc_mx = pd.read_pickle('acc_mx.pickle')
towns = gpd.read_file('towns.geojson')[['geometry', 'rounded_population']].rename(columns={'rounded_population':'population'})
towns.population = towns.population.apply(int)

towns.head()

Unnamed: 0,geometry,population
0,"POLYGON ((33.79410 59.36206, 33.79334 59.35856...",10
1,"POLYGON ((33.82129 59.47496, 33.82053 59.47146...",68
2,"POLYGON ((33.82765 59.47334, 33.82689 59.46985...",1734
3,"POLYGON ((33.81098 59.44228, 33.81022 59.43878...",10
4,"POLYGON ((33.71243 59.32801, 33.71168 59.32451...",10


## Код

In [42]:
from pydantic import BaseModel, Field
from enum import Enum

class AvailabilityType(Enum):
  METERS = 'м'
  MINUTES = 'мин'

class SupplyType(Enum):
  SERVICES_PER_1000 = 'шт. на 1000 человек'
  CAPACITY_PER_1000 = 'ед. на 1000 человек'

class Normative(BaseModel):
  availability_type : AvailabilityType
  availability_value : float = Field(ge=0)
  supply_type : SupplyType
  supply_value : float = Field(ge=0)

Normative(
  availability_type=AvailabilityType.MINUTES,
  availability_value=123, 
  supply_type=SupplyType.CAPACITY_PER_1000, 
  supply_value=123
)

Normative(availability_type=<AvailabilityType.MINUTES: 'мин'>, availability_value=123.0, supply_type=<SupplyType.CAPACITY_PER_1000: 'ед. на 1000 человек'>, supply_value=123.0)

In [None]:
from pydantic import BaseModel
from loguru import logger
import geopandas as gpd
import pandas as pd
from pulp import PULP_CBC_CMD, LpMaximize, LpProblem, LpVariable, lpSum, LpInteger
from enum import Enum

PROVISION_COLUMN = "provision"

class ProvisionModel(BaseModel):

    def _get_blocks_gdf(self, service_type: ServiceType, update_df: pd.DataFrame | None = None) -> gpd.GeoDataFrame:
        """
        Generates a GeoDataFrame of city blocks with updated service capacities and demands.

        Parameters
        ----------
        service_type : ServiceType
            The service type for which provision assessment is being calculated.
        update_df : pandas.DataFrame, optional
            DataFrame containing updates to population or capacity, by default None.

        Returns
        -------
        geopandas.GeoDataFrame
            GeoDataFrame containing the geometry, demand, and capacity for each block.
        """
        capacity_column = f"capacity_{service_type.name}"
        gdf = self.city_model.get_blocks_gdf()[["geometry", "population", capacity_column]].fillna(0)
        gdf = gdf.rename(columns={capacity_column: "capacity"})
        if update_df is not None:
            if "population" in update_df.columns:
                gdf["population"] = gdf["population"].add(update_df["population"].fillna(0), fill_value=0)
            if service_type.name in update_df.columns:
                gdf["capacity"] = gdf["capacity"].add(update_df[service_type.name].fillna(0), fill_value=0)
        gdf["population"] = gdf["population"].apply(service_type.calculate_in_need)
        gdf = gdf.rename(columns={"population": "demand"})
        gdf["capacity_left"] = gdf["capacity"]
        gdf["demand_left"] = gdf["demand"]
        gdf["demand_within"] = 0
        gdf["demand_without"] = 0
        return gdf
    
    @classmethod
    def total(cls, gdf: gpd.GeoDataFrame) -> float:
        """
        Calculates the total provision by dividing the sum of met demand
        by the total demand for all blocks in the GeoDataFrame.

        Parameters
        ----------
        gdf : geopandas.GeoDataFrame
            GeoDataFrame containing the columns 'demand_within' and 'demand',
            representing the met demand and total demand for each block.

        Returns
        -------
        float
            The ratio of total met demand to total demand, representing overall provision.
        """
        return gdf["demand_within"].sum() / gdf["demand"].sum()

    def calculate(
        self,
        service_type: ServiceType | str,
        update_df: pd.DataFrame | None = None,
        self_supply: bool = False,
    ) -> gpd.GeoDataFrame:
        
        service_type: ServiceType = self.city_model[service_type]
        gdf = self._get_blocks_gdf(service_type, update_df)

        if self_supply:
            supply: pd.Series = gdf.apply(lambda x: min(x["demand"], x["capacity"]), axis=1)
            gdf["demand_within"] += supply
            gdf["demand_left"] -= supply
            gdf["capacity_left"] -= supply

        gdf = self._lp_provision(gdf, service_type, method)

        gdf["provision"] = gdf["demand_within"] / gdf["demand"]

        if self.verbose:
            logger.success("Provision assessment finished")

        return gdf

    def _lp_provision(
        self,
        gdf: gpd.GeoDataFrame,
        service_type: ServiceType,
        selection_range: float | None = None,
    ) -> gpd.GeoDataFrame:
        """
        Solves the provision problem using a Linear Programming (LP) solver.
        Loops itself till capacity or demand left meet 0.

        Parameters
        ----------
        gdf : geopandas.GeoDataFrame
            GeoDataFrame containing blocks with demand and capacity.
        service_type : ServiceType
            The type of service for which provision is being calculated.
        method : ProvisionMethod
            The method used to calculate provision (LINEAR or GRAVITATIONAL).
        selection_range : float | None
            The accessibility range defining maximum distance between living blocks and service blocks
            in the current problem loop, default None (actually service type accessibility).

        Returns
        -------
        geopandas.GeoDataFrame
            Updated GeoDataFrame with provision results, including updates to
            'demand_within', 'demand_without', 'demand_left', and 'capacity_left'.
        """

        if selection_range is None:
            selection_range = service_type.accessibility

        def _get_distance(id1: int, id2: int):
            distance = self.city_model.accessibility_matrix.loc[id1, id2]
            return distance if distance > 1 else 1

        def _get_weight(id1: int, id2: int):
            distance = _get_distance(id1, id2)
            return 1 / (distance * distance)

        demand = gdf.loc[gdf["demand_left"] > 0]
        capacity = gdf.loc[gdf["capacity_left"] > 0]

        if self.verbose:
            logger.info(f"Setting an LP problem for accessibility = {selection_range} : {len(demand)}x{len(capacity)}")

        prob = LpProblem("Provision", LpMaximize)
        # Precompute distance and filter products
        products = [
            (i, j)
            for i in demand.index
            for j in capacity.index
            if _get_distance(i, j) <= selection_range  # service_type.accessibility * 2
        ]

        # Create the decision variable dictionary
        x = LpVariable.dicts("Route", products, 0, None, cat=LpInteger)

        # Objective Function
        prob += lpSum(_get_weight(n, m) * x[n, m] for n, m in products)

        # Constraint dictionaries
        demand_constraints = {n: [] for n in demand.index}
        capacity_constraints = {m: [] for m in capacity.index}

        for n, m in products:
            demand_constraints[n].append(x[n, m])
            capacity_constraints[m].append(x[n, m])

        # Add Demand Constraints
        for n in demand.index:
            prob += lpSum(demand_constraints[n]) <= demand.loc[n, "demand_left"]

        # Add Capacity Constraints
        for m in capacity.index:
            prob += lpSum(capacity_constraints[m]) <= capacity.loc[m, "capacity_left"]

        if self.verbose:
            logger.info("Solving the problem")
        prob.solve(PULP_CBC_CMD(msg=False))

        if self.verbose:
            logger.info("Restoring values from variables")

        for var in prob.variables():
            value = var.value()
            name = var.name.replace("(", "").replace(")", "").replace(",", "").split("_")
            if name[2] == "dummy":
                continue
            a = int(name[1])
            b = int(name[2])
            distance = _get_distance(a, b)
            if value > 0:
                if distance <= service_type.accessibility:
                    gdf.loc[a, "demand_within"] += value
                else:
                    gdf.loc[a, "demand_without"] += value
                gdf.loc[a, "demand_left"] -= value
                gdf.loc[b, "capacity_left"] -= value

        if gdf["demand_left"].sum() > 0 and gdf["capacity_left"].sum() > 0:
            return self._lp_provision(gdf, service_type, method, selection_range * 2)
        return gdf