In [None]:
from mesa import Model, Agent
from mesa.time import RandomActivation
import random
import math


# ------------------------------------------------------------
# House object (data container, not a Mesa Agent)
# ------------------------------------------------------------

class House:
    def __init__(self, house_id, energy_label, market_value, pos=None, owner_id=None):
        """
        House object according to the formalization.
        """
        self.house_id = house_id
        self.energy_label = energy_label
        self.market_value = market_value
        self.owner_id = owner_id   # homeowner unique_id or None (vacant)
        self.pos = pos             # optional: (x, y) on spatial grid

    def is_vacant(self):
        return self.owner_id is None


# ------------------------------------------------------------
# Homeowner Agent (Mesa new-style: Agent(model))
# ------------------------------------------------------------

class HomeownerAgent(Agent):
    def __init__(
        self,
        model,
        current_house_id,
        financial_capacity,
        pref_energy,
        pref_value,
        min_acceptable_label,
    ):
        # IMPORTANT: for current Mesa versions, only pass `model` to super()
        super().__init__(model)

        # State variables from the formalization
        self.current_house_id = current_house_id
        self.financial_capacity = financial_capacity
        self.pref_energy = pref_energy
        self.pref_value = pref_value
        self.min_acceptable_label = min_acceptable_label

        # Indicators HasUpgraded_i(t), HasMoved_i(t)
        self.has_upgraded = 0
        self.has_moved = 0

        # Decision for the current step (to be computed in step())
        # decision_type âˆˆ {"stay", "stay_upgrade", "move", "move_upgrade"}
        self.decision_type = None
        self.target_house_id = None  # relevant for move / move_upgrade

    # --------------------------------------------------------
    # Convenience getters
    # --------------------------------------------------------

    @property
    def current_house(self):
        return self.model.houses[self.current_house_id]

    # --------------------------------------------------------
    # Normalisation functions (EnergyLabel and MarketValue)
    # --------------------------------------------------------

    def norm_energy_label(self, house):
        el_min = self.model.energy_label_min
        el_max = self.model.energy_label_max
        if el_max == el_min:
            return 0.0
        return (house.energy_label - el_min) / (el_max - el_min)

    def norm_market_value(self, house):
        mv_min = self.model.market_value_min
        mv_max = self.model.market_value_max
        if mv_max == mv_min:
            return 0.0
        return (house.market_value - mv_min) / (mv_max - mv_min)

    # --------------------------------------------------------
    # Satisfaction calculations
    # --------------------------------------------------------

    def satisfaction_for_house(self, house, energy_label_override=None):
        """
        Computes:
          PrefEnergy_i * NormEnergyLabel_h(t) +
          PrefValue_i  * NormMarketValue_h(t)

        If energy_label_override is provided, use it instead of house.energy_label
        (to compute satisfaction AFTER upgrading).
        """
        original_label = house.energy_label
        if energy_label_override is not None:
            house.energy_label = energy_label_override

        norm_el = self.norm_energy_label(house)
        norm_mv = self.norm_market_value(house)

        # Restore original label
        house.energy_label = original_label

        return (
            self.pref_energy * norm_el
            + self.pref_value * norm_mv
        )

    # --------------------------------------------------------
    # Upgrade-related helpers
    # --------------------------------------------------------

    def proposed_new_label(self, current_label):
        """
        Simple rule: try to upgrade by one label step up to the maximum.
        You can refine this later (e.g. jump to top label directly).
        """
        return min(current_label + 1, self.model.energy_label_max)

    def upgrade_cost(self, label_improvement):
        """
        UpgradeCost_i(t) = LabelImprovement_i(t) * CostPerLabelStep
        """
        return label_improvement * self.model.cost_per_label_step

    # --------------------------------------------------------
    # Option evaluation (Stay, Stay+Upgrade, Move, Move+Upgrade)
    # --------------------------------------------------------

    def evaluate_stay_option(self):
        """
        StaySatisfaction_i(t) = CurrentSatisfaction_i(t)
        """
        house = self.current_house
        return self.satisfaction_for_house(house)

    def evaluate_stay_upgrade_option(self):
        """
        Stay-and-upgrade option in the current house.
        If financially infeasible: return -inf.
        """
        house = self.current_house
        current_label = house.energy_label
        new_label = self.proposed_new_label(current_label)

        label_improvement = new_label - current_label
        if label_improvement <= 0:
            # No improvement possible
            return float("-inf"), None, None

        cost = self.upgrade_cost(label_improvement)
        if cost > self.financial_capacity:
            return float("-inf"), None, None

        # Satisfaction after upgrading
        satisfaction = self.satisfaction_for_house(house, energy_label_override=new_label)
        return satisfaction, new_label, cost

    def compute_eligible_houses(self):
        """
        EligibleHouses_i(t):
          MarketValue_k(t) <= FinancialCapacity_i(t),
          EnergyLabel_k(t) >= MinAcceptableLabel_i,
          k != CurrentHouse_i(t)
        """
        eligible = []
        vacancy_pool = self.model.vacancy_pool
        for house in vacancy_pool:
            if house.house_id == self.current_house_id:
                continue
            if house.market_value <= self.financial_capacity and \
               house.energy_label >= self.min_acceptable_label:
                eligible.append(house)
        return eligible

    def evaluate_move_option(self, eligible_houses):
        """
        MoveSatisfaction_i(t) based on the best relocation house.
        Returns (satisfaction, best_house_id) or (-inf, None) if no option.
        """
        if not eligible_houses:
            return float("-inf"), None

        best_house = None
        best_satisfaction = float("-inf")

        for house in eligible_houses:
            s = self.satisfaction_for_house(house)
            if s > best_satisfaction:
                best_satisfaction = s
                best_house = house

        return best_satisfaction, best_house.house_id

    def evaluate_move_upgrade_option(self, best_house_id):
        """
        Move-and-upgrade option for BRH_i(t).
        If financially infeasible: returns -inf, None, None.
        """
        if best_house_id is None:
            return float("-inf"), None, None

        house = self.model.houses[best_house_id]
        current_label = house.energy_label
        new_label = self.proposed_new_label(current_label)
        label_improvement = new_label - current_label

        if label_improvement <= 0:
            return float("-inf"), None, None

        cost = self.upgrade_cost(label_improvement)

        # Financial feasibility, following your formalisation
        if house.market_value > self.financial_capacity:
            return float("-inf"), None, None
        if cost > self.financial_capacity:
            return float("-inf"), None, None

        satisfaction = self.satisfaction_for_house(house, energy_label_override=new_label)
        return satisfaction, new_label, cost

    # --------------------------------------------------------
    # Step: compute decision (no global changes yet)
    # --------------------------------------------------------

    def step(self):
        """
        One decision step:
         1) Compute all four option satisfactions.
         2) Store the chosen decision_type and target house info locally.
        Global state changes (ownership, labels, vacancy pool) are done in
        the model after conflict resolution.
        """
        self.has_upgraded = 0
        self.has_moved = 0
        self.decision_type = None
        self.target_house_id = None

        # 1. Stay
        stay_satisfaction = self.evaluate_stay_option()

        # 2. Stay+Upgrade
        stay_up_satisfaction, stay_up_new_label, stay_up_cost = \
            self.evaluate_stay_upgrade_option()

        # 3. Move (without upgrade)
        eligible_houses = self.compute_eligible_houses()
        move_satisfaction, best_house_id = self.evaluate_move_option(eligible_houses)

        # 4. Move+Upgrade
        if self.model.allow_move_upgrade:
            move_up_satisfaction, move_up_new_label, move_up_cost = \
                self.evaluate_move_upgrade_option(best_house_id)
        else:
            move_up_satisfaction, move_up_new_label, move_up_cost = \
                float("-inf"), None, None

        satisfaction_dict = {
            "stay": stay_satisfaction,
            "stay_upgrade": stay_up_satisfaction,
            "move": move_satisfaction,
            "move_upgrade": move_up_satisfaction,
        }

        best_option = max(satisfaction_dict, key=satisfaction_dict.get)
        best_value = satisfaction_dict[best_option]

        # If everything is -inf, default to stay
        if math.isinf(best_value) and best_value < 0:
            best_option = "stay"

        self.decision_type = best_option

        # Store details for model to use later
        self._stay_upgrade_info = (stay_up_new_label, stay_up_cost)
        self._move_info = best_house_id
        self._move_upgrade_info = (best_house_id, move_up_new_label, move_up_cost)


# ------------------------------------------------------------
# Model
# ------------------------------------------------------------

class HousingUpgradeModel(Model):
    def __init__(
        self,
        n_homeowners,
        n_houses,
        energy_label_min=1,
        energy_label_max=7,
        market_value_min=100_000,
        market_value_max=600_000,
        cost_per_label_step=10_000,
        allow_move_upgrade=True,
        seed=None
    ):
        super().__init__()
        if seed is not None:
            random.seed(seed)
            self._seed = seed

        # Parameters
        self.energy_label_min = energy_label_min
        self.energy_label_max = energy_label_max
        self.market_value_min = market_value_min
        self.market_value_max = market_value_max
        self.cost_per_label_step = cost_per_label_step
        self.allow_move_upgrade = allow_move_upgrade

        # Scheduler
        self.schedule = RandomActivation(self)

        # Houses: dict house_id -> House
        self.houses = {}
        self._create_houses(n_houses)

        # Homeowners
        self._create_homeowners(n_homeowners)

    # --------------------------------------------------------
    # Vacancy pool
    # --------------------------------------------------------

    @property
    def vacancy_pool(self):
        return [h for h in self.houses.values() if h.is_vacant()]

    # --------------------------------------------------------
    # Initialisation helpers
    # --------------------------------------------------------

    def _create_houses(self, n_houses):
        """
        Simple initialisation; plug in your own distributions/data here.
        """
        for h_id in range(n_houses):
            energy_label = random.randint(self.energy_label_min, self.energy_label_max)
            market_value = random.randint(self.market_value_min, self.market_value_max)
            self.houses[h_id] = House(
                house_id=h_id,
                energy_label=energy_label,
                market_value=market_value,
                owner_id=None
            )

    def _create_homeowners(self, n_homeowners):
        """
        Assign each homeowner to a random house where possible.
        Preferences and capacities are random placeholders; replace with your data.
        """
        all_house_ids = list(self.houses.keys())
        random.shuffle(all_house_ids)

        for _ in range(n_homeowners):
            house_id = all_house_ids[_ % len(all_house_ids)]
            house = self.houses[house_id]

            # Create agent (Mesa assigns unique_id)
            pref_energy = random.uniform(0.2, 0.8)
            pref_value = 1.0 - pref_energy

            financial_capacity = random.randint(
                int(0.5 * self.market_value_min),
                int(1.2 * self.market_value_max)
            )

            min_acceptable_label = random.randint(
                self.energy_label_min, self.energy_label_max
            )

            agent = HomeownerAgent(
                model=self,
                current_house_id=house_id,
                financial_capacity=financial_capacity,
                pref_energy=pref_energy,
                pref_value=pref_value,
                min_acceptable_label=min_acceptable_label,
            )

            # Assign ownership to this agent's unique_id
            house.owner_id = agent.unique_id

            self.schedule.add(agent)

    # --------------------------------------------------------
    # Conflict resolution & state updates
    # --------------------------------------------------------

    def _resolve_relocation_conflicts(self):
        """
        Implements InterestedBuyers_k(t) and WinningBuyer_k(t).
        """
        interested_buyers = {}  # house_id -> list of agents

        for agent in self.schedule.agents:
            if agent.decision_type in ("move", "move_upgrade"):
                if agent.decision_type == "move":
                    target_house_id = agent._move_info
                else:  # "move_upgrade"
                    target_house_id = agent._move_upgrade_info[0]

                if target_house_id is None:
                    continue

                if not self.houses[target_house_id].is_vacant():
                    continue

                interested_buyers.setdefault(target_house_id, []).append(agent)

        winning_assignment = {}

        for house_id, buyers in interested_buyers.items():
            if not buyers:
                continue
            winner = max(buyers, key=lambda a: a.financial_capacity)
            winning_assignment[house_id] = winner

        return winning_assignment

    def _apply_moves_and_upgrades(self, winning_assignment):
        """
        Apply decisions after conflicts:
          - assign houses,
          - update owner_id,
          - apply upgrades and financial updates,
          - set HasMoved_i and HasUpgraded_i.
        """
        moved_agents = set()

        # Apply moves
        for house_id, winner in winning_assignment.items():
            old_house = winner.current_house
            new_house = self.houses[house_id]

            old_house.owner_id = None
            new_house.owner_id = winner.unique_id
            winner.current_house_id = house_id

            winner.has_moved = 1
            moved_agents.add(winner.unique_id)

        # Apply upgrades and financial updates
        for agent in self.schedule.agents:

            if agent.decision_type == "stay":
                continue

            if agent.decision_type == "stay_upgrade":
                new_label, cost = agent._stay_upgrade_info
                if new_label is not None and cost is not None:
                    house = agent.current_house
                    house.energy_label = new_label
                    agent.financial_capacity -= cost
                    agent.has_upgraded = 1
                continue

            if agent.decision_type == "move":
                if agent.unique_id not in moved_agents:
                    # tried to move but lost; no change
                    continue
                # already moved, no upgrade in this option
                continue

            if agent.decision_type == "move_upgrade":
                if agent.unique_id not in moved_agents:
                    # lost the conflict
                    continue
                _, new_label, cost = agent._move_upgrade_info
                if new_label is not None and cost is not None:
                    house = agent.current_house
                    house.energy_label = new_label
                    agent.financial_capacity -= cost
                    agent.has_upgraded = 1

    # --------------------------------------------------------
    # One global time step
    # --------------------------------------------------------

    def step(self):
        """
        Implements your pseudocode:
          1. Each homeowner evaluates options and chooses decision.
          2. Resolve relocation conflicts.
          3. Apply moves & upgrades, update vacancy.
        """
        # Phase 1: agents compute their decisions
        self.schedule.step()

        # Phase 2: conflict resolution for popular houses
        winning_assignment = self._resolve_relocation_conflicts()

        # Phase 3: apply moves & upgrades
        self._apply_moves_and_upgrades(winning_assignment)




In [4]:
model = HousingUpgradeModel(
    n_homeowners=50,
    n_houses=60,
    energy_label_min=1,
    energy_label_max=7,
    market_value_min=150_000,
    market_value_max=600_000,
    cost_per_label_step=10_000,
    allow_move_upgrade=True,
    seed=42,
)

for t in range(5):
    print(f"Step {t}, vacant houses: {len(model.vacancy_pool)}")
    model.step()



Step 0, vacant houses: 10
Step 1, vacant houses: 10
Step 2, vacant houses: 10
Step 3, vacant houses: 10
Step 4, vacant houses: 10


  self.schedule = RandomActivation(self)


In [5]:
for t in range(5):
    model.step()
    moves = sum(a.has_moved for a in model.schedule.agents)
    upgrades = sum(a.has_upgraded for a in model.schedule.agents)
    avg_label = sum(h.energy_label for h in model.houses.values()) / len(model.houses)
    print(
        f"Step {t}: moves={moves}, upgrades={upgrades}, "
        f"vacant={len(model.vacancy_pool)}, avg_label={avg_label:.2f}"
    )



Step 0: moves=1, upgrades=8, vacant=10, avg_label=5.92
Step 1: moves=1, upgrades=6, vacant=10, avg_label=6.02
Step 2: moves=0, upgrades=5, vacant=10, avg_label=6.10
Step 3: moves=0, upgrades=2, vacant=10, avg_label=6.13
Step 4: moves=0, upgrades=1, vacant=10, avg_label=6.15
