In [367]:
import geopandas as gpd
import os
import pandas as pd
import osmnx as ox
import math
import pyproj
import numpy as np
from shapely.geometry import Polygon

In [368]:
# функция возвращает utm по географическим координатам (широта, долгота)
def convert_wgs_to_utm(lat: float, lon: float):
    """
    Возвращает наиболее подходящую проекцию UTM по принимаемым координатам широты и долготы.

    Принимает
    ----------
    lat: float      координата широты.
    lon: float      координата долготы.

    Возвращает
    -------
    CRS             объект класса CRS модуля pyproj.
    """
    utm_band = str((math.floor((lon + 180) / 6) % 60) + 1)
    if len(utm_band) == 1:
        utm_band = '0' + utm_band
    if lat >= 0:
        epsg_code = '326' + utm_band
    else:
        epsg_code = '327' + utm_band
    prop_crs = pyproj.CRS.from_epsg(epsg_code)
    return prop_crs

In [369]:
class GeoFrame:
    default_name = 'gdf'
    default_dir_path = '.'
    
    def __init__(self, gdf=gpd.GeoDataFrame()):
        self.gdf = gdf
    
    @property
    def lat(self, column_name='lat'):
        if column_name in self.gdf.columns:
            lat = self.gdf[column_name].mean()
        else:
            lat = self.gdf.geometry.centroid.y.mean()
        
        return lat
    
    @property
    def lon(self, column_name='lon'):
        if column_name in self.gdf.columns:
            lon = self.gdf[column_name].mean()
        else:
            lon = self.gdf.geometry.centroid.x.mean()
        
        return lon

    @classmethod
    def from_file(cls, file_path):
        gdf = gpd.read_file(file_path)
        gf_from_file = cls(gdf) 
        #gf_by_query.get_geoinfo()
        #gf_by_query.set_save_prop()
        
        return gf_from_file
    
    # метод позволяет выгружать геодатафрейм по названию
    @classmethod
    def by_query(cls, query):
        gdf = ox.geocode_to_gdf(query)
        gf_by_query = cls(gdf) 
        
        gf_by_query.get_geoinfo(query)
        
        gf_by_query.set_save_prop()
        
        return gf_by_query

    def get_geoinfo(self, query):
        lat, lon = ox.geocode(query)
        self.wgs_crs = self.gdf.crs
        if self.wgs_crs.is_geographic:
            self.utm_crs = convert_wgs_to_utm(lat, lon)
        
        return {
            'координаты': (lat, lon),
            'crs': self.wgs_crs.name,
            'utm crs': self.utm_crs.name
        }
        
    def set_save_prop(self):
        self.name = self.gdf.display_name[0].split(',')[0]
        self.key = self.gdf['class'][0]
        self.value = self.gdf['type'][0]
        self.dir_path = f'query/{self.key}/{self.value}/{self.__class__.default_dir_path}'
    
    def prepare_for_geojson(self):
        gdf = self.gdf
        for col in gdf.columns:
            non_nan_values = gdf[col].dropna()  # Исключаем NaN значения
            if not non_nan_values.empty:
                dtype = type(non_nan_values.iloc[0]).__name__
                if dtype == 'list':
                    gdf.drop(columns=col, inplace=True)
        return gdf
    
    # экспорт геодатафрейма в нужном формате
    def export(self, formate='geojson', *, name=None, dir_path=None):
        """
        Экспортирует объект класса geopandas.GeoDataFrame в файл указанного формата.

        Принимает
        ----------
        frame           объект класса geopandas.GeoDataFrame.
        name: str       (опционально) имя файла; по умолчанию gdf.
        dir_path: str   (опционально) путь, если файл нужно разместить в определённой папке; создаёт папку, если её нет.
        formate: str    (опционально) формат файла; по умолчанию csv.
        """
        
        try:
            self.name = name if name is not None else self.name
        except:
            self.name = self.__class__.default_name
            
        try:
            self.dir_path = dir_path if dir_path is not None else self.dir_path
        except:
            self.dir_path = self.__class__.default_dir_path
        
        os.makedirs(self.dir_path, exist_ok=True)
        file_path = f'{self.dir_path}/{self.name}.{formate}'
        export_gdf = self.gdf
        
        # непосредственно экспорт
        if formate == 'csv':
            export_gdf.to_csv(file_path, encoding='utf_8_sig')
        elif formate == 'geojson':
            export_gdf = self.prepare_for_geojson()
            export_gdf.to_file(file_path, 'GeoJSON')

In [370]:
class Place(GeoFrame):
    default_dir_path = 'place'
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
    @classmethod
    def by_query(cls, query):
        gf_by_query = super().by_query(query)
        gdf = ox.features_from_place(query, {gf_by_query.key: gf_by_query.value})
        gdf = gdf.loc[[(gf_by_query.gdf['osm_type'][0], gf_by_query.gdf['osm_id'][0])]]
        gf_by_query.gdf = gdf
        return gf_by_query

In [371]:
class Buildings(GeoFrame):
    default_dir_path = 'buildings'
    
    default_building_types = {
        'accommodation': [
            'apartments', 'barracks', 'bungalow', 'cabin',
            'detached', 'dormitory', 'farm', 'ger', 'hotel',
            'house', 'houseboat', 'residential', 'semidetached_house',
            'static_caravan', 'stilt_house', 'terrace', 'tree_house'
        ]
    }
    def __init__(self, *args, **kwargs):
        super().__init__(*args,**kwargs)

    # функция получения геометрии требуемых типов зданий (по умолчанию выводятся все здания)
    @classmethod
    def by_query(cls, query, types=True):
        gf_by_query = super().by_query(query)
        gdf = ox.features_from_place(query, {'building': types})
        gdf.sort_index(inplace=True)
        if 'node' in set(gdf.index.get_level_values(0).to_list()):
            gdf.drop('node', inplace=True)
        
        gf_by_query.gdf = gdf
        return gf_by_query
    
    def update_levels(self):
        # делаю параметр этажности числовым
        self.gdf['building:levels'] = pd.to_numeric(self.gdf['building:levels'], errors='coerce')
        
        # в осм не дана этажность всех зданий, поэтому как костыль берётся 
        # средняя этажность по каждому типу зданий, которая ставится вместо NaN
        # если для типа зданий этажности нет, удаляю
        for i in self.gdf['building'].unique():
            try:
                mean_level = self.gdf[self.gdf['building'] == i]['building:levels'].mean()
                mean_level = int(mean_level)
            except ValueError:
                self.gdf.drop(self.gdf[self.gdf['building'] == i].index,
                               inplace = True)
            
            self.gdf.loc[self.gdf['building'] == i, 'building:levels'] = self.gdf.loc[
                self.gdf['building'] == i, 'building:levels'].fillna(mean_level)
    
    def common_area(self):
        self.gdf['common_area'] = self.gdf.area * self.gdf['building:levels']
        return self.gdf['common_area']
    
    
    # функция подсчёта количества проживающих в зданиях людей
    # принимает словарь типов зданий, для которых считается количество людей
    # подсчёт ведётся по формуле относительно общей площади здания
    # по умолчанию считает людей во всех домах
    # если дан список, только в типах домов из списка
    # подразумевается, что считаются только жилые дома, хотя ограничений нет
    # если дома к этому шагу не сгенерированы, генерирует по заданному списку
    # если списка нет, генерирует все здания, считает все здания
    # присоединяет общую площадь и людей к self.buildings
    # выводит датасет с домами с площадью и людьми
    def count_people(self, types=True, reduce_factor=0.4, area_ratio=20):
        # выбираю нужные типы домов (жилые)
        if type(types) == list:
            buildings = self.gdf[self.gdf['building'].isin(types)]
        else:
            buildings = self.gdf
        
        # перевожу геометрию в нужную црс
        buildings = buildings.to_crs(self.utm_crs)
        
        # считаем население каждого жилого дома, по формуле
        # площадь дома * пониж. коэффициент / м2 на человека
        # в данном случае пониж. коэффициент = 0,4, 20м2 на человека
        buildings['living_area'] = self.common_area() * reduce_factor
        buildings['people'] = buildings['area'] / area_ratio
        buildings.people = buildings.people.round()

        # если в доме 0 чел. (слишком маленькая площадь), меняем на 1
        buildings.loc[buildings.people == 0, 'people'] = 1
         
        # добавляю значения в датасет класса
        self.gdf['living_area'] = buildings['living_area']
        self.gdf['people'] = buildings['people']

In [372]:
class Hex(GeoFrame):
    default_dir_path = 'hex'
    def __init__(self, *args, **kwargs):
        super().__init__(*args,**kwargs)
        
    # строит сетку. размер по умолчанию ~537 метров
    # это расстояние от центра гексагона до его стороны (радиус впис. окр-ти)
    # получается, что диаметр в районе километра
    # можно задать свой радиус при желании
    @classmethod
    def by_query(cls, query, size=np.sqrt(500000 / np.sqrt(3)), *, top='flat'):
        """size - это радиус вписаной в гексагон окружности, то есть
        расстояние от центра до стороны гексагона
        по умолчанию size стоит такое, что одна ячейка по площади равна 1 км2"""
        
        gf_by_query = super().by_query(query)

        if top not in ('flat', 'point'):
            raise ValueError("The 'top' parameter can only take the values "
                             "'flat' or 'point'. The default setting is 'flat'.")

        # переводим в метровую crs
        gdf = gf_by_query.gdf.to_crs(gf_by_query.utm_crs)

        # определяем границы сетки как экстент по городу
        xmin, ymin, xmax, ymax = gdf.bounds.iloc[0]

        # создаём точки центров гексагонов
        # для этого сначала нужно создать прямоугольную сетку точек
        if top == 'flat':
            x, y = np.meshgrid(np.arange(xmin, xmax, size * np.sqrt(3)),
                               np.arange(ymin, ymax, size))
        elif top == 'point':
            x, y = np.meshgrid(np.arange(xmin, xmax, size),
                               np.arange(ymin, ymax, size * np.sqrt(3)))
        points = np.dstack((x, y))

        # чтобы сетка стала гексагональной, удаляем точки,
        # которые не "попадают" в центры гексагонов 
        condition = np.sum(np.indices(points.shape[:2]), axis=0) % 2 == 0
        points = points[condition].tolist()

        # создаём вокруг точек точки вершин шестиугольников
        if top == 'flat':
            angles = [60 * i for i in range(6)]
        elif top == 'point':
            angles = [60 * i + 30 for i in range(6)]

        side_length = 2 * size / math.sqrt(3)
        hexagones = [
            [(point[0] + side_length * math.cos(math.radians(angle)),
              point[1] + side_length * math.sin(math.radians(angle)))
             for angle in angles] for point in points
        ]

        # создаём полигоны шестиугольников
        hexagones = [Polygon(hexagon) for hexagon in hexagones]

        # создаём геодатасекс
        gdf = gpd.GeoDataFrame(geometry=hexagones, crs=gf_by_query.utm_crs)

        # заношу сетку в класс, переводя в изначальную crs
        gf_by_query.gdf = gdf.to_crs(gf_by_query.wgs_crs)
        
        return gf_by_query

In [None]:
# подсчёт числа людей в каждой ячейке
def density_by_hex(self):     
    
    # перевод в utm, так как геометрия в градусах плохо работает
    buildings = self.buildings.to_crs(self.prop_crs)
    gdf_hex = self.hex.to_crs(self.prop_crs)
    
    #пересечение сетки и домов:
    buildings.geometry=buildings.centroid
    gdf_hex = gpd.sjoin(gdf_hex, buildings, how='left')

    # аггрегирование данных сетки, получение итогового геодатафрейма GRID
    gdf_hex=gdf_hex[['geometry', 'people']]
    gdf_hex.replace(np.nan, 0, inplace=True)
    gdf_hex.reset_index(inplace=True)
    gdf_hex = gdf_hex.groupby(['index']).agg({'geometry':'first',
                                              'people':'sum'})
    gdf_hex = gdf_hex.set_geometry('geometry')
    gdf_hex.crs = self.prop_crs
    
    # запись в класс, уже в wgs 84
    self.hex = gdf_hex.to_crs(self.scr_crs)
    
    # возврат в utm, на случай если пользователю понадобится
    return gdf_hex

In [381]:
a=Buildings.by_query('Тольятти')
a.export('geojson', name='test', dir_path='.')

In [374]:
a=Buildings.by_query('Ярустово')