In [None]:
# Cell 1: Setup environment and imports
import logging
import os
import pprint
import time
from datetime import datetime
from dataclasses import dataclass, field
from typing import Tuple, List
import json
import pygame
from llama_cpp import Llama
import kagglehub
import sys
import contextlib
import pyperclip
import openai
from openai import OpenAI

COLORS = ["white", "black"]
FIGURE_TYPES = ["king", "queen", "rook", "bishop", "knight", "pawn"]
DIRECTION_MAP = {
    "north": (0, 1),
    "south": (0, -1),
    "east": (1, 0),
    "west": (-1, 0),
    "northeast": (1, 1),
    "northwest": (-1, 1),
    "southeast": (1, -1),
    "southwest": (-1, -1)
}

openai.api_key = os.getenv("OPENAI_API_KEY")

class TimestampedLogger:
    def __init__(self, log_dir='logs', log_file='simulation.log'):
        os.makedirs(log_dir, exist_ok=True)
        self.log_path = os.path.join(log_dir, log_file)
        logging.basicConfig(filename=self.log_path, level=logging.INFO, filemode='w')
        self.start_time = time.time()
        self.last_time = self.start_time
        self.log("Logger initialized.")

    def _now(self):
        return datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    def _duration(self):
        current_time = time.time()
        duration = current_time - self.last_time
        self.last_time = current_time
        return f"{duration:.3f}s"

    def log(self, message):
        timestamp = self._now()
        duration = self._duration()
        log_message = f"[{timestamp}] (+{duration}) {message}"
        print(log_message)
        logging.info(log_message)
LOGGER = TimestampedLogger()

def load_config(config_path: str = "config.json") -> dict:
    """Loads the configuration file."""
    LOGGER.log(f"Load Config: {config_path}")
    with open(config_path, "r") as f:
        return json.load(f)
CONFIG = load_config("config.json")

@contextlib.contextmanager
def suppress_output():
    with open(os.devnull, "w") as devnull:
        old_stdout = sys.stdout
        old_stderr = sys.stderr
        try:
            sys.stdout = devnull
            sys.stderr = devnull
            yield
        finally:
            sys.stdout = old_stdout
            sys.stderr = old_stderr

def direction_from_vector(vector: Tuple[int, int]) -> str:
    """Returns the direction string based on a vector."""
    for direction, vec in DIRECTION_MAP.items():
        if vec == vector:
            return direction
    return str(vector)

class _Figure:
    """Represents a figure on the board."""
    def __init__(self, position: Tuple[int, int], color: str, figure_type: str):
        self.position = position
        self.color = color
        self.figure_type = figure_type

        self.target_positions = []  # List of target positions this figure can attack or defend

    def calculate_figure_targets(self, board: List[List['_Tile']]):
        """Generates a list of target positions based on the figure's movement rules."""
        # Calculate movement rules based on figure type
        directions = []
        max_board_size = max(CONFIG["board"]["width"], CONFIG["board"]["height"])
        max_range = max_board_size if self.figure_type in ["queen", "rook", "bishop"] else 1
        if self.figure_type == "king" or self.figure_type == "queen":
            directions = [(1, 0), (-1, 0), (0, 1), (0, -1), (1, 1), (-1, -1), (1, -1), (-1, 1)]
        elif self.figure_type == "rook":
            directions = [(1, 0), (-1, 0), (0, 1), (0, -1)]
        elif self.figure_type == "bishop":
            directions = [(1, 1), (-1, -1), (1, -1), (-1, 1)]
        elif self.figure_type == "knight":
            directions = [(2, 1), (2, -1), (-2, 1), (-2, -1), (1, 2), (1, -2), (-1, 2), (-1, -2)]
        elif self.figure_type == "pawn":
            # Pawns can only capture diagonally forward
            if self.color == "white":
                directions = [(1, 1), (-1, 1)]
            else:
                directions = [(1, -1), (-1, -1)]

        # Generate valid target positions based on its movement rules
        for direction in directions:
            for dr in range(1, max_range + 1):
                dx = direction[0] * dr
                dy = direction[1] * dr
                target_position = (self.position[0] + dx, self.position[1] + dy)
                if      0 <= target_position[0] < CONFIG["board"]["width"] \
                    and 0 <= target_position[1] < CONFIG["board"]["height"]:
                    self.target_positions.append(target_position)
                    if board[target_position[0]][target_position[1]].figure is not None:
                        # Stop if we hit another figure
                        break

class _Drone:
    """Represents a drone in the simulation."""
    def __init__(self, id: int, position: Tuple[int, int], model_path: str, rules: str, color: str = "white"):
        self.id = id
        self.position = position
        self.color = color
        self.model_path = model_path
        self.rules = rules\
            .replace("DRONE_ID", str(self.id))\
            .replace("NUMBER_OF_DRONES", str(CONFIG["simulation"]["num_drones"]))\
            .replace("NUMBER_OF_ROUNDS", str(CONFIG["simulation"]["max_rounds"]))
        LOGGER.log(f"Drone {self.id} initialized at position {self.position} with color {self.color}.")
        self.memory = ""  # Memory of past actions or observations
        self.rx_buffer = ""  # Buffer for received broadcasts

    def _move(self, direction: str):
        """Moves the drone in the specified direction."""
        if direction in DIRECTION_MAP:
            dx, dy = DIRECTION_MAP[direction]
            new_position = (self.position[0] + dx, self.position[1] + dy)
            if 0 <= new_position[0] < CONFIG["board"]["width"] and 0 <= new_position[1] < CONFIG["board"]["height"]:
                # Remove drone from old tile
                old_tile = SIM.board[self.position[0]][self.position[1]]
                old_tile.remove_drone(self)
                # Move drone to new position
                self.position = new_position
                # Add drone to new tile
                new_tile = SIM.board[self.position[0]][self.position[1]]
                new_tile.add_drone(self)
                LOGGER.log(f"Drone {self.id} moved to {self.position}.")
            else:
                LOGGER.log(f"Drone {self.id} attempted to move out of bounds to {new_position}. Action ignored.")
        else:
            LOGGER.log(f"Drone {self.id} attempted to move in an invalid direction '{direction}'. Action ignored.")

    def _generate_single_model_response(self, prompt_info: str, prompt_request: str, model: str = "qwen-3", temperature: float = 1.0, max_tokens: int = 256) -> str:
        """
        Generates a response from the LLM based on the prompt information and request.
        """
        prompt = f"{prompt_info}\n{prompt_request}\n"
        with suppress_output():
            if CONFIG["simulation"]["use_manual_model"]:
                pyperclip.copy(prompt)
                llm_response = input("Model prompt is on clipboard, paste model's response: ")
            elif CONFIG["simulation"]["use_gpt4_model"]:
                llm_response = OpenAI().chat.completions.create(
                    model=model,
                    messages=[
                        {"role": "user", "content": prompt}
                    ],
                    temperature=temperature,
                    max_tokens=max_tokens,
                ).choices[0].message.content
            else: # Use llama_cpp qwen-3 for other models
                cpu_count = os.cpu_count() or 4
                llm = Llama(
                    model_path=self.model_path,
                    n_ctx=2048, # 40960
                    temperature=temperature,
                    n_threads=max(1, cpu_count // 2)
                )
                llm_response = llm(prompt=prompt, max_tokens=max_tokens, stop=["}"], echo=False)

        if isinstance(llm_response, dict) and "choices" in llm_response:
            # Standard OpenAI-like response
            return llm_response["choices"][0]["text"].strip()
        elif isinstance(llm_response, str):
            # Newer llama-cpp versions return plain text
            return llm_response.strip()
        else:
            LOGGER.log(f"Unexpected LLM response format: {type(llm_response)}")
            return ""

    def _generate_full_model_response(self) -> dict:
        """Determines the action for the drone based on its rules."""

        # Identify co-located drones
        other_drones_at_position = ""
        for d in SIM.board[self.position[0]][self.position[1]].drones:
            if d.id != self.id:
                other_drones_at_position += f"Drone {d.id}, "
        other_drones_at_position = other_drones_at_position.strip(", ")
        # Identify co-located figure
        figure_at_position = "None"
        if SIM.board[self.position[0]][self.position[1]].figure:
            figure_at_position = SIM.board[self.position[0]][self.position[1]].figure.figure_type
        neighboring_figure_colors = ""
        # Identify neighboring figure colors
        for dx in [-1, 0, 1]:
            for dy in [-1, 0, 1]:
                if dx == 0 and dy == 0:
                    continue
                neighbor_x = self.position[0] + dx
                neighbor_y = self.position[1] + dy
                if 0 <= neighbor_x < CONFIG["board"]["width"] and 0 <= neighbor_y < CONFIG["board"]["height"]:
                    neighbor_tile = SIM.board[neighbor_x][neighbor_y]
                    if neighbor_tile.figure:
                        neighboring_figure_colors += f"{direction_from_vector((dx, dy))}: {neighbor_tile.figure.color}, "
        neighboring_figure_colors = neighboring_figure_colors.strip(", ")

        LOGGER.log(f"Drone {self.id} sees drones {other_drones_at_position} and figure '{figure_at_position}' at its position.")
        LOGGER.log(f"Neighboring figures: {neighboring_figure_colors}")

        # Generate a rationale
        prompt_info = f"Rules: {self.rules}\n"
        prompt_info += f"Current round number: {SIM.round}\n"
        prompt_info += f"Current position: {self.position}\n"
        prompt_info += f"Visible drones at position: {other_drones_at_position}\n"
        prompt_info += f"Visible figure at position: {figure_at_position}\n"
        prompt_info += f"Visible neighboring figures: {neighboring_figure_colors}\n"
        prompt_info += f"Memory: {self.memory}\n"
        prompt_info += f"Broadcast Rx Buffer: {self.rx_buffer}\n"
        self.rx_buffer = ""  # Clear the buffer after processing


        rationale = self._generate_single_model_response(
            prompt_info=prompt_info,
            prompt_request=CONFIG["prompt_requests"]["rationale"],
            model=self.model_path,
            max_tokens=CONFIG["simulation"]["max_tokens_for_rationale"])
        prompt_info += f"Rationale: '{rationale}'\n"
        
        # Generate an action (move, wait, broadcast)
        action = self._generate_single_model_response(
            prompt_info=prompt_info,
            prompt_request=CONFIG["prompt_requests"]["action"],
            model=self.model_path,
            max_tokens=CONFIG["simulation"]["max_tokens_for_action"]).lower().replace("'", "").replace('"', "''").strip()
        for word in action.split():
            if word in ["move", "wait", "broadcast"]:
                action = word
                break
        if action not in ["move", "wait", "broadcast"]:
            LOGGER.log(f"Invalid action: '{action}'. Defaulting to 'wait'.")
            action = "wait"
        prompt_info += f"Action: '{action}'\n"

        # Generate content based on the action
        content = "" # default for wait action
        if action == "move":
            content = self._generate_single_model_response(
                prompt_info=prompt_info,
                prompt_request=CONFIG['prompt_requests']['action_move'],
                model=self.model_path,
                max_tokens=CONFIG["simulation"]["max_tokens_for_action_move"]).lower().strip()
            for word in content.split():
                if word in DIRECTION_MAP:
                    content = word
                    break
            if content not in DIRECTION_MAP:
                LOGGER.log(f"Invalid move direction '{content}'. Defaulting to 'wait'.")
                action = "wait"
                content = ""
        if action == "broadcast":
            content = self._generate_single_model_response(
                prompt_info=prompt_info,
                prompt_request=CONFIG["prompt_requests"]["action_broadcast"],
                model=self.model_path,
                max_tokens=CONFIG["simulation"]["max_tokens_for_action_broadcast"])
        prompt_info += f"Content: '{content}'\n"

        # Update memory
        self.memory = self._generate_single_model_response(
            prompt_info=prompt_info,
            prompt_request=CONFIG["prompt_requests"]["memory"],
            model=self.model_path,
            max_tokens=CONFIG["simulation"]["max_tokens_for_memory"])

        # Compile the full response
        response = {
            "rationale": rationale,
            "action": action,
            "content": content,
            "memory": self.memory
        }
        return response
    
    def take_turn(self):
        LOGGER.log(f"Drone {self.id} at position {self.position} is taking action.")
        response = self._generate_full_model_response()
        LOGGER.log(f"Drone {self.id} response:\n{pprint.pformat(response, indent=4)}")

        if response["action"] == "wait":
            # Drone waits, no action taken
            LOGGER.log(f"Drone {self.id} is waiting.")
        if response["action"] == "move":
            # Move the drone to a new position based on the content
            direction = response["content"]
            self._move(direction)
        if response["action"] == "broadcast":
            # Broadcast the content to all drones on the same tile
            tile = SIM.board[self.position[0]][self.position[1]]
            for drone in tile.drones:
                if drone.id != self.id:
                    drone.rx_buffer += f"Drone {self.id} broadcasted: {response['content']}\n"

class _Tile:
    """Represents a tile on the board."""
    def __init__(self, x: int, y: int):
        self.x = x
        self.y = y
        self.targeted_by = {"white": 0, "black": 0}
        self.figure = None
        self.drones = []

    def set_figure(self, figure: _Figure):
        """Sets a piece on the tile."""
        self.figure = figure

    def add_drone(self, drone: _Drone):
        """Adds a drone to the tile."""
        assert drone not in self.drones
        self.drones.append(drone)

    def remove_drone(self, drone: _Drone):
        """Removes a drone from the tile."""
        if drone in self.drones:
            self.drones.remove(drone)

    def reset_targeted_by_amounts(self):
        """Resets the threat and defend levels."""
        self.targeted_by = {"white": 0, "black": 0}

    def add_targeted_by_amount(self, color: str, amount: int = 1):
        """Adds to the targeted by amount for a color."""
        self.targeted_by[color] += amount

class _SimulationGUI:
    def __init__(self):
        """Initializes the GUI for the simulation."""
        self.grid_size = (CONFIG["board"]["width"], CONFIG["board"]["height"])
        
        # Initialize Pygame
        pygame.init()
        gui_config = CONFIG["gui"]
        self.screen = pygame.display.set_mode(
            (self.grid_size[0] * (gui_config["cell_size"] + gui_config["margin"]) + gui_config["margin"],
             self.grid_size[1] * (gui_config["cell_size"] + gui_config["margin"]) + gui_config["margin"])
        )
        pygame.display.set_caption(f'Simulation - Round {1}.1/{CONFIG["simulation"]["max_rounds"]}.{CONFIG["simulation"]["num_drones"]}')
        self.clock = pygame.time.Clock()
        
    def draw_field(self):
        """Draws the board, king, and drones using the current state."""
        gui_config = CONFIG["gui"]
        cell_size = gui_config["cell_size"]
        margin = gui_config["margin"]
        grid_width, grid_height = self.grid_size

        self.screen.fill(gui_config["background_color"])
        font = pygame.font.SysFont(None, 16)

        for x in range(grid_width):
            for y in range(grid_height):
                rect = pygame.Rect(
                    x * (cell_size + margin) + margin,
                    y * (cell_size + margin) + margin,
                    cell_size,
                    cell_size
                )
                pygame.draw.rect(self.screen, gui_config["grid_color"], rect)

                tile = SIM.board[x][y]

                # Draw King
                if tile.figure and tile.figure.figure_type == "king":
                    pygame.draw.circle(self.screen, gui_config["king_color"], rect.center, cell_size // 3)

                # Draw drones (multiple per tile supported)
                total = len(tile.drones)
                if total > 0:
                    angle_step = 360 / total if total > 1 else 0
                    radius = cell_size // 6

                    for idx in range(len(tile.drones)):
                        angle = angle_step * idx
                        offset = pygame.math.Vector2(0, 0)
                        if total > 1:
                            offset = pygame.math.Vector2(1, 0).rotate(angle) * (cell_size // 4)
                        center = (rect.centerx + int(offset.x), rect.centery + int(offset.y))

                        pygame.draw.circle(self.screen, gui_config["drone_color"], center, radius)
                        text = font.render(str(idx + 1), True, gui_config["text_color"])
                        text_rect = text.get_rect(center=center)
                        self.screen.blit(text, text_rect)
                        
        pygame.display.flip()

class Simulation:
    """Tracks the state of the simulation."""
    def __init__(self):
        # Initialize game state
        self.turn = 1 # Which drone's turn it is
        self.round = 1 # Which round of the game it is
        self.rules = ""        
        with open(CONFIG["simulation"]["rules_path"], "r") as f:
            self.rules = f.read().replace("NUMBER_OF_DRONES", str(CONFIG["simulation"]["num_drones"]))
        self.grid_size = (CONFIG["board"]["width"], CONFIG["board"]["height"])
        self.max_rounds = CONFIG["simulation"]["max_rounds"]
        self.num_drones = CONFIG["simulation"]["num_drones"]
        self.model_path = 'Manual'
        if CONFIG["simulation"]["use_gpt4_model"]:
            self.model_path = 'gpt-4' # Use OpenAI's online GPT-4 model
        if CONFIG["simulation"]["use_qwen3_model"]:
            self.model_path = os.path.join(kagglehub.model_download(CONFIG["simulation"]["qwen3_model"]), CONFIG["simulation"]["qwen3_model_name"])
        LOGGER.log(f"Using model: {self.model_path}")

        # Create the board
        self.board = [[_Tile(x, y) for y in range(self.grid_size[1])] for x in range(self.grid_size[0])]
        self.figures = []  # List of all figures on the board
        self.drones = []  # List of drones in the simulation

        # Create, place and calculate figures
        self._create_figures()

        # Create and place drones
        self.drone_base = self.figures[0].position # Assuming the first figure is the drone base (white king)
        self._create_drones()

        # Initialize the GUI
        if CONFIG["simulation"]["use_gui"]:
            self.gui = _SimulationGUI()

    def _create_figures(self):
        """Generates and places all figures based on CONFIG["figures"]."""
        # Create figures based on the configuration
        LOGGER.log("Creating figures based on configuration.")
        self.figures = []
        for color in COLORS:
            for figure_type in FIGURE_TYPES:
                for position in CONFIG["figures"][color][figure_type]:
                    self.figures.append(_Figure(position, color, figure_type))

        # Place figures on the board
        for figure in self.figures:
            self.board[figure.position[0]][figure.position[1]].set_figure(figure)
        
        # Calculate targets for each figure
        for figure in self.figures:
            figure.calculate_figure_targets(self.board)

        # Add targeted by amounts to the tiles based on figure targets
        for figure in self.figures:
            for target_position in figure.target_positions:
                target_tile = self.board[target_position[0]][target_position[1]]
                target_tile.add_targeted_by_amount(figure.color, 1)

    def _create_drones(self):
        """Generate drones."""
        LOGGER.log(f"Creating {self.num_drones} drones.")
        for i in range(self.num_drones):
            drone = _Drone(id=i+1, position=self.drone_base, model_path=self.model_path, rules=self.rules)
            self.drones.append(drone)
            
        # Place drones on the board
        for drone in self.drones:
            tile = self.board[drone.position[0]][drone.position[1]]
            tile.add_drone(drone)
SIM = Simulation()

In [None]:
def run_simulation():
    """Runs the simulation."""

    max_rounds = CONFIG["simulation"].get("max_rounds", 10)
    delay = CONFIG["simulation"].get("delay", 300)
    use_gui = CONFIG["simulation"].get("use_gui", True)

    running = True
    if use_gui:
        pygame.display.set_caption(f"Simulation - Round {1}.1/{max_rounds}.{SIM.num_drones}")

    for sim_round in range(max_rounds):
        SIM.round = sim_round + 1

        if use_gui:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    running = False
                    break
                elif event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE:
                    running = False
            if not running:
                break

        for drone in SIM.drones:
            SIM.turn = drone.id
            caption = f"Simulation - Round {sim_round + 1}.{drone.id}/{max_rounds}.{SIM.num_drones}"
            LOGGER.log('#' * 50)
            LOGGER.log(caption)

            if use_gui:
                pygame.display.set_caption(caption)

            try:
                drone.take_turn()
            except Exception as e:
                LOGGER.log(f"Error during Drone {drone.id}'s turn: {e}")

            if use_gui:
                SIM.gui.draw_field()
                pygame.time.delay(delay)

            LOGGER.log(f"Round {SIM.round}.{SIM.turn} completed.")

    if use_gui:
        pygame.quit()


In [None]:
# Cell 3: Launcher for simulation

if __name__ == "__main__":
    LOGGER.log("Launching simulation.")
    run_simulation()
    LOGGER.log("Simulation ended.")
