In [4]:
%pip install openai pydantic jinja2 python_dotenv ipywidgets

Note: you may need to restart the kernel to use updated packages.


In [5]:
import dis
import os
import re
import tomllib
import functools

from jinja2 import Template
from pydantic import BaseModel
from openai import OpenAI, AzureOpenAI
from dotenv import load_dotenv

import ipywidgets as widgets
from IPython.display import display, clear_output

from typing import Literal, Union

load_dotenv()

with open("data/scary_ship_with_items.toml", "rb") as f:
    config = tomllib.load(f)

message_pattern = re.compile(r"^([\w_ *]+)\:[ \s]*(.+)*$")
class Message(BaseModel):
    # Message from chatbot
    role: Literal["system", "assistant", "user"]
    content: str   

    # parse the content into dictionary
    def dict(self) -> dict:
        current_key = None
        parsed = {}
        lines = self.content.splitlines()
        # parse the content line by line
        for line in lines:
            res = message_pattern.search(line)
            if res is not None:
                current_key = res.group(1)
                if res.group(2) is not None:
                    parsed[current_key] = res.group(2)
                else:
                    parsed[current_key] = ""
            elif current_key is not None:
                parsed[current_key] += "\n" + line
        return parsed

class Chapter(BaseModel):
    id: str
    next: str|None = None
    max_turns: int
    backgrounds: list[str]
    rules: list[str]
    initial: str
    
    def __init__(self, **kargs):
        super().__init__(**kargs)
    
    def get_messages(self) -> list[Message]:
        rendered_rules = render_rules(self.rules)
        system_content = Template(system).render({"rules": rendered_rules, "backgrounds":self.backgrounds, "character": character})
        initial_content = Template(self.initial).render({"rules": self.rules, "character": character})
        return [
            Message(role="system", content=system_content),
            Message(role="assistant", content=initial_content)
        ]

version = config["version"]
system = config["system"]
character = config["character"]
chapters = config["chapters"]
inventory = character["inventory"]
messages:list[Message] =[]

tags:list[str] = ['r1']

print(f"Version: {version}, Chapters: {len(chapters)}")

def getChapterById(id:str) -> Chapter:
    for r in chapters:
        if r["id"] == id:
            return Chapter(**r) 
    return None

def choose_next_chapter():
    global current_chapter
    if current_chapter.next is not None:
        next = Template(current_chapter.next).render({"inventory": inventory, "tags": tags})
        print("Next round: ", next)
        next_chapter = getChapterById(next)
        if next_chapter is not None:
            current_chapter = next_chapter
            messages.clear()
            do_chat(None, lambda x: print(x, end=""))
        else:
            print(f"Chapter '{next}' not found.")
    else:
        print("No next chapter found.")


def count_messages(messages:list[Message], role:str) -> int:
    return len([msg for msg in messages if msg.role == role])

def chat_with_azure(messages:list[Message], on_message: Union[callable, None] = None) -> Message:
    client = AzureOpenAI(api_key=os.environ.get("AZURE_OPENAI_API_KEY"), 
                         azure_endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT"),
                         api_version=os.environ.get("AZURE_OPENAI_API_VERSION")
                         )
    stream = client.chat.completions.create(
        model=os.environ.get("AZURE_OPENAI_MODEL_NAME"),
        messages=messages,
        stream=True,
        max_tokens=4000,
        temperature=float(os.environ.get("OPENAI_TEMPERATURE"))
    )

    generated = ""
    for chunk in stream:
        if len(chunk.choices) > 0 and chunk.choices[0].delta.content is not None:
            delta = chunk.choices[0].delta.content
            if (on_message is not None):
                on_message(chunk.choices[0].delta.content)
            generated += delta
    return Message(role="assistant", content=generated)
    
def chat(messages:list[Message], on_message: Union[callable, None] = None) -> Message:
    client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
    stream = client.chat.completions.create(
        model=os.environ.get("OPENAI_MODEL_NAME"),
        messages=messages,
        stream=True,
        max_tokens=4000,
        temperature=float(os.environ.get("OPENAI_TEMPERATURE"))
    )

    generated = ""
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            delta = chunk.choices[0].delta.content
            if (on_message is not None):
                on_message(chunk.choices[0].delta.content)
            generated += delta
    return Message(role="assistant", content=generated)

def render_rules(rules:list[str]) -> str:
    lists = []
    for rule in rules:
        res = Template(rule).render({"character": character, "tags":tags, "inventory": inventory})
        if res is not None and res != "":
            lists.append(res)
    return lists

def do_chat(content:str|None, on_message: Union[callable, None] = None):
    if content is not None:
        turn = count_messages(messages, 'user') + 1
        if turn >= current_chapter.max_turns:
            content += f"\nwhisper: MAX_TURNS is reached."
        messages.append(Message(role="user", content=content))
        print(content)

    # change it to chat_with_azure to use Azure OpenAI
    concat = current_chapter.get_messages() + messages
    # for msg in concat:
    #     print(f"{msg.role}: {msg.content}")

    msg = chat_with_azure(concat, on_message)

    # print all the messages:
    show_controls(msg)
    messages.append(msg)
        
# handle player's action
def on_action(b, index:int):
    user_msg = f"I select '{index}'."
    do_chat(user_msg, lambda x: print(x, end=""))

# handle player's skill check
def on_skill(b, skill:str, difficulty:str):
    user_msg = f"I roll the dice for '{skill.upper()}' and SUCCESS."
    do_chat(user_msg, lambda x: print(x, end=""))

def on_continue(b):
    user_msg = f"Continue..."
    do_chat(user_msg, lambda x: print(x, end=""))

def on_next(b):
    choose_next_chapter()
        
def on_retry(b):
    # delete last assistant message
    messages.pop()
    do_chat(None, lambda x: print(x, end=""))

action_pattern = re.compile(r"(\d+). *(.+)")
def show_controls(msg:Message):
    dict = msg.dict()
    buttons = []

    if "choices" in dict.keys():
        matches = action_pattern.finditer(dict["choices"])
        for match in matches:
            btn = widgets.Button(description=match.group(0), layout=widgets.Layout(width="fit-content"))
            btn.on_click(functools.partial(on_action, index=int(match.group(1))))
            buttons.append(btn)
    elif "skill" in dict.keys() and "difficulty" in dict.keys():
        btn = widgets.Button(description=f"{dict['skill']} - {dict['difficulty']}", layout=widgets.Layout(width="fit-content"))
        btn.on_click(functools.partial(on_skill, skill=dict['skill'], difficulty=dict['difficulty']))
        buttons.append(btn)
    elif "ending" in dict.keys():
        btn = widgets.Button(description=f"NEXT", layout=widgets.Layout(width="fit-content"))
        btn.on_click(on_next)
        buttons.append(btn)
    elif "game_over" in dict.keys():
        print("\nGame Over - Hit 'Run All' button to restart again.")
    else:
        btn = widgets.Button(description="Continue", layout=widgets.Layout(width="fit-content"))
        btn.on_click(on_continue)
        buttons.append(btn)
    
    if "add_tag" in dict.keys() and dict["add_tag"] not in tags:
        tags.append(dict["add_tag"])

    if "remove_tag" in dict.keys() and dict["remove_tag"] in tags:
        tags.remove(dict["remove_tag"])
    
    if "add_item" in dict.keys() and dict["add_item"] not in inventory:
        inventory.append(dict["add_item"])
    
    if "remove_item" in dict.keys() and dict["remove_item"] in inventory:
        inventory.remove(dict["remove_item"])
    
    controls = []
    if len(messages) > 0:
        retry_btn = widgets.Button(description="Retry", layout=widgets.Layout(width="fit-content"))
        retry_btn.on_click(on_retry)
        controls.append(retry_btn)

    rules_btn = widgets.Button(description="Rules", layout=widgets.Layout(width="fit-content"))
    rules_btn.on_click(lambda x: print(render_rules(current_chapter.rules)))
    controls.append(rules_btn)

    if len(tags) > 0:
        tags_lbl = widgets.Label(value=f"Tags: {', '.join(tags)}")
        controls.append(tags_lbl)

    if len(inventory) > 0:
        inventory_lbl = widgets.Label(value=f"inventory: {', '.join(inventory)}")
        controls.append(inventory_lbl)
    
    display(widgets.VBox(buttons))

    if (len(controls) > 0):
        display(widgets.HBox(controls))

current_chapter = getChapterById("r1")

# locals_dict = {}
# s = "'r1' in tags"
# source = f"def f(tags): return {s}"
# exec(source, {}, locals_dict)
# print(locals_dict['f'](['r2']))

Version: 0.0.3, Chapters: 3


In [6]:
initials = current_chapter.get_messages()
print(initials[1].content)
show_controls(initials[1])

background: ship_own_room
narration: 
You are Jack, a journalist investigating a mysterious luxury cruise ship that is related to multiple disappearances.

You have made your way to the designated port and successfully boarded the luxury ocean liner along with the passengers from various cultures. As you are relaxing in the suite in which you entered not so long ago, the captain invites everyone on the ship to attend the voyage welcome party at the grand ballroom. It seems your investigation for the strange disappearances has just begun.
choices:
1. Attend the party at the grand ballroom
2. throw a knife out into the sea
3. Explore other parts of the ship while everyone is in the ballroom


VBox(children=(Button(description='1. Attend the party at the grand ballroom', layout=Layout(width='fit-conten…

HBox(children=(Button(description='Rules', layout=Layout(width='fit-content'), style=ButtonStyle()), Label(val…