In [None]:
#Preprocessing
#conda create -n algeria_economic_monitoring python=3.11
#conda activate algeria_economic_monitoring
#conda install -c conda-forge geopandas shapely pyproj mercantile tqdm fastparquet pandas


In [1]:
import os
import glob
import json
import re
from itertools import product

import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, box, shape
import mercantile
from tqdm import tqdm

# CONFIG
PATH_DATA = "./ookle_algeria/"  # Ookla parquet root
OUTPUT_DIR = "./processed_data/"
os.makedirs(OUTPUT_DIR, exist_ok=True)

NET_TYPE = "mobile"  # Filter for mobile data only when necessary
ISO_CODE = "DZA"  # Country specification
YEAR_MIN, YEAR_MAX = 2019, 2025

# Boundaries
PATH_ADM0 = "./boundaries/geoBoundaries-DZA-ADM0.geojson"
PATH_ADM1 = "./boundaries/geoBoundaries-DZA-ADM1.geojson"
PATH_ADM2 = "./boundaries/geoBoundaries-DZA-ADM2.geojson"
PATH_ADM3 = "./boundaries/geoBoundaries-DZA-ADM3.geojson"

print("ALGERIA OOKLA DATA PREPROCESSING PIPELINE")

# UTILITIES

def tile_to_quadkey(x: int, y: int, z: int) -> str:
    qk = ""
    for i in range(z, 0, -1):
        digit = 0
        mask = 1 << (i - 1)
        if (x & mask) != 0:
            digit += 1
        if (y & mask) != 0:
            digit += 2
        qk += str(digit)
    return qk

def quadkey_to_point(quadkey: str):
    t = mercantile.quadkey_to_tile(quadkey)
    b = mercantile.bounds(t)
    return Point((b.west + b.east) / 2, (b.south + b.north) / 2)

def quadkey_to_polygon(quadkey: str):
    t = mercantile.quadkey_to_tile(quadkey)
    b = mercantile.bounds(t)
    return box(b.west, b.south, b.east, b.north)

def get_country_quadkeys_at_zoom(boundary_gdf: gpd.GeoDataFrame, zoom_level: int) -> list:
    """Return quadkeys whose TILE CENTROIDS fall inside the country (centroid-within)."""
    minx, miny, maxx, maxy = boundary_gdf.total_bounds
    tiles = list(mercantile.tiles(minx, miny, maxx, maxy, zoom_level))

    quadkeys, pts = [], []
    for t in tqdm(tiles, desc=f"  • Processing zoom {zoom_level} tiles"):
        qk = tile_to_quadkey(t.x, t.y, t.z)
        quadkeys.append(qk)
        pts.append(quadkey_to_point(qk))   # centroid

    gdf = gpd.GeoDataFrame({"quadkey": quadkeys, "geometry": pts}, crs="EPSG:4326")
    gdf_f = gpd.sjoin(gdf, boundary_gdf[["geometry"]], how="inner", predicate="within")
    return gdf_f["quadkey"].astype(str).tolist()

def get_subquadkeys(parent_quadkey: str, target_zoom: int) -> list:
    current_zoom = len(parent_quadkey)
    delta = target_zoom - current_zoom
    if delta < 0:
        raise ValueError("Target zoom must be >= parent zoom")
    if delta == 0:
        return [parent_quadkey]
    out = []
    for suffix in product("0123", repeat=delta):
        out.append(parent_quadkey + "".join(suffix))
    return out

def ensure_tile_polygon_gdf(df: pd.DataFrame) -> gpd.GeoDataFrame:
    if "quadkey" in df.columns:
        df = df.copy()
        df["geometry"] = df["quadkey"].astype(str).apply(quadkey_to_polygon)
    elif {"lat", "lon"}.issubset(df.columns):
        df = df.copy()
        df["geometry"] = [Point(xy) for xy in zip(df["lon"], df["lat"])]
    else:
        raise ValueError("No geometry source (quadkey or lat/lon) found in dataframe.")
    return gpd.GeoDataFrame(df, crs="EPSG:4326")

def _flatten_columns(df: pd.DataFrame) -> pd.DataFrame:
    new_cols = []
    for col in df.columns:
        if isinstance(col, tuple):
            a, b = col
            new_cols.append(f"{a}_{b}" if b else a)
        else:
            new_cols.append(col)
    df.columns = new_cols
    return df

def validate_ookla_data(df: pd.DataFrame, year: int, quarter: int) -> bool:
    """Validate Ookla parquet data before processing"""
    required_cols = ["avg_d_kbps", "avg_u_kbps"]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"{year}-Q{quarter}: Missing required columns: {missing}")

    if len(df) == 0:
        print(f"  ⚠ Warning: {year}-Q{quarter} has no rows")
        return False

    neg_down = (df["avg_d_kbps"] < 0).sum()
    neg_up = (df["avg_u_kbps"] < 0).sum()
    if neg_down > 0 or neg_up > 0:
        print(f"  ⚠ Warning: {year}-Q{quarter} has {neg_down} negative download and {neg_up} negative upload values")

    extreme_down = (df["avg_d_kbps"] > 1_000_000).sum()
    extreme_up = (df["avg_u_kbps"] > 1_000_000).sum()
    if extreme_down > 0 or extreme_up > 0:
        print(f"  ⚠ Warning: {year}-Q{quarter} has {extreme_down} extreme download and {extreme_up} extreme upload values (>1 Gbps)")

    if "avg_lat_ms" in df.columns:
        neg_lat = (df["avg_lat_ms"] < 0).sum()
        extreme_lat = (df["avg_lat_ms"] > 1000).sum()
        if neg_lat > 0:
            print(f"  ⚠ Warning: {year}-Q{quarter} has {neg_lat} negative latency values")
        if extreme_lat > 0:
            print(f"  ⚠ Warning: {year}-Q{quarter} has {extreme_lat} extreme latency values (>1000ms)")

    return True

def validate_boundaries(boundary_gdf: gpd.GeoDataFrame, level: str, required_cols: list) -> bool:
    """Validate boundary GeoDataFrame"""
    missing = [c for c in required_cols if c not in boundary_gdf.columns]
    if missing:
        raise ValueError(f"{level}: Missing required columns: {missing}")

    invalid = (~boundary_gdf.geometry.is_valid).sum()
    if invalid > 0:
        print(f"  ⚠ Warning: {level} has {invalid} invalid geometries (will be fixed with buffer(0))")

    return True

def read_geojson_no_gdal(path):
    with open(path, "r", encoding="utf-8") as f:
        gj = json.load(f)
    recs = []
    for feat in gj["features"]:
        props = feat.get("properties", {})
        geom = shape(feat["geometry"])
        recs.append({**props, "geometry": geom})
    return gpd.GeoDataFrame(recs, crs="EPSG:4326")

def aggregate_by_admin(
    gdf_meas: gpd.GeoDataFrame,
    admin_gdf: gpd.GeoDataFrame,
    id_col: str,
    name_col: str,
    admin_level_tag: str,
) -> pd.DataFrame:
    """
    Spatially join measurements to admin polygons (intersects) and aggregate.
    Keeps both the boundary 'code' (id_col) and UTF-8 name (name_col).
    """
    joined = gpd.sjoin(
        gdf_meas,
        admin_gdf[[id_col, name_col, "geometry"]],
        how="left",
        predicate="intersects",
    )

    has_tests = "tests" in joined.columns
    has_devices = "devices" in joined.columns
    has_latency = "avg_lat_ms" in joined.columns

    agg_dict = {
        "avg_d_kbps": ["mean", "median", "count"],
        "avg_u_kbps": ["mean", "median"],
    }
    if has_latency:
        agg_dict["avg_lat_ms"] = ["mean", "median"]
    if has_tests:
        agg_dict["tests"] = "sum"
    if has_devices:
        agg_dict["devices"] = "sum"

    out = joined.groupby([id_col, name_col]).agg(agg_dict).reset_index()
    out = _flatten_columns(out)

    rename_map = {
        "avg_d_kbps_mean": "avg_download_kbps",
        "avg_d_kbps_median": "median_download_kbps",
        "avg_d_kbps_count": "num_tiles",
        "avg_u_kbps_mean": "avg_upload_kbps",
        "avg_u_kbps_median": "median_upload_kbps",
        "tests_sum": "num_tests",
        "devices_sum": "num_devices",
        "avg_lat_ms_mean": "avg_latency_ms",
        "avg_lat_ms_median": "median_latency_ms",
    }
    out = out.rename(columns=rename_map)

    # Final tidy
    out["admin_level"] = admin_level_tag
    out["admin_code"] = out[id_col].astype(str)
    out["admin_name"] = out[name_col].astype(str)

    # Mbps conversions (download/upload only)
    if "avg_download_kbps" in out.columns:
        out["avg_download_mbps"] = out["avg_download_kbps"] / 1000.0
    if "avg_upload_kbps" in out.columns:
        out["avg_upload_mbps"] = out["avg_upload_kbps"] / 1000.0
    if "median_download_kbps" in out.columns:
        out["median_download_mbps"] = out["median_download_kbps"] / 1000.0
    if "median_upload_kbps" in out.columns:
        out["median_upload_mbps"] = out["median_upload_kbps"] / 1000.0

    keep = [
        "admin_level", "admin_code", "admin_name",
        "avg_download_mbps", "avg_upload_mbps",
        "median_download_mbps", "median_upload_mbps",
        "num_tiles",
    ]
    if "avg_latency_ms" in out.columns:
        keep += ["avg_latency_ms", "median_latency_ms"]
    if "num_tests" in out.columns:
        keep.append("num_tests")
    if "num_devices" in out.columns:
        keep.append("num_devices")

    return out[keep]

# STEP 1: LOAD BOUNDARIES

# Validate paths exist
if not os.path.exists(PATH_ADM0):
    raise FileNotFoundError(f"ADM0 boundary file not found: {PATH_ADM0}")
if not os.path.exists(PATH_ADM1):
    raise FileNotFoundError(f"ADM1 boundary file not found: {PATH_ADM1}")
if not os.path.exists(PATH_ADM2):
    raise FileNotFoundError(f"ADM2 boundary file not found: {PATH_ADM2}")
if not os.path.exists(PATH_ADM3):
    raise FileNotFoundError(f"ADM3 boundary file not found: {PATH_ADM3}")

boundary_national = read_geojson_no_gdal(PATH_ADM0).to_crs("EPSG:4326")
print("  Loaded national boundary (ADM0)")

boundary_adm1 = read_geojson_no_gdal(PATH_ADM1)[["shapeISO", "shapeName", "geometry"]].to_crs("EPSG:4326")
boundary_adm1["shapeName"] = boundary_adm1["shapeName"].astype(str)
validate_boundaries(boundary_adm1, "ADM1", ["shapeISO", "shapeName", "geometry"])
print(f"  Loaded ADM1: {len(boundary_adm1)} wilayas")

boundary_adm2 = read_geojson_no_gdal(PATH_ADM2)[["shapeISO", "shapeName", "geometry"]].to_crs("EPSG:4326")
boundary_adm2["shapeName"] = boundary_adm2["shapeName"].astype(str)
validate_boundaries(boundary_adm2, "ADM2", ["shapeISO", "shapeName", "geometry"])
print(f"  Loaded ADM2: {len(boundary_adm2)} districts")

boundary_adm3 = read_geojson_no_gdal(PATH_ADM3)[["shapeISO", "shapeName", "geometry"]].to_crs("EPSG:4326")
boundary_adm3["shapeName"] = boundary_adm3["shapeName"].astype(str)
validate_boundaries(boundary_adm3, "ADM3", ["shapeISO", "shapeName", "geometry"])
print(f"  Loaded ADM3: {len(boundary_adm3)} communes")

# Fix invalid geometries
for gdf_fix in [boundary_national, boundary_adm1, boundary_adm2, boundary_adm3]:
    if isinstance(gdf_fix, gpd.GeoDataFrame):
        gdf_fix.geometry = gdf_fix.buffer(0)

ALGERIA_BOUNDS = boundary_national.total_bounds

# STEP 2: QUADKEY FILTERS & Z12 GRID
print("\n[STEP 2] Generating quadkey filter (z10 → expand to z16) and z12 grid...")

parent_qk_z10 = get_country_quadkeys_at_zoom(boundary_national, zoom_level=10)

print("  • Expanding to zoom 16 quadkeys (Ookla native resolution)...")
all_qk_z16 = []
for pqk in tqdm(parent_qk_z10, desc="  • Expanding quadkeys"):
    all_qk_z16.extend(get_subquadkeys(pqk, 16))
print(f"  ✓ Generated {len(all_qk_z16):,} z16 quadkeys for Algeria")

pd.DataFrame({"quadkey": all_qk_z16, "country": ISO_CODE}).to_csv(
    os.path.join(OUTPUT_DIR, "algeria_quadkeys_z16.csv"), index=False, encoding="utf-8"
)
print(f"  ✓ Saved: {OUTPUT_DIR}algeria_quadkeys_z16.csv")

QK16_SET = set(all_qk_z16)

print("\n[STEP 2B] Creating z12 grid (polygons) for mapping...")
parent_qk_z12 = get_country_quadkeys_at_zoom(boundary_national, zoom_level=12)
gdf_grid_z12 = gpd.GeoDataFrame(
    {"quadkey": parent_qk_z12, "geometry": [quadkey_to_polygon(qk) for qk in parent_qk_z12]},
    crs="EPSG:4326",
)
gdf_grid_z12["quadkey"] = gdf_grid_z12["quadkey"].astype(str)
print(f"  ✓ Created {len(gdf_grid_z12)} z12 tiles")
# Save as CSV with WKT geometry (avoids pyarrow requirement)
_gz12 = gdf_grid_z12.copy()
_gz12["wkt"] = _gz12.geometry.apply(lambda g: g.wkt)
_gz12.drop(columns="geometry").to_csv(os.path.join(OUTPUT_DIR, "algeria_grid_z12_wkt.csv"), index=False, encoding="utf-8")
print(f"  ✓ Saved grid: {OUTPUT_DIR}algeria_grid_z12_wkt.csv")

print("\n[STEP 2C] Building tile→admin (ADM1, ADM2 & ADM3) lookup...")
# Use CENTROID-WITHIN for admin assignment to avoid border overlaps
gdf_grid_z12_centroids = gdf_grid_z12.copy()
gdf_grid_z12_centroids["geometry"] = gdf_grid_z12_centroids.geometry.centroid

adm1_ren = boundary_adm1.rename(columns={"shapeISO": "adm1_code", "shapeName": "adm1_name"})
adm2_ren = boundary_adm2.rename(columns={"shapeISO": "adm2_code", "shapeName": "adm2_name"})
adm3_ren = boundary_adm3.rename(columns={"shapeISO": "adm3_code", "shapeName": "adm3_name"})

lk = gpd.sjoin(
    gdf_grid_z12_centroids[["quadkey", "geometry"]],
    adm1_ren[["adm1_code", "adm1_name", "geometry"]],
    how="left",
    predicate="within"
).drop(columns="index_right")
lk = gpd.sjoin(
    lk,
    adm2_ren[["adm2_code", "adm2_name", "geometry"]],
    how="left",
    predicate="within"
).drop(columns="index_right")
lk = gpd.sjoin(
    lk,
    adm3_ren[["adm3_code", "adm3_name", "geometry"]],
    how="left",
    predicate="within"
).drop(columns=["index_right", "geometry"])

lk["quadkey_z12"] = lk["quadkey"].astype(str)
lookup = lk[["quadkey_z12", "adm1_code", "adm1_name", "adm2_code", "adm2_name", "adm3_code", "adm3_name"]]
lookup.to_csv(os.path.join(OUTPUT_DIR, "tile_admin_lookup_z12.csv"), index=False, encoding="utf-8")
print(f"  ✓ Saved: {OUTPUT_DIR}tile_admin_lookup_z12.csv")

# STEP 3: DISCOVER PARQUET FILES (2019–2025)

available = []
if not os.path.isdir(PATH_DATA):
    raise FileNotFoundError(f"Data directory not found: {PATH_DATA}")

# iterate only through directories and parse year with regex
for year_folder in sorted(d for d in os.listdir(PATH_DATA) if os.path.isdir(os.path.join(PATH_DATA, d))):
    if not year_folder.startswith("year="):
        continue
    m_year = re.search(r'year\s*=\s*(\d+)', year_folder)
    if not m_year:
        continue
    year = int(m_year.group(1))
    if year < YEAR_MIN or year > YEAR_MAX:
        continue

    year_path = os.path.join(PATH_DATA, year_folder)
    # iterate only through directories and parse quarter with regex
    for quarter_folder in sorted(d for d in os.listdir(year_path) if os.path.isdir(os.path.join(year_path, d))):
        if not quarter_folder.startswith("quarter="):
            continue
        m_quarter = re.search(r'quarter\s*=\s*(\d+)', quarter_folder)
        if not m_quarter:
            continue
        quarter = int(m_quarter.group(1))
        quarter_path = os.path.join(year_path, quarter_folder)
        files = sorted(glob.glob(os.path.join(quarter_path, "*.parquet")))
        if files:
            for fp in files:  # include all shards for accuracy
                available.append((year, quarter, fp))

print(f"  ✓ Found {len(available)} parquet file(s) within {YEAR_MIN}-{YEAR_MAX}")
if len(available) == 0:
    raise FileNotFoundError("No Ookla parquet files found for 2019–2025.")

# STEP 4: PROCESS & AGGREGATE

all_national = []
all_subnat = []
all_grid_long = []
all_grid_long_z16 = []

# Group files by period
from collections import defaultdict
by_period = defaultdict(list)
for year, quarter, fp in available:
    by_period[(year, quarter)].append(fp)

for (year, quarter), file_paths in sorted(by_period.items()):
    print("\n" + "-" * 72)
    print(f"Processing {year}-Q{quarter} ({len(file_paths)} file(s))")
    try:
        dfs = []
        for file_path in file_paths:
            dfi = pd.read_parquet(file_path, engine="fastparquet")
            dfs.append(dfi)
        df = pd.concat(dfs, ignore_index=True)
        print(f"  • Loaded: {len(df):,} rows; columns: {list(df.columns)}")

        # Validate data
        if not validate_ookla_data(df, year, quarter):
            continue

        # Mobile network filter (REQUIRED if column exists; otherwise assume mobile-only source)
        if "network" in df.columns:
            before = len(df)
            df = df[df["network"].astype(str).str.lower() == NET_TYPE.lower()].copy()
            print(f"  • Filtered by NET_TYPE='{NET_TYPE}': {before:,} → {len(df):,}")
            if len(df) == 0:
                print(f"  ⚠ No {NET_TYPE} data in this period; skipping.")
                continue
        else:
            print(f"  • 'network' column not found; proceeding (source assumed {NET_TYPE}).")

        # Algeria filter
        if "quadkey" in df.columns and len(QK16_SET) > 0:
            df["quadkey"] = df["quadkey"].astype(str)
            df_dza = df[df["quadkey"].isin(QK16_SET)].copy()
            print(f"  • Quadkey filter kept: {len(df_dza):,}")
        elif {"lat", "lon"}.issubset(df.columns):
            xmin, ymin, xmax, ymax = ALGERIA_BOUNDS
            df_dza = df[(df["lat"] >= ymin) & (df["lat"] <= ymax) &
                        (df["lon"] >= xmin) & (df["lon"] <= xmax)].copy()
            print(f"  • BBox filter kept: {len(df_dza):,}")
        else:
            print("  ✗ No 'quadkey' or 'lat/lon' columns; skipping period.")
            continue

        if len(df_dza) == 0:
            print("  ⚠ No Algeria records in this period; skipping.")
            continue

        # NATIONAL STATS
        nat = {
            "year": year,
            "quarter": quarter,
            "date": f"{year}-Q{quarter}",
            "avg_download_mbps": (df_dza["avg_d_kbps"].mean() / 1000.0),
            "avg_upload_mbps": (df_dza["avg_u_kbps"].mean() / 1000.0),
            "median_download_mbps": (df_dza["avg_d_kbps"].median() / 1000.0),
            "median_upload_mbps": (df_dza["avg_u_kbps"].median() / 1000.0),
            "avg_latency_ms": (df_dza["avg_lat_ms"].mean() if "avg_lat_ms" in df_dza.columns else None),
            "median_latency_ms": (df_dza["avg_lat_ms"].median() if "avg_lat_ms" in df_dza.columns else None),
            "num_tiles": len(df_dza),
            "num_tests": int(df_dza["tests"].sum()) if "tests" in df_dza.columns else None,
            "num_devices": int(df_dza["devices"].sum()) if "devices" in df_dza.columns else None,
        }
        if "tests" in df_dza.columns:
            w = df_dza["tests"].clip(lower=0)
            if w.sum() > 0:
                nat["wavg_download_mbps"] = (df_dza["avg_d_kbps"] / 1000.0 * w).sum() / w.sum()
                nat["wavg_upload_mbps"] = (df_dza["avg_u_kbps"] / 1000.0 * w).sum() / w.sum()
                if "avg_lat_ms" in df_dza.columns:
                    nat["wavg_latency_ms"] = (df_dza["avg_lat_ms"] * w).sum() / w.sum()
        all_national.append(nat)
        print(f"  ✓ National mean: {nat['avg_download_mbps']:.2f}↓ / {nat['avg_upload_mbps']:.2f}↑ Mbps")

        # SUBNATIONAL
        gdf_poly = ensure_tile_polygon_gdf(df_dza)

        adm1_stats = aggregate_by_admin(gdf_poly, boundary_adm1, "shapeISO", "shapeName", "ADM1")
        adm1_stats["year"] = year; adm1_stats["quarter"] = quarter; adm1_stats["date"] = f"{year}-Q{quarter}"
        all_subnat.append(adm1_stats); print(f"  ✓ ADM1 aggregated: {len(adm1_stats)} rows")

        adm2_stats = aggregate_by_admin(gdf_poly, boundary_adm2, "shapeISO", "shapeName", "ADM2")
        adm2_stats["year"] = year; adm2_stats["quarter"] = quarter; adm2_stats["date"] = f"{year}-Q{quarter}"
        all_subnat.append(adm2_stats); print(f"  ✓ ADM2 aggregated: {len(adm2_stats)} rows")

        adm3_stats = aggregate_by_admin(gdf_poly, boundary_adm3, "shapeISO", "shapeName", "ADM3")
        adm3_stats["year"] = year; adm3_stats["quarter"] = quarter; adm3_stats["date"] = f"{year}-Q{quarter}"
        all_subnat.append(adm3_stats); print(f"  ✓ ADM3 aggregated: {len(adm3_stats)} rows")

        # GRID z12
        if "quadkey" in df_dza.columns:
            df_dza["quadkey_z12"] = df_dza["quadkey"].astype(str).str[:12]
            agg_cols = {
                "avg_d_kbps": "mean",
                "avg_u_kbps": "mean",
            }
            if "avg_lat_ms" in df_dza.columns:
                agg_cols["avg_lat_ms"] = "mean"
            if "tests" in df_dza.columns:
                agg_cols["tests"] = "sum"

            grid_stats = df_dza.groupby("quadkey_z12").agg(agg_cols).reset_index()
            grid_stats["year"] = year
            grid_stats["quarter"] = quarter
            grid_stats["date"] = f"{year}-Q{quarter}"
            grid_stats["avg_download_mbps"] = grid_stats["avg_d_kbps"] / 1000.0
            grid_stats["avg_upload_mbps"] = grid_stats["avg_u_kbps"] / 1000.0

            keep_cols = ["quadkey_z12", "year", "quarter", "date",
                         "avg_download_mbps", "avg_upload_mbps"]
            if "avg_lat_ms" in grid_stats.columns:
                keep_cols.append("avg_lat_ms")
            if "tests" in df_dza.columns:
                keep_cols.append("tests")

            all_grid_long.append(grid_stats[keep_cols])
            print(f"  ✓ Grid z12 long rows added: {len(grid_stats)}")

        # GRID z16
        if "quadkey" in df_dza.columns:
            agg_cols16 = {"avg_d_kbps": "mean", "avg_u_kbps": "mean"}
            if "avg_lat_ms" in df_dza.columns:
                agg_cols16["avg_lat_ms"] = "mean"
            if "tests" in df_dza.columns:
                agg_cols16["tests"] = "sum"

            grid_stats_z16 = (
                df_dza.groupby(df_dza["quadkey"].astype(str))
                .agg(agg_cols16)
                .reset_index()
                .rename(columns={"quadkey": "quadkey_z16"})
            )
            grid_stats_z16["year"] = year
            grid_stats_z16["quarter"] = quarter
            grid_stats_z16["date"] = f"{year}-Q{quarter}"
            grid_stats_z16["avg_download_mbps"] = grid_stats_z16["avg_d_kbps"] / 1000.0
            grid_stats_z16["avg_upload_mbps"] = grid_stats_z16["avg_u_kbps"] / 1000.0

            keep_z16 = ["quadkey_z16", "year", "quarter", "date",
                        "avg_download_mbps", "avg_upload_mbps"]
            if "avg_lat_ms" in grid_stats_z16.columns:
                keep_z16.append("avg_lat_ms")
            if "tests" in df_dza.columns:
                keep_z16.append("tests")

            all_grid_long_z16.append(grid_stats_z16[keep_z16])
            print(f"  ✓ Grid z16 long rows added: {len(grid_stats_z16)}")

    except Exception as e:
        print(f"  ✗ Error processing {year}-Q{quarter}: {e}")
        import traceback; traceback.print_exc()
        continue

# STEP 5: SAVE OUTPUTS (UTF-8)

# National trends
if not all_national:
    raise ValueError("No national data processed; aborting save.")
df_national = pd.DataFrame(all_national).sort_values(["year", "quarter"]).reset_index(drop=True)
df_national.to_csv(os.path.join(OUTPUT_DIR, "algeria_national_trends_mobile.csv"), index=False, encoding="utf-8")
print(f"  ✓ National trends: {len(df_national)} periods → {OUTPUT_DIR}algeria_national_trends_mobile.csv")

# Subnational trends
if all_subnat:
    df_sub = pd.concat(all_subnat, ignore_index=True)
    df_sub = df_sub.sort_values(["admin_level", "admin_name", "year", "quarter"]).reset_index(drop=True)
    df_sub.to_csv(os.path.join(OUTPUT_DIR, "algeria_subnational_trends_mobile.csv"), index=False, encoding="utf-8")
    print(f"  ✓ Subnational trends: {len(df_sub)} rows → {OUTPUT_DIR}algeria_subnational_trends_mobile.csv")
else:
    df_sub = pd.DataFrame()
    print("  ⚠ No subnational aggregates to save.")

# Grid long (z12)
if all_grid_long:
    df_grid_long = pd.concat(all_grid_long, ignore_index=True)
    df_grid_long.to_csv(os.path.join(OUTPUT_DIR, "algeria_grid_data_long_z12_mobile.csv"), index=False, encoding="utf-8")
    print(f"  ✓ Grid long z12: {len(df_grid_long)} rows → {OUTPUT_DIR}algeria_grid_data_long_z12_mobile.csv")

    # z12 wide CSV with WKT geometry
    print("\n[STEP 5B] Building z12 wide GeoParquet for mapping...")
    dl_wide = df_grid_long.pivot(index="quadkey_z12", columns="date", values="avg_download_mbps")
    ul_wide = df_grid_long.pivot(index="quadkey_z12", columns="date", values="avg_upload_mbps")
    dl_wide.columns = [f"download_{c}" for c in dl_wide.columns]
    ul_wide.columns = [f"upload_{c}" for c in ul_wide.columns]

    parts = [dl_wide, ul_wide]
    if "avg_lat_ms" in df_grid_long.columns:
        lat_wide = df_grid_long.pivot(index="quadkey_z12", columns="date", values="avg_lat_ms")
        lat_wide.columns = [f"latency_{c}" for c in lat_wide.columns]
        parts.append(lat_wide)

    grid_wide = pd.concat(parts, axis=1)
    grid_wide["geometry"] = grid_wide.index.map(quadkey_to_polygon)
    gdf_grid_wide = gpd.GeoDataFrame(grid_wide, crs="EPSG:4326").reset_index(names="quadkey_z12")
    _gw = gdf_grid_wide.copy()
    _gw["wkt"] = _gw.geometry.apply(lambda g: g.wkt)
    _gw.drop(columns="geometry").to_csv(os.path.join(OUTPUT_DIR, "algeria_grid_timeseries_mobile_wkt.csv"),
                                        index=False, encoding="utf-8")
    print(f"  ✓ z12 time-series GeoParquet → {OUTPUT_DIR}algeria_grid_timeseries_mobile_wkt.csv")
else:
    print("  ⚠ No grid-long z12 data; skipped z12 wide/GeoPackage build.")

# Grid long (z16)
if all_grid_long_z16:
    df_grid_long_z16 = pd.concat(all_grid_long_z16, ignore_index=True)
    df_grid_long_z16.to_csv(os.path.join(OUTPUT_DIR, "algeria_grid_data_long_z16_mobile.csv"),
                            index=False, encoding="utf-8")
    print(f"  ✓ Grid long z16: {len(df_grid_long_z16)} rows → {OUTPUT_DIR}algeria_grid_data_long_z16_mobile.csv")

print("PREPROCESSING COMPLETE")


ALGERIA OOKLA DATA PREPROCESSING PIPELINE
  Loaded national boundary (ADM0)
  Loaded ADM1: 48 wilayas
  Loaded ADM2: 48 districts
  Loaded ADM3: 1540 communes

[STEP 2] Generating quadkey filter (z10 → expand to z16) and z12 grid...


  • Processing zoom 10 tiles: 100%|██████████| 3600/3600 [00:00<00:00, 21936.39it/s]


  • Expanding to zoom 16 quadkeys (Ookla native resolution)...


  • Expanding quadkeys: 100%|██████████| 1963/1963 [00:02<00:00, 695.99it/s]


  ✓ Generated 8,040,448 z16 quadkeys for Algeria
  ✓ Saved: ./processed_data/algeria_quadkeys_z16.csv

[STEP 2B] Creating z12 grid (polygons) for mapping...


  • Processing zoom 12 tiles: 100%|██████████| 55696/55696 [00:00<00:00, 66016.90it/s]


  ✓ Created 31390 z12 tiles
  ✓ Saved grid: ./processed_data/algeria_grid_z12_wkt.csv

[STEP 2C] Building tile→admin (ADM1, ADM2 & ADM3) lookup...



  gdf_grid_z12_centroids["geometry"] = gdf_grid_z12_centroids.geometry.centroid


  ✓ Saved: ./processed_data/tile_admin_lookup_z12.csv
  ✓ Found 26 parquet file(s) within 2019-2025

------------------------------------------------------------------------
Processing 2019-Q1 (1 file(s))
  • Loaded: 3,231,245 rows; columns: ['quadkey', 'tile', 'avg_d_kbps', 'avg_u_kbps', 'avg_lat_ms', 'tests', 'devices']
  • 'network' column not found; proceeding (source assumed mobile).
  • Quadkey filter kept: 12,204
  ✓ National mean: 6.27↓ / 4.81↑ Mbps
  ✓ ADM1 aggregated: 48 rows
  ✓ ADM2 aggregated: 47 rows
  ✓ ADM3 aggregated: 1135 rows
  ✓ Grid z12 long rows added: 1529
  ✓ Grid z16 long rows added: 12204

------------------------------------------------------------------------
Processing 2019-Q2 (1 file(s))
  • Loaded: 3,340,189 rows; columns: ['quadkey', 'tile', 'avg_d_kbps', 'avg_u_kbps', 'avg_lat_ms', 'tests', 'devices']
  • 'network' column not found; proceeding (source assumed mobile).
  • Quadkey filter kept: 11,255
  ✓ National mean: 6.96↓ / 5.51↑ Mbps
  ✓ ADM1 aggrega