In [3]:
import warnings
warnings.filterwarnings("ignore")

import os, json
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import mapping
import rasterio
from rasterio.mask import mask
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import BallTree


def auto_utm_crs(gdf):
    c = gdf.unary_union.centroid
    zone = int((c.x + 180)//6) + 1
    return f"EPSG:{32600 + zone if c.y >= 0 else 32700 + zone}"

def ensure_crs(gdf, epsg):
    if gdf.crs is None or gdf.crs.to_string()!=epsg:
        return gdf.to_crs(epsg)
    return gdf

def inv_dist_km(d_m):
    return 1.0 / (d_m/1000.0 + 1.0)

def to_0_100(arr):
    mn, mx = np.nanmin(arr), np.nanmax(arr)
    return np.zeros_like(arr) if np.isclose(mn, mx) else (arr-mn)/(mx-mn)*100.0

# --------- Education ---------
COUNTRY = "Ghana"
FAC      = "education"
ADM2     = "/Users/ruoxin/Downloads/gadm41_GHA_shp/gadm41_GHA_2.shp"
POI      = "/Users/ruoxin/Downloads/dissertation/qgis/Ghana/edu_reprojected.gpkg"
POP_TIF  = "/Users/ruoxin/Downloads/gha_ppp_2020_UNadj_constrained.tif"
OUT_DIR  = "./edu_outputs"

# 1) Loading CRS（UTM）
adm = gpd.read_file(ADM2)
for c in adm.columns:
    if c.lower() in ("name_2","name"):
        adm = adm.rename(columns={c:"NAME_2"})
        break
target_crs = "EPSG:32630"  # Ghana UTM 30N；如想自动则用：auto_utm_crs(adm)
adm = ensure_crs(adm, target_crs)
adm["area_km2"] = adm.geometry.area/1_000_000.0

poi = gpd.read_file(POI)
if poi.crs is None: poi = poi.set_crs(epsg=4326)
poi = ensure_crs(poi, target_crs)
poi = poi[poi.geometry.notna() & ~poi.geometry.is_empty].copy()

# 2) infrastructure count & density of area
joined = gpd.sjoin(poi, adm, how="left", predicate="within")
counts = joined.groupby("index_right").size()
adm[f"{FAC}_count"] = 0
adm.loc[counts.index, f"{FAC}_count"] = counts.values
adm[f"{FAC}_density_area"] = adm[f"{FAC}_count"]/adm["area_km2"]

# 3) The number of infrastructure per million person
pop_sum = []
with rasterio.open(POP_TIF) as src:
    pop_crs = src.crs
    for g in adm.geometry:
        g_pop = gpd.GeoSeries([g], crs=adm.crs).to_crs(pop_crs).iloc[0]
        out, _ = mask(src, [mapping(g_pop)], crop=True)
        arr = out[0]
        if src.nodata is not None:
            arr = np.where(arr==src.nodata, 0, arr)
        pop_sum.append(float(np.nansum(arr)))
adm["pop_sum"] = pop_sum
adm[f"{FAC}_density_pop"] = adm.apply(
    lambda r: r[f"{FAC}_count"]/(r["pop_sum"]/10000.0) if r["pop_sum"] and r["pop_sum"]>0 else 0, axis=1
)

# 4) Distance & Number
cent = adm.geometry.centroid
cent_xy = np.vstack([cent.x.values, cent.y.values]).T
if len(poi)==0:
    adm[f"nearest_{FAC}_dist"] = np.nan
    for R in (5000,10000,20000): adm[f"{FAC}_count_{R//1000}km"] = 0
    adm[f"{FAC}_avg_dist_top3"] = np.nan
else:
    poi_xy = np.vstack([[p.x,p.y] for p in poi.geometry])
    tree = BallTree(poi_xy, metric="euclidean")
    dists, _ = tree.query(cent_xy, k=min(10, len(poi_xy)))
    adm[f"nearest_{FAC}_dist"] = dists[:,0]
    for R in (5000,10000,20000):
        adm[f"{FAC}_count_{R//1000}km"] = (dists<=R).sum(axis=1)
    k3 = np.minimum(3, dists.shape[1])
    adm[f"{FAC}_avg_dist_top3"] = dists[:, :k3].mean(axis=1)

# 5) PCA(1) → education_index（0–100）
adm[f"{FAC}_inv_nearest"] = inv_dist_km(adm[f"nearest_{FAC}_dist"].values)
features = [
    f"{FAC}_density_area",
    f"{FAC}_density_pop",
    f"{FAC}_count_5km",
    f"{FAC}_count_10km",
    f"{FAC}_inv_nearest"
]
X = adm[features].fillna(0.0).values
Xs = StandardScaler().fit_transform(X)
pca = PCA(n_components=1, random_state=42)
scores = pca.fit_transform(Xs).flatten()
adm[f"{FAC}_index"] = to_0_100(scores)

# 6) Save
os.makedirs(OUT_DIR, exist_ok=True)
base = os.path.join(OUT_DIR, f"{COUNTRY.lower()}_{FAC}")
keep = ["NAME_2","area_km2","pop_sum",
        f"{FAC}_count", f"{FAC}_density_area", f"{FAC}_density_pop",
        f"nearest_{FAC}_dist", f"{FAC}_count_5km", f"{FAC}_count_10km",
        f"{FAC}_count_20km", f"{FAC}_avg_dist_top3", f"{FAC}_inv_nearest",
        f"{FAC}_index", "geometry"]
adm[keep].to_file(base + ".shp")
adm[keep[:-1]].to_csv(base + ".csv", index=False)  # 无 geometry
with open(base + "_pca.json","w",encoding="utf-8") as f:
    json.dump({
        "country": COUNTRY,
        "facility": FAC,
        "features": features,
        "explained_variance_ratio": float(pca.explained_variance_ratio_[0]),
        "components": dict(zip(features, pca.components_[0].tolist()))
    }, f, ensure_ascii=False, indent=2)

print("✓ Done:",
      "\n  ", base + ".shp",
      "\n  ", base + ".csv",
      "\n  ", base + "_pca.json")


✓ Done: 
   ./edu_outputs/ghana_education.shp 
   ./edu_outputs/ghana_education.csv 
   ./edu_outputs/ghana_education_pca.json


In [4]:
# run_ghana_health_index.py
import warnings
warnings.filterwarnings("ignore")

import os, json
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import mapping
import rasterio
from rasterio.mask import mask
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import BallTree

def auto_utm_crs(gdf):
    c = gdf.unary_union.centroid
    zone = int((c.x + 180)//6) + 1
    return f"EPSG:{32600 + zone if c.y >= 0 else 32700 + zone}"

def ensure_crs(gdf, epsg):
    if gdf.crs is None or gdf.crs.to_string()!=epsg:
        return gdf.to_crs(epsg)
    return gdf

def inv_dist_km(d_m):
    return 1.0 / (d_m/1000.0 + 1.0)

def to_0_100(arr):
    mn, mx = np.nanmin(arr), np.nanmax(arr)
    return np.zeros_like(arr) if np.isclose(mn, mx) else (arr-mn)/(mx-mn)*100.0

# --------- healthcare ---------
COUNTRY = "Ghana"
FAC      = "health"
ADM2     = "/Users/ruoxin/Downloads/gadm41_GHA_shp/gadm41_GHA_2.shp"
POI      = "/Users/ruoxin/Downloads/dissertation/qgis/Ghana/health_reproject.gpkg"
POP_TIF  = "/Users/ruoxin/Downloads/gha_ppp_2020_UNadj_constrained.tif"
OUT_DIR  = "./health_outputs"

# 1) Loading CRS（UTM）
adm = gpd.read_file(ADM2)
for c in adm.columns:
    if c.lower() in ("name_2","name"):
        adm = adm.rename(columns={c:"NAME_2"})
        break
target_crs = "EPSG:32630"  # Ghana UTM 30N；如想自动则用：auto_utm_crs(adm)
adm = ensure_crs(adm, target_crs)
adm["area_km2"] = adm.geometry.area/1_000_000.0

poi = gpd.read_file(POI)
if poi.crs is None: poi = poi.set_crs(epsg=4326)
poi = ensure_crs(poi, target_crs)
poi = poi[poi.geometry.notna() & ~poi.geometry.is_empty].copy()

# 2) infrastructure count & density of area
joined = gpd.sjoin(poi, adm, how="left", predicate="within")
counts = joined.groupby("index_right").size()
adm[f"{FAC}_count"] = 0
adm.loc[counts.index, f"{FAC}_count"] = counts.values
adm[f"{FAC}_density_area"] = adm[f"{FAC}_count"]/adm["area_km2"]

# 3) The number of infrastructure per million person
pop_sum = []
with rasterio.open(POP_TIF) as src:
    pop_crs = src.crs
    for g in adm.geometry:
        g_pop = gpd.GeoSeries([g], crs=adm.crs).to_crs(pop_crs).iloc[0]
        out, _ = mask(src, [mapping(g_pop)], crop=True)
        arr = out[0]
        if src.nodata is not None:
            arr = np.where(arr==src.nodata, 0, arr)
        pop_sum.append(float(np.nansum(arr)))
adm["pop_sum"] = pop_sum
adm[f"{FAC}_density_pop"] = adm.apply(
    lambda r: r[f"{FAC}_count"]/(r["pop_sum"]/10000.0) if r["pop_sum"] and r["pop_sum"]>0 else 0, axis=1
)

# 4) Distance & Number
cent = adm.geometry.centroid
cent_xy = np.vstack([cent.x.values, cent.y.values]).T
if len(poi)==0:
    adm[f"nearest_{FAC}_dist"] = np.nan
    for R in (5000,10000,20000): adm[f"{FAC}_count_{R//1000}km"] = 0
    adm[f"{FAC}_avg_dist_top3"] = np.nan
else:
    poi_xy = np.vstack([[p.x,p.y] for p in poi.geometry])
    tree = BallTree(poi_xy, metric="euclidean")
    dists, _ = tree.query(cent_xy, k=min(10, len(poi_xy)))
    adm[f"nearest_{FAC}_dist"] = dists[:,0]
    for R in (5000,10000,20000):
        adm[f"{FAC}_count_{R//1000}km"] = (dists<=R).sum(axis=1)
    k3 = np.minimum(3, dists.shape[1])
    adm[f"{FAC}_avg_dist_top3"] = dists[:, :k3].mean(axis=1)

# 5) PCA(1) → health_index（0–100）
adm[f"{FAC}_inv_nearest"] = inv_dist_km(adm[f"nearest_{FAC}_dist"].values)
features = [
    f"{FAC}_density_area",
    f"{FAC}_density_pop",
    f"{FAC}_count_5km",
    f"{FAC}_count_10km",
    f"{FAC}_inv_nearest"
]
X = adm[features].fillna(0.0).values
Xs = StandardScaler().fit_transform(X)
pca = PCA(n_components=1, random_state=42)
scores = pca.fit_transform(Xs).flatten()
adm[f"{FAC}_index"] = to_0_100(scores)

# 6) Save
os.makedirs(OUT_DIR, exist_ok=True)
base = os.path.join(OUT_DIR, f"{COUNTRY.lower()}_{FAC}")
keep = ["NAME_2","area_km2","pop_sum",
        f"{FAC}_count", f"{FAC}_density_area", f"{FAC}_density_pop",
        f"nearest_{FAC}_dist", f"{FAC}_count_5km", f"{FAC}_count_10km",
        f"{FAC}_count_20km", f"{FAC}_avg_dist_top3", f"{FAC}_inv_nearest",
        f"{FAC}_index", "geometry"]
adm[keep].to_file(base + ".shp")
adm[keep[:-1]].to_csv(base + ".csv", index=False)  # 无 geometry
with open(base + "_pca.json","w",encoding="utf-8") as f:
    json.dump({
        "country": COUNTRY,
        "facility": FAC,
        "features": features,
        "explained_variance_ratio": float(pca.explained_variance_ratio_[0]),
        "components": dict(zip(features, pca.components_[0].tolist()))
    }, f, ensure_ascii=False, indent=2)

print("✓ Done:",
      "\n  ", base + ".shp",
      "\n  ", base + ".csv",
      "\n  ", base + "_pca.json")


✓ Done: 
   ./health_outputs/ghana_health.shp 
   ./health_outputs/ghana_health.csv 
   ./health_outputs/ghana_health_pca.json


In [5]:
# run_ghana_shop_index.py
import warnings
warnings.filterwarnings("ignore")

import os, json
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import mapping
import rasterio
from rasterio.mask import mask
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import BallTree

def auto_utm_crs(gdf):
    c = gdf.unary_union.centroid
    zone = int((c.x + 180)//6) + 1
    return f"EPSG:{32600 + zone if c.y >= 0 else 32700 + zone}"

def ensure_crs(gdf, epsg):
    if gdf.crs is None or gdf.crs.to_string()!=epsg:
        return gdf.to_crs(epsg)
    return gdf

def inv_dist_km(d_m):
    return 1.0 / (d_m/1000.0 + 1.0)

def to_0_100(arr):
    mn, mx = np.nanmin(arr), np.nanmax(arr)
    return np.zeros_like(arr) if np.isclose(mn, mx) else (arr-mn)/(mx-mn)*100.0

# --------- Commerce ---------
COUNTRY = "Ghana"
FAC      = "shop"
ADM2     = "/Users/ruoxin/Downloads/gadm41_GHA_shp/gadm41_GHA_2.shp"
POI      = "/Users/ruoxin/Downloads/dissertation/qgis/Ghana/shop_reprojected.gpkg"
POP_TIF  = "/Users/ruoxin/Downloads/gha_ppp_2020_UNadj_constrained.tif"
OUT_DIR  = "./shop_outputs"

# 1) Loading CRS（UTM）
adm = gpd.read_file(ADM2)
for c in adm.columns:
    if c.lower() in ("name_2","name"):
        adm = adm.rename(columns={c:"NAME_2"})
        break
target_crs = "EPSG:32630"  # Ghana UTM 30N；如想自动则用：auto_utm_crs(adm)
adm = ensure_crs(adm, target_crs)
adm["area_km2"] = adm.geometry.area/1_000_000.0

poi = gpd.read_file(POI)
if poi.crs is None: poi = poi.set_crs(epsg=4326)
poi = ensure_crs(poi, target_crs)
poi = poi[poi.geometry.notna() & ~poi.geometry.is_empty].copy()

# 2) infrastructure count & density of area
joined = gpd.sjoin(poi, adm, how="left", predicate="within")
counts = joined.groupby("index_right").size()
adm[f"{FAC}_count"] = 0
adm.loc[counts.index, f"{FAC}_count"] = counts.values
adm[f"{FAC}_density_area"] = adm[f"{FAC}_count"]/adm["area_km2"]

# 3) The number of infrastructure per million person
pop_sum = []
with rasterio.open(POP_TIF) as src:
    pop_crs = src.crs
    for g in adm.geometry:
        g_pop = gpd.GeoSeries([g], crs=adm.crs).to_crs(pop_crs).iloc[0]
        out, _ = mask(src, [mapping(g_pop)], crop=True)
        arr = out[0]
        if src.nodata is not None:
            arr = np.where(arr==src.nodata, 0, arr)
        pop_sum.append(float(np.nansum(arr)))
adm["pop_sum"] = pop_sum
adm[f"{FAC}_density_pop"] = adm.apply(
    lambda r: r[f"{FAC}_count"]/(r["pop_sum"]/10000.0) if r["pop_sum"] and r["pop_sum"]>0 else 0, axis=1
)

# 4) Distance & Number
cent = adm.geometry.centroid
cent_xy = np.vstack([cent.x.values, cent.y.values]).T
if len(poi)==0:
    adm[f"nearest_{FAC}_dist"] = np.nan
    for R in (5000,10000,20000): adm[f"{FAC}_count_{R//1000}km"] = 0
    adm[f"{FAC}_avg_dist_top3"] = np.nan
else:
    poi_xy = np.vstack([[p.x,p.y] for p in poi.geometry])
    tree = BallTree(poi_xy, metric="euclidean")
    dists, _ = tree.query(cent_xy, k=min(10, len(poi_xy)))
    adm[f"nearest_{FAC}_dist"] = dists[:,0]
    for R in (5000,10000,20000):
        adm[f"{FAC}_count_{R//1000}km"] = (dists<=R).sum(axis=1)
    k3 = np.minimum(3, dists.shape[1])
    adm[f"{FAC}_avg_dist_top3"] = dists[:, :k3].mean(axis=1)

# 5) PCA(1) → shop_index（0–100）
adm[f"{FAC}_inv_nearest"] = inv_dist_km(adm[f"nearest_{FAC}_dist"].values)
features = [
    f"{FAC}_density_area",
    f"{FAC}_density_pop",
    f"{FAC}_count_5km",
    f"{FAC}_count_10km",
    f"{FAC}_inv_nearest"
]
X = adm[features].fillna(0.0).values
Xs = StandardScaler().fit_transform(X)
pca = PCA(n_components=1, random_state=42)
scores = pca.fit_transform(Xs).flatten()
adm[f"{FAC}_index"] = to_0_100(scores)

# 6) Save
os.makedirs(OUT_DIR, exist_ok=True)
base = os.path.join(OUT_DIR, f"{COUNTRY.lower()}_{FAC}")
keep = ["NAME_2","area_km2","pop_sum",
        f"{FAC}_count", f"{FAC}_density_area", f"{FAC}_density_pop",
        f"nearest_{FAC}_dist", f"{FAC}_count_5km", f"{FAC}_count_10km",
        f"{FAC}_count_20km", f"{FAC}_avg_dist_top3", f"{FAC}_inv_nearest",
        f"{FAC}_index", "geometry"]
adm[keep].to_file(base + ".shp")
adm[keep[:-1]].to_csv(base + ".csv", index=False)  # 无 geometry
with open(base + "_pca.json","w",encoding="utf-8") as f:
    json.dump({
        "country": COUNTRY,
        "facility": FAC,
        "features": features,
        "explained_variance_ratio": float(pca.explained_variance_ratio_[0]),
        "components": dict(zip(features, pca.components_[0].tolist()))
    }, f, ensure_ascii=False, indent=2)

print("✓ Done:",
      "\n  ", base + ".shp",
      "\n  ", base + ".csv",
      "\n  ", base + "_pca.json")


✓ Done: 
   ./shop_outputs/ghana_shop.shp 
   ./shop_outputs/ghana_shop.csv 
   ./shop_outputs/ghana_shop_pca.json


In [6]:
# run_ghana_publictransport_index.py
import warnings
warnings.filterwarnings("ignore")

import os, json
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import mapping
import rasterio
from rasterio.mask import mask
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import BallTree


def auto_utm_crs(gdf):
    c = gdf.unary_union.centroid
    zone = int((c.x + 180)//6) + 1
    return f"EPSG:{32600 + zone if c.y >= 0 else 32700 + zone}"

def ensure_crs(gdf, epsg):
    if gdf.crs is None or gdf.crs.to_string()!=epsg:
        return gdf.to_crs(epsg)
    return gdf

def inv_dist_km(d_m):
    return 1.0 / (d_m/1000.0 + 1.0)

def to_0_100(arr):
    mn, mx = np.nanmin(arr), np.nanmax(arr)
    return np.zeros_like(arr) if np.isclose(mn, mx) else (arr-mn)/(mx-mn)*100.0

# --------- public transport ---------
COUNTRY = "Ghana"
FAC      = "publictransport"
ADM2     = "/Users/ruoxin/Downloads/gadm41_GHA_shp/gadm41_GHA_2.shp"
POI      = "/Users/ruoxin/Downloads/dissertation/qgis/Ghana/pt_reprojected.gpkg"
POP_TIF  = "/Users/ruoxin/Downloads/gha_ppp_2020_UNadj_constrained.tif"
OUT_DIR  = "./pt_outputs"

# 1) loading CRS（UTM）
adm = gpd.read_file(ADM2)
for c in adm.columns:
    if c.lower() in ("name_2","name"):
        adm = adm.rename(columns={c:"NAME_2"})
        break
target_crs = "EPSG:32630"  # Ghana UTM 30N；如想自动则用：auto_utm_crs(adm)
adm = ensure_crs(adm, target_crs)
adm["area_km2"] = adm.geometry.area/1_000_000.0

poi = gpd.read_file(POI)
if poi.crs is None: poi = poi.set_crs(epsg=4326)
poi = ensure_crs(poi, target_crs)
poi = poi[poi.geometry.notna() & ~poi.geometry.is_empty].copy()

# 2) infrastructure count & density of area
joined = gpd.sjoin(poi, adm, how="left", predicate="within")
counts = joined.groupby("index_right").size()
adm[f"{FAC}_count"] = 0
adm.loc[counts.index, f"{FAC}_count"] = counts.values
adm[f"{FAC}_density_area"] = adm[f"{FAC}_count"]/adm["area_km2"]

# 3) The number of infrastructure per million person
pop_sum = []
with rasterio.open(POP_TIF) as src:
    pop_crs = src.crs
    for g in adm.geometry:
        g_pop = gpd.GeoSeries([g], crs=adm.crs).to_crs(pop_crs).iloc[0]
        out, _ = mask(src, [mapping(g_pop)], crop=True)
        arr = out[0]
        if src.nodata is not None:
            arr = np.where(arr==src.nodata, 0, arr)
        pop_sum.append(float(np.nansum(arr)))
adm["pop_sum"] = pop_sum
adm[f"{FAC}_density_pop"] = adm.apply(
    lambda r: r[f"{FAC}_count"]/(r["pop_sum"]/10000.0) if r["pop_sum"] and r["pop_sum"]>0 else 0, axis=1
)

# 4) Distance & Number
cent = adm.geometry.centroid
cent_xy = np.vstack([cent.x.values, cent.y.values]).T
if len(poi)==0:
    adm[f"nearest_{FAC}_dist"] = np.nan
    for R in (5000,10000,20000): adm[f"{FAC}_count_{R//1000}km"] = 0
    adm[f"{FAC}_avg_dist_top3"] = np.nan
else:
    poi_xy = np.vstack([[p.x,p.y] for p in poi.geometry])
    tree = BallTree(poi_xy, metric="euclidean")
    dists, _ = tree.query(cent_xy, k=min(10, len(poi_xy)))
    adm[f"nearest_{FAC}_dist"] = dists[:,0]
    for R in (5000,10000,20000):
        adm[f"{FAC}_count_{R//1000}km"] = (dists<=R).sum(axis=1)
    k3 = np.minimum(3, dists.shape[1])
    adm[f"{FAC}_avg_dist_top3"] = dists[:, :k3].mean(axis=1)

# 5) PCA(1) → publictransport_index（0–100）
adm[f"{FAC}_inv_nearest"] = inv_dist_km(adm[f"nearest_{FAC}_dist"].values)
features = [
    f"{FAC}_density_area",
    f"{FAC}_density_pop",
    f"{FAC}_count_5km",
    f"{FAC}_count_10km",
    f"{FAC}_inv_nearest"
]
X = adm[features].fillna(0.0).values
Xs = StandardScaler().fit_transform(X)
pca = PCA(n_components=1, random_state=42)
scores = pca.fit_transform(Xs).flatten()
adm[f"{FAC}_index"] = to_0_100(scores)

# 6) save
os.makedirs(OUT_DIR, exist_ok=True)
base = os.path.join(OUT_DIR, f"{COUNTRY.lower()}_{FAC}")
keep = ["NAME_2","area_km2","pop_sum",
        f"{FAC}_count", f"{FAC}_density_area", f"{FAC}_density_pop",
        f"nearest_{FAC}_dist", f"{FAC}_count_5km", f"{FAC}_count_10km",
        f"{FAC}_count_20km", f"{FAC}_avg_dist_top3", f"{FAC}_inv_nearest",
        f"{FAC}_index", "geometry"]
adm[keep].to_file(base + ".shp")
adm[keep[:-1]].to_csv(base + ".csv", index=False)  # 无 geometry
with open(base + "_pca.json","w",encoding="utf-8") as f:
    json.dump({
        "country": COUNTRY,
        "facility": FAC,
        "features": features,
        "explained_variance_ratio": float(pca.explained_variance_ratio_[0]),
        "components": dict(zip(features, pca.components_[0].tolist()))
    }, f, ensure_ascii=False, indent=2)

print("✓ Done:",
      "\n  ", base + ".shp",
      "\n  ", base + ".csv",
      "\n  ", base + "_pca.json")


✓ Done: 
   ./pt_outputs/ghana_publictransport.shp 
   ./pt_outputs/ghana_publictransport.csv 
   ./pt_outputs/ghana_publictransport_pca.json


In [7]:
# run_ghana_green_index.py
import warnings
warnings.filterwarnings("ignore")

import os, json
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import mapping
import rasterio
from rasterio.mask import mask
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import BallTree


def auto_utm_crs(gdf):
    c = gdf.unary_union.centroid
    zone = int((c.x + 180)//6) + 1
    return f"EPSG:{32600 + zone if c.y >= 0 else 32700 + zone}"

def ensure_crs(gdf, epsg):
    if gdf.crs is None or gdf.crs.to_string()!=epsg:
        return gdf.to_crs(epsg)
    return gdf

def inv_dist_km(d_m):
    return 1.0 / (d_m/1000.0 + 1.0)

def to_0_100(arr):
    mn, mx = np.nanmin(arr), np.nanmax(arr)
    return np.zeros_like(arr) if np.isclose(mn, mx) else (arr-mn)/(mx-mn)*100.0

# --------- Green Space ---------
COUNTRY = "Ghana"
FAC      = "green"
ADM2     = "/Users/ruoxin/Downloads/gadm41_GHA_shp/gadm41_GHA_2.shp"
POI      = "/Users/ruoxin/Downloads/dissertation/qgis/Ghana/green_reprojected.gpkg"
POP_TIF  = "/Users/ruoxin/Downloads/gha_ppp_2020_UNadj_constrained.tif"
OUT_DIR  = "./green_outputs"

# 1) loading CRS（UTM）
adm = gpd.read_file(ADM2)
for c in adm.columns:
    if c.lower() in ("name_2","name"):
        adm = adm.rename(columns={c:"NAME_2"})
        break
target_crs = "EPSG:32630"  # Ghana UTM 30N；如想自动则用：auto_utm_crs(adm)
adm = ensure_crs(adm, target_crs)
adm["area_km2"] = adm.geometry.area/1_000_000.0

poi = gpd.read_file(POI)
if poi.crs is None: poi = poi.set_crs(epsg=4326)
poi = ensure_crs(poi, target_crs)
poi = poi[poi.geometry.notna() & ~poi.geometry.is_empty].copy()

# 2) infrastructure count & density of area
joined = gpd.sjoin(poi, adm, how="left", predicate="within")
counts = joined.groupby("index_right").size()
adm[f"{FAC}_count"] = 0
adm.loc[counts.index, f"{FAC}_count"] = counts.values
adm[f"{FAC}_density_area"] = adm[f"{FAC}_count"]/adm["area_km2"]

# 3) The number of infrastructure per million person
pop_sum = []
with rasterio.open(POP_TIF) as src:
    pop_crs = src.crs
    for g in adm.geometry:
        g_pop = gpd.GeoSeries([g], crs=adm.crs).to_crs(pop_crs).iloc[0]
        out, _ = mask(src, [mapping(g_pop)], crop=True)
        arr = out[0]
        if src.nodata is not None:
            arr = np.where(arr==src.nodata, 0, arr)
        pop_sum.append(float(np.nansum(arr)))
adm["pop_sum"] = pop_sum
adm[f"{FAC}_density_pop"] = adm.apply(
    lambda r: r[f"{FAC}_count"]/(r["pop_sum"]/10000.0) if r["pop_sum"] and r["pop_sum"]>0 else 0, axis=1
)

# 4) Distance & Number
cent = adm.geometry.centroid
cent_xy = np.vstack([cent.x.values, cent.y.values]).T
if len(poi)==0:
    adm[f"nearest_{FAC}_dist"] = np.nan
    for R in (5000,10000,20000): adm[f"{FAC}_count_{R//1000}km"] = 0
    adm[f"{FAC}_avg_dist_top3"] = np.nan
else:
    poi_xy = np.vstack([[p.x,p.y] for p in poi.geometry])
    tree = BallTree(poi_xy, metric="euclidean")
    dists, _ = tree.query(cent_xy, k=min(10, len(poi_xy)))
    adm[f"nearest_{FAC}_dist"] = dists[:,0]
    for R in (5000,10000,20000):
        adm[f"{FAC}_count_{R//1000}km"] = (dists<=R).sum(axis=1)
    k3 = np.minimum(3, dists.shape[1])
    adm[f"{FAC}_avg_dist_top3"] = dists[:, :k3].mean(axis=1)

# 5) PCA(1) → green_index（0–100）
adm[f"{FAC}_inv_nearest"] = inv_dist_km(adm[f"nearest_{FAC}_dist"].values)
features = [
    f"{FAC}_density_area",
    f"{FAC}_density_pop",
    f"{FAC}_count_5km",
    f"{FAC}_count_10km",
    f"{FAC}_inv_nearest"
]
X = adm[features].fillna(0.0).values
Xs = StandardScaler().fit_transform(X)
pca = PCA(n_components=1, random_state=42)
scores = pca.fit_transform(Xs).flatten()
adm[f"{FAC}_index"] = to_0_100(scores)

# 6) Save
os.makedirs(OUT_DIR, exist_ok=True)
base = os.path.join(OUT_DIR, f"{COUNTRY.lower()}_{FAC}")
keep = ["NAME_2","area_km2","pop_sum",
        f"{FAC}_count", f"{FAC}_density_area", f"{FAC}_density_pop",
        f"nearest_{FAC}_dist", f"{FAC}_count_5km", f"{FAC}_count_10km",
        f"{FAC}_count_20km", f"{FAC}_avg_dist_top3", f"{FAC}_inv_nearest",
        f"{FAC}_index", "geometry"]
adm[keep].to_file(base + ".shp")
adm[keep[:-1]].to_csv(base + ".csv", index=False)  # 无 geometry
with open(base + "_pca.json","w",encoding="utf-8") as f:
    json.dump({
        "country": COUNTRY,
        "facility": FAC,
        "features": features,
        "explained_variance_ratio": float(pca.explained_variance_ratio_[0]),
        "components": dict(zip(features, pca.components_[0].tolist()))
    }, f, ensure_ascii=False, indent=2)

print("✓ Done:",
      "\n  ", base + ".shp",
      "\n  ", base + ".csv",
      "\n  ", base + "_pca.json")


✓ Done: 
   ./green_outputs/ghana_green.shp 
   ./green_outputs/ghana_green.csv 
   ./green_outputs/ghana_green_pca.json
