In [None]:
import pandas as pd
import geopandas as gpd
import numpy as np
import shapely
from scipy.spatial.distance import pdist
from scipy.spatial import cKDTree
from sklearn.neighbors import BallTree
from pyproj import Geod 
from geopy.distance import geodesic

from collections import Counter
from tqdm import tqdm
from shapely import wkt
from shapely import wkb

geod = Geod(ellps="WGS84")

In [None]:
# Constants
EARTH_RADIUS = 6371  # Earth radius in km

In [None]:
df_K = pd.read_parquet(".parquet")
df_K["geometry"] = df_K["geometry"].apply(wkb.loads)
df_K = gpd.GeoDataFrame(df_K, geometry='geometry')


#df_K = gpd.GeoDataFrame(df_K, geometry=shapely.from_wkb(df_K['geometry']))
df_K.index = [i for i in range(len(df_K))]



### Perimiter to area ratio

In [None]:
df_K.head()


In [None]:

df_K['building_perimeter_in_meters_new'] = df_K["geometry"].apply(lambda g: geod.geometry_area_perimeter(g)[1]) #this extracts second value from the tuple which is perimeter

In [None]:
df_K['perimeter_to_area_ratio'] = df_K['building_perimeter_in_meters_new'] / df_K['area_in_meters']
df_K

In [None]:
df_K['perimeter_to_area_ratio'] = df_K['perimeter_to_area_ratio'].clip(upper=6.5)
print(df_K['perimeter_to_area_ratio'].describe())

In [None]:
df_K['perimeter_to_area_ratio'].max()

In [None]:
df_K['normalized_perimeter_to_area_ratio'] = df_K['perimeter_to_area_ratio'] / df_K['perimeter_to_area_ratio'].max()

### Radius calculation

df_K['geo_original']=df_K['geometry']

In [None]:
df_K.set_crs("EPSG:4326", inplace=True, allow_override=True)  
print("Current CRS:", df_K.crs)

df_K = df_K.to_crs("EPSG:7767") 

df_K["centroid"] = df_K.geometry.centroid

print(df_K[["geometry", "centroid"]].head())


In [None]:

def calculate_radius(geometry):
    
    geometry = geometry if geometry.type == 'Polygon' else geometry.convex_hull
    centroid = geometry.centroid
    boundary_points = np.array(geometry.exterior.coords)
    distances = np.linalg.norm(boundary_points - np.array([centroid.x, centroid.y]), axis=1)
    return np.mean(distances)

# Compute radius in meters
df_K["radius_m"] = df_K["geometry"].apply(calculate_radius)

# Convert back to WGS84 (if needed)
df_K = df_K.to_crs("EPSG:4326")

print(df_K[["centroid", "radius_m"]].head())


In [None]:
df_K["num_vertices"] = df_K["geometry"].apply(lambda x: len(x.exterior.coords) if x.type == 'Polygon' else sum(len(g.exterior.coords) for g in x.geoms))
# df_K["num_vertices"] = df_K["geometry"].apply(lambda x: len(x.exterior.coords))
df_K[["num_vertices"]].describe()

### Roads calculation

In [None]:
#file from extract_roads
final_gdf = gpd.read_parquet('Kenya.parquet')
final_gdf = final_gdf.set_crs("EPSG:4326")
final_gdf['road_index'] = [i for i in range(len(final_gdf))]

CHECKS

In [None]:
final_gdf

In [None]:
len(list(final_gdf.columns))

In [None]:
print(final_gdf.geometry.type.value_counts())

print("Empty geometries count:", final_gdf.geometry.is_empty.sum())

print("Unique highway values:", final_gdf['highway'].unique())

CALCULATIONS

In [None]:
#extraction of required columns
required_columns = {"highway", "geometry", "id",'width','oneway','junction','lanes','maxspeed','motorcar', 'road_index'}
final_gdf = final_gdf[[col for col in required_columns if col in final_gdf.columns]]

final_gdf

In [None]:
duplic=final_gdf[final_gdf.duplicated(keep=False)]
print(duplic.shape)

In [None]:
tolerance = 0.00001

final_gdf['geometry_simplified'] = final_gdf['geometry'].simplify(tolerance)
final_gdf

In [None]:
roads_categories = {
    1: ['motorway', 'trunk_link', 'motorway_link', 'trunk', 'primary', 'primary_link'],
    2: ['secondary', 'secondary_link',],
    3: ['tertiary', 'tertiary_link', ],
    4: ['residential', 'footway', 'service', 'unclassified','living_street','steps','path','track','pedestrian','cycleway','raceway','bridleway','construction','services','bus_stop','road','rest_area','yes','emergency_access_point','corridor','junction','proposed','minor']
    }

In [None]:
def explode_multilinestrings(gdf):
    """ Convert MultiLineStrings to separate LineStrings """
    gdf = gdf.explode(ignore_index=True)
    return gdf[gdf.geometry.type == 'LineString'] 

final_gdf = final_gdf[final_gdf.geometry.notnull()]
final_gdf = explode_multilinestrings(final_gdf)
final_gdf

In [None]:
projected_crs = "EPSG:3857"
final_gdf = final_gdf.to_crs(projected_crs)
df_K = df_K.to_crs("EPSG:3857")


In [None]:
df_K['centroid_x'] = df_K.geometry.apply(lambda g: g.centroid.xy[0][0])
df_K['centroid_y'] = df_K.geometry.apply(lambda g: g.centroid.xy[1][0])


In [None]:
def geodesic_distance(house_row, road_row):
    house_coords = (house_row['latitude'], house_row['longitude'])  
    road_coords = (road_row.geometry_centroid.y, road_row.geometry_centroid.x)  
    return geodesic(house_coords, road_coords).meters  # Returns distance in meters

In [None]:
def explode_road_geometry(df):
    
    road_rows = []
    for row_idx, row in df.to_dict(orient='index').items():
        
        for x, y in row['geometry'].coords:
            current_row = row.copy()
            current_row['geometry_centroid'] = shapely.Point(x, y)
            current_row['row_idx'] = row_idx
            road_rows.append(current_row)

    result_df = pd.DataFrame.from_dict(road_rows)
    result_df.index = [i for i in range(len(result_df))]    
    
    result_df['centroid_x'] = result_df.geometry.apply(lambda g: g.centroid.xy[0][0])
    result_df['centroid_y'] = result_df.geometry.apply(lambda g: g.centroid.xy[1][0]) 
    
    return result_df

In [None]:
category_bbox_size = {
    1: 5_000,
    2: 4_000, 
    3: 3_000,
    4: 2_000
}

In [None]:
df_K = df_K.sort_values(by='area_in_meters', ascending=True)
df_K.index = [i for i in range(len(df_K))]
df_K

In [None]:
for category, road_types in roads_categories.items():

    print(f'Processing road_types: {road_types}')
    
    filtered_roads_df = final_gdf[final_gdf['highway'].isin(road_types)]
    print(f'Unexploded road geometries amount: {len(filtered_roads_df)}')
    
    filtered_roads_df = explode_road_geometry(filtered_roads_df)
    
    print(f'Exploded road geometries amount: {len(filtered_roads_df)}')
    road_centroids = filtered_roads_df['geometry_centroid']  

    road_coords = np.array([(point.x, point.y) for point in road_centroids if not point.is_empty])
    
    if len(road_coords) == 0:
        raise ValueError("No valid road centroids found. Check road geometries!")

    road_tree = cKDTree(road_coords)

    house_coords = np.array(list(zip(df_K.centroid_x, df_K.centroid_y)))  

    distances, indices = road_tree.query(house_coords, k=1)
    
    distance_col_name = f'distance_to_{category}'
    road_type_col_name = f'nearest_road_type_{category}'
    
    df_K[road_type_col_name] = ''
    for building_idx, (distance, idx) in tqdm(enumerate(zip(distances, indices)), desc='Assigning roads & distances', total=len(distances)):
        
        df_K.loc[building_idx, distance_col_name] = float(distance)
        df_K.loc[building_idx, road_type_col_name] = filtered_roads_df.iloc[idx].highway

CHECKS

In [None]:
distance_columns = ['distance_to_1', 'distance_to_2', 'distance_to_3', 'distance_to_4']

df_K[distance_columns].describe()

In [None]:
for cat, limit in category_bbox_size.items():
    
    df_K[f'distance_to_{cat}'] = df_K[f'distance_to_{cat}'].clip(lower=0, upper=limit)

In [None]:
distance_columns = ['distance_to_1', 'distance_to_2', 'distance_to_3', 'distance_to_4']

df_K[distance_columns].describe()

In [None]:
df_K.columns

In [None]:
fixed_radius_by_category = {
    1: 500,
    2: 400,
    3: 300,
    4: 200,
}

### Density of roads

In [None]:
def explode_road_geometry_without_index(df):
    
    road_rows = []
    for row_idx, row in df.to_dict(orient='index').items():
        
        for x, y in row['geometry'].coords:
            current_row = row.copy()
            current_row['geometry_centroid'] = shapely.Point(x, y)
            current_row['row_idx'] = row_idx
            road_rows.append(current_row)

    result_df = pd.DataFrame.from_dict(road_rows)  
    
    result_df['centroid_x'] = result_df.geometry.apply(lambda g: g.centroid.xy[0][0])
    result_df['centroid_y'] = result_df.geometry.apply(lambda g: g.centroid.xy[1][0]) 
    
    return result_df

In [None]:
filtered_roads = explode_road_geometry_without_index(final_gdf)

In [None]:
road_centroids = filtered_roads['geometry_centroid'] 
road_coords = np.array([(point.x, point.y) for point in road_centroids if not point.is_empty])
   
road_tree = cKDTree(road_coords)

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
fixed_radius_by_category = {
    1: 500,
    2: 400,
    3: 300,
    4: 200,
    5: 100,
}

In [None]:
roads_categories_ = {
    4: ['residential', 'footway', 'service', 'unclassified','living_street','steps','path','track','pedestrian','cycleway','raceway','bridleway','construction','services','bus_stop','road','rest_area','yes','emergency_access_point','corridor','junction','proposed','minor'],
    5: ['residential', 'footway', 'service', 'unclassified','living_street','steps','path','track','pedestrian','cycleway','raceway','bridleway','construction','services','bus_stop','road','rest_area','yes','emergency_access_point','corridor','junction','proposed','minor']
    }

In [None]:
def compute_road_density(road_tree, building_x, building_y, radius, category_roads_df):
    # Get nearby roads
    nearby_indices = road_tree.query_ball_point((building_x, building_y), radius)
    
    building_radius_polygon = shapely.Point(building_x, building_y).buffer(radius)
    
    if not nearby_indices:
        return 0  

    # Get total road length within radius
    
    road_idxs = list(set(filtered_roads.loc[nearby_indices].row_idx))
    
    filtered_roads_df = category_roads_df[category_roads_df.road_index.isin(road_idxs)].copy()
    
    filtered_roads_df['geometry'] = filtered_roads_df['geometry'].apply(lambda g: g.intersection(building_radius_polygon))
    total_road_length = filtered_roads_df.geometry.length.sum()
    
    # Compute buffer area
    buffer_area = np.pi * (radius ** 2)  # Circle area formula πr²
    
    # Compute density: road length per km²
    return (total_road_length / buffer_area) * 1e6  # Convert to km/km²

building_coords = np.array(list(zip(df_K.centroid_x, df_K.centroid_y)))

for category, road_types in roads_categories_.items():

    fixed_radius = fixed_radius_by_category[category]
    
    category_roads_df = final_gdf[final_gdf.highway.isin(road_types)]
    # df_K[f"road_density_for_{category}_fixed"] = df_K[['centroid_x', 'centroid_y']].apply(lambda row: compute_road_density(row.centroid_x, row.centroid_y, fixed_radius), axis=1)
    # [ for x, y in tqdm(building_coords, total=len(building_coords))]
#  Compute road density for all buildings (Vectorized, fast)
    df_K[f"road_density_for_{category}_fixed"] = [compute_road_density(road_tree, x, y, fixed_radius, category_roads_df) for x, y in tqdm(building_coords, total=len(building_coords), desc=f'Counting for fixed radius: {fixed_radius}')]

In [None]:
density_columns = ['road_density_for_4_fixed', 'road_density_for_5_fixed']

df_K[density_columns].describe()

SQN + NUMBER OF FACES

In [None]:

df_K["SQN"] = (4 * np.sqrt(df_K["area_in_meters"]) / df_K["building_perimeter_in_meters_new"])
df_K.columns

# #no of faces
df_K["faces"] = df_K['num_vertices'] - 1
df_K.loc[df_K["faces"] > 20, "faces"] = 20


SAVE

In [None]:
df_K.to_parquet(".parquet")