In [0]:
!pip install mesa==2.4.0  

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import mesa
#from mesa.visualization import SolaraViz, Slider
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.colors import ListedColormap
import numpy as np
import time
import mesa
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import numpy as np
from IPython.display import display, clear_output
import ipywidgets as widgets
import asyncio
import threading  
import nest_asyncio
import warnings
from matplotlib.patches import Patch

warnings.filterwarnings('ignore')
nest_asyncio.apply()

In [0]:
# Keep your Forest and ForestFire classes
class Forest(mesa.Agent):
    def __init__(self, unique_id, model, position):
        super().__init__(unique_id, model)
        self.pos = position
        self.status = "Fine"
    def step(self):
        if self.status == "Burning":
            for neighbor in self.model.grid.iter_neighbors(self.pos, True):
                if neighbor.status == "Fine":
                    if self.model.random.random() > 0.25:
                        neighbor.status = "Burning"
                self.status = "Burned"


class ForestFire(mesa.Model):
    def __init__(
        self,
        width=100,
        height=100,
        density=0.65,
        regeneration_time=10,
        regeneration_rate=0.1
    ):
        super().__init__()
        self.width = width
        self.height = height
        self.density = density
        self.schedule = mesa.time.RandomActivation(self)
        self.regeneration_time = regeneration_time
        self.regeneration_rate = regeneration_rate
        
        self.grid = mesa.space.SingleGrid(self.width, self.height, torus=False)
        
        self.datacollector = mesa.DataCollector(
            model_reporters={
                "Fine": lambda model: self.count_type(model, "Fine"),
                "Burning": lambda model: self.count_type(model, "Burning"),
                "Burned": lambda model: self.count_type(model, "Burned"),
            }
        )
        
        self._initialize_forest()
        self.datacollector.collect(self)
        
    def _initialize_forest(self):
        for x in range(self.width):
            for y in range(self.height):
                pos = (x, y)
                forest = Forest(self.next_id(), self, pos)
                if self.random.random() < self.density:
                    if x == 0:
                        forest.status = "Burning"
                    else:
                        forest.status = "Fine"
                else:
                    forest.status = "Burned"
                self.grid.place_agent(forest, pos)
                self.schedule.add(forest)
            
            
    def step(self):
        self.schedule.step()
        self.datacollector.collect(self)
        
    @staticmethod
    def count_type(model, status):
        count = 0
        for forest in model.schedule.agents:
            if forest.status == status:
                count += 1
        return count
    


In [0]:
class InteractiveForestFire:
    def __init__(self):
        self.model = None
        self.running = False
        self.step_count = 0

        # Runners & guards
        self._task = None              # asyncio.Task (if using asyncio)
        self._thread = None            # threading.Thread (fallback)
        self._stop_event = threading.Event()
        self._rendering = False        # prevent overlapping redraws

        # Create widgets
        self.play_button  = widgets.Button(description="▶️ Play",  button_style='success')
        self.step_button  = widgets.Button(description="⏭️ Step")
        self.reset_button = widgets.Button(description="🔄 Reset", button_style='danger')
        self.stop_button  = widgets.Button(description="⏹️ Stop",  button_style='info')

        self.width_slider  = widgets.IntSlider(value=100, min=20, max=150, step=10, description='Width:')
        self.height_slider = widgets.IntSlider(value=100, min=20, max=150, step=10, description='Height:')
        self.density_slider = widgets.FloatSlider(value=0.65, min=0.1, max=1.0, step=0.05, description='Density:')
        # self.regen_time_slider = widgets.IntSlider(value=10, min=1, max=50, step=1, description='Regen Time:')
        # self.regen_rate_slider = widgets.FloatSlider(value=0.1, min=0.01, max=0.5, step=0.01, description='Regen Rate:')

        self.output = widgets.Output()

        # Connect buttons
        self.play_button.on_click(self.toggle_play)
        self.step_button.on_click(self.do_step)
        self.reset_button.on_click(self.reset_simulation)
        self.stop_button.on_click(self.stop_simulation)

        # Initialize model
        self.reset_simulation()

    # ---------- UI helpers ----------
    def _set_play_ui(self, playing: bool):
        if playing:
            self.play_button.description = "⏸️ Pause"
            self.play_button.button_style = 'warning'
        else:
            self.play_button.description = "▶️ Play"
            self.play_button.button_style = 'success'

    # ---------- Start/Stop/reset ----------
    def toggle_play(self, button):
        self.running = not self.running
        if self.running:
            self._set_play_ui(True)
            self.run_simulation()
        else:
            self.stop_simulation()

    def stop_simulation(self, button=None):
        # Stop flag
        self.running = False
        self._stop_event.set()

        # Cancel asyncio task if any
        if self._task is not None:
            t = self._task
            self._task = None
            if not t.done():
                t.cancel()

        # Join thread if any
        if self._thread is not None and self._thread.is_alive():
            self._thread.join(timeout=1.0)
        self._thread = None

        self._set_play_ui(False)
        # Optional clean redraw
        self.update_display()

    def reset_simulation(self, button=None):
        # Fully stop any runner
        self.stop_simulation()

        # Reset state
        self.step_count = 0

        # Rebuild model
        self.model = ForestFire(
            width=self.width_slider.value,
            height=self.height_slider.value,
            density=self.density_slider.value,
            # regeneration_time=self.regen_time_slider.value,
            # regeneration_rate=self.regen_rate_slider.value
        )
        self.update_display()

    # ---------- Runner selection ----------
    def run_simulation(self):
        if not self.running:
            return

        # If something is already running, don’t start another
        if (self._task is not None and not self._task.done()) or \
           (self._thread is not None and self._thread.is_alive()):
            return

        # Clear stop signal for new run
        self._stop_event.clear()

        # Prefer asyncio in environments where a loop is running
        try:
            loop = asyncio.get_running_loop()
            # If we got here without raising, schedule the async loop
            self._task = loop.create_task(self._async_loop())
        except RuntimeError:
            # No running loop (classic notebook/Colab older setups) -> use thread
            self._thread = threading.Thread(target=self._thread_loop, daemon=True)
            self._thread.start()

    async def _async_loop(self):
        try:
            while self.running:
                self.do_step()
                await asyncio.sleep(0.1)  # yield to UI
        except asyncio.CancelledError:
            pass

    def _thread_loop(self):
        while self.running and not self._stop_event.is_set():
            self.do_step()
            time.sleep(0.1)

    # ---------- One step + render ----------
    def do_step(self, button=None):
        if not self.model:
            return
        if self._rendering:
            return  # skip if a draw is underway
        self._rendering = True
        try:
            self.model.step()
            self.step_count += 1
            self.update_display()
        finally:
            self._rendering = False

    # ---------- Drawing ----------
    def update_display(self):
        if self.model is None:
            return

        with self.output:
            clear_output(wait=True)

            fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

            # Grid visualization
            W, H = self.model.grid.width, self.model.grid.height
            grid = np.zeros((W, H))
            status_to_num = {"Burned": 0, "Fine": 1, "Burning": 2}

            for agent in self.model.agents:
                if hasattr(agent, 'pos') and hasattr(agent, 'status'):
                    x, y = agent.pos
                    if 0 <= x < W and 0 <= y < H:
                        grid[x][y] = status_to_num.get(agent.status, 0)

            cmap = ListedColormap(["#3D2B1F", "#00AA00", "#FF0000"])
            ax1.imshow(grid.T, cmap=cmap, origin="lower", vmin=0, vmax=2)
            ax1.set_title(f"Forest Grid - Step {self.step_count}", fontsize=14, fontweight='bold')
            ax1.set_xlabel("X")
            ax1.set_ylabel("Y")
            ax1.set_xticks([])
            ax1.set_yticks([])
            legend_elements = [
                Patch(facecolor='#3D2B1F', label='Burned'),
                Patch(facecolor='#00AA00', label='Fine'),
                Patch(facecolor='#FF0000', label='Burning')
            ]
            ax1.legend(handles=legend_elements, loc='upper right')

            # Chart visualization
            df = self.model.datacollector.get_model_vars_dataframe()
            if df is not None and not df.empty:
                ax2.plot(df.index, df["Fine"],    label="Fine Trees", linewidth=2)
                ax2.plot(df.index, df["Burning"], label="Burning",    linewidth=2)
                ax2.plot(df.index, df["Burned"],  label="Burned",     linewidth=2)

                ax2.set_xlabel("Step", fontsize=12)
                ax2.set_ylabel("Count", fontsize=12)
                ax2.set_title("Forest Status Over Time", fontsize=14, fontweight='bold')
                ax2.legend(fontsize=10)
                ax2.grid(True, alpha=0.3)
                ax2.set_ylim(0, W * H)

            plt.tight_layout()
            plt.show()

    # ---------- UI layout ----------
    def display(self):
        controls = widgets.HBox([self.play_button, self.step_button, self.reset_button, self.stop_button])
        sliders = widgets.VBox([
            self.width_slider,
            self.height_slider,
            self.density_slider,
            # self.regen_time_slider,
            # self.regen_rate_slider
        ])

        ui = widgets.VBox([
            widgets.HTML("<h1>🔥 Forest Fire Simulation with Regeneration</h1>"),
            controls,
            sliders,
            self.output
        ])

        display(ui)

# Run the simulation
sim = InteractiveForestFire()
sim.display()

VBox(children=(HTML(value='<h1>🔥 Forest Fire Simulation with Regeneration</h1>'), HBox(children=(Button(button…