# Ahead of time:
- Go over [this link](https://learn.microsoft.com/en-us/azure/ai-services/openai/gpt-v-quickstart) and modify the `llmclient.py` class to work with GPT-4 Turbo with Vision and GPT-4
- Make sure the `doom.wad` file is in the same directory as this notebook
- Make sure you have installed cydoomgeneric
- Make a log directory and modify `LOG_PATH` below to point to it

In [1]:
import time
from llmclient import LLMClient
import json
import subprocess
import os
import random

import sys
from typing import Optional, Tuple
import datetime

import cydoomgeneric as cdg
import matplotlib.pyplot as plt
import numpy as np

from doom_prompts import build_vision_prompt, get_plan_prompt, play_prompt_naive, play_prompt_with_plan, play_prompt_with_klevels, play_prompt_with_walkthrough

mock = False # For debugging without calling the models

keymap = {
    "left": cdg.Keys.LEFTARROW,
    "right": cdg.Keys.RIGHTARROW,
    "up": cdg.Keys.UPARROW,
    "down": cdg.Keys.DOWNARROW,
    ',': cdg.Keys.STRAFE_L,
    '.': cdg.Keys.STRAFE_R,
    "control": cdg.Keys.FIRE,
    " ": cdg.Keys.USE,
    "shift": cdg.Keys.RSHIFT,
    "enter": cdg.Keys.ENTER,
    "escape": cdg.Keys.ESCAPE,
}

# TODO -- we really only need one keymap
GPT4_KEYMAP = {
    "UP": "up",
    "DOWN": "down",
    "LEFT": "left",
    "RIGHT": "right",
    "STRAFE LEFT": ",",
    "STRAFE RIGHT": ".",
    "FIRE": "control",
    "USE": " ",
    "SPEED": "shift",
    "1": "1",
    "2": "2",
    "3": "3",
    "4": "4",
    "5": "5",
    "6": "6",
    "7": "7"
}

MAX_FPS = 2
# The frames we need to skip to do our selection of difficulty
# Note that this was tuned with MAX_FPS = 2
SKIP_FRAME_COUNT = 50
MAX_FRAMES_TO_PLAY = 8000 # Set this to None to let the model play as long as it can. For walkthrough, it gets usually killed around 500.
FRAMES_IN_EXECUTION = 10 # This one is for doing a 45-deg turn radius

LOG_PATH = "logs_run_14/"
DEBUG = True

DO_PLAN = True # Call the planner on your prompt every PLAN_INTERVAL frames
PLAN_INTERVAL = 20
KLEVEL_INTERVAL = 60 # Must be a multiple of PLAN_INTERVAL so we get a plan for klevels
PROMPT_TYPE = "klevels" #"walkthrough", "plan", "klevels" -- anything else will be interpreted as "naive"

# When the model outputs "GAME OVER" this forces the endgame (i.e., quit).
# It also crashes the kernel of the notebook.
END_SEQUENCE = {0: [("escape", 1), ("escape", 0)],
                4: [("up", 1), ("up", 0)],
                6: [("down", 1), ("down", 0)],
                8: [("up", 1), ("up", 0)],
                10:[("up", 1), ("up", 0)],
                12:[("down", 1), ("down", 0)],
                14:[("enter", 1), ("enter", 0)],
                16:[("y", 1)]}

# This selects the first level ("knee deep in the dead") and sets it as "hurt me plenty" difficulty
SKIP_CODES = {4: [(" ", 1), (" ", 0)],
              18:[("enter", 1), ("enter", 0)],
              20:[("enter", 1), ("enter", 0)],
              #30:[("up", 1), ("up", 0)], for "inferno"
              38:[("enter", 1), ("enter", 0)],
              48:[("enter", 1), ("enter", 0)],
             }

# Walkthrough for that level.
WALTHROUGH_E1_M1 = "1. You start out the game in a room with a blue pool.\n2. Go across the pool.\n3. Keep going straight into the hallway.\n4.Go straight down the hall, and open the door. Enter the computer room.\n5. Kill the two guys there. Go around the room with blue walls, and to the back right of the computer room, where there is an entrance to the bridge room (if you face the grey door, you need to turn around).\n6. Make a right, kill the two guys, and cross the bridge. Don't fall into the green acid pool.\n7. Go straight and open the door at the end of the bridge room. Kill the imp behind the door. Enter the loot room.\n8. Go straight the loot room and open the next door. Enter the switch room.\n9. Inside the switch room there is a switch on the right wall. Hit the switch after getting the items, if you need them."#, then return to the bigger room.Head back up the stairs, toward the beginning, then run into that opening and onto the ledge that dropped. When it rises, get the bonuses in the winding hallway. When you've had your fill, return to the "exit" room, and enter the exit door. 


In [2]:
agent_params = {"temperature": 0.9,
                "max_tokens": 25,
                "top_p": 1
                }

planner_params = {"temperature": 0.1,
                  "top_p": 1, 
                  "max_tokens": 150
                 }

vision_params = {"temperature": 0.1,
                 "top_p": 1,
                 "max_tokens": 2880
                }
    

expert_params = {"expert1": {"temperature": 0.9, "max_tokens": 25, "top_p": 1},
                 "expert2": {"temperature": 0.9, "max_tokens": 25, "top_p": 1},
                 "expert3": {"temperature": 0.9, "max_tokens": 25, "top_p": 1}
                }

# Agent (and planner) - GPT-4
gpt4 = LLMClient(agent_params, "gpt-4-32k")	
# Vision - GPT-4V
gpt4v = LLMClient(vision_params, "gpt-4-vision-preview")

In [8]:
def step_call(im_path, history, plan_history=None, log_metadata = None, walkthrough=False,
              plan=False, klevels=False, do_plan_override=False, do_klevel_override=False, 
              debug=True, mock=False):
    """
    Main step call for our models. By default call the naive prompt.

    Parameters:
        im_path (str): Path to the image.
        history (str): The history of actions up until this point.
        plan_history (str, None): the history of plans up until this point
        log_metadata (dic, None): A dictionary with a LogPath (str) entry and a Frame entry
        debug (bool, True): Output stuff to stdout if log_metadata is None, else to file 
        walkthrough (bool, False): Call the walkthrough prompt
        plan (bool, False): Use the planner (hierarchical planning; also does walkthrough)
        klevels (bool, False): Do K-level planning (also does hierarchical planning)
        do_plan_override (bool, False): call the planner regardless of prompt used
        do_klevel_override (bool, False): call the experts regardless of prompt used
        mock (bool, False): Skip all model calls and return random action and empty history.
    Returns:
        str, str, str: The agent's action; the history; the planner's response
    """
    if mock:
        return random.choice([k for k in GPT4_KEYMAP.keys()]), ""

    def get_gpt4_response(model, assembled_prompt, is_vision=False):
        # This is a way to avoid throttling and too much repeated code
        resp = None
        while resp is None or "choices" not in resp or "error" in resp:
            try:
                resp = model.send_request(assembled_prompt)
            except:
                continue
            if resp is not None and "choices" in resp:
                break
        return resp["choices"][0]["text"]

    def clean_gpt_response(act_resp):
        # Because sometimes (too often) the model does whatever it wants.
        if act_resp.split("\n")[0].strip() not in GPT4_KEYMAP:
            act_resp = "WAIT"
            r_act_resp = "WAIT\n|Explanation|: Waiting for another action."
        else:
            r_act_resp = act_resp.strip()
        return act_resp, r_act_resp

    def log_to_file(model_response, model_name):
        if log_metadata is not None:
            with open(log_metadata["LogPath"] + "model_call_log.txt", "a", encoding="utf-8") as f:
                f.write("\t".join([str(log_metadata["Frame"]), model_name, model_response]) + "\n")
        else:
            print(model_response)
    
    # 1 - Vision call
    resp =  {"error": None}
    while "error" in resp or "choices" not in resp:
        try:
            resp = gpt4v.send_request(build_vision_prompt(im_path))
        except:
            continue
        if "choices" in resp:
            break
    raw_vision_resp = resp["choices"][0]["message"]["content"].strip()
    if debug:
        log_to_file(raw_vision_resp, "Vision")

    # 1.5 - Plan (if applicable)
    plan_resp = plan_history
    if do_plan_override:
        gpt4.update_params(planner_params)
        plan_resp = get_gpt4_response(gpt4, get_plan_prompt(raw_vision_resp, history))
        if ":" in plan_resp.strip():
            plan_resp = "\n\n".join(plan_resp.strip().split("\n\n")[1:])
            plan_resp = plan_resp.strip()
        gpt4.update_params(agent_params)        
        if debug:
            log_to_file(plan_resp, "Planner")

    # 1.5 - K-levels (if applicable)
    klevels_resps = []
    if klevels and do_klevel_override:
        # Hardcoded temps
        for i in range(3):
            #if do_plan_override:
            gpt4.update_params(expert_params[f"expert{i+1}"])
            act_resp = get_gpt4_response(gpt4, play_prompt_with_plan(raw_vision_resp,
                                                                     history, plan=plan_resp))
            gpt4.update_params(agent_params)
            act_resp, r_act_resp = clean_gpt_response(act_resp)
            proc_act_res = []
            for r in r_act_resp.split("\n"):
                if "|step|" in r:
                    continue
                proc_act_res.append(r.replace("|Explanation|", ", because"))
            klevels_resps.append("".join(proc_act_res).strip())
            if debug:
                log_to_file(klevels_resps[i], f"Expert{i+1}")

    # 2 - Agent call    
    if walkthrough or (klevels and not do_plan_override):
        act_resp = get_gpt4_response(gpt4, play_prompt_with_walkthrough(raw_vision_resp, history))
    elif plan:
        act_resp = get_gpt4_response(gpt4, play_prompt_with_plan(raw_vision_resp, history, plan=plan_resp))
    elif klevels:
        act_resp = get_gpt4_response(gpt4, play_prompt_with_klevels(raw_vision_resp, history, plan=plan_resp, klevels=klevels_resps))
    else:
        act_resp = get_gpt4_response(gpt4, play_prompt_naive(raw_vision_resp, history))

    if debug:
        log_to_file(act_resp, "Agent")

    act_resp, r_act_resp = clean_gpt_response(act_resp)
    history += f"\nState:\n{raw_vision_resp.strip()}\n|Action| {r_act_resp.strip()}"

    return act_resp, history, plan_resp


In [9]:
class PyPlotDoomGPT4:
    """
    PyPlot DOOM-playing class, modified to work with GPT-4V and GPT-4.
    Original class/code can be found (here)[https://github.com/wojciech-graj/cydoomgeneric]
    """
    def init(self, debug=DEBUG) -> None:
        self.keyevent_queue = []
        self.fig = plt.figure()
        self.ax = self.fig.add_subplot(1,1,1)
        self.fig.canvas.mpl_connect('key_press_event', self.on_press)
        self.fig.canvas.mpl_connect('key_release_event', self.on_release)
        self.fig.canvas.mpl_connect('close_event', sys.exit)
        self.fig.show()

        # State tracking
        self._frame = 0
        self._history = ""
        self._debug = debug
        self._is_in_end_state = False
        self._is_executing = False
        self._frames_on_execution = 0
        self._last_action = "UP"
        self._last_frame = 0
        self._do_plan_override = DO_PLAN
        self._plan = WALTHROUGH_E1_M1

        # Configuration
        self._prompt_type = PROMPT_TYPE
        self._log_root_path = LOG_PATH
        self._log_metadata = {"LogPath": self._log_root_path, "Frame": self._frame}

        # Create logfiles
        if self._debug:
            with open(f"{self._log_root_path}/log.txt", "w", encoding="utf-8") as f:
                f.write("\t".join(["Frame", "Key", "Pressed", "EventName", "IsInEndState"]) + "\n")
            with open(f"{self._log_root_path}/model_call_log.txt", "w", encoding="utf-8") as f:
                f.write("\t".join(["Frame", "Model", "Response"]) + "\n")
    
    def clip_history(self) -> None:
        """
        Heuristic to not go over the token limit.
        """
        # Bad heuristic -- we should use tiktoken.
        max_toks = 16_000
        if not self._prompt_type == "klevels":
            max_toks = 13_000
        if len(self._history.split(" ")) > max_toks:
            self._history = "State:".join(self._history.split("State:")[2:])
            self._history = "State:\n" + self._history
            self._history.strip()
    
    def draw_frame(self, pixels: np.ndarray) -> None:
        """
        Main logic loop. Take in pixels, call the model, hijack the keystroke queue and add your own actions.
        In our case, this will contain the logic to call the model depending on prompt, and update/assign 
        actions and history.
        
        Parameters:
            pixels (np.ndarray): current thing to render on canvas
        """

        self.ax.clear()
        self.ax.imshow(pixels[:,:,[2,1,0]])
        self.fig.canvas.draw()
        self._frame += 1
        self._log_metadata["Frame"] = self._frame

        # Until I figure out how to fix ValueError: ndarray is not C-contiguous, this is the only way
        event_name = str(self._frame) + "_" + str(datetime.datetime.now()).replace(":", ".")
        plt.imsave(f"{self._log_root_path}/{event_name}.png", pixels[:,:,[2,1,0]])

        if MAX_FRAMES_TO_PLAY is not None and self._frame > MAX_FRAMES_TO_PLAY:
            self._is_in_end_state = True

        # Termination conditions -- either max frames or LLM
        if self._is_in_end_state:
            if self._last_frame in END_SEQUENCE:
                for e in END_SEQUENCE[self._last_frame]:
                    self.keyevent_queue.append(e)
            self._last_frame += 1
        # Bulk of the call
        elif self._frame > SKIP_FRAME_COUNT:
            if not self._is_executing:
                if self._frame == 0 or self._frame % MAX_FPS == 0:
                    needs_planning = False
                    if self._do_plan_override:
                        needs_planning = self._frame % PLAN_INTERVAL == 0
                    action, self._history, self.__plan = step_call(f"{self._log_root_path}/{event_name}.png",
                                                                  self._history,
                                                                  plan_history = self._plan,
                                                                  log_metadata = self._log_metadata,
                                                                  debug = self._debug,
                                                                  walkthrough = self._prompt_type == "walkthrough",
                                                                  plan = self._prompt_type == "plan",
                                                                  klevels = self._prompt_type == "klevels",
                                                                  do_plan_override = needs_planning,
                                                                  do_klevel_override = self._prompt_type == "klevels" and self._frame % KLEVEL_INTERVAL == 0,
                                                                  mock = mock)
                    self._plan = self.__plan if self.__plan is not None else self._plan
                    action = action.split("\n")[0].strip()
    
                    if action in GPT4_KEYMAP:
                        # Two frame skip = press and depress.
                        # Other actions have "sensitivity" calibrated by logging my own keystrokes.
                        self._is_executing = False
                        self._Frames_on_execution = 0
                        if action in ["LEFT", "RIGHT", "UP", "DOWN", "STRAFE LEFT", "STRAFE RIGHT"]:
                            self._is_executing = True
                            self._frames_on_execution = MAX_FRAMES_TO_PLAY
                        if action != self._last_action:
                            self.keyevent_queue.append((GPT4_KEYMAP[self._last_action], 0))
                        self._last_action = action
                        self.keyevent_queue.append((GPT4_KEYMAP[action], 1))

                    elif action == "GAME OVER":
                        # For a graceful exit -- it doesn't work unless there are exemplars
                        self._is_in_end_state = True
                    self.clip_history()
                else:
                    if self._frames_on_execution > 0:
                        self._frames_on_execution -= 1
                        self.keyevent_queue.append((GPT4_KEYMAP[self._last_action], 1))
                    else:
                        self._is_executing = False
                        self.keyevent_queue.append((GPT4_KEYMAP[self._last_action], 1))
                        self.keyevent_queue.append((GPT4_KEYMAP[self._last_action], 0))
        # Skip call for loading the first game
        else:
            if self._frame in SKIP_CODES:
                for e in SKIP_CODES[self._frame]:
                    self.keyevent_queue.append(e)

        self.fig.canvas.flush_events()


    def get_key(self) -> Optional[Tuple[int, int]]:
        """
        Get currently pressed key (and log)
        """
        
        if len(self.keyevent_queue) == 0:
            self._is_executing = False
            return None
        (key, pressed) = self.keyevent_queue.pop(0)
        if self._debug:
            with open(f"{self._log_root_path}/log.txt", "a", encoding="utf-8") as f:
                event_name = str(self._frame) + "_" + str(datetime.datetime.now()).replace(":", ".")
                f.write("\t".join([str(self._frame), key, str(pressed), event_name, str(self._is_in_end_state)]) + "\n")
            print(self._frame, key, pressed, self.keyevent_queue)
        if key in keymap:
            return (pressed, keymap[key])
        elif len(key) == 1:
            return (pressed, ord(key.lower()))
        return self.get_key()

    def on_press(self, event) -> None:
        self.keyevent_queue.append((event.key, 1))

    def on_release(self, event) -> None:
        self.keyevent_queue.append((event.key, 0))

    def set_window_title(self, t: str) -> None:
        self.fig.suptitle(t)

# Play Doom!

- Instantiate the class
- Init the engine
- Call the `main()` method. This needs to be done fast so that `SKIP_CODES` kicks in. Otherwise, restart and tune `SKIP_FRAME_COUNT`
- *Note*: this class doesn't exit well (because it is meant to be a UI). So the only way to exit is to restart the kernel or wait for it to crash (via `END_SEQUENCE`).

In [None]:
g = PyPlotDoomGPT4()
cdg.init(640,
    400,
    g.draw_frame,
    g.get_key,
    init=g.init,
    set_window_title=g.set_window_title)
# This needs to be called fast. If you see something like "4 1 [(' ', 0)]" below (or any frame below 20), 
# you should be good. Otherwise restart and tune SKIP_FRAME_COUNT
cdg.main()

  self.fig.show()


4   1 [(' ', 0)]
4   0 []
18 enter 1 [('enter', 0)]
18 enter 0 []
20 enter 1 [('enter', 0)]
20 enter 0 []
38 enter 1 [('enter', 0)]
38 enter 0 []
52 enter 1 [('enter', 0), ('up', 1), ('up', 0), ('up', 1)]
52 enter 0 [('up', 1), ('up', 0), ('up', 1)]
52 up 1 [('up', 0), ('up', 1)]
52 up 0 [('up', 1)]
52 up 1 []
53 up 1 []
54 up 1 []
55 up 1 []
56 up 0 [('right', 1)]
56 right 1 []
57 right 1 []
58 right 0 [('up', 1)]
58 up 1 []
59 up 1 []
60 up 1 []
61 up 1 []
62 up 0 [('left', 1)]
62 left 1 []
63 left 1 []
64 left 0 [('right', 1)]
64 right 1 []
65 right 1 []
66 right 0 [('up', 1)]
66 up 1 []
67 up 1 []
68 up 1 []
69 up 1 []
70 up 0 [('right', 1)]
70 right 1 []
71 right 1 []
72 right 0 [('up', 1)]
72 up 1 []
73 up 1 []
74 up 1 []
75 up 1 []
76 up 0 [('right', 1)]
76 right 1 []
77 right 1 []
78 right 0 [('up', 1)]
78 up 1 []
79 up 1 []
80 up 1 []
81 up 1 []
82 up 1 []
83 up 1 []
84 up 0 [(' ', 1)]
84   1 []
85   1 []
86   0 [('up', 1)]
86 up 1 []
87 up 1 []
88 up 1 []
89 up 1 []
90 up 0 [