In [None]:
from pyrosm import OSM
from pyrosm import get_data
import osmnx as ox
import pandas as pd
import h3.api.numpy_int as h3 
import shapely
import matplotlib.pyplot as plt
import geopandas as gpd
from shapely.geometry import MultiPolygon
from shapely.geometry import Polygon
import contextily as cx


def flatten(lst):
    return [item for sublist in lst for item in (flatten(sublist) if isinstance(sublist, list) else [sublist])]

def swap_xy(geom):
    if geom.is_empty:
        return geom

    if geom.has_z:
        def swap_xy_coords(coords):
            for x, y, z in coords:
                yield (y, x, z)
    else:
        def swap_xy_coords(coords):
            for x, y in coords:
                yield (y, x)

    # Process coordinates from each supported geometry type
    if geom.type in ('Point', 'LineString', 'LinearRing'):
        return type(geom)(list(swap_xy_coords(geom.coords)))
    elif geom.type == 'Polygon':
        ring = geom.exterior
        shell = type(ring)(list(swap_xy_coords(ring.coords)))
        holes = list(geom.interiors)
        for pos, ring in enumerate(holes):
            holes[pos] = type(ring)(list(swap_xy_coords(ring.coords)))
        return type(geom)(shell, holes)
    elif geom.type.startswith('Multi') or geom.type == 'GeometryCollection':
        # Recursive call
        return type(geom)([swap_xy(part) for part in geom.geoms])
    else:
        raise ValueError('Type %r not recognized' % geom.type)

def h3_list_to_multi_poly(h3_list):
    h3_polygon = h3.h3_set_to_multi_polygon(h3_list)
    # for some reason you can't go straight to multiploly ////:
    return MultiPolygon([Polygon(p[0]) for p in h3_polygon])

def LineString_to_hex(line, H3_RES):
    l_coords = [x for x in line.coords]
    start = h3.geo_to_h3(l_coords[0][0], l_coords[0][1], H3_RES)
    end = h3.geo_to_h3(l_coords[-1][0], l_coords[-1][1], H3_RES)
    return h3.h3_line(start,end)

def all_shapley_geo_to_h3(obj, H3_RES):
    geom_type = obj.geom_type
    # assert geom_type valid at some point

    # shapely and h3 swap x and y:
    obj = swap_xy(obj)

    if geom_type=='MultiPolygon':
        # this will break in a different version of shapley, use this instead of iterating polys in multipoly: 
        # multi_poly.coords or .geoms
        return [ind for p in obj.geoms for ind in h3.polyfill(shapely.geometry.mapping(p), H3_RES)] # loop through polys and flatten
    elif geom_type=='Polygon':
        return h3.polyfill(shapely.geometry.mapping(obj), H3_RES)
    elif geom_type=='MultiLineString':
        # this will break in a different version of shapley, use this instead of iterating lines in multi_line: 
        # obj.coords or obj.geoms
        return [ind for l in obj.geoms for ind in LineString_to_hex(l,H3_RES)]
    elif geom_type=='LineString':
        return LineString_to_hex(obj, H3_RES)
    elif geom_type=='Point':
        return h3.geo_to_h3(obj.x, obj.y, H3_RES)
    else:
        print(f"unimplemented geom type: {geom_type}")
    

def plot_h3_and_geo(h3_index_list, shapely_geo):
    p = gpd.GeoSeries(shapely_geo)
    p2 = gpd.GeoSeries(h3_list_to_multi_poly(h3_index_list))
    gdf1 = gpd.GeoDataFrame(geometry=p)
    gdf2 = gpd.GeoDataFrame(geometry=p2)

    # Create a figure and axis
    fig, ax = plt.subplots()

    # Plot the GeoDataFrames on the axis
    gdf1.plot(ax=ax, color='blue', alpha=0.5)
    gdf2.plot(ax=ax, color='red', alpha=0.5)

    cx.add_basemap(ax = ax, crs="EPSG:4326")

    plt.show()


def osm_to_manual_category(tag, osm_tag_mapping):
    # faster than searching keys to try, except
    try:
        return osm_tag_mapping[tag]
    except:
        return tag

# some tags don't have headers, find manually in tags
def tag_conditions(tags, healthcare_list):
    # search for healthcare tag substring, return match
    s = [s for s in healthcare_list if s in tags]
    if s:
        return s[0]
    elif "sport" in tags:
        return "sport" 
    else:
        return None

def df_manipulations(pois, H3_RES, osm_filter, category_set, osm_tag_mapping):
    '''
    input: raw poi df direct from pyrosm
    output: df with columns h3_index and category
    '''

    pois["poi_type"] = pois["amenity"]
    pois["poi_type"] = pois["poi_type"].fillna(pois["shop"])
    pois["poi_type"] = pois["poi_type"].fillna(pois["leisure"])

    # some pois don't have a poi_type, find them in tags
    pois['tags'] = pois['tags'].astype(str)
    pois['no_header'] = pois.apply(lambda x: tag_conditions(x.tags, osm_filter['healthcare']), axis=1)
    pois["poi_type"] = pois["poi_type"].fillna(pois["no_header"])

    # convert poi_type to n minute city category
    pois['category'] = pois["poi_type"].apply(lambda x: osm_to_manual_category(x, osm_tag_mapping))

    # convert all geometry to h3
    h3_df = pois[['category','poi_type','geometry']].copy()
    h3_df['h3_index'] = pois.apply(lambda x: all_shapley_geo_to_h3(x.geometry, H3_RES), axis=1)
    # make one h3 index on each row
    h3_df = h3_df.explode('h3_index')
    # for some reason other categories are still in the df - not many - put earlier for efficiency
    h3_df = h3_df[h3_df['category'].isin(category_set)]
    # get rid of nans
    h3_df = h3_df[~h3_df['h3_index'].isna()]
    
    return h3_df[['h3_index','category']]

def get_pois_h3(pbf_path, osm_filter, H3_RES, category_set, osm_tag_mapping):
    osm = OSM(pbf_path)
    pois = osm.get_pois(custom_filter=osm_filter)
    return df_manipulations(pois, H3_RES, osm_filter, category_set, osm_tag_mapping)

In [None]:
pbf_path = "../resources/osm/Læsø Kommune_processed.osm.pbf"

# idea: EVERYONE can use it and live a healthy life with this as their only source
#check: doctors, clinic, health_food, food
#essential filter will be the least restrictive!
essential_filter = {
    "amenity":["pharmacy","dentist","clinic","doctors","school","library"],
    "shop":["supermarket","greengrocer","medical_supply","grocery","wholesale"],
    "healthcare":["clinic","doctor","pharmacy","dentist"],
    "leisure":["park","fitness_centre",'fitness_station'],
    "sport":['tennis','soccer','swimming_pool','sports_centre','pitch','track','golf_course','gymnastics','gym','fitness_centre','fitness_station']
    }

# parameters

H3_RES = 12

# n minute city must satisfy the following expression based on
# the existence of certain tags within n minutes:
category_set = set(['pharmacy','park','supermarket','sport','doctor','dentist','library'])

# easy use case which is debateably also essential
#education_filter = {"amenity":["school"]}

osm_tag_mapping = {
    "medical_supply":"pharmacy",
    "greengrocer":"supermarket",
    "wholesale":"supermarket",
    "grocery":"supermarket",
    "clinic":"doctor",
    "doctors":"doctor",
    'pitch':'sport',
    'track':'sport',
    'sports_centre':'sport'
}

# testing pyrosm bounding box

In [None]:
# testing all denmark and then cropping
pbf_path = "../resources/osm_unprocessed/denmark-latest.osm.pbf"
municipality = 'Randers Kommune'
bbox = [9.7, 56.37, 10.47, 56.6]

from ghsl_processing import city_boundaries_to_h3 
city_bounds_h3, bbox, bbox_pois = city_boundaries_to_h3(municipality)
osm = OSM(pbf_path, bounding_box=bbox)
pois = osm.get_pois(custom_filter=essential_filter)

In [None]:
# testing all denmark and then cropping
pbf_path = "../resources/osm_unprocessed/denmark-latest.osm.pbf"
municipality = 'Ærø Kommune'
bbox = [10.2, 54.8, 10.56, 54.97]

from ghsl_processing import city_boundaries_to_h3 
city_bounds_h3, bbox, bbox_pois = city_boundaries_to_h3(municipality)
osm = OSM(pbf_path, bounding_box=bbox)
pois = osm.get_pois(custom_filter=essential_filter)

In [None]:
# testing all denmark and then cropping
pbf_path = "../resources/osm/Ærø Kommune_processed.osm.pbf"
municipality = 'Ærø Kommune'
bbox = [10.2, 54.8, 10.56, 54.97]

from ghsl_processing import city_boundaries_to_h3 
city_bounds_h3, bbox, bbox_pois = city_boundaries_to_h3(municipality)
osm = OSM(pbf_path)
pois = osm.get_pois(custom_filter=essential_filter)

In [None]:
test = get_pois_h3(pbf_path, essential_filter, H3_RES, category_set, osm_tag_mapping)
test['category'].value_counts()

In [None]:
test = get_pois_h3(pbf_path, essential_filter, H3_RES, category_set, osm_tag_mapping)
test['category'].value_counts()

In [None]:
test['category'].value_counts()

# testing pyrosm bounding box

In [None]:
# testing all denmark and then cropping
pbf_path = "../resources/osm_unprocessed/denmark-latest.osm.pbf"
municipality = 'Randers Kommune'
bbox = [9.7, 56.37, 10.47, 56.6]

from ghsl_processing import city_boundaries_to_h3 
city_bounds_h3, bbox, bbox_pois = city_boundaries_to_h3(municipality)
osm = OSM(pbf_path, bounding_box=bbox)
pois = osm.get_pois(custom_filter=essential_filter)

In [None]:
# testing all denmark and then cropping
pbf_path = "../resources/osm_unprocessed/denmark-latest.osm.pbf"
municipality = 'Ærø Kommune'
bbox = [10.2, 54.8, 10.56, 54.97]

from ghsl_processing import city_boundaries_to_h3 
city_bounds_h3, bbox, bbox_pois = city_boundaries_to_h3(municipality)
osm = OSM(pbf_path, bounding_box=bbox)
pois = osm.get_pois(custom_filter=essential_filter)

In [None]:
# testing all denmark and then cropping
pbf_path = "../resources/osm/Ærø Kommune_processed.osm.pbf"
municipality = 'Ærø Kommune'
bbox = [10.2, 54.8, 10.56, 54.97]

from ghsl_processing import city_boundaries_to_h3 
city_bounds_h3, bbox, bbox_pois = city_boundaries_to_h3(municipality)
osm = OSM(pbf_path)
pois = osm.get_pois(custom_filter=essential_filter)

In [None]:
#from pois_to_h3 import get_pois_h3
df = get_pois_h3(pbf_path, essential_filter, H3_RES, category_set, osm_tag_mapping)

In [None]:
pois["poi_type"] = pois["amenity"]
pois["poi_type"] = pois["poi_type"].fillna(pois["shop"])
pois["poi_type"] = pois["poi_type"].fillna(pois["leisure"])

# some pois don't have a poi_type, find them in tags
pois['tags'] = pois['tags'].astype(str)
pois['no_header'] = pois.apply(lambda x: tag_conditions(x.tags, essential_filter['healthcare']), axis=1)
pois["poi_type"] = pois["poi_type"].fillna(pois["no_header"])

# convert poi_type to n minute city category
pois['category'] = pois["poi_type"].apply(lambda x: osm_to_manual_category(x, osm_tag_mapping))

# convert all geometry to h3
h3_df = pois[['category','poi_type','geometry']].copy()
h3_df.plot()

In [None]:
df['h3_index'] = pois.apply(lambda x: all_shapley_geo_to_h3(x.geometry, H3_RES), axis=1)
sample_df = h3_df.copy()
sample_df = sample_df.drop('geometry', axis=1)
sample_df = sample_df.sample(1000)
sample_df = sample_df['h3_index'].explode()
sample_df = sample_df[~sample_df.isna()]
sample_df = pd.DataFrame(list(sample_df.apply(h3.h3_to_geo)), columns=['y','x'])
# Convert the pandas DataFrame to a GeoPandas DataFrame with a Point geometry column
gdf = gpd.GeoDataFrame(sample_df, geometry=gpd.points_from_xy(sample_df.x, sample_df.y))

In [None]:
gdf.plot()

In [None]:
pois["poi_type"] = pois["amenity"]
pois["poi_type"] = pois["poi_type"].fillna(pois["shop"])
pois["poi_type"] = pois["poi_type"].fillna(pois["leisure"])

# some pois don't have a poi_type, find them in tags
pois['tags'] = pois['tags'].astype(str)
pois['no_header'] = pois.apply(lambda x: tag_conditions(x.tags, essential_filter['healthcare']), axis=1)
pois["poi_type"] = pois["poi_type"].fillna(pois["no_header"])

# convert poi_type to n minute city category
pois['category'] = pois["poi_type"].apply(lambda x: osm_to_manual_category(x, osm_tag_mapping))

# convert all geometry to h3
h3_df = pois[['category','poi_type','geometry']].copy()
h3_df['h3_index'] = pois.apply(lambda x: all_shapley_geo_to_h3(x.geometry, H3_RES), axis=1)
# make one h3 index on each row
h3_df = h3_df.explode('h3_index')
# for some reason other categories are still in the df - not many - put earlier for efficiency
h3_df = h3_df[h3_df['category'].isin(category_set)]
# get rid of nans
h3_df = h3_df[~h3_df['h3_index'].isna()]

In [None]:
fig, ax = plt.subplots(figsize=(10,10))
# Show the GeoPandas DataFrame
sample_df = h3_df[['geom_type','h3_index']].sample(1000)
sample_df[['y','x']] = list(sample_df['h3_index'].apply(h3.h3_to_geo))
# Convert the pandas DataFrame to a GeoPandas DataFrame with a Point geometry column
gdf = gpd.GeoDataFrame(sample_df, geometry=gpd.points_from_xy(sample_df.x, sample_df.y))
gdf.geometry = gdf.geometry.map(swap_xy)
gdf.plot(ax=ax, column='geom_type', categorical=True, legend=True, markersize=10, alpha=1)

# denmark
plt.xlim(12.391681,12.737388)
plt.ylim(55.549206, 55.759991)

# middle east
#plt.ylim(12.391681,12.737388)
#plt.xlim(55.549206, 55.759991)

cx.add_basemap(ax = ax, crs="EPSG:4326")

In [None]:
sample_df = df.sample(10000) #df.sample(100)
sample_df[['y','x']] = list(sample_df['h3_index'].apply(h3.h3_to_geo))
# Convert the pandas DataFrame to a GeoPandas DataFrame with a Point geometry column
gdf = gpd.GeoDataFrame(sample_df, geometry=gpd.points_from_xy(sample_df.x, sample_df.y))

fig, ax = plt.subplots(figsize=(10,10))
# Show the GeoPandas DataFrame
gdf.plot(ax=ax, column='category', categorical=True, legend=True, markersize=10, alpha=0.5)

# denmark
#plt.xlim(12.391681,12.737388)
#plt.ylim(55.549206, 55.759991)

# middle east
#plt.ylim(12.391681,12.737388)
#plt.xlim(55.549206, 55.759991)

cx.add_basemap(ax = ax, crs="EPSG:4326")

In [None]:
# tests
polygon = pois[pois['geom_type']=='Polygon']['geometry'].values[0]
multi_poly = pois[pois['geom_type']=='MultiPolygon']['geometry'].values[0]
point = pois[pois['geom_type']=='Point']['geometry'].values[0]
line = pois[pois['geom_type']=='LineString']['geometry'].values[0]
multi_line = pois[pois['geom_type']=='MultiLineString']['geometry'].values[0]

test_poly = all_shapley_geo_to_h3(polygon, H3_RES)
test_multi_poly = all_shapley_geo_to_h3(multi_poly, H3_RES)
test_point = all_shapley_geo_to_h3(point, H3_RES)
test_line = all_shapley_geo_to_h3(line, H3_RES)
# these are all parks
# in order to test this yo uneed to plot a scatterplot of all hex cell centers bc converting to polygon fills in the holes
test_multi_line = all_shapley_geo_to_h3(multi_line, H3_RES)

In [None]:
# misc filters
'''
check: marketplace, childcare, studio, food

work_filter = {"amenity":["coworking_space"]}

elderly_care = {"amenity":["nursing_home","retirement_home"]}

healthcare_filter = {"amenity": ["hospital","dentist","doctors","pharmacy","clinic"]}

sport_filter = {"amenity": ["climbing_wall","dojo"]}

culture_filter = {"amenity":["cinema","theatre","events_venue","arts_centre","place_of_worship","library","music_venue","community_centre","dancing_school","social_facility","gallery","social_centre","conference_centre","public_bath"]}

social_filter = {"amenity":["pub","bar","nightclub","cafe","internet_cafe"]}

food_filter = {"amenity":["restaurant","kitchen","food_court"],"shop":["supermarket","greengrocer","confectionery"]}

education_filter = {"amenity":["school","kindergarten","language_school"]}

'''

In [None]:
# By default pyrosm reads all elements having "amenity", "shop" or "tourism" tag
# Here, let's read only "amenity" and "shop" by applying a custom filter that
# overrides the default filtering mechanism
# https://wiki.openstreetmap.org/wiki/Map_features
# there is overlap, basically no need to use building tag
custom_filter = {"amenity": True, "shop": True, "healthcare": True, "leisure": True, "sport": True, "building": True}
custom_filter = {"amenity": ["doctors"]}
#custom_filter = {"amenity":["clinic"]}

#pois = osm.get_pois(custom_filter=custom_filter)

# Gather info about POI type (combines the tag info from "amenity" and "shop")
#pois["poi_type"] = pois["amenity"]
#pois["poi_type"] = pois["poi_type"].fillna(pois["shop"])

In [None]:
#pd.DataFrame(pois['poi_type'].value_counts()).to_csv("list_of_poi_types.csv")

In [None]:
h3_df

In [None]:
ax = h3_df.plot(column='category', markersize=3, figsize=(12,12), legend=True, legend_kwds=dict(loc='upper left', ncol=5, bbox_to_anchor=(1, 1)))

In [None]:
ax = h3_df.plot(column='category', markersize=3, figsize=(12,12), legend=True, legend_kwds=dict(loc='upper left', ncol=5, bbox_to_anchor=(1, 1)))

In [None]:
# Plot
ax = pois.plot(column='poi_type', markersize=3, figsize=(12,12), legend=True, legend_kwds=dict(loc='upper left', ncol=5, bbox_to_anchor=(1, 1)))