In [None]:
import json
import logging
import sqlite3
import time
from contextlib import closing
from typing import Any

import numpy as np
import pandas as pd
import plotly.graph_objects as go
import requests
from plotly.subplots import make_subplots
from IPython.display import clear_output

# Assuming fig_widget is defined globally
# Initialize the figure widget globally
fig_widget = None


# Configuration
API_KEY = "668d4e535bdd6668d4e535bddd"
BASE_URL = "https://games.datsteam.dev"
LOBBY_CHECK_INTERVAL = 10  # Interval to recheck lobby status in seconds
EPSILON = 1e-6  # Small value to avoid division by zero
REQUEST_TIMEOUT = 10  # Timeout for network requests in seconds

HEADERS = {
    "Content-Type": "application/json",
    "X-Auth-Token": API_KEY
}

# Setup logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Constants for vision, attack, and strength
BASE_VISION_RADIUS = 10
BASE_ATTACK_RADIUS = 8
BASE_ATTACK_STRENGTH = 40
BASE_HEALTH = 300

CELL_VISION_RADIUS = 8
CELL_ATTACK_RADIUS = 5
CELL_ATTACK_STRENGTH = 10
CELL_HEALTH = 100

STARTING_GOLD = 10
STARTING_BASE_SIZE = 4

# Zombie spawning rules
INITIAL_PROBABILITY = 0.01
MAX_PROBABILITY = 0.50
PROBABILITY_INCREASE_INTERVAL = 6
MAX_TURN = 300
ZOMBIE_MODS = {
    "normal": 1,
    "fast": 1,
    "bomber": 3,
    "liner": 3,
    "juggernaut": 3,
    "chaos_knight": 2
}

# Zombie characteristic growth
INITIAL_HEALTH = 5
INITIAL_ATTACK = 5
INITIAL_SPEED = 1

# Game phases
EARLY_GAME_END_TURN = 50
MID_GAME_END_TURN = 150

# Database setup
DB_NAME = "cache.db"

def init_db() -> None:
    try:
        with closing(sqlite3.connect(DB_NAME)) as conn, conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS cache (
                    id INTEGER PRIMARY KEY,
                    url TEXT,
                    method TEXT,
                    request_body TEXT,
                    response_body TEXT,
                    timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            """)
        logging.info("Database initialized successfully.")
    except Exception as e:
        logging.exception(f"Failed to initialize database: {e}")

def cache_request(url: str, method: str, request_body: dict[str, Any], response_body: dict[str, Any]) -> None:
    try:
        with closing(sqlite3.connect(DB_NAME)) as conn, conn:
            conn.execute("""
                INSERT INTO cache (url, method, request_body, response_body)
                VALUES (?, ?, ?, ?)
            """, (url, method, json.dumps(request_body), json.dumps(response_body)))
        logging.info(f"Request cached successfully: {url} {method}")
    except Exception as e:
        logging.exception(f"Failed to cache request: {e}")

def get_cached_response(url: str, method: str, request_body: dict[str, Any]) -> dict[str, Any] | None:
    try:
        with closing(sqlite3.connect(DB_NAME)) as conn:
            cursor = conn.cursor()
            cursor.execute("""
                SELECT response_body FROM cache
                WHERE url = ? AND method = ? AND request_body = ?
                ORDER BY timestamp DESC LIMIT 1
            """, (url, method, json.dumps(request_body)))
            row = cursor.fetchone()
            if row:
                logging.info(f"Cache hit for {url} {method}")
            else:
                logging.info(f"Cache miss for {url} {method}")
            return json.loads(row[0]) if row else None
    except Exception as e:
        logging.exception(f"Failed to retrieve cached response: {e}")
        return None

# Helper Functions
def register_for_game() -> dict[str, Any]:
    url = f"{BASE_URL}/play/zombidef/participate"
    logging.info(f"Registering for game at {url}")
    response = requests.put(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
    if response.status_code != 200:
        logging.error(f"Failed to register for game: {response.status_code} - {response.text}")
        response.raise_for_status()
    cache_request(url, "PUT", {}, response.json())
    logging.info("Successfully registered for game")
    return response.json()

def get_game_state() -> dict[str, Any]:
    url = f"{BASE_URL}/play/zombidef/units"
    logging.info(f"Fetching game state from {url}")
    response = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
    if response.status_code != 200:
        logging.error(f"Failed to get game state: {response.status_code} - {response.text}")
        response.raise_for_status()
    cache_request(url, "GET", {}, response.json())
    logging.info("Game state fetched successfully")
    return response.json()

def send_commands(commands: dict[str, Any]) -> dict[str, Any]:
    url = f"{BASE_URL}/play/zombidef/command"
    logging.info(f"Sending commands to {url}: {json.dumps(commands)}")
    response = requests.post(url, headers=HEADERS, json=commands, timeout=REQUEST_TIMEOUT)
    if response.status_code != 200:
        logging.error(f"Failed to send commands: {response.status_code} - {response.text}")
        response.raise_for_status()
    cache_request(url, "POST", commands, response.json())
    logging.info("Commands successfully sent")
    return response.json()

def calculate_distance(x1: int, y1: int, x2: int, y2: int) -> float:
    return np.sqrt(np.square(x1 - x2) + np.square(y1 - y2))

def check_participation() -> bool:
    try:
        logging.info("Checking participation status")
        get_game_state()
        return True
    except requests.HTTPError as e:
        if e.response.status_code == 400 and "player is not participating in this round" in e.response.text:
            logging.info("Player is not participating in this round")
            return False
        elif e.response.status_code == 400 and "lobby ends" in e.response.text:
            lobby_end_time = int(json.loads(e.response.text)["error"].split()[-2])
            logging.info(f"Lobby is still active. Waiting for {lobby_end_time} seconds.")
            try:
                time.sleep(lobby_end_time - 2)
            except:
                time.sleep(0.001)
            return False
        else:
            raise

def calculate_zombie_weight(zombie: dict[str, Any]) -> float:
    mod = ZOMBIE_MODS.get(zombie["type"], 1)
    return (zombie["speed"] * zombie["attack"] * mod) / zombie["health"]

def calculate_danger(block: dict[str, int], zombies: list[dict[str, Any]], danger_cache: dict[tuple[int, int], float]) -> float:
    block_coords = np.array([block["x"], block["y"]])
    danger = 0
    for zombie in zombies:
        zombie_coords = np.array([zombie["x"], zombie["y"]])
        distance = np.linalg.norm(block_coords - zombie_coords)
        has_contact = 1  # if zombie["direction"] in ["up", "down", "left", "right"] else 0
        if has_contact and distance <= BASE_VISION_RADIUS:
            zombie_weight = calculate_zombie_weight(zombie)
            danger += (zombie_weight * has_contact) / (distance + EPSILON)
    danger_cache[(block["x"], block["y"])] = danger
    return danger

def find_best_attack_block(base: list[dict[str, Any]], zombies: list[dict[str, Any]], danger_cache: dict[tuple[int, int], float]) -> dict[str, Any] | None:
    max_danger = -float("inf")
    best_block = None
    for block in base:
        if block.get("attacked"):
            continue
        danger = danger_cache.get((block["x"], block["y"]), calculate_danger(block, zombies, danger_cache))
        if danger > max_danger:
            max_danger = danger
            best_block = block
    return best_block

def find_top_k_safe_neighbors(base: list[dict[str, Any]], zombies: list[dict[str, Any]], k: int, danger_cache: dict[tuple[int, int], float]) -> list[dict[str, int]]:
    directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
    neighbors = []

    base_set = {(b["x"], b["y"]) for b in base}

    for block in base:
        for dx, dy in directions:
            neighbor_x = block["x"] + dx
            neighbor_y = block["y"] + dy
            if (neighbor_x, neighbor_y) in base_set:
                continue

            neighbor = {"x": neighbor_x, "y": neighbor_y}
            danger = danger_cache.get((neighbor_x, neighbor_y), calculate_danger(neighbor, zombies, danger_cache))
            neighbors.append((danger, neighbor))

    neighbors.sort(key=lambda x: x[0])
    return [neighbor for _, neighbor in neighbors[:k]]

def build_base(base: list[dict[str, Any]], zombies: list[dict[str, Any]], gold: int, turn: int, danger_cache: dict[tuple[int, int], float]) -> dict[str, Any]:
    commands = {
        "build": [],
        "attack": [],
        "moveBase": None
    }

    if base and gold > 0:
        safe_locations = find_top_k_safe_neighbors(base, zombies, gold, danger_cache)
        for location in safe_locations:
            commands["build"].append({"x": location["x"], "y": location["y"]})

    return commands

def attack_enemy_or_zombies(base: list[dict[str, Any]], zombies: list[dict[str, Any]], turn: int, danger_cache: dict[tuple[int, int], float]) -> dict[str, Any]:
    commands = {
        "build": [],
        "attack": [],
        "moveBase": None
    }

    while zombies and len(commands["attack"]) < 1000:
        best_block = find_best_attack_block(base, zombies, danger_cache)
        if not best_block:
            logging.info("No best block found, breaking out of the loop.")
            break

        attack_radius = BASE_ATTACK_RADIUS if best_block.get("isHead") else CELL_ATTACK_RADIUS
        logging.info(f"Best block found: {best_block}, attack radius: {attack_radius}")

        attacked = False
        for zombie in zombies:
            distance = calculate_distance(best_block["x"], best_block["y"], zombie["x"], zombie["y"])
            if distance <= attack_radius:
                commands["attack"].append({"blockId": best_block["id"], "target": {"x": zombie["x"], "y": zombie["y"]}})
                logging.info(f"Attacking zombie at ({zombie['x']}, {zombie['y']}) from block {best_block['id']}")

                # Update the global state of the base
                for block in base:
                    if block["id"] == best_block["id"]:
                        block["attacked"] = True
                        break

                zombie["health"] -= BASE_ATTACK_STRENGTH if best_block.get("isHead") else CELL_ATTACK_STRENGTH
                if zombie["health"] <= 0:
                    logging.info(f"Zombie at ({zombie['x']}, {zombie['y']}) killed")
                    zombies.remove(zombie)

                # Recalculate danger for the affected blocks
                danger_cache[(best_block["x"], best_block["y"])] = calculate_danger(best_block, zombies, danger_cache)
                for dx in range(-BASE_VISION_RADIUS, BASE_VISION_RADIUS + 1):
                    for dy in range(-BASE_VISION_RADIUS, BASE_VISION_RADIUS + 1):
                        neighbor_x = best_block["x"] + dx
                        neighbor_y = best_block["y"] + dy
                        if (neighbor_x, neighbor_y) in danger_cache:
                            danger_cache[(neighbor_x, neighbor_y)] = calculate_danger({"x": neighbor_x, "y": neighbor_y}, zombies, danger_cache)

                attacked = True
                break

        if not attacked:
            logging.info("No zombies within attack radius, breaking out of the loop.")
            break

    return commands

def find_safest_base_location(base: list[dict[str, Any]], zombies: list[dict[str, Any]], danger_cache: dict[tuple[int, int], float]) -> tuple[int, int] | None:
    min_danger = float("inf")
    safest_location = None

    for block in base:
        x, y = block["x"], block["y"]
        for dx in range(-1, 2):
            for dy in range(-1, 2):
                new_location = (x + dx, y + dy)
                danger = danger_cache.get(new_location, calculate_danger({"x": new_location[0], "y": new_location[1]}, zombies, danger_cache))

                if danger < min_danger:
                    min_danger = danger
                    safest_location = new_location

    return safest_location

def update_zombie_characteristics(zombie: dict[str, Any], turn: int) -> None:
    zombie["health"] = INITIAL_HEALTH + (turn // PROBABILITY_INCREASE_INTERVAL)
    zombie["attack"] = INITIAL_ATTACK + (turn // PROBABILITY_INCREASE_INTERVAL)
    zombie["speed"] = INITIAL_SPEED + (turn // PROBABILITY_INCREASE_INTERVAL)

def move_zombie(zombie: dict[str, Any], obstacles: list[dict[str, int]], base: list[dict[str, int]]) -> tuple[int | None, int | None]:
    direction_map = {
        "up": (0, -1),
        "down": (0, 1),
        "left": (-1, 0),
        "right": (1, 0)
    }
    dx, dy = direction_map.get(zombie["direction"], (0, 0))
    new_x = zombie["x"] + dx
    new_y = zombie["y"] + dy

    if any(obstacle["x"] == new_x and obstacle["y"] == new_y for obstacle in obstacles):
        return None, None
    if any(block["x"] == new_x and block["y"] == new_y for block in base):
        return None, None

    return new_x, new_y

def handle_zombies(game_state: dict[str, Any], zombies: list[dict[str, Any]], base: list[dict[str, Any]]) -> None:
    base_dict = {(block["x"], block["y"]): block for block in base}

    for zombie in zombies:
        update_zombie_characteristics(zombie, game_state["turn"])
        # if zombie["wait_turn"] > 1:
        #     continue
        new_x, new_y = move_zombie(zombie, [], [])

        if new_x is None and new_y is None:
            continue

        if (new_x, new_y) in base_dict:
            block = base_dict[(new_x, new_y)]
            block["health"] -= zombie["attack"]
            if block["health"] <= 0:
                del base_dict[(new_x, new_y)]
                base.remove(block)

        zombie["x"] = new_x
        zombie["y"] = new_y

def move_base(base: list[dict[str, Any]], zombies: list[dict[str, Any]], danger_cache: dict[tuple[int, int], float]) -> dict[str, int] | None:
    safest_location = find_safest_base_location(base, zombies, danger_cache)
    if safest_location:
        return {"x": safest_location[0], "y": safest_location[1]}
    return None

import logging
from typing import Any

# Assuming fig_widget is defined globally
fig_widget = None

# Define a mapping for zombie types to colors and symbols
ZOMBIE_TYPE_STYLES = {
    "normal": {"color": "red", "symbol": "x"},
    "fast": {"color": "orange", "symbol": "triangle-up"},
    "bomber": {"color": "yellow", "symbol": "diamond"},
    "liner": {"color": "green", "symbol": "star"},
    "juggernaut": {"color": "blue", "symbol": "hexagon"},
    "chaos_knight": {"color": "purple", "symbol": "cross"}
}

# Define a style for walls
WALL_STYLE = {"color": "gray", "symbol": "square"}

# Define a style for the main base
MAIN_BASE_STYLE = {"color": "black", "symbol": "circle-open-dot"}

def get_cached_game_state() -> dict[str, Any] | None:
    try:
        with closing(sqlite3.connect(DB_NAME)) as conn:
            cursor = conn.cursor()
            cursor.execute("""
                SELECT response_body FROM cache
                WHERE url LIKE '%/play/zombidef/units%'
                ORDER BY timestamp DESC LIMIT 1
            """)
            row = cursor.fetchone()
            if row:
                logging.info("Cache hit for game state")
                return json.loads(row[0])
            else:
                logging.info("Cache miss for game state")
                return None
    except Exception as e:
        logging.exception(f"Failed to retrieve cached game state: {e}")
        return None

def visualize_game_state(turn: int, base: list[dict[str, Any]], zombies: list[dict[str, Any]], walls: list[dict[str, Any]], enemies: list[dict[str, Any]]) -> None:
    global fig_widget

    logging.info(f"Visualizing game state at turn {turn}")
    logging.debug(f"Base: {base}")
    logging.debug(f"Zombies: {zombies}")
    logging.debug(f"Walls: {walls}")
    logging.debug(f"Enemies: {enemies}")

    if not base:
        logging.error("No base found in the game state.")
        return

    # Calculate the center coordinates based on the base's position
    base_df = pd.DataFrame(base)
    center_x = base_df["x"].mean()
    center_y = base_df["y"].mean()

    # Identify the main base (assuming it's the first entry in the base list)
    main_base = base[0]
    other_bases = base[1:]

    if fig_widget is None:
        # Create the figure widget only once
        fig_widget = make_subplots()
        fig_widget.update_layout(
            title=f"Game State at Turn {turn}",
            xaxis_title="X Coordinate",
            yaxis_title="Y Coordinate",
            legend_title="Entities",
            showlegend=True,
            xaxis=dict(showgrid=True, gridwidth=1, gridcolor="LightGray", dtick=1),
            yaxis=dict(showgrid=True, gridwidth=1, gridcolor="LightGray", dtick=1)
        )
        # Add initial empty traces for each entity type
        fig_widget.add_trace(go.Scatter(mode="markers", marker=dict(symbol=MAIN_BASE_STYLE["symbol"], size=15, color=MAIN_BASE_STYLE["color"]), name="Main Base"))
        fig_widget.add_trace(go.Scatter(mode="markers", marker=dict(symbol="circle", size=10, color="blue"), name="Base"))
        fig_widget.add_trace(go.Scatter(mode="markers", marker=dict(symbol=WALL_STYLE["symbol"], size=10, color=WALL_STYLE["color"]), name="Walls"))
        fig_widget.add_trace(go.Scatter(mode="markers", marker=dict(symbol="triangle-up", size=10, color="green"), name="Enemies"))

        # Add traces for each zombie type
        for zombie_type, style in ZOMBIE_TYPE_STYLES.items():
            fig_widget.add_trace(go.Scatter(
                mode="markers",
                marker=dict(symbol=style["symbol"], size=10, color=style["color"]),
                name=zombie_type.capitalize()
            ))

    # Update traces with new data
    fig_widget.data[0].update(x=[main_base["x"]], y=[main_base["y"]])

    if other_bases:
        other_bases_df = pd.DataFrame(other_bases)
        fig_widget.data[1].update(x=other_bases_df["x"], y=other_bases_df["y"])
    else:
        fig_widget.data[1].update(x=[], y=[])

    if walls:
        walls_df = pd.DataFrame(walls)
        fig_widget.data[2].update(x=walls_df["x"], y=walls_df["y"])
    else:
        fig_widget.data[2].update(x=[], y=[])

    if enemies:
        enemies_df = pd.DataFrame(enemies)
        fig_widget.data[3].update(x=enemies_df["x"], y=enemies_df["y"])
    else:
        fig_widget.data[3].update(x=[], y=[])

    # Update traces for each zombie type
    for i, (zombie_type, style) in enumerate(ZOMBIE_TYPE_STYLES.items(), start=4):
        type_zombies = [z for z in zombies if z["type"] == zombie_type]
        if type_zombies:
            type_zombies_df = pd.DataFrame(type_zombies)
            fig_widget.data[i].update(x=type_zombies_df["x"], y=type_zombies_df["y"])
        else:
            fig_widget.data[i].update(x=[], y=[])

    # Update the layout title for the current turn
    fig_widget.update_layout(
        title=f"Game State at Turn {turn}",
        xaxis=dict(range=[center_x - 10, center_x + 10], showgrid=True, gridwidth=1, gridcolor="LightGray", dtick=1),
        yaxis=dict(range=[center_y - 10, center_y + 10], showgrid=True, gridwidth=1, gridcolor="LightGray", dtick=1)
    )

    # Display the updated figure
    clear_output(wait=True)
    fig_widget.show()

def game_loop() -> None:
    while True:
        try:
            logging.info("Starting game loop iteration")
            if not check_participation():
                logging.info("Not participating, registering for game")
                register_for_game()
                time.sleep(5)
                if not check_participation():
                    logging.info("Still not participating after registration")
                    continue

            game_state = get_game_state()
            logging.info(f"Game state: {json.dumps(game_state, indent=2)}")

            if game_state.get("gameEndedAt"):
                logging.info("Game round has ended. Re-registering for a new game round.")
                register_for_game()
                time.sleep(5)
                continue

            base = game_state.get("base") or []
            zombies = game_state.get("zombies") or []
            walls = game_state.get("walls") or []
            enemies = game_state.get("enemies") or []
            gold = game_state.get("player", {}).get("gold", 0)
            turn = game_state.get("turn", 0)

            logging.info(f"Handling zombies for turn {turn}")
            danger_cache: dict[tuple[int, int], float] = {}
            handle_zombies(game_state, zombies, base)

            logging.info("Building base")
            build_commands = build_base(base, zombies, gold, turn, danger_cache)
            logging.info(f"Build commands: {build_commands}")

            logging.info("Attacking enemies or zombies")
            attack_commands = attack_enemy_or_zombies(base, zombies, turn, danger_cache)
            logging.info(f"Attack commands: {attack_commands}")

            logging.info("Moving base if necessary")
            move_command = move_base(base, zombies, danger_cache)
            logging.info(f"Move command: {move_command}")

            commands = {
                "build": build_commands["build"],
                "attack": attack_commands["attack"],
                "moveBase": move_command
            }

            logging.info(f"Sending commands: {json.dumps(commands)}")
            send_commands(commands)

            logging.info("Visualizing game state")
            visualize_game_state(turn, base, zombies, walls, enemies)

            time.sleep(1)
        except Exception as e:
            logging.exception(f"An error occurred during the game loop: {e}")
            break

if __name__ == "__main__":
    init_db()
    game_loop()

ERROR:root:No base found in the game state.
ERROR:root:No base found in the game state.
ERROR:root:No base found in the game state.


KeyboardInterrupt: 