# Parcel Complementarity Model (PCM) — Student Workbook

## How to use this notebook
- Run cells **top → bottom** click on run icon at the top left of the cell or (Shift+Enter) after a click inside the cell.
- **Do not edit code.
- Inputs must come from an **Excel file** with the required sheets:
  - `parcels`
  - `complementarity_weights`
  - `visit_frequency_weights`
  - `distance_weights`

## What you will get
- Parcel and area-level PCI outputs.
- A suggested intervention set.
- Exported result files at the end.

**Recovery tip:** If anything breaks, restart the runtime and run again from the top.
**Note:** Closing the tab or browser will reset the runtime. Results and progress are only preserved within a single continuous session.


## Cell 1 — Configuration
**Goal:** Point the notebook to your Excel file and define the analysis radius.

**Action:**  Only run the cell by clicking on the top left area of the cell itself, not the screen window.

In [None]:
# Cell 1: Configuration "click here"

# Choose where you run
NOTEBOOK_MODE = "COLAB"   # "COLAB" or "LOCAL"

# If LOCAL, set your path here (ignored in COLAB)
LOCAL_EXCEL_PATH = "Tres Cantos for optimization Residential.xlsx"

# Expected sheet names (must match exactly)
EXPECTED_SHEETS = [
    "parcels",
    "complementarity_weights",
    "visit_frequency_weights",
    "distance_weights"
]

# Inter-parcel search radius (units MUST match your coordinates: you confirmed they are in meters)
MAX_DISTANCE = 1200

# Distance bins (meters) used in distance_weights sheet
DISTANCE_BINS = [(0, 400), (400, 800), (800, 1200)]

# Reproducibility (GA uses randomness)
RANDOM_SEED = 42

print("✅ Cell 1 OK")
print("Mode:", NOTEBOOK_MODE)
print("Max distance:", MAX_DISTANCE)
print("Expected sheets:", EXPECTED_SHEETS)


## Cell 2 — Imports and setup
**Goal:** Load required Python packages and prepare the runtime.

**Action:** Just run it. If an installation step appears, let it finish and then continue.


In [None]:
# Cell 2: Imports + installs  "click here"

import os
import math
import random
import time
from typing import Dict, List, Tuple, Any

import numpy as np
import pandas as pd

# KD-tree
from scipy.spatial import cKDTree

# Plotting (for results later)
import matplotlib.pyplot as plt

# Colab-only utilities (upload/download)
if NOTEBOOK_MODE == "COLAB":
    try:
        from google.colab import files
    except Exception as e:
        raise RuntimeError("You set NOTEBOOK_MODE='COLAB' but google.colab is not available.") from e

# Seed everything
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

print("✅ Cell 2 OK: imports loaded and seeds set.")


## Cell 3 — Provide the Excel file
**Goal:** Load your dataset into the notebook.

**Action:** upload the Excel file when prompted.

In [None]:
# Cell 3: Input Excel / Upload dataset file "click here"

if NOTEBOOK_MODE == "COLAB":
    uploaded = files.upload()
    if len(uploaded) == 0:
        raise ValueError("❌ No file uploaded. Please upload the Excel file.")
    EXCEL_PATH = list(uploaded.keys())[0]
    print("✅ Uploaded dataset file:", EXCEL_PATH)

elif NOTEBOOK_MODE == "LOCAL":
    EXCEL_PATH = LOCAL_EXCEL_PATH
    if not os.path.exists(EXCEL_PATH):
        raise FileNotFoundError(
            f"❌ Excel file not found: {EXCEL_PATH}\n"
            "Fix LOCAL_EXCEL_PATH in Cell 1 (use full path if needed)."
        )
    print("✅ Using local file:", EXCEL_PATH)

else:
    raise ValueError("❌ NOTEBOOK_MODE must be either 'COLAB' or 'LOCAL'.")


## Cell 4 — Load sheets and show a quick summary
**Goal:** Read the Excel tabs and show a sanity-check preview.

Check that:
- all required sheets are found (no “sheet not found” errors)
- the `parcels` table has expected columns and **non-zero** areas
- function columns are numeric (not text)
**Action:** Just run it.

In [None]:
# Cell 4: Load required sheets + show summary "click here"

xls = pd.ExcelFile(EXCEL_PATH)
found_sheets = xls.sheet_names

print("✅ Excel loaded")
print("Sheets found:", found_sheets)

missing = [s for s in EXPECTED_SHEETS if s not in found_sheets]
if missing:
    raise ValueError(
        "❌ Missing required sheet(s): " + ", ".join(missing) +
        "\nYour Excel must include these sheets exactly:\n" +
        "\n".join(EXPECTED_SHEETS)
    )

# Load the required sheets
parcels_df = pd.read_excel(EXCEL_PATH, sheet_name="parcels")
complementarity_weights_df = pd.read_excel(EXCEL_PATH, sheet_name="complementarity_weights")
visit_frequency_weights_df = pd.read_excel(EXCEL_PATH, sheet_name="visit_frequency_weights")
distance_weights_df = pd.read_excel(EXCEL_PATH, sheet_name="distance_weights")

def summarize_df(df: pd.DataFrame, name: str, preview_rows: int = 3):
    print("\n" + "=" * 80)
    print(f"Sheet: {name}")
    print(f"Shape: {df.shape}")
    print("Columns:")
    for c in df.columns:
        print("  -", c)
    print("\nPreview:")
    display(df.head(preview_rows))

summarize_df(parcels_df, "parcels")
summarize_df(complementarity_weights_df, "complementarity_weights")
summarize_df(visit_frequency_weights_df, "visit_frequency_weights")
summarize_df(distance_weights_df, "distance_weights")

print("\n✅ Cell 4 OK: all required sheets loaded successfully.")


## Cell 5 — Validate inputs
**Goal:** Catch common data problems early (missing columns, empty tables, wrong data types).

**Action:** Just run it.

If this cell raises an error:
1) fix the **Excel file** (not the code)
2) rerun starting from Cell 3


In [None]:
# Cell 5: Validate inputs  "click here"

import pandas as pd
from typing import List

def require_columns(df: pd.DataFrame, required: List[str], sheet_name: str):
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"❌ Sheet '{sheet_name}' is missing required columns: {missing}")

# ---- parcels sheet checks ----
PARCEL_REQUIRED = ["parcel_id", "longitude", "latitude", "parcel_total_functions_area"]
require_columns(parcels_df, PARCEL_REQUIRED, "parcels")

# Enforce "functions start at col 6" stability assumption
if parcels_df.shape[1] <= 6:
    raise ValueError("❌ 'parcels' sheet must have more than 6 columns (functions start at column index 6).")

BASE_COLS = list(parcels_df.columns[:6])
FUNCTION_COLS = list(parcels_df.columns[6:])

print("Base columns (first 6):", BASE_COLS)
print("Function columns (from index 6):", FUNCTION_COLS)

# Key field checks
if parcels_df["parcel_id"].isna().any():
    raise ValueError("❌ 'parcels.parcel_id' contains missing values.")
if parcels_df["parcel_id"].duplicated().any():
    dups = parcels_df.loc[parcels_df["parcel_id"].duplicated(), "parcel_id"].head(10).tolist()
    raise ValueError(f"❌ Duplicate parcel_id found (showing up to 10): {dups}")

for col in ["longitude", "latitude", "parcel_total_functions_area"]:
    if parcels_df[col].isna().any():
        raise ValueError(f"❌ 'parcels.{col}' contains missing values.")
    if not pd.api.types.is_numeric_dtype(parcels_df[col]):
        raise ValueError(f"❌ 'parcels.{col}' must be numeric. Current dtype: {parcels_df[col].dtype}")

# Function columns numeric + non-negative
bad_func_cols = []
for c in FUNCTION_COLS:
    if not pd.api.types.is_numeric_dtype(parcels_df[c]):
        bad_func_cols.append((c, "non-numeric"))
    else:
        if (parcels_df[c] < 0).any():
            bad_func_cols.append((c, "negative values"))

if bad_func_cols:
    raise ValueError(f"❌ Function columns issue(s): {bad_func_cols}")

# ---- complementarity_weights sheet checks ----
CW_REQUIRED = ["function_type_1", "function_type_2", "complementarity_weight"]
require_columns(complementarity_weights_df, CW_REQUIRED, "complementarity_weights")

if complementarity_weights_df[CW_REQUIRED].isna().any().any():
    raise ValueError("❌ 'complementarity_weights' has missing values in required columns.")

# ---- visit_frequency_weights sheet checks ----
# Standardise possible column-name variants into:
#   function_type, visit_frequency_weight
VF_REQUIRED = ["function_type", "visit_frequency_weight"]
VF_VARIANTS = [
    ["function_type", "visit_frequency_weight"],   # preferred
    ["function_type", "visit_frequency_weights"],  # common plural
    ["function_type", "visit_frequency"],          # short
    ["function_type", "weight"],                   # generic
]

# Confirm df exists
if "visit_frequency_weights_df" not in globals():
    raise ValueError("❌ visit_frequency_weights_df is not defined. Ensure you loaded sheet 'visit_frequency_weights'.")

# Detect & rename
matched = None
for cols in VF_VARIANTS:
    if all(c in visit_frequency_weights_df.columns for c in cols):
        matched = cols
        break

if matched is None:
    raise ValueError(
        "❌ Sheet 'visit_frequency_weights' must contain columns like:\n"
        "  - ['function_type', 'visit_frequency_weight'] (preferred)\n"
        "  - or one of these variants: "
        f"{VF_VARIANTS}"
    )

# Rename second column to visit_frequency_weight if needed
if matched[1] != "visit_frequency_weight":
    visit_frequency_weights_df = visit_frequency_weights_df.rename(
        columns={matched[1]: "visit_frequency_weight"}
    )
    print(f"ℹ️ Renamed '{matched[1]}' -> 'visit_frequency_weight' for internal consistency.")

# Final required columns check
require_columns(visit_frequency_weights_df, VF_REQUIRED, "visit_frequency_weights")

if visit_frequency_weights_df[VF_REQUIRED].isna().any().any():
    raise ValueError("❌ 'visit_frequency_weights' has missing values in required columns.")

# ---- distance_weights sheet checks ----
DW_REQUIRED = ["min_distance", "max_distance", "weight"]
require_columns(distance_weights_df, DW_REQUIRED, "distance_weights")

if distance_weights_df[DW_REQUIRED].isna().any().any():
    raise ValueError("❌ 'distance_weights' has missing values in required columns.")

# Check distance ranges are sensible
if not (distance_weights_df["min_distance"] < distance_weights_df["max_distance"]).all():
    raise ValueError("❌ 'distance_weights' has rows where min_distance >= max_distance.")

# Check max distance coverage includes MAX_DISTANCE
if distance_weights_df["max_distance"].max() < MAX_DISTANCE:
    raise ValueError(
        f"❌ 'distance_weights' does not cover MAX_DISTANCE={MAX_DISTANCE}. "
        f"Max max_distance in sheet is {distance_weights_df['max_distance'].max()}."
    )

print("\n✅ Cell 5 OK: Validation passed.")


## Cell 6 — Build fast lookup structures
**Goal:** Convert tables into efficient lookups so scoring is fast.

**Action:** Just run it.


In [None]:
# Cell 6: Build fast lookup structures "click here"

# --- parcels core arrays ---
parcels_df["parcel_id"] = parcels_df["parcel_id"].astype(str)

parcel_ids = parcels_df["parcel_id"].tolist()

# Coordinates are confirmed to be in meters
coordinates = parcels_df[["longitude", "latitude"]].to_numpy(dtype=float)

# Functions start at column index 6 (stable structure)
function_cols = list(parcels_df.columns[6:])
functions_matrix = parcels_df[function_cols].to_numpy(dtype=float)

# Total functions area (used in your normalisation)
parcel_total_functions_area = parcels_df["parcel_total_functions_area"].to_numpy(dtype=float)

N = len(parcel_ids)
M = len(function_cols)

print(f"Parcels: {N}, Functions: {M}")
print("First 5 function columns:", function_cols[:5])

# --- weights dicts ---
# Visit frequency (kept as B-only logic later, as you required)
visit_frequency_weights: Dict[str, float] = (
    visit_frequency_weights_df.set_index("function_type")["visit_frequency_weight"].to_dict()
)

# Complementarity weights (directed pair key)
complementarity_weights: Dict[Tuple[str, str], float] = (
    complementarity_weights_df.set_index(["function_type_1", "function_type_2"])["complementarity_weight"].to_dict()
)

# Distance weights table -> numpy array for fast scanning: [min, max, weight]
distance_weights = distance_weights_df[["min_distance", "max_distance", "weight"]].to_numpy(dtype=float)

# Quick diagnostics
print("\n✅ Weights loaded:")
print(" - visit_frequency_weights:", len(visit_frequency_weights))
print(" - complementarity_weights:", len(complementarity_weights))
print(" - distance_weights rows:", distance_weights.shape[0])

# Optional: check if all function names exist in visit_frequency_weights
missing_vf = [f for f in function_cols if f not in visit_frequency_weights]
if missing_vf:
    print("\n⚠️ Warning: These functions are missing in visit_frequency_weights (default=1 will be used later):")
    print(missing_vf)

print("\n✅ Cell 6 OK")


## Cell 7 — Spatial neighbourhood search (within MAX_DISTANCE)
**Goal:** Build “who is near who” relationships using parcel coordinates.
**Action:** Just run it.

Important:
- Coordinates are assumed to be in **meters** (projected CRS).  
  If your coordinates are degrees (lat/lon), distances will be wrong.
- Parcels beyond `MAX_DISTANCE` should effectively have **0 interaction** in later steps.


In [None]:
# Cell 7: KD-tree spatial query "click here"

kd_tree = cKDTree(coordinates)

# Build a fast distance-weight lookup function (uses your distance_weights sheet)
def get_distance_weight(dist: float, distance_weights_arr: np.ndarray) -> float:
    # distance_weights_arr rows: [min_distance, max_distance, weight]
    # returns 0.0 if out of range
    for mn, mx, w in distance_weights_arr:
        if mn <= dist < mx:
            return float(w)
    return 0.0

neighbors_idx: List[np.ndarray] = []
neighbors_dist_w: List[np.ndarray] = []

t0 = time.time()

for i in range(N):
    # Get all neighbors within radius (includes itself)
    idxs = kd_tree.query_ball_point(coordinates[i], r=MAX_DISTANCE)

    # Remove itself
    if i in idxs:
        idxs.remove(i)

    idxs_arr = np.array(idxs, dtype=int)

    if idxs_arr.size == 0:
        neighbors_idx.append(idxs_arr)
        neighbors_dist_w.append(np.array([], dtype=float))
        continue

    # Compute distances to each neighbor
    dists = np.linalg.norm(coordinates[idxs_arr] - coordinates[i], axis=1)

    # Convert distances to distance-weights
    w_arr = np.array([get_distance_weight(d, distance_weights) for d in dists], dtype=float)

    # Keep only those with non-zero weight (extra safety)
    keep = w_arr > 0
    neighbors_idx.append(idxs_arr[keep])
    neighbors_dist_w.append(w_arr[keep])

t1 = time.time()

# Diagnostics: how many neighbours per parcel (basic stats)
counts = np.array([len(x) for x in neighbors_idx], dtype=int)
print("✅ KD-tree neighbour lists built")
print(f"Time: {t1 - t0:.2f} sec")
print(f"Neighbours per parcel: min={counts.min()}, mean={counts.mean():.1f}, max={counts.max()}")

# Show a quick sample for the first parcel
if N > 0:
    sample_i = 0
    print("\nSample parcel:", parcel_ids[sample_i])
    print("Neighbour count:", len(neighbors_idx[sample_i]))
    if len(neighbors_idx[sample_i]) > 0:
        j = neighbors_idx[sample_i][0]
        print("First neighbour parcel_id:", parcel_ids[j])
        print("First neighbour distance weight:", neighbors_dist_w[sample_i][0])

print("\n✅ Cell 7 OK")


## Cell 8 — Parcel rules editor (READ CAREFULLY)
**Goal:** Define *what kinds of land-use changes are allowed* before the model evaluates or optimises anything.

Think of this as the **policy layer** of the notebook:
- It controls which functions can be proposed/added/modified.
- It can block unrealistic edits (e.g., changing everything everywhere).
- It keeps outputs consistent with how you want “feasible interventions” to behave.

### The workflow for this cell
1) **Run the cell as-is** (default rules are already set for the PCM workflow).  
2) Follow the interface instruction to add/delete targeted parcels for optimisaion*.
3) You may Add limits of Min/Max prefferd floor areas to certain functions, however it can be kept as OFF*.
4) You may Add FAR to certain functions, however it can be kept as OFF*.
5) You may Add as much as possible targeted vacant parcels to the optimisation process*.
6) You may Add as much as possible targeted built parcels to the optimisation process, however the process allow only to densify already present functions by add on floor areas, or add total new functions*.
7) You may NOT remove or deduct already allocated functions in built parcels*.



### Hidden / common pitfalls this cell protects you from
- **Accidentally allowing every function everywhere**, which creates “too-good-to-be-true” solutions.
- **Letting the optimiser add land uses that are not meaningful** for your study objective.
- **Allowing the model to treat “Undeveloped land” as a functional category** that competes with real uses.

### Mandatory rule for this study (Undeveloped land - READ CAREFULLY)
- Treat **Undeveloped land** as *non-programmatic* ground-level capacity.
- **Always exclude it from complementarity evaluation and optimisation decisions**  
  **unless** you have a clear, stated intent to keep a specific area empty at ground level (e.g., protected open space, plaza, buffer).
- If there is **no explicit intent** to preserve emptiness, assume undeveloped area is **available for intervention** and should **not** be counted as a land-use function.

> In short: Undeveloped land is a *constraint or reserve*, not a “use” — keep it out by selecting undeveloped-land as excluded for each targeted parcel for optimisation.


In [None]:
# Cell 8 : Parcel rules editor "click here"

import numpy as np
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, clear_output

# ---- Vacant parcels detection (same rule) ----
vacant_mask = np.isclose(functions_matrix, 0.0).all(axis=1)
parcel_index = {pid: i for i, pid in enumerate(parcel_ids)}

# ---- Explanations (shown to students) ----
EXPLAIN = widgets.HTML("""
<div style="border:1px solid #ddd; padding:10px; border-radius:8px;">
  <h3 style="margin:0 0 8px 0;">What these options mean</h3>
  <ul style="margin:0;">
    <li><b>Exclude functions</b>: functions that are <b>not allowed</b> to be assigned to this parcel during optimisation.</li>
    <li><b>Limits</b>: optional min/max area bounds for specific functions in this parcel. If Limits = YES, the optimiser must keep areas within your bounds.</li>
    <li><b>FAR</b>: feasibility rule using <code>parcel_ground_area</code>. If FAR = YES, then:
      <br><b>(total allocated floor area)</b> / <b>(parcel_ground_area)</b> ≤ <b>FAR_max</b>.
    </li>
  </ul>
</div>
""")

# ---- Global toggle ----
VACANT_ONLY_widget = widgets.Checkbox(value=True, description="Optimise vacant parcels only (all function areas = 0)")

# ---- Add parcel by typing ID ----
parcel_id_input = widgets.Text(
    value="",
    description="Parcel ID",
    placeholder="Type parcel_id then click Add",
    layout=widgets.Layout(width="420px")
)
add_btn = widgets.Button(description="Add parcel", button_style="success")

# ---- Batch actions ----
clear_all_btn = widgets.Button(description="Remove all parcels", button_style="danger")

# ---- Output areas ----
rows_box = widgets.VBox([])
status_out = widgets.Output()
rules_out = widgets.Output()

# ---- Helper: checkbox panel for excludes (with search) ----
def make_exclude_panel(initial_selected=None):
    if initial_selected is None:
        initial_selected = set()

    search = widgets.Text(value="", placeholder="Search functions...", description="Search", layout=widgets.Layout(width="420px"))

    checks_box = widgets.VBox([])
    scroll = widgets.Box([checks_box], layout=widgets.Layout(
        border="1px solid #ddd",
        height="180px",
        overflow_y="scroll",
        padding="6px",
        width="560px"
    ))

    all_checks = {f: widgets.Checkbox(value=(f in initial_selected), description=f, indent=False) for f in function_cols}

    def refresh():
        q = search.value.strip().lower()
        visible = []
        for f in function_cols:
            if q == "" or q in f.lower():
                visible.append(all_checks[f])
        checks_box.children = tuple(visible)

    search.observe(lambda ch: refresh(), names="value")
    refresh()

    def get_selected():
        return [f for f, cb in all_checks.items() if cb.value]

    return search, scroll, get_selected

# ---- One parcel row ----
def make_rule_row(pid: str):
    i = parcel_index[pid]
    is_vacant = bool(vacant_mask[i])

    title = widgets.HTML(f"<b>Parcel:</b> {pid} &nbsp; | &nbsp; <b>Vacant:</b> {is_vacant}")

    # Exclude checkboxes
    exclude_search, exclude_scroll, get_excluded = make_exclude_panel(initial_selected=set())

    # Limits
    limits_active = widgets.ToggleButtons(options=["NO", "YES"], value="NO", description="Limits?")

    limit_func = widgets.Dropdown(options=function_cols, description="Function", layout=widgets.Layout(width="360px"))
    limit_min = widgets.FloatText(value=0.0, description="Min", layout=widgets.Layout(width="180px"))
    limit_max = widgets.FloatText(value=0.0, description="Max", layout=widgets.Layout(width="180px"))
    add_limit_btn = widgets.Button(description="Add/Update limit", button_style="info")
    clear_limits_btn = widgets.Button(description="Clear limits", button_style="warning")
    limits_out = widgets.Output()

    limits_store = {}  # {func: {"min": x, "max": y}}

    def render_limits():
        with limits_out:
            clear_output()
            if not limits_store:
                print("No limits set.")
            else:
                df = pd.DataFrame(
                    [{"function": k, "min": v["min"], "max": v["max"]} for k, v in limits_store.items()]
                ).sort_values("function")
                display(df)

    def on_add_limit(_):
        f = limit_func.value
        limits_store[f] = {"min": float(limit_min.value), "max": float(limit_max.value)}
        render_limits()

    def on_clear_limits(_):
        limits_store.clear()
        render_limits()

    add_limit_btn.on_click(on_add_limit)
    clear_limits_btn.on_click(on_clear_limits)

    limits_box = widgets.VBox([
        widgets.HBox([limit_func, limit_min, limit_max, add_limit_btn, clear_limits_btn]),
        limits_out
    ])

    def toggle_limits_visibility(_=None):
        limits_box.layout.display = "none" if limits_active.value == "NO" else "block"

    limits_active.observe(lambda ch: toggle_limits_visibility(), names="value")
    toggle_limits_visibility()
    render_limits()

    # FAR
    far_active = widgets.ToggleButtons(options=["NO", "YES"], value="NO", description="FAR?")
    far_max = widgets.FloatText(value=1.0, description="FAR_max", layout=widgets.Layout(width="220px"))

    remove_btn = widgets.Button(description="Remove parcel", button_style="danger")

    row = widgets.VBox([
        widgets.HBox([title, limits_active, far_active, far_max, remove_btn]),
        widgets.HTML("<b>Exclude functions</b> (tick functions that are NOT allowed in this parcel):"),
        exclude_search,
        exclude_scroll,
        widgets.HTML("<b>Limits</b> (only used if Limits? = YES):"),
        limits_box
    ])

    # Attach accessors
    row._pid = pid
    row._is_vacant = is_vacant
    row._limits_active = limits_active
    row._limits_store = limits_store
    row._get_excluded = get_excluded
    row._far_active = far_active
    row._far_max = far_max
    row._remove_btn = remove_btn

    return row

# ---- Add parcel logic ----
def add_parcel(_=None):
    pid = parcel_id_input.value.strip()
    with status_out:
        clear_output()

        if pid == "":
            print("Type a parcel ID first.")
            return

        if pid not in parcel_index:
            print(f"❌ Parcel ID not found in parcels sheet: {pid}")
            return

        existing = [r._pid for r in rows_box.children] if rows_box.children else []
        if pid in existing:
            print(f"Parcel {pid} already added.")
            return

        r = make_rule_row(pid)

        def remove_this(_btn):
            current = list(rows_box.children)
            if r in current:
                current.remove(r)
                rows_box.children = tuple(current)

        r._remove_btn.on_click(remove_this)
        rows_box.children = tuple(list(rows_box.children) + [r])

        if VACANT_ONLY_widget.value and (not r._is_vacant):
            print(f"⚠️ Warning: Parcel {pid} is NOT vacant, but VACANT_ONLY=True. It will be ignored later unless you change VACANT_ONLY.")
        else:
            print(f"✅ Added parcel: {pid}")

add_btn.on_click(add_parcel)

# ---- Clear all ----
def clear_all(_):
    rows_box.children = tuple([])
    with status_out:
        clear_output()
        print("Cleared all parcels.")

clear_all_btn.on_click(clear_all)

# ---- Build rules + validation ----
build_btn = widgets.Button(description="Build rules", button_style="primary")

def build_rules(_=None):
    PARCEL_RULES_local = {}
    warnings = []

    vacant_only = bool(VACANT_ONLY_widget.value)

    for r in rows_box.children:
        pid = r._pid

        if vacant_only and (not r._is_vacant):
            warnings.append(f"Parcel {pid} is NOT vacant but VACANT_ONLY=True. It will be ignored later.")

        # Limits validation
        if r._limits_active.value == "YES":
            for f, lim in r._limits_store.items():
                if float(lim["max"]) < float(lim["min"]):
                    warnings.append(f"Parcel {pid} limit invalid for '{f}': max < min.")

        # FAR validation (only if active)
        if r._far_active.value == "YES":
            if r._far_max.value <= 0:
                warnings.append(f"Parcel {pid} FAR_max must be > 0.")

        PARCEL_RULES_local[pid] = {
            "exclude_functions": r._get_excluded(),
            "limits_active": r._limits_active.value,
            "limits": dict(r._limits_store),
            "far_active": r._far_active.value,
            "far_max": float(r._far_max.value),
        }

    with rules_out:
        clear_output()
        if warnings:
            print("⚠️ Validation warnings:")
            for w in warnings:
                print("-", w)
            print()
        print("✅ Rules built.")
        print("Parcels with specific rules:", len(PARCEL_RULES_local))
        display(PARCEL_RULES_local)

    # Export to globals used later
    global VACANT_ONLY, PARCEL_RULES
    VACANT_ONLY = vacant_only
    PARCEL_RULES = PARCEL_RULES_local

build_btn.on_click(build_rules)

# ---- Layout ----
ui = widgets.VBox([
    widgets.HTML("<h3>Parcel Rules Editor</h3>"),
    EXPLAIN,
    VACANT_ONLY_widget,
    widgets.HTML("<b>Add parcel by ID</b> (type parcel_id and click Add):"),
    widgets.HBox([parcel_id_input, add_btn, clear_all_btn]),
    widgets.HTML("<hr>"),
    widgets.HTML("<b>Selected parcels</b> (set rules below):"),
    rows_box,
    widgets.HBox([build_btn]),
    status_out,
    widgets.HTML("<hr>"),
    rules_out
])

display(ui)
print("✅ Cell 8 UI ready. Type parcel IDs, set options, then click 'Build rules'.")


## Cell 9 — Constraints and feasibility checks
**Goal:** Enforce feasibility constraints so solutions stay realistic.

Just run it.


In [None]:
# Cell 9: Constraint builder "click here"

# =========================
# 0) Global optimisation rule
# =========================
# Optimisation is allowed for ALL parcels, but it is ADDs-ONLY:
# You can add areas, but you cannot reduce what already exists.
ADD_ONLY = True

# ---- Required column for FAR ----
if "parcel_ground_area" not in parcels_df.columns:
    raise ValueError("❌ parcels sheet must include column 'parcel_ground_area' to use FAR logic.")

parcel_ground_area = parcels_df["parcel_ground_area"].to_numpy(dtype=float)

# ---- Baseline allocation (what exists already; cannot be reduced if ADD_ONLY=True) ----
# Uses your stability rule: function columns start at index 6
BASE_MATRIX = functions_matrix.copy()  # shape (N, M)

# =========================
# 1) Eligibility
# =========================
eligible_indices = np.arange(N, dtype=int)
eligible_parcel_ids = parcel_ids.copy()

print(f"✅ Eligible parcels: {len(eligible_parcel_ids)} / {N} (ALL parcels)")
print("Sample eligible parcel IDs:", eligible_parcel_ids[:10])

# =========================
# 2) Rule access helpers
# =========================
def get_rule(parcel_id: str) -> dict:
    """
    If parcel has no specific rule, return defaults (free optimisation).
    """
    r = PARCEL_RULES.get(parcel_id, None)
    if r is None:
        return {
            "exclude_functions": [],
            "limits_active": "NO",
            "limits": {},
            "far_active": "NO",
            "far_max": None
        }
    return r

def get_base_alloc(parcel_id: str) -> Dict[str, float]:
    """
    Returns baseline (existing) function areas for a parcel as dict.
    """
    i = parcel_index[parcel_id]
    base_row = BASE_MATRIX[i]  # length M
    return {function_cols[j]: float(base_row[j]) for j in range(M)}

# =========================
# 3) Feasibility checks
# =========================
def check_feasible(parcel_id: str, alloc: Dict[str, float]) -> Tuple[bool, List[str]]:
    """
    alloc: dict {function_name: area} for THIS parcel (candidate solution)
    Returns (is_ok, messages)
    """
    msgs = []
    rule = get_rule(parcel_id)

    # Build full vector-like dict (missing -> 0), ensure numeric
    a = {}
    for f in function_cols:
        v = alloc.get(f, 0.0)
        try:
            v = float(v) if v is not None else 0.0
        except Exception:
            msgs.append(f"Function '{f}' has non-numeric area: {alloc.get(f)}")
            v = 0.0
        a[f] = v

    # Non-negativity
    for f, v in a.items():
        if np.isnan(v):
            msgs.append(f"Function '{f}' has NaN area.")
        elif v < 0:
            msgs.append(f"Function '{f}' has negative area: {v}")

    # ADDs-ONLY: cannot reduce baseline
    if ADD_ONLY:
        base = get_base_alloc(parcel_id)
        for f in function_cols:
            if a[f] + 1e-9 < base[f]:
                msgs.append(f"ADD-ONLY violated for '{f}': new {a[f]} < base {base[f]}")

    # Exclusions: excluded functions cannot be increased (must be exactly baseline if ADD_ONLY)
    excluded = set(rule.get("exclude_functions", []))
    if excluded:
        base = get_base_alloc(parcel_id) if ADD_ONLY else None
        for f in excluded:
            if f not in function_cols:
                msgs.append(f"Excluded function '{f}' is not a known function column.")
                continue
            if ADD_ONLY:
                # Must equal baseline (since we cannot remove, but also cannot add)
                if abs(a[f] - base[f]) > 1e-9:
                    msgs.append(f"Excluded function '{f}' must remain at baseline {base[f]} (current {a[f]}).")
            else:
                if a[f] > 0:
                    msgs.append(f"Excluded function '{f}' has area > 0.")

    # Limits: apply to final value (baseline already included if ADD_ONLY)
    if rule.get("limits_active") == "YES":
        limits = rule.get("limits", {})
        for f, lim in limits.items():
            if f not in function_cols:
                msgs.append(f"Limit references unknown function '{f}'.")
                continue
            v = float(a.get(f, 0.0))
            mn = float(lim.get("min", -np.inf))
            mx = float(lim.get("max", np.inf))
            if v < mn - 1e-9:
                msgs.append(f"Limit violated for '{f}': {v} < min {mn}")
            if v > mx + 1e-9:
                msgs.append(f"Limit violated for '{f}': {v} > max {mx}")

    # FAR: total_allocated / parcel_ground_area <= FAR_max
    if rule.get("far_active") == "YES":
        far_max = rule.get("far_max", None)
        if far_max is None or far_max <= 0:
            msgs.append("FAR is active but FAR_max is missing or <= 0.")
        else:
            i = parcel_index[parcel_id]
            ga = float(parcel_ground_area[i])
            if ga <= 0:
                msgs.append("parcel_ground_area must be > 0 for FAR.")
            else:
                total_alloc = float(sum(max(0.0, v) for v in a.values()))
                far_val = total_alloc / ga
                if far_val > float(far_max) + 1e-9:
                    msgs.append(f"FAR violated: {far_val:.4f} > FAR_max {far_max}")

    return (len(msgs) == 0), msgs

# =========================
# 4) Repair utilities
# =========================
def repair_to_feasible(parcel_id: str, alloc: Dict[str, float]) -> Dict[str, float]:
    """
    Repairs alloc conservatively:
    1) Ensure numeric, non-negative
    2) ADD-ONLY: new >= baseline
    3) Exclusions: excluded functions fixed at baseline (ADD-ONLY) or set to 0 (non add-only)
    4) Clamp limits (if active)
    5) Enforce FAR by proportional scaling of ONLY the "added part" (so baseline is not reduced)
    """
    rule = get_rule(parcel_id)

    # Start with clean numeric dict for all function cols (missing -> 0)
    a = {f: float(alloc.get(f, 0.0) or 0.0) for f in function_cols}

    # 1) Non-negativity
    for f in function_cols:
        if np.isnan(a[f]) or a[f] < 0:
            a[f] = 0.0

    # Baseline
    i = parcel_index[parcel_id]
    base_row = BASE_MATRIX[i]
    base = {function_cols[j]: float(base_row[j]) for j in range(M)}

    # 2) ADD-ONLY enforcement
    if ADD_ONLY:
        for f in function_cols:
            if a[f] < base[f]:
                a[f] = base[f]

    # 3) Exclusions
    excluded = set(rule.get("exclude_functions", []))
    for f in excluded:
        if f not in a:
            continue
        if ADD_ONLY:
            a[f] = base[f]  # cannot add to excluded, cannot remove baseline
        else:
            a[f] = 0.0

    # 4) Limits clamp (applies to final value)
    if rule.get("limits_active") == "YES":
        limits = rule.get("limits", {})
        for f, lim in limits.items():
            if f not in a:
                continue
            mn = float(lim.get("min", -np.inf))
            mx = float(lim.get("max", np.inf))
            if a[f] < mn:
                a[f] = mn
            if a[f] > mx:
                a[f] = mx
            # If ADD-ONLY, never go below baseline
            if ADD_ONLY and a[f] < base[f]:
                a[f] = base[f]

    # 5) FAR enforcement by proportional scaling
    # Important: in ADD-ONLY mode we scale ONLY the added part, not the baseline.
    if rule.get("far_active") == "YES":
        far_max = rule.get("far_max", None)
        if far_max is not None and far_max > 0:
            ga = float(parcel_ground_area[i])
            if ga > 0:
                total_alloc = float(sum(a.values()))
                max_total = float(far_max) * ga

                if total_alloc > max_total + 1e-9:
                    if ADD_ONLY:
                        # Compute added part
                        added = {f: max(0.0, a[f] - base[f]) for f in function_cols}
                        total_added = float(sum(added.values()))
                        # If there's no added part but FAR is still violated, we cannot fix add-only.
                        if total_added <= 0:
                            pass
                        else:
                            # Reduce added so that base + scaled_added meets max_total
                            allowed_added = max_total - float(sum(base.values()))
                            if allowed_added < 0:
                                # Baseline itself exceeds FAR_max; cannot fix in add-only mode
                                pass
                            else:
                                scale = allowed_added / total_added
                                scale = max(0.0, min(1.0, scale))
                                for f in function_cols:
                                    a[f] = base[f] + added[f] * scale
                    else:
                        # Non add-only: scale everything proportionally
                        scale = max_total / total_alloc if total_alloc > 0 else 1.0
                        for f in function_cols:
                            a[f] *= scale

    # Remove tiny numerical noise
    for f in function_cols:
        if abs(a[f]) < 1e-9:
            a[f] = 0.0

    return a

# =========================
# 5) Baseline FAR warnings (optional but useful)
# =========================
# If a parcel has FAR active, but baseline already violates it, add-only cannot fix it.
baseline_warnings = []
for pid, rule in PARCEL_RULES.items():
    if rule.get("far_active") == "YES":
        far_max = rule.get("far_max", None)
        if far_max is None or far_max <= 0:
            continue
        i = parcel_index[pid]
        ga = float(parcel_ground_area[i])
        if ga <= 0:
            baseline_warnings.append(f"Parcel {pid}: parcel_ground_area <= 0 (FAR cannot be computed).")
            continue
        base_total = float(BASE_MATRIX[i].sum())
        far_val = base_total / ga
        if far_val > float(far_max) + 1e-9:
            baseline_warnings.append(
                f"Parcel {pid}: baseline FAR {far_val:.4f} already > FAR_max {far_max}. "
                "ADD-ONLY cannot fix this."
            )

if baseline_warnings:
    print("\n⚠️ Baseline FAR warnings:")
    for w in baseline_warnings[:15]:
        print("-", w)
    if len(baseline_warnings) > 15:
        print(f"... and {len(baseline_warnings) - 15} more")

# =========================
# 6) Quick self-test
# =========================
if len(PARCEL_RULES) > 0:
    test_pid = list(PARCEL_RULES.keys())[0]
    print("\nSelf-test parcel:", test_pid)

    # Deliberately try to reduce baseline -> should be repaired back up
    base = get_base_alloc(test_pid)
    some_func = function_cols[0]
    test_alloc = {some_func: max(0.0, base[some_func] - 1000)}  # invalid in ADD-ONLY if base>0
    repaired = repair_to_feasible(test_pid, test_alloc)

    ok, msgs = check_feasible(test_pid, repaired)
    print("Feasible after repair:", ok)
    if not ok:
        print("Messages (first 10):", msgs[:10])
else:
    print("\nℹ️ No parcel-specific rules provided yet; Cell 9 utilities are ready.")

print("\n✅ Cell 9 OK (ADD-ONLY + FAR)")


## Cell 10 — Genetic algorithm (GA) settings
**Goal:** Control population size, generations, mutation rate, etc.

Students: run as-is.  
(Changing GA settings changes runtime and behaviour, so only adjust if your instructor requests it.)


In [None]:
# Cell 10: GA configuration  "click here"
# Fixed configuration (do not change)
POP_SIZE = 200
N_GENERATIONS = 200

TOURNAMENT_K = 5
CROSSOVER_RATE = 0.9
MUTATION_RATE = 0.25

# Mutation magnitude (area perturbation, m²)
MUTATION_SIGMA = 30.0

# Safety clamps (pre-repair only; repair enforces feasibility)
MAX_ADDED_AREA_PER_FUNCTION = 5000.0

# Progress printing
PRINT_EVERY = 5

print("✅ Cell 10 OK (GA configuration locked)")
print("POP_SIZE:", POP_SIZE, "| Generations:", N_GENERATIONS)
print("Tournament K:", TOURNAMENT_K)
print("Crossover:", CROSSOVER_RATE, "| Mutation:", MUTATION_RATE, "| Sigma:", MUTATION_SIGMA)


## Cell 11 — Genome + initial population
**Goal:** Create the first set of candidate solutions for the GA.

Just run it.


In [None]:
# Cell 11: Genome representation + init population  "click here"

# Target parcels = those user added in PARCEL_RULES keys
TARGET_PARCEL_IDS = list(PARCEL_RULES.keys())

if len(TARGET_PARCEL_IDS) == 0:
    raise ValueError("❌ No target parcels defined. Go to Cell 8 UI, add parcels, then click 'Build rules'.")

print("✅ Target parcels for optimisation:", len(TARGET_PARCEL_IDS))
print("Sample target parcel IDs:", TARGET_PARCEL_IDS[:10])

# Precompute baseline dict for each target parcel (speed)
BASE_ALLOC_DICT = {pid: get_base_alloc(pid) for pid in TARGET_PARCEL_IDS}

# Helper: make a random added-allocation dict for one parcel
def random_added_alloc_for_parcel(parcel_id: str) -> Dict[str, float]:
    rule = get_rule(parcel_id)
    excluded = set(rule.get("exclude_functions", []))

    added = {}
    for f in function_cols:
        if f in excluded:
            added[f] = 0.0
            continue
        # random added area for function
        # Use positive random with cap; repair will enforce FRA/limits.
        added[f] = float(np.random.uniform(0.0, MAX_ADDED_AREA_PER_FUNCTION))
    return added

# Build one individual
def make_individual() -> Dict[str, Dict[str, float]]:
    """
    Individual structure:
      { parcel_id: {function: added_area} }
    """
    ind = {}
    for pid in TARGET_PARCEL_IDS:
        ind[pid] = random_added_alloc_for_parcel(pid)
    return ind

# Repair an individual (convert added -> actual -> repair -> convert back to added)
def repair_individual(ind: Dict[str, Dict[str, float]]) -> Dict[str, Dict[str, float]]:
    repaired = {}
    for pid in TARGET_PARCEL_IDS:
        base = BASE_ALLOC_DICT[pid]
        # build actual alloc
        actual = {f: base[f] + float(ind[pid].get(f, 0.0)) for f in function_cols}
        actual = repair_to_feasible(pid, actual)
        # convert back to added (ADD-ONLY safe)
        repaired[pid] = {f: max(0.0, actual[f] - base[f]) for f in function_cols}
    return repaired

# Initialise population
population = []
for _ in range(POP_SIZE):
    ind = make_individual()
    ind = repair_individual(ind)
    population.append(ind)

print("✅ Cell 11 OK: initial population created")
print("Population size:", len(population))


## Cell 12 — Objective function (Inter-Parcel complementarity)
**Goal:** Define how each candidate solution is scored.

Just run it. (This is the core scoring logic.)


In [None]:
# Cell 12: Fitness / objective function (Inter-Parcel) + proper normalisation "click here"

import numpy as np
from typing import Dict

# --- fast helpers ---
func_idx = {f: j for j, f in enumerate(function_cols)}

# visit frequency vector aligned to function_cols (default=1 if missing)
visit_freq_vec = np.array([float(visit_frequency_weights.get(f, 1.0)) for f in function_cols], dtype=float)

def build_actual_matrix(ind: Dict[str, Dict[str, float]]) -> np.ndarray:
    """
    Builds full actual function matrix for ALL parcels:
    - start from BASE_MATRIX (baseline areas)
    - for target parcels: baseline + added -> repair_to_feasible -> write back
    """
    actual = BASE_MATRIX.copy()

    for pid in TARGET_PARCEL_IDS:
        i = parcel_index[pid]
        base_row = BASE_MATRIX[i].copy()

        # Construct proposed actual row = base + added
        proposed = base_row.copy()
        added_dict = ind[pid]

        for f, add_val in added_dict.items():
            j = func_idx[f]
            proposed[j] = base_row[j] + float(add_val)

        # Repair via dict interface (keeps all your constraints consistent: ADD-ONLY + excludes + limits + FAR)
        proposed_dict = {function_cols[j]: float(proposed[j]) for j in range(M)}
        repaired_dict = repair_to_feasible(pid, proposed_dict)

        # Put back into matrix
        actual[i] = np.array([float(repaired_dict[f]) for f in function_cols], dtype=float)

    return actual

def parcel_inter_score(i_a: int, actual: np.ndarray, total_area: np.ndarray) -> float:
    """
    Inter-parcel score for parcel A (index i_a), using your scientific logic:

      interaction = (area_a + area_b) * complementarity_weight * distance_weight * visit_frequency_b

    Normalisation:
      divide by total_interacting_area = sum(total_area of interacting neighbour parcels B)

    IMPORTANT:
      total_area is DYNAMIC from 'actual' (sum of floor areas), consistent with FAR and "add floor area".
    """
    areas_a = actual[i_a]
    nz_a = np.where(areas_a > 0)[0]
    if nz_a.size == 0:
        return 0.0

    inter_score = 0.0
    total_interacting_area = 0.0

    idxs = neighbors_idx[i_a]
    wds = neighbors_dist_w[i_a]

    if idxs.size == 0:
        return 0.0

    for k in range(idxs.size):
        i_b = int(idxs[k])
        w_dist = float(wds[k])
        if w_dist <= 0:
            continue

        areas_b = actual[i_b]
        nz_b = np.where(areas_b > 0)[0]
        if nz_b.size == 0:
            continue

        # Normalisation term (dynamic floor area of parcel B)
        tb = float(total_area[i_b])
        if tb > 0:
            total_interacting_area += tb
        else:
            continue

        # Only non-zero functions
        for ja in nz_a:
            fa = function_cols[ja]
            aa = float(areas_a[ja])

            for jb in nz_b:
                fb = function_cols[jb]
                ab = float(areas_b[jb])

                cw = complementarity_weights.get((fa, fb), None)
                if cw is None:
                    continue

                # Scientific rule: visit_frequency of B only
                vf_b = float(visit_freq_vec[jb])

                inter_score += (aa + ab) * float(cw) * w_dist * vf_b

    if total_interacting_area > 0:
        return float(inter_score / total_interacting_area)
    return 0.0

def fitness(ind: Dict[str, Dict[str, float]]) -> float:
    """
    GA objective:
    - evaluate inter-parcel score for each TARGET parcel
    - return mean score across target parcels (stable scale)
    """
    actual = build_actual_matrix(ind)

    # Dynamic total floor area per parcel (consistent with FAR and add-only floor-area allocation)
    total_area = actual.sum(axis=1)

    scores = []
    for pid in TARGET_PARCEL_IDS:
        i = parcel_index[pid]
        scores.append(parcel_inter_score(i, actual, total_area))

    if len(scores) == 0:
        return 0.0
    return float(np.mean(scores))

# --- quick smoke test ---
test_fit = fitness(population[0])
print("✅ Cell 12 OK: fitness function ready (dynamic normalisation by neighbour total floor area)")
print("Fitness(population[0]) =", test_fit)


## Cell 13 — GA operators
**Goal:** Crossover, mutation, selection, and repair logic.

Just run it.


In [None]:
# Cell 13: GA operators "click here"

import copy

# --- Fitness cache (big speed win) ---
# We cache by a deterministic hash of the individual's added areas (rounded).
def individual_key(ind: Dict[str, Dict[str, float]], ndigits: int = 3) -> tuple:
    # returns nested tuples for hashing
    items = []
    for pid in TARGET_PARCEL_IDS:
        row = ind[pid]
        # sort by function order (stable)
        items.append((pid, tuple(round(float(row[f]), ndigits) for f in function_cols)))
    return tuple(items)

_fitness_cache = {}

def cached_fitness(ind) -> float:
    k = individual_key(ind)
    if k in _fitness_cache:
        return _fitness_cache[k]
    v = fitness(ind)
    _fitness_cache[k] = v
    return v

# --- Tournament selection ---
def tournament_select(pop: List[Dict[str, Dict[str, float]]], k: int) -> Dict[str, Dict[str, float]]:
    # sample k individuals and return the best (max fitness)
    idxs = np.random.randint(0, len(pop), size=k)
    best = None
    best_fit = -1e18
    for ix in idxs:
        ind = pop[int(ix)]
        f = cached_fitness(ind)
        if f > best_fit:
            best_fit = f
            best = ind
    return copy.deepcopy(best)

# --- Crossover (uniform on parcel blocks) ---
def crossover(parent1, parent2):
    """
    Uniform crossover at parcel level:
    For each parcel_id, child gets that parcel's added dict from either parent.
    """
    child1 = {}
    child2 = {}
    for pid in TARGET_PARCEL_IDS:
        if np.random.rand() < 0.5:
            child1[pid] = copy.deepcopy(parent1[pid])
            child2[pid] = copy.deepcopy(parent2[pid])
        else:
            child1[pid] = copy.deepcopy(parent2[pid])
            child2[pid] = copy.deepcopy(parent1[pid])
    return child1, child2

# --- Mutation (Gaussian noise on added areas, ADD-ONLY safe) ---
def mutate(ind):
    """
    For each target parcel and function, with probability MUTATION_RATE,
    add Gaussian noise (sigma), then clamp to [0, MAX_ADDED_AREA_PER_FUNCTION].
    """
    for pid in TARGET_PARCEL_IDS:
        rule = get_rule(pid)
        excluded = set(rule.get("exclude_functions", []))

        for f in function_cols:
            if f in excluded:
                ind[pid][f] = 0.0
                continue

            if np.random.rand() < MUTATION_RATE:
                v = float(ind[pid][f])
                v = v + float(np.random.normal(0.0, MUTATION_SIGMA))
                if v < 0.0:
                    v = 0.0
                if v > MAX_ADDED_AREA_PER_FUNCTION:
                    v = MAX_ADDED_AREA_PER_FUNCTION
                ind[pid][f] = v
    return ind

# --- Make offspring helper ---
def make_offspring(pop):
    p1 = tournament_select(pop, TOURNAMENT_K)
    p2 = tournament_select(pop, TOURNAMENT_K)

    if np.random.rand() < CROSSOVER_RATE:
        c1, c2 = crossover(p1, p2)
    else:
        c1, c2 = copy.deepcopy(p1), copy.deepcopy(p2)

    c1 = mutate(c1)
    c2 = mutate(c2)

    # Always repair to satisfy ADD-ONLY + exclusions + limits + FRA
    c1 = repair_individual(c1)
    c2 = repair_individual(c2)

    return c1, c2

print("✅ Cell 13 OK: GA operators ready")
print("Fitness cache size (currently):", len(_fitness_cache))


## Cell 14 — Run the AI optimisation loop
**Goal:** Execute the GA (Run cell 15 only **AFTER** the optimisation reach GEN 200).
**Output:** optimisation complete



In [None]:
# Cell 14: Run GA loop "click here"

best_history = []
mean_history = []

# Evaluate initial population
fits = np.array([cached_fitness(ind) for ind in population], dtype=float)

best_idx = int(np.argmax(fits))
best_ind = copy.deepcopy(population[best_idx])
best_fit = float(fits[best_idx])

print("✅ Initial best fitness:", best_fit)

for gen in range(1, N_GENERATIONS + 1):
    new_pop = []

    # Elitism: carry best individual
    new_pop.append(copy.deepcopy(best_ind))

    # Create the rest
    while len(new_pop) < POP_SIZE:
        c1, c2 = make_offspring(population)
        new_pop.append(c1)
        if len(new_pop) < POP_SIZE:
            new_pop.append(c2)

    population = new_pop

    # Evaluate
    fits = np.array([cached_fitness(ind) for ind in population], dtype=float)

    # Track best
    gen_best_idx = int(np.argmax(fits))
    gen_best_fit = float(fits[gen_best_idx])

    if gen_best_fit > best_fit:
        best_fit = gen_best_fit
        best_ind = copy.deepcopy(population[gen_best_idx])

    best_history.append(best_fit)
    mean_history.append(float(np.mean(fits)))

    if gen % PRINT_EVERY == 0 or gen == 1 or gen == N_GENERATIONS:
        print(f"Gen {gen:3d} | best={best_fit:.6f} | mean={mean_history[-1]:.6f} | cache={len(_fitness_cache)}")

print("\n✅ Cell 14 OK: optimisation complete")
print("Best fitness:", best_fit)


## Cell 15 — Results summary
**Goal:** Read the best solution, score changes, and diagnostics.

Focus on:
- which parcels changed
- what functions were proposed
- how scores moved (parcel-level and area-level)


In [None]:
# Cell 15: Results summary + change log + plots "click here"

import matplotlib.pyplot as plt

# --- Build best solution actual matrix ---
best_actual = build_actual_matrix(best_ind)  # full (N, M)
base_actual = BASE_MATRIX.copy()

# --- Extract changes for target parcels ---
changes = []
for pid in TARGET_PARCEL_IDS:
    i = parcel_index[pid]
    for j, f in enumerate(function_cols):
        base_v = float(base_actual[i, j])
        new_v = float(best_actual[i, j])
        add_v = new_v - base_v
        if add_v > 1e-9:
            changes.append([pid, f, base_v, new_v, add_v])

changes_df = pd.DataFrame(changes, columns=["parcel_id", "function", "base_area", "new_area", "added_area"])
changes_df = changes_df.sort_values(["parcel_id", "added_area"], ascending=[True, False]).reset_index(drop=True)

print("✅ Changes log created")
print("Number of changes (rows):", len(changes_df))
display(changes_df.head(20))

# --- Before/After compact view per parcel (only changed functions) ---
summary_rows = []
for pid in TARGET_PARCEL_IDS:
    sub = changes_df[changes_df["parcel_id"] == pid]
    if len(sub) == 0:
        summary_rows.append([pid, "(no changes)", "", ""])
        continue
    # list top 8 changes for readability
    top = sub.head(8)
    funcs = ", ".join(top["function"].tolist())
    added = ", ".join([f"{v:.1f}" for v in top["added_area"].tolist()])
    newa = ", ".join([f"{v:.1f}" for v in top["new_area"].tolist()])
    summary_rows.append([pid, funcs, added, newa])

before_after_df = pd.DataFrame(
    summary_rows,
    columns=["parcel_id", "changed_functions (top8)", "added_area (top8)", "new_area (top8)"]
)

print("\n✅ Before/After summary (top changes per parcel)")
display(before_after_df)

# --- Plot fitness history ---
plt.figure()
plt.plot(best_history, label="Best fitness")
plt.plot(mean_history, label="Mean fitness")
plt.xlabel("Generation")
plt.ylabel("Fitness")
plt.title("GA Fitness over Generations")
plt.legend()
plt.show()

print("\n✅ Cell 15 OK")


## Cell 16 — Results export
**Goal:** Read the best solution, score changes, and diagnostics.

Focus on:
- which parcels changed
- what functions were proposed
- how scores moved (parcel-level and area-level)


In [None]:
# Cell 16: Export outputs "click here"

import os
from datetime import datetime
import numpy as np
import pandas as pd

# Styling (Excel)
from openpyxl.styles import PatternFill
from openpyxl.utils import get_column_letter

# 1) Build updated parcels dataframe (same order/columns as original parcels_df)
parcels_updated_df = parcels_df.copy()

# Overwrite function columns using best_actual matrix
for j, f in enumerate(function_cols):
    parcels_updated_df[f] = best_actual[:, j]

# Update parcel_total_functions_area (sum of function floor areas)
if "parcel_total_functions_area" in parcels_updated_df.columns:
    parcels_updated_df["parcel_total_functions_area"] = parcels_updated_df[function_cols].sum(axis=1)

# Add FAR column for ALL parcels: total_floor_area / parcel_ground_area
if "parcel_ground_area" not in parcels_updated_df.columns:
    raise ValueError("❌ Cannot compute FAR: 'parcel_ground_area' is missing in parcels sheet.")

ga = parcels_updated_df["parcel_ground_area"].to_numpy(dtype=float)
tfa = parcels_updated_df[function_cols].sum(axis=1).to_numpy(dtype=float)

far = np.where(ga > 0, tfa / ga, np.nan)
parcels_updated_df["FAR"] = far

# Identify optimised parcels: any function increased vs baseline
delta = best_actual - BASE_MATRIX
optimised_mask = (delta > 1e-9).any(axis=1)
parcels_updated_df["optimised_flag"] = optimised_mask

# 2) GA history dataframe
ga_history_df = pd.DataFrame({
    "generation": np.arange(1, len(best_history) + 1),
    "best_fitness": best_history,
    "mean_fitness": mean_history
})

# 3) Output filename
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out_xlsx = f"Optimization_Output_{ts}.xlsx"

# 4) Write Excel + apply formatting
with pd.ExcelWriter(out_xlsx, engine="openpyxl") as writer:
    parcels_updated_df.to_excel(writer, sheet_name="parcels_updated", index=False)
    changes_df.to_excel(writer, sheet_name="changes_log", index=False)
    before_after_df.to_excel(writer, sheet_name="before_after_summary", index=False)
    ga_history_df.to_excel(writer, sheet_name="ga_history", index=False)

    # Copy through original sheets if they exist (stability + audit)
    if "complementarity_weights_df" in globals():
        complementarity_weights_df.to_excel(writer, sheet_name="complementarity_weights", index=False)
    if "visit_frequency_weights_df" in globals():
        visit_frequency_weights_df.to_excel(writer, sheet_name="visit_frequency_weights", index=False)
    if "distance_weights_df" in globals():
        distance_weights_df.to_excel(writer, sheet_name="distance_weights", index=False)

    # --- Apply styles to parcels_updated ---
    wb = writer.book
    ws = wb["parcels_updated"]

    headers = [cell.value for cell in ws[1]]

    # Green highlight for optimised parcels
    green_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")

    if "optimised_flag" not in headers:
        raise ValueError("❌ 'optimised_flag' column not found in export sheet headers.")
    opt_col = headers.index("optimised_flag") + 1

    for r in range(2, ws.max_row + 1):
        val = ws.cell(row=r, column=opt_col).value
        if val is True or (isinstance(val, (int, float)) and val == 1):
            for c in range(1, ws.max_column + 1):
                ws.cell(row=r, column=c).fill = green_fill

    # ---- Number formatting (display) ----
    # Show areas as integers (no decimals), FAR with 2 decimals.
    int_cols = ["parcel_ground_area", "parcel_total_functions_area"] + list(function_cols)
    far_cols = ["FAR"]

    def set_number_format(col_name: str, fmt: str):
        if col_name in headers:
            col_idx = headers.index(col_name) + 1
            col_letter = get_column_letter(col_idx)
            for r in range(2, ws.max_row + 1):
                ws[f"{col_letter}{r}"].number_format = fmt

    for c in int_cols:
        set_number_format(c, "0")

    for c in far_cols:
        set_number_format(c, "0.00")

print("✅ Export created:", out_xlsx)
print("File size (bytes):", os.path.getsize(out_xlsx))

# 5) Download in Colab (no Drive)
try:
    from google.colab import files
    files.download(out_xlsx)
    print("✅ Colab download triggered.")
except Exception:
    print("ℹ️ Not in Colab. File saved locally in:", os.path.abspath(out_xlsx))
