In [1]:
pip install mesa --quiet


Note: you may need to restart the kernel to use updated packages.


In [1]:
from flask import Flask, jsonify, request
from flask_cors import CORS
import threading
from mesa import Agent, Model
from mesa.time import SimultaneousActivation
from mesa.space import MultiGrid

# Definición de los agentes
class CellAgent(Agent):
    def __init__(self, unique_id, model, row, col, walls, is_entrance=False):
        super().__init__(unique_id, model)
        self.row = row
        self.col = col
        self.walls = walls  # String de 4 dígitos 'arriba, izquierda, abajo, derecha'
        self.is_entrance = is_entrance  # Indica si la celda es una entrada

    def step(self):
        pass

class MarkerAgent(Agent):
    def __init__(self, unique_id, model, row, col, marker_type):
        super().__init__(unique_id, model)
        self.row = row
        self.col = col
        self.marker_type = marker_type  # 'v' para víctima, 'f' para falsa alarma

    def step(self):
        pass

class FireMarkerAgent(Agent):
    def __init__(self, unique_id, model, row, col):
        super().__init__(unique_id, model)
        self.row = row
        self.col = col

    def step(self):
        pass

class DoorAgent(Agent):
    def __init__(self, unique_id, model, row, col, connected_cell):
        super().__init__(unique_id, model)
        self.row = row
        self.col = col
        self.connected_cell = connected_cell  # (col, row) de la celda conectada

    def step(self):
        pass

# Definición del modelo
class FlashPointModel(Model):
    def __init__(self, filename):
        super().__init__()
        self.num_rows = 6
        self.num_cols = 8
        self.grid = MultiGrid(self.num_cols, self.num_rows, torus=False)
        self.schedule = SimultaneousActivation(self)
        self.parse_grid(filename)

    def parse_grid(self, filename):
        with open(filename, 'r') as file:
            # Parsear las primeras 6 líneas para el grid
            for row in range(self.num_rows):
                line = file.readline().strip()
                cells = line.split()
                for col in range(self.num_cols):
                    walls = cells[col]
                    cell_agent = CellAgent(
                        unique_id=f"Cell_{row}_{col}",
                        model=self,
                        row=row,
                        col=col,
                        walls=walls
                    )
                    self.grid.place_agent(cell_agent, (col, self.num_rows - row - 1))
                    self.schedule.add(cell_agent)

            # Parsear los marcadores (víctimas/falsas alarmas)
            for _ in range(3):
                row, col, marker_type = file.readline().strip().split()
                marker_agent = MarkerAgent(
                    unique_id=f"Marker_{row}_{col}",
                    model=self,
                    row=int(row) - 1,
                    col=int(col) - 1,
                    marker_type=marker_type
                )
                self.grid.place_agent(marker_agent, (int(col) - 1, self.num_rows - int(row)))
                self.schedule.add(marker_agent)

            # Parsear los marcadores de fuego
            for _ in range(10):
                row, col = file.readline().strip().split()
                fire_agent = FireMarkerAgent(
                    unique_id=f"Fire_{row}_{col}",
                    model=self,
                    row=int(row) - 1,
                    col=int(col) - 1
                )
                self.grid.place_agent(fire_agent, (int(col) - 1, self.num_rows - int(row)))
                self.schedule.add(fire_agent)

            # Parsear las puertas
            for _ in range(8):
                row1, col1, row2, col2 = file.readline().strip().split()
                door_agent1 = DoorAgent(
                    unique_id=f"Door_{row1}_{col1}_{row2}_{col2}_1",
                    model=self,
                    row=int(row1) - 1,
                    col=int(col1) - 1,
                    connected_cell=(int(col2) - 1, self.num_rows - int(row2))
                )
                self.grid.place_agent(door_agent1, (int(col1) - 1, self.num_rows - int(row1)))
                self.schedule.add(door_agent1)

                door_agent2 = DoorAgent(
                    unique_id=f"Door_{row1}_{col1}_{row2}_{col2}_2",
                    model=self,
                    row=int(row2) - 1,
                    col=int(col2) - 1,
                    connected_cell=(int(col1) - 1, self.num_rows - int(row1))
                )
                self.grid.place_agent(door_agent2, (int(col2) - 1, self.num_rows - int(row2)))
                self.schedule.add(door_agent2)

            # Parsear las entradas
            for _ in range(4):
                row, col = file.readline().strip().split()
                entrance_cell = next(
                    agent for agent in self.grid.get_cell_list_contents([(int(col) - 1, self.num_rows - int(row))])
                    if isinstance(agent, CellAgent)
                )
                entrance_cell.is_entrance = True  # Marcar la celda como entrada

    def step(self):
        self.schedule.step()

    def get_state(self):
        state = []
        for cell in self.grid.coord_iter():
            cell_content = []
            cell_agents = self.grid.get_cell_list_contents([cell[1]])
            for agent in cell_agents:
                agent_data = {
                    'type': type(agent).__name__,
                    'unique_id': agent.unique_id,
                }
                if isinstance(agent, CellAgent):
                    agent_data['position'] = [agent.col, agent.row]
                    agent_data['walls'] = agent.walls
                    agent_data['is_entrance'] = agent.is_entrance  # Agregar si la celda es entrada
                elif isinstance(agent, MarkerAgent):
                    agent_data['position'] = [agent.col, agent.row]
                    agent_data['marker_type'] = agent.marker_type
                elif isinstance(agent, FireMarkerAgent):
                    agent_data['position'] = [agent.col, agent.row]
                elif isinstance(agent, DoorAgent):
                    agent_data['position'] = [agent.col, agent.row]
                    agent_data['connected_cell'] = list(agent.connected_cell)
                cell_content.append(agent_data)
            if cell_content:
                state.append({
                    'cell_position': [cell[1][0], cell[1][1]],
                    'agents': cell_content
                })
        return state

# Configurar la aplicación Flask
app = Flask(__name__)
CORS(app)  # Habilitar CORS para la aplicación Flask

# Crear el modelo
filename = 'final.txt'
model = FlashPointModel(filename)

# Lock para asegurar la seguridad en hilos
model_lock = threading.Lock()

@app.route('/', methods=['GET'])
def index():
    return "Servidor de Simulación Flash Point está en funcionamiento. Usa /state para obtener el estado actual."

@app.route('/state', methods=['GET'])
def get_state():
    with model_lock:
        state = model.get_state()
    return jsonify(state)

@app.route('/step', methods=['POST'])
def step():
    with model_lock:
        model.step()
        state = model.get_state()
    return jsonify(state)

if __name__ == '__main__':
    app.run(port=5000)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
[33mPress CTRL+C to quit[0m
127.0.0.1 - - [26/Nov/2024 18:08:59] "GET /state HTTP/1.1" 200 -
127.0.0.1 - - [26/Nov/2024 18:08:59] "GET /state HTTP/1.1" 200 -
127.0.0.1 - - [26/Nov/2024 18:10:25] "GET /state HTTP/1.1" 200 -
127.0.0.1 - - [26/Nov/2024 18:12:29] "GET /state HTTP/1.1" 200 -
127.0.0.1 - - [26/Nov/2024 18:12:29] "GET /state HTTP/1.1" 200 -
127.0.0.1 - - [26/Nov/2024 18:29:02] "GET /state HTTP/1.1" 200 -
127.0.0.1 - - [26/Nov/2024 18:29:02] "GET /state HTTP/1.1" 200 -
127.0.0.1 - - [26/Nov/2024 18:30:37] "GET /state HTTP/1.1" 200 -
127.0.0.1 - - [26/Nov/2024 18:30:37] "GET /state HTTP/1.1" 200 -
