In [None]:
import arcpy, random, math, itertools
from collections import Counter, defaultdict
from deap import base, creator, tools, algorithms
import os, copy

In [None]:
arcpy.env.workspace = r"YourProject\YourProject.gdb"

In [None]:
'''
    Lane4RoadPoints is a Point Feature class that is created using RoadCenterPoints_NoNearIntersections (this file is created in the preprocessing step). 
    This file is the filtered version of RoadCenterPoints_NoNearIntersections as it only contains the points of roads with 4 Lanes; as I am assuming that a 
            bus stop can be alloted in a 4 Lane road only. This filtering process is done in ArcGIS Pro itslef.
    
    You may change this based on your needs :))
'''

# INDIVIDUAL GENERATION
def generate_individual(n_stops, candidates_fc="Lane4RoadPoints", min_spacing_m=None):
    """
    Returns a Python list of lists:
        [[Bus_ID, X_Point, Y_Point], ...]  length = n_stops
    """
    if not arcpy.Exists(candidates_fc):
        raise RuntimeError(f"Candidates not found: {candidates_fc}")

    # Pull candidate XYs
    candidates = [(row[0], row[1]) for row in arcpy.da.SearchCursor(candidates_fc, ["X_POINT", "Y_POINT"])]
    if not candidates:
        raise RuntimeError("No candidate points found in Lane4RoadPoints.")

    # If enforcing spacing, ensure projected units
    if min_spacing_m is not None:
        sr = arcpy.Describe(candidates_fc).spatialReference
        if (sr.linearUnitName or "").lower() not in ("meter", "metre"):
            raise ValueError(f"min_spacing_m requires projected units (meters). Found: {sr.linearUnitName}")

    # Choose K points
    if min_spacing_m is None:
        if n_stops > len(candidates):
            raise ValueError(f"Requested {n_stops} stops but only {len(candidates)} candidates available.")
        chosen = random.sample(candidates, n_stops)
    else:
        pool = candidates[:]
        random.shuffle(pool)
        chosen = []
        for x, y in pool:
            if len(chosen) >= n_stops:
                break
            if all(math.hypot(x - cx, y - cy) >= float(min_spacing_m) for cx, cy in chosen):
                chosen.append((x, y))
        if len(chosen) < n_stops:
            raise RuntimeError(
                f"Could only place {len(chosen)} of {n_stops} with spacing {min_spacing_m} m."
            )

    # Build the individual list
    individual = []
    for i, (x, y) in enumerate(chosen, start=1):
        individual.append([i, x, y])

    # return individual
    return creator.Individual(individual)

In [None]:
# --- fuzzy helpers to calculate walking distances  ---
'''
    "Given a walking distance, what is the likely walking time?"
    
    Instead of using a fixed walking speed, we:
        > Accept that walking speed varies
        > Accept that people perceive distance non-linearly
        > Convert distance → linguistic categories → time → minutes

    So both Distance and Time are fuzzy (not exact)

    | Distance (m) | Fuzzy meaning |
    | ------------ | ------------- |
    | 0–150        | VeryNear      |
    | ~200         | Near          |
    | ~500         | Moderate      |
    | ~1000        | Far           |
    | >2000        | VeryFar       |

    
    | Time label | Approx minutes |
    | ---------- | -------------- |
    | VeryShort  | 0–4            |
    | Short      | 3–8            |
    | Medium     | 7–14           |
    | Long       | 12–22          |
    | VeryLong   | 20–40          |


    The speed if not hardcoded, but embedded in the shapes:
    | Assumption                                  | Effect          |
    | ------------------------------------------- | --------------- |
    | Short distances → faster perceived speed    | Small times     |
    | Long distances → fatigue, crossings, delays | Larger times    |
    | Very far → slowing down                     | Upper time tail |

    

    The reason why this logic was used instead of the typical The Traveling Salesman Problem (TSP) 
    was because my road centerline file was not fully connected, thus there were issues with finding shortest route ;-;
'''

def tri(x, a, b, c):
    if x <= a or x >= c: return 0.0
    if x == b: return 1.0
    return (x - a) / (b - a) if x < b else (c - x) / (c - b)

def trap(x, a, b, c, d):
    if x <= a or x >= d: return 0.0
    if b <= x <= c: return 1.0
    return (x - a) / (b - a) if a < x < b else (d - x) / (d - c)

# Distance membership functions (meters)
DIST_MFS = {
    "VeryNear":  lambda x: trap(x,   0,   0,  80, 150),
    "Near":      lambda x: tri (x,  80, 200, 350),
    "Moderate":  lambda x: tri (x, 300, 500, 800),
    "Far":       lambda x: tri (x, 700,1000,1500),
    "VeryFar":   lambda x: trap(x,1300,2000,3000,5000),
}

# Time (minutes) membership functions for 0–40 min
TIME_MIN, TIME_MAX, TIME_STEP = 0.0, 40.0, 0.1
TIME_POINTS = [TIME_MIN + i*TIME_STEP for i in range(int((TIME_MAX-TIME_MIN)/TIME_STEP)+1)]
TIME_MFS = {
    "VeryShort": lambda t: trap(t, 0, 0, 2, 4),
    "Short":     lambda t: tri (t, 3, 5, 8),
    "Medium":    lambda t: tri (t, 7,10,14),
    "Long":      lambda t: tri (t,12,16,22),
    "VeryLong":  lambda t: trap(t,20,28,40,40),
}
RULES = {
    "VeryNear": "VeryShort",
    "Near":     "Short",
    "Moderate": "Medium",
    "Far":      "Long",
    "VeryFar":  "VeryLong",
}

def estimate_walk_time_minutes(distance_m: float) -> float:
    if distance_m is None or (isinstance(distance_m, float) and math.isnan(distance_m)) or distance_m < 0:
        return float('nan')
    # 1) fuzzify distance
    dist_memberships = {k: mf(distance_m) for k, mf in DIST_MFS.items()}
    
    # 2) rule firing
    time_activation = {lbl: 0.0 for lbl in TIME_MFS.keys()}
    for d_lbl, mu in dist_memberships.items():
        if mu <= 0: continue
        t_lbl = RULES[d_lbl]
        time_activation[t_lbl] = max(time_activation[t_lbl], mu)
        
    # 3) aggregate & 4) centroid defuzzification
    agg = []
    for t in TIME_POINTS:
        vals = [min(alpha, TIME_MFS[t_lbl](t)) for t_lbl, alpha in time_activation.items() if alpha > 0]
        agg.append(max(vals) if vals else 0.0)
    num = sum(t * mu for t, mu in zip(TIME_POINTS, agg))
    den = sum(agg)
    if den == 0:
        return (distance_m / 1.2) / 60.0  # fallback ~1.2 m/s
    return num / den

# --- updated assign function with fuzzy time included ---
def assign_lots_to_nearest_with_attrs(individual, lots_fc="Nearest_RoadPoints_From_Lots", population_field="POPULATION", landuse_field="LANDUSE", decimals=2):
    """
    Returns (same order as 'individual'):
      [
        [Bus_ID, X_POINT, Y_POINT,
         [(LOT_OID, DIST_M, POP_VALUE, LANDUSE, WALK_MIN), ...],
         {"total_population": float,
          "landuse_counts": {<landuse>: count, ...},
          "total_walk_min": float,
          "avg_walk_min": float}
        ], ...
      ]
    """
    # Ensure projected units (meters)
    sr = arcpy.Describe(lots_fc).spatialReference
    if (sr.linearUnitName or "").lower() not in ("meter", "metre"):
        raise ValueError("Lots must be in a projected CRS (meters) to use Euclidean distances.")

    # Prep result container
    busstop_results = {
        int(bus_id): [int(bus_id), (float(x)), (float(y)), [],
                      {"total_population": 0.0, "landuse_counts": {}, "total_walk_min": 0.0, "avg_walk_min": 0.0}]
        for bus_id, x, y in individual
    }

    # Cursor fields
    fields = ["OID@", "SHAPE@XY"]
    pop_idx = land_idx = None
    if population_field:
        fields.append(population_field); pop_idx = len(fields) - 1
    if landuse_field:
        fields.append(landuse_field);   land_idx = len(fields) - 1

    # Assign each lot to its nearest bus stop, compute fuzzy walking time
    with arcpy.da.SearchCursor(lots_fc, fields) as cur:
        for row in cur:
            lot_oid = int(row[0])
            lx, ly = row[1]

            nearest_bus = None
            nearest_dist = float("inf")
            for bus_id, bx, by in individual:
                d = math.hypot(lx - float(bx), ly - float(by))
                if d < nearest_dist:
                    nearest_dist = d
                    nearest_bus  = int(bus_id)

            pop_val  = float(row[pop_idx]) if (pop_idx is not None and row[pop_idx] is not None) else 0.0
            land_val = row[land_idx] if (land_idx is not None) else None
            walk_min = estimate_walk_time_minutes(nearest_dist)

            rec = busstop_results[nearest_bus]
            rec[3].append((lot_oid, round(nearest_dist, decimals), round(pop_val, decimals), land_val, round(walk_min, decimals)))

    # Aggregate per stop
    for bus_id, rec in busstop_results.items():
        lots_list = rec[3]
        total_pop  = sum(l[2] for l in lots_list)
        total_time = sum(l[4] for l in lots_list)
        lu_counts  = Counter(l[3] for l in lots_list if l[3] is not None)
        n          = len(lots_list)

        rec[4]["total_population"] = float(round(total_pop, decimals))
        rec[4]["total_walk_min"]   = float(round(total_time, decimals))
        rec[4]["avg_walk_min"]     = float(round(total_time / n, decimals)) if n else 0.0
        rec[4]["landuse_counts"]   = dict(lu_counts)

    # Preserve input order
    ordered = [busstop_results[int(bus_id)] for bus_id, _, _ in individual]
    return ordered

In [None]:
# CHILDREN CREATION

def mate_individuals(
    ind1, ind2,
    candidates_fc="Lane4RoadPoints",
    decimals=2,
    seed=None,
    diversity_rate=0.15,      # ~15% of stops re-sampled if child unchanged
    min_spacing_m=250,        # spacing target (meters)
    max_resample_tries=2000,  # total tries before relaxing / fallback
    relax_steps=5,            # how many relax phases
    relax_factor=0.85         # each phase: spacing *= relax_factor
):
    """
    Half–half crossover with:
      - no duplicate XY within a child (after rounding)
      - spacing constraint with relaxation backoff
      - diversity injection if child == parent
      - farthest-point fallback to avoid hard failure
    Returns: creator.Individual(child1), creator.Individual(child2)
    """
    if len(ind1) != len(ind2):
        raise ValueError("Both individuals must have the same number of bus stops")

    if seed is not None:
        random.seed(seed)

    if not arcpy.Exists(candidates_fc):
        raise RuntimeError(f"Candidates not found: {candidates_fc}")

    # read/clean candidates once (round for stable uniqueness)
    raw = [(row[0], row[1]) for row in arcpy.da.SearchCursor(candidates_fc, ["X_POINT", "Y_POINT"])]
    if not raw:
        raise RuntimeError(f"No candidate points in {candidates_fc}")

    candidate_xy = list({(round(float(x), decimals), round(float(y), decimals)) for x, y in raw})

    # (optional) spacing needs projected units
    if min_spacing_m is not None:
        sr = arcpy.Describe(candidates_fc).spatialReference
        if (sr.linearUnitName or "").lower() not in ("meter", "metre"):
            raise ValueError("min_spacing_m requires projected CRS (meters)")

    K = len(ind1)
    cut = K // 2

    def xy_of(row):
        # row is [Bus_ID, X, Y]
        return (round(float(row[1]), decimals), round(float(row[2]), decimals))

    def ok_spacing_thr(pt, used, thr):
        if thr is None or thr <= 0 or not used:
            return True
        px, py = pt
        for (ux, uy) in used:
            if math.hypot(px - ux, py - uy) < thr:
                return False
        return True

    def sample_with_relax(used, base_thr):
        """
        Try random samples under a spacing threshold; if no luck, relax multiple times.
        If still nothing, pick the farthest unique candidate.
        """
        # 1) try with relax backoff
        total_tries = max_resample_tries
        tries_per_step = max(1, total_tries // max(1, relax_steps))
        thr = float(base_thr) if (base_thr is not None) else 0.0

        for step in range(relax_steps):
            for _ in range(tries_per_step):
                cand = random.choice(candidate_xy)
                if cand in used:
                    continue
                if ok_spacing_thr(cand, used, thr):
                    return cand
            thr *= float(relax_factor)  # relax spacing

        # 2) fallback: farthest unique candidate from used
        best = None
        best_d = -1.0
        for cand in candidate_xy:
            if cand in used:
                continue
            if not used:
                return cand  # first placement case
            # min distance to used set
            dmin = min(math.hypot(cand[0] - ux, cand[1] - uy) for (ux, uy) in used)
            if dmin > best_d:
                best_d = dmin
                best = cand
        return best  # may be None if everything was used

    def build_child(srcA, srcB):
        child = []
        used = set()
        for i in range(K):
            bus_id = i + 1
            pref = xy_of(srcA[i] if i < cut else srcB[i])
            alt  = xy_of(srcB[i] if i < cut else srcA[i])

            chosen = None
            # 1) try preferred
            if (pref not in used) and ok_spacing_thr(pref, used, min_spacing_m):
                chosen = pref
            # 2) try alternate
            elif (alt not in used) and ok_spacing_thr(alt, used, min_spacing_m):
                chosen = alt
            else:
                # 3) sample with relaxation & fallback
                chosen = sample_with_relax(used, min_spacing_m)
                if chosen is None:
                    # As a last resort, drop spacing and only enforce uniqueness
                    for cand in candidate_xy:
                        if cand not in used:
                            chosen = cand
                            break
                if chosen is None:
                    raise RuntimeError("Could not find a non-duplicate XY for child")

            used.add(chosen)
            child.append([bus_id, chosen[0], chosen[1]])
        return child

    child1 = build_child(ind1, ind2)
    child2 = build_child(ind2, ind1)

    # --- diversity injection if unchanged ---
    def inject_diversity(child, parent_ref):
        if child == parent_ref and diversity_rate > 0:
            n = max(1, int(round(len(child) * diversity_rate)))
            idxs = random.sample(range(len(child)), n)
            used = {(row[1], row[2]) for row in child}
            for idx in idxs:
                fresh = sample_with_relax(used, min_spacing_m)
                if fresh is not None:
                    child[idx][1], child[idx][2] = fresh
                    used.add(fresh)
        return child

    child1 = inject_diversity(child1, ind1)
    child2 = inject_diversity(child2, ind2)

    return creator.Individual(child1), creator.Individual(child2)

In [None]:
# MUTATION OF INDIVIDUAL

def mutate_individual(individual, candidates_fc="Lane4RoadPoints"):
    """
    Mutates an individual by replacing X,Y of randomly chosen bus stop(s)
    with new coordinates from Lane4RoadPoints.

    Args:
        individual   : [[Bus_ID, X, Y], ...]
        candidates_fc: feature class with candidate points (must have X_POINT, Y_POINT fields)
    
    Returns:
        (creator.Individual,) tuple
    """
    if not arcpy.Exists(candidates_fc):
        raise RuntimeError(f"Candidates not found: {candidates_fc}")

    # Get all candidate XYs
    candidates = [(row[0], row[1]) 
                  for row in arcpy.da.SearchCursor(candidates_fc, ["X_POINT", "Y_POINT"])]
    if not candidates:
        raise RuntimeError(f"No candidate points found in {candidates_fc}")

    # Copy so we don’t overwrite the input
    new_individual = [row[:] for row in individual]

    # Keep a set of already used coordinates for uniqueness check
    used_coords = {(float(x), float(y)) for _, x, y in new_individual}

    # Decide how many stops to mutate
    n_mutations = random.randint(1, len(new_individual))
    indices_to_mutate = random.sample(range(len(new_individual)), k=n_mutations)

    for idx in indices_to_mutate:
        bus_id = new_individual[idx][0]

        # Try until we find a candidate not already used
        tries = 0
        while True:
            new_x, new_y = random.choice(candidates)
            coord = (float(new_x), float(new_y))
            if coord not in used_coords:
                used_coords.add(coord)
                new_individual[idx] = [bus_id, new_x, new_y]
                break
            tries += 1
            if tries > 100:  # fail-safe to avoid infinite loop
                raise RuntimeError("Could not find unique candidate point for mutation.")

    return (creator.Individual(new_individual),)

In [None]:
# INDIVIDUAL DECODING

def decode(ind):
    """
    Evaluate each bus stop in an individual at the lot level.
    Returns:
      dict {Bus_ID: {
        "X": float,
        "Y": float,
        "lots": [
          {"LOT_OID": int, "DIST_M": float, "WALK_MIN": float,
           "POP": float, "LANDUSE": str}
        ]
      }}
    """

    result = assign_lots_to_nearest_with_attrs(ind, "Nearest_RoadPoints_From_Lots", "POPULATION", "LANDUSE", 3)

    eval_dict = {}
    for bus_id, x, y, lots, summary in result:
        lot_entries = []
        for lot_oid, dist, pop, landuse, walk_min in lots:
            lot_entries.append({
                "LOT_OID": lot_oid,
                "DIST_M": dist,
                "WALK_MIN": walk_min,
                "POP": pop,
                "LANDUSE": landuse
            })

        eval_dict[bus_id] = {
            "X": x,
            "Y": y,
            "lots": lot_entries
        }

    return eval_dict

In [None]:
""" HELPER FUNCTIONS TO EVALUATE """
# Computes how much two bus stop service areas overlap
def circle_overlap_pct(d, r=250.0):
    if d >= 2*r: return 0.0         # No overlap
    if d <= 0:   return 100.0       # Same location

    # Partial overlap
    part1 = 2 * r*r * math.acos(d / (2*r))
    part2 = 0.5 * d * math.sqrt(max(0.0, 4*r*r - d*d))
    
    return (part1 - part2) / (math.pi * r*r) * 100.0

# walking-time scorer (returns score, penalty)
def score_walk_minutes(t):
    if t is None:
        return 0, 0
    if t <= 5.0:                # Less than 5 min  (+10)
        return 10, 0
    elif 5.1 <= t <= 6.9:       # (+5)
        return 5, 0
    elif 7 <= t <= 15.9:        # (−5)
        return -5, 5
    elif t >= 16.0:             # (−20)
        return -20, 20
    else:                        
        return 0, 0

#Bus Stop duplicate checker
def score_no_duplicates(evals, penalty_per_extra=50, bonus_if_none=5):
    """
    Penalize duplicate XY among bus stops.

    - Each *extra* occurrence of an XY beyond the first incurs a penalty.
      e.g., if one XY appears 3 times → 2 extras → 2 * penalty_per_extra.
    - If there are no duplicates at all, give a small bonus.

    Returns:
        duplication_score (int)   # +bonus_if_none if no duplicates
        duplication_penalty (int) # k*penalty_per_extra if duplicates found
        dup_detail (dict)         # {(x,y): [bus_id, ...]} for duplicates
    """
    xy_to_ids = defaultdict(list)
    for bus_id, data in evals.items():
        x = float(data["X"])
        y = float(data["Y"])
        xy_to_ids[(x, y)].append(int(bus_id))

    extras = 0
    dup_detail = {}
    for xy, ids in xy_to_ids.items():
        if len(ids) > 1:
            extras += (len(ids) - 1)
            dup_detail[xy] = ids[:]  # record duplicates

    if extras == 0:
        return bonus_if_none, 0, {}
    else:
        return 0, extras * penalty_per_extra, dup_detail

def score_min_residential(evals, min_required=3, bonus_per_stop_meets=2, penalty_per_missing=5):
    """
    Ensure each bus stop serves at least `min_required` Residential lots.
    Returns (res_bonus, res_penalty, per_stop_counts)
    """
    res_bonus = 0
    res_penalty = 0
    per_stop_counts = {}

    for bus_id, data in evals.items():
        cnt = sum(1 for lot in data["lots"]
                  if str(lot.get("LANDUSE", "")).strip().lower() == "residential")
        per_stop_counts[bus_id] = cnt
        missing = max(0, min_required - cnt)
        if missing == 0:
            res_bonus += bonus_per_stop_meets
        else:
            res_penalty += missing * penalty_per_missing

    return res_bonus, res_penalty, per_stop_counts

In [None]:
def prioritize_pareto_front(best_individuals):
    """
    Prioritize individuals
    """
    def get_fitness_values(ind):
        total_penalty, bonus, pop_score, walk_score, duplication_score, res_bonus = ind.fitness.values
        return (walk_score, duplication_score, bonus, pop_score, -total_penalty)

    # Sort individuals based on diversity, compatibility, and violations
    prioritized_solutions = sorted(best_individuals, key=get_fitness_values)

    return prioritized_solutions

In [None]:
def evaluate(ind, r):
    """
    Multi-objective
    """
    # decode returns: {Bus_ID: {"X": float, "Y": float, "lots": [{"POP","WALK_MIN",...}, ...]}}
    evals = decode(ind)

    # ---------- 1) Overlap (global, all pairs) ----------
    stops = [(bus_id, data["X"], data["Y"]) for bus_id, data in evals.items()]
    n = len(stops)
    n_pairs = n * (n - 1) // 2 if n >= 2 else 0

    non_overlap_count = 0
    overlap_penalty = 0

    for (id1, x1, y1), (id2, x2, y2) in itertools.combinations(stops, 2):
        d = math.hypot(x2 - x1, y2 - y1)
        p = circle_overlap_pct(d, r=r)

        # RULES:
        if p <= 30.0:
            non_overlap_count += 1
        elif 30.0 < p <= 60.0:
            overlap_penalty += 10
        else:  # p >= 61.0
            overlap_penalty += 20

    # Normalize bonus so it doesn't explode with many stops (scale to [0..5])
    bonus = 100.0 * (non_overlap_count / n_pairs) if n_pairs else 0.0

    # ---------- 2) Population per stop ----------
    pop_score = 0
    pop_penalty = 0
    for bus_id, data in evals.items():
        pop_total = sum(lot["POP"] for lot in data["lots"])
        if pop_total <= 500:
            pop_score -= 20
            pop_penalty += 20
        elif 501 <= pop_total < 2000:
            pop_score -= 10
            pop_penalty += 10
        elif 2000 <= pop_total <= 5000:
            pop_score += 5
        elif 5001 <= pop_total <= 30000:
            pop_score += 15
        elif pop_total > 30001:
            pop_score += 20

    # ---------- 3) Walking time per lot ----------
    walk_score = 0
    walk_penalty = 0
    for bus_id, data in evals.items():
        for lot in data["lots"]:
            s, pen = score_walk_minutes(lot["WALK_MIN"])
            walk_score += s
            walk_penalty += pen

    # ---------- 4) No duplicated Bus Stops (when mutated and/or crossovered) ----------
    duplication_score, duplication_penalty, dup_detail = score_no_duplicates(evals, penalty_per_extra=57, bonus_if_none=5)


    # ------- 5) A Bus Stop must have at least 3 'Residential' lots
    res_bonus, res_penalty, res_counts = score_min_residential(evals, min_required=3, bonus_per_stop_meets=2, penalty_per_missing=5)

    
    # ---------- Combine ----------
    total_penalty = overlap_penalty + pop_penalty + walk_penalty + duplication_penalty + res_penalty
    score = bonus + pop_score + walk_score + duplication_score + res_bonus - total_penalty 

    # Minimize total_penalty
    # Maximize bonus, pop_score, walk_score, duplication_score, res_bonus
    return total_penalty, bonus, pop_score, walk_score, duplication_score, res_bonus

In [None]:
# Set up NSGA-II
numStops = 50
candidates_fc="Lane4RoadPoints"
min_spacing_m = 100

# Parameters for NSGA2
pop_size = 40
generations = 5
prob_cx = 0.6
prob_mut = 0.3

In [None]:
if hasattr(creator, "FitnessMulti"):
    delattr(creator, "FitnessMulti")
creator.create("FitnessMulti", base.Fitness, weights=(-1.0, +1.0, +1.0, +1.0, +1.0, +1.0))

if hasattr(creator, "Individual"):
    delattr(creator, "Individual")
creator.create("Individual", list, fitness=creator.FitnessMulti)

# Register the evaluation function
toolbox = base.Toolbox()
toolbox.register(
    "gen_individual",
    generate_individual,
    n_stops=numStops,
    candidates_fc=candidates_fc,
    min_spacing_m=min_spacing_m
)

# Wrap it into an Individual class
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.gen_individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# --- History setup (to see the best solution per generation) ---
history = tools.History()
toolbox.register("clone", copy.deepcopy)

# --- Normal DEAP setup ---
toolbox.register("evaluate", evaluate, r=min_spacing_m)
toolbox.register("select", tools.selNSGA2)
toolbox.register("mutate", mutate_individual, candidates_fc=candidates_fc)
toolbox.register("mate", mate_individuals)


# Decorate variation ops for history
toolbox.decorate("mate", history.decorator)
toolbox.decorate("mutate", history.decorator)

# --- Create ONE population and init history on it ---
starting_pop = toolbox.population(n=pop_size)
history.update(starting_pop)

hof = tools.HallOfFame(5)

# --- Custom eaMuPlusLambda that updates history each generation ---
def eaMuPlusLambda_with_history(population, toolbox, mu, lambda_, cxpb, mutpb, ngen,
                               halloffame=None, verbose=True):
    
    best_by_gen = []      # list of Individuals (one per gen)
    
    # Evaluate initial population
    invalid = [ind for ind in population if not ind.fitness.valid]
    fits = map(toolbox.evaluate, invalid)
    for ind, fit in zip(invalid, fits):
        ind.fitness.values = fit

    # NSGA-II needs crowding distances assigned; selNSGA2 does that.
    population = toolbox.select(population, len(population))

    best0 = prioritize_pareto_front(population)[0]
    best_by_gen.append(toolbox.clone(best0))

    if halloffame is not None:
        halloffame.update(population)

    for gen in range(1, ngen + 1):
        # Variation (offspring creation)
        offspring = algorithms.varOr(population, toolbox, lambda_, cxpb, mutpb)

        # update history AFTER variation
        history.update(offspring) 

        # Evaluate offspring
        invalid = [ind for ind in offspring if not ind.fitness.valid]
        fits = map(toolbox.evaluate, invalid)
        for ind, fit in zip(invalid, fits):
            ind.fitness.values = fit

        # Select next generation
        population = toolbox.select(population + offspring, mu)

        # best of this generation
        bestg = prioritize_pareto_front(population)[0]
        best_by_gen.append(toolbox.clone(bestg))

        if halloffame is not None:
            halloffame.update(population)

        if verbose:
            print(f"gen={gen} done")

    return population, best_by_gen

final_pop, best_by_gen = eaMuPlusLambda_with_history(
    population=starting_pop,    # Parent population
    toolbox=toolbox,
    mu=pop_size,                # Number of parents
    lambda_=(pop_size * 2),     # Number of offspring
    cxpb=prob_cx,               # Crossover probability
    mutpb=prob_mut,             # Mutation probability
    ngen=generations,           # Number of generations
    halloffame=hof,             # Hall of Fame for tracking the best individuals
    verbose=True
)

prioritized_solutions = prioritize_pareto_front(hof)
best_prioritized_individual = prioritized_solutions[0]

print(best_prioritized_individual)
print(len(best_by_gen), "generations saved")

In [None]:
# ---- SAVE AND DISPLAY BEST SOLUTION PER GENERATION ----

def write_best_gen_points(individual, out_fc, gen_idx=None, sr_like="Lane4RoadPoints"):
    """
    Writes an individual's bus stops to a POINT feature class.
    """
    sr = arcpy.Describe(sr_like).spatialReference

    # Create output feature class
    out_ws = os.path.dirname(out_fc)
    out_name = os.path.basename(out_fc)

    if arcpy.Exists(out_fc):
        arcpy.management.Delete(out_fc)

    arcpy.management.CreateFeatureclass(out_ws, out_name, "POINT", spatial_reference=sr)

    arcpy.management.AddField(out_fc, "GEN", "LONG")
    arcpy.management.AddField(out_fc, "Bus_ID", "LONG")
    arcpy.management.AddField(out_fc, "X_POINT", "DOUBLE")
    arcpy.management.AddField(out_fc, "Y_POINT", "DOUBLE")

    with arcpy.da.InsertCursor(out_fc, ["SHAPE@XY", "GEN", "Bus_ID", "X_POINT", "Y_POINT"]) as ic:
        for row in list(individual):
            bus_id, x, y = row[0], row[1], row[2]
            ic.insertRow([(float(x), float(y)), int(gen_idx or 0), int(bus_id), float(x), float(y)])

    return out_fc

out_gdb = arcpy.env.workspace
name_prefix = "BestStops_Gen"

out_fcs = []
for gen_idx, ind in enumerate(best_by_gen):
    out_fc = os.path.join(out_gdb, f"{name_prefix}_{gen_idx:03d}")
    out_fcs.append(write_best_gen_points(ind, out_fc, gen_idx=gen_idx, sr_like=candidates_fc))
    print("Wrote:", out_fc)

print("Done. Feature classes created:", len(out_fcs))

In [None]:
# ---- DISPLAY THE OPTIMAL BUS STOP LOCATIONS ON ARCGIS MAP ----

arcpy.env.overwriteOutput = True

arcpy.env.workspace = r"YourProject\YourProject.gdb"

OUT_POINTS = "GA_Best_Stops"          # name for the points FC
SR_LIKE    = "Lane4RoadPoints"        # copy spatial reference from this layer
LOTS_FC    = "Nearest_RoadPoints_From_Lots"  # optional (for spider lines)
OUT_LINES  = "GA_Best_SpiderLines"    # optional (for spider lines)

ind = list(best_prioritized_individual)

# ---- Create points FC from the individual ----
sr = arcpy.Describe(SR_LIKE).spatialReference
if arcpy.Exists(OUT_POINTS):
    arcpy.management.Delete(OUT_POINTS)

arcpy.management.CreateFeatureclass(arcpy.env.workspace, OUT_POINTS, "POINT", spatial_reference=sr)
arcpy.management.AddField(OUT_POINTS, "Bus_ID",  "LONG")
arcpy.management.AddField(OUT_POINTS, "X_POINT", "DOUBLE")
arcpy.management.AddField(OUT_POINTS, "Y_POINT", "DOUBLE")

with arcpy.da.InsertCursor(OUT_POINTS, ["SHAPE@XY", "Bus_ID", "X_POINT", "Y_POINT"]) as ic:
    for bus_id, x, y in ind:
        ic.insertRow([(float(x), float(y)), int(bus_id), float(x), float(y)])

# ---- Add the points to the current map (and label by Bus_ID) ----
try:
    aprx = arcpy.mp.ArcGISProject("CURRENT")
    m = aprx.activeMap
    lyr = m.addDataFromPath(arcpy.Describe(OUT_POINTS).catalogPath)

    # quick labeling by Bus_ID
    lyr.showLabels = True
    if hasattr(lyr, "listLabelClasses"):
        for lc in lyr.listLabelClasses():
            lc.expression = "$feature.Bus_ID"

    # optional: unique values symbology by Bus_ID
    try:
        sym = lyr.symbology
        sym.updateRenderer('UniqueValueRenderer')
        sym.renderer.fields = ['Bus_ID']
        lyr.symbology = sym
    except Exception:
        pass

    aprx.save()
except Exception as e:
    print("Layer add skipped (not in ArcGIS Pro UI?):", e)

print(f"✓ Wrote {OUT_POINTS} and added to map.")

In [None]:
# ---------------- OPTIONAL: DISPLAY THE LOT TO BUS STOP SPIDERLINE ON ARCGIS MAP ----------------

result = assign_lots_to_nearest_with_attrs(
    individual=ind,
    lots_fc=LOTS_FC,
    population_field="POPULATION",
    landuse_field="LANDUSE",
    decimals=2
)

# cache lot geometries
lot_geom = {oid: shp for oid, shp in arcpy.da.SearchCursor(LOTS_FC, ["OID@", "SHAPE@"])}

if arcpy.Exists(OUT_LINES):
    arcpy.management.Delete(OUT_LINES)
    
arcpy.management.CreateFeatureclass(arcpy.env.workspace, OUT_LINES, "POLYLINE", spatial_reference=sr)
for nm, tp in [("Bus_ID","LONG"), ("LOT_OID","LONG"), ("DIST_M","DOUBLE"), ("POP","DOUBLE"), ("LANDUSE","TEXT"), ("WALK_MIN","DOUBLE")]:
    arcpy.management.AddField(OUT_LINES, nm, tp, field_length=64 if tp=="TEXT" else None)

with arcpy.da.InsertCursor(OUT_LINES, ["SHAPE@", "Bus_ID", "LOT_OID", "DIST_M", "POP", "LANDUSE", "WALK_MIN"]) as ic:
    for bus_id, sx, sy, lot_list, _summary in result:
        stop_pt = arcpy.PointGeometry(arcpy.Point(float(sx), float(sy)), sr)
        for lot_oid, dist_m, pop, landuse, walk_min in lot_list:
            lp = lot_geom.get(int(lot_oid))
            if lp:
                line = arcpy.Polyline(arcpy.Array([stop_pt.firstPoint, lp.firstPoint]), sr)
                ic.insertRow([line, int(bus_id), int(lot_oid), float(dist_m), float(pop), landuse, float(walk_min)])

# Add to map
try:
   
    # Remove existing spiderline layers with the same name (and "(2)" variants)
    for lyr in m.listLayers():
        if lyr.name == OUT_LINES or lyr.name.startswith(f"{OUT_LINES} ("):
            m.removeLayer(lyr)
    
    lyr_lines = m.addDataFromPath(arcpy.Describe(OUT_LINES).catalogPath)
    aprx.save()
except Exception as e:
    print("Could not add lines to map:", e)
print(f"✓ Wrote {OUT_LINES} and added to map.")