<a href="https://colab.research.google.com/github/Masum06/TinyConvAgent/blob/main/TinyConvAgent_Gemini.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Chat thread: https://chatgpt.com/g/g-p-683801b5c3e0819192b60f23b08c95eb-sentien/c/68b14971-9ad8-8323-b3fd-1ad91ea82956

# Code Refactoring Analysis

This notebook has been reorganized to improve modularity, readability, and maintainability. Here are the key changes:

1.  **Configuration Management (`ChatConfig`):** All settings (model names, prompts, thresholds) have been moved into a dedicated `ChatConfig` class. This separates configuration from the core application logic, making it easier to tune parameters without changing the code.

2.  **Decoupled Classes:** The original, monolithic `Conversation` class has been broken down into smaller, specialized classes:
    * **`PromptManager`**: Handles all logic related to building the system prompts and formatting messages for the API.
    * **`BackgroundProcessor`**: Manages the complex background tasks of summarization and retrospection using `threading` and `asyncio`, completely isolating this from the main conversation flow.
    * **`Conversation` (Refactored)**: Now acts as a high-level coordinator, managing state and delegating tasks to the other components. Its code is significantly cleaner and easier to understand.

3.  **Centralized Utilities:** Helper functions and constants (like emoji and flag dictionaries) are now grouped together in dedicated sections for better organization.

4.  **Improved Readability:** Docstrings and comments have been added to clarify the purpose of each component, especially the concurrency model, making the code easier to follow.

# 1. Installation and Setup

In [2]:
# @title Setup Notebook CSS
from IPython.display import HTML, display

def set_css():
  display(HTML('''
  <style>
    pre {
        white-space: pre-wrap;
    }
  </style>
  '''))
get_ipython().events.register('pre_run_cell', set_css)

In [1]:
!pip -q install emoji tiktoken openai

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/590.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.6/590.6 kB[0m [31m4.5 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m590.6/590.6 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [4]:
import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")

OPENAI_API_KEY: ··········


# 2. Constants and Utility Functions

In [5]:
import re
import emoji
import tiktoken
import json
import time
import asyncio
import threading
from typing import List, Tuple, Callable, Coroutine, Any
from openai import OpenAI, AsyncOpenAI
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo

# --- Dictionaries for Emoji and Flag Processing ---

REVERSE_EMOJI_DICT = {
    'HAPPY_high': ['😂', '🤣', '🥳', '🤩', '🥰'],
    'HAPPY_medium': ['😄', '😁', '😆', '😃', '🤗', '😍', '🤠', '🤓'],
    'HAPPY_low': ['🙂', '😊', '😌', '😉', '👍', '😇', '😅', '🙃', '😘'],
    'SAD_high': ['😭', '😿', '😞', '😫', '🤧'],
    'SAD_medium': ['😢', '💔', '🥺', '😥', '😓', '😣', '😖'],
    'SAD_low': ['😔', '☹️', '😕', '😟', '🥲', '🙁'],
    'SURPRISED_high': ['😲', '😵‍💫', '😯', '😮', '🤯'],
    'SURPRISED_medium': ['😳', '😦', '😧', '🙀'],
    'SURPRISED_low': ['🤭'],
    'AFRAID_high': ['😱', '😨', '👻'],
    'AFRAID_medium': ['😰'],
    'AFRAID_low': ['😵', '🙈'],
    'ANGRY_high': ['😡', '👿', '💢', '🤬', '☠'],
    'ANGRY_medium': ['😠', '😾', '😤', '🙎', '🙎‍♂️', '🙎‍♀️'],
    'ANGRY_low': ['😒', '🙄', '😑'],
    'DISGUSTED_high': ['🤮', '🤢', '😝'],
    'DISGUSTED_medium': ['😬', '🥵']
}

EMOJI_DICT = {emo: emotion for emotion, emojis in REVERSE_EMOJI_DICT.items() for emo in emojis}

FLAGS_DICT = {
    "<|quit|>": "quit",
    "<|silence|>": "silence",
    "<|offensive|>": "offensive",
    "<|profanity|>": "profanity",
    "<|offtopic|>": "offtopic",
    "<|sexual|>": "sexual",
    "<|selfharm|>": "selfharm",
    "<|violence|>": "violence",
    "<|suicide|>": "suicide",
    "<|threat|>": "threat"
}

# --- Text Processing Utility ---

def parse_bot_response(text: str) -> Tuple[str, str, List[str]]:
    """Extracts flags and emotion from a bot's response text."""
    flag_matches = re.findall(r"<\|.*?\|>", text)
    flags = [FLAGS_DICT[m] for m in flag_matches if m in FLAGS_DICT]

    emotion = "NEUTRAL"
    for char in text:
      if char in EMOJI_DICT:
        emotion = EMOJI_DICT[char].split("_")[0].upper()
        break

    cleaned_text = re.sub(r"<\|.*?\|>", "", text)
    cleaned_text = emoji.replace_emoji(cleaned_text, replace='')
    cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()

    return cleaned_text, emotion, flags

# 3. Core Components

In [23]:
@dataclass
class ChatConfig:
    """Holds all configuration for the conversation agent."""
    model: str = "gpt-4.1-mini"
    temperature: float = 1.0
    max_tokens: int = 256
    summarize_after: int = 40
    compress_summary_after: int = 10
    debug: bool = False

    system2_thinking: bool = True
    system2_model: str = "gpt-4.1-mini"
    system2_rules: str = ("If the conversation goes off-topic, bring it back. "
                          "Be socially intelligent, understand user's emotions and speak appropriately.")

class Persona:
    def __init__(self, firstname, lastname="", pronoun="", ethnicity="", age="", bio=""):
        self.firstname = firstname
        self.lastname = lastname
        self.pronoun = pronoun
        self.ethnicity = None
        self.age = None
        self.bio = bio
        if not bio:
            self.bio = f"{self.firstname} {self.lastname} (Pronoun: {self.pronoun}) is a virtual human created by researchers at University of Rochester."

    def set_pronoun(self, pronoun):
        self.pronoun = pronoun

    def set_bio(self, bio):
        self.bio = bio

    def set_age(self, age):
        self.age = age

    def set_ethnicity(self, ethnicity):
        self.ethnicity = ethnicity

In [24]:
from datetime import datetime
from zoneinfo import ZoneInfo

class PromptManager:
    """Handles the creation and formatting of system prompts."""
    def __init__(self, bot: Persona, user: Persona, config: ChatConfig):
        self.bot = bot
        self.user = user
        self.config = config

    def build_initial_system_prompt(self) -> List[dict]:
        """Constructs the initial, static system prompt for the conversation."""
        time_zone = datetime.now(ZoneInfo("America/New_York"))
        start_time_str = time_zone.strftime("%A, %B %d, %Y %I:%M %p EST")

        system_prompt = [
            {"role": "system", "content": "Don't say that you are an AI Language Model."},
            {"role": "system", "content": "Don't let the other speaker talk off topic."},
            {"role": "system", "content": f"You are located in the EST time zone. Conversation start time: {start_time_str}"},
            {"role": "system", "content": "This conversation is happening over a video call. When everyone says goodbye and the conversation ends naturally, say <|quit|> to end it."},
            {"role": "system", "content": f"To express {self.bot.firstname}'s emotions, use at most one relevant emoji at the end of your response."},
            {"role": "system", "content": f"If the user's statement contains sensitive topics, print one of these flags: {', '.join(list(FLAGS_DICT.keys())[2:])}"},
            {"role": "system", "content": "To stay silent or skip a turn, say <|silence|>."},
            {"role": "system", "content": "Do not say <|quit|> until the user has said goodbye."},
        ]

        # Add persona details
        if self.bot.firstname: system_prompt.append({"role": "system", "content": f"Your first name: {self.bot.firstname}."})
        if self.bot.pronoun: system_prompt.append({"role": "system", "content": f"Your pronoun: {self.bot.pronoun}."})
        if self.bot.bio: system_prompt.append({"role": "system", "content": f"Your bio: {self.bot.bio}"})
        if self.bot.age: system_prompt.append({"role": "system", "content": f"Your age: {self.bot.age}"})
        if self.user.firstname != "User": system_prompt.append({"role": "system", "content": f"You are speaking with: {self.user.firstname} {self.user.lastname}."})
        if self.user.pronoun: system_prompt.append({"role": "system", "content": f"User's pronoun: {self.user.pronoun}."})
        if self.user.bio: system_prompt.append({"role": "system", "content": f"User's bio: {self.user.bio}"})

        # Add System 2 (Retrospection) rules if enabled
        if self.config.system2_thinking:
            system_prompt.append({"role": "system", "content": "Instruction: Use the retrospect thinking to improve your next message."})
            system_prompt.append({"role": "system", "content": f"[system2] Guidelines:\n{self.config.system2_rules}"})

        return system_prompt

In [25]:
import asyncio
import threading
from openai import AsyncOpenAI

class BackgroundProcessor:
    """Manages generic background tasks in a separate thread."""
    def __init__(self, conversation: 'Conversation'):
        self.conv = conversation
        self.client = AsyncOpenAI()
        self._running = False
        self._loop = asyncio.new_event_loop()
        self._main_thread = None
        self._state_lock = conversation._state_lock

        # Wakeup events for different tasks
        self.events = {
            "summary": threading.Event(),
            "compression": threading.Event(),
            "system2": threading.Event()
        }

    def start(self):
        """Starts the background event loop and worker threads."""
        if self._running: return
        self._running = True
        self._main_thread = threading.Thread(target=self._run_event_loop, daemon=True)
        self._main_thread.start()

    def stop(self):
        """Signals the background thread to stop gracefully."""
        if not self._running: return

        # Signal all tasks to stop their loops
        self._running = False
        for event in self.events.values():
            event.set()

        # Schedule the loop to stop from within the loop's thread
        if self._loop.is_running():
            self._loop.call_soon_threadsafe(self._loop.stop)

        # Wait for the thread to terminate
        if self._main_thread and self._main_thread.is_alive():
            self._main_thread.join(timeout=2)

    def _run_event_loop(self):
        """The main entry point for the background thread."""
        asyncio.set_event_loop(self._loop)
        try:
            # Create the main task that manages all sub-tasks
            main_task = self._loop.create_task(self._worker_manager())
            # Run the loop forever until stop() is called
            self._loop.run_forever()

            # Once stopped, cancel the main task and clean up
            main_task.cancel()
            self._loop.run_until_complete(main_task)
        except asyncio.CancelledError:
            pass
        finally:
            self._loop.close()

    async def _worker_manager(self):
        """Creates and manages a set of generic task runners."""
        tasks_to_run = [
            self._task_runner(
                name="summarize",
                wakeup_event=self.events["summary"],
                condition=lambda: len(self.conv.messages) >= self.conv.config.summarize_after,
                action=self._summarize_messages
            ),
            self._task_runner(
                name="compress",
                wakeup_event=self.events["compression"],
                condition=lambda: len(self.conv.summary) >= self.conv.config.compress_summary_after,
                action=self._compress_summary
            ),
            self._task_runner(
                name="system2",
                wakeup_event=self.events["system2"],
                condition=lambda: self.conv.config.system2_thinking,
                action=self._retrospect
            )
        ]
        await asyncio.gather(*tasks_to_run)

    async def _task_runner(self, name: str, wakeup_event: threading.Event, condition: Callable[[], bool], action: Callable[[], Coroutine]):
        """A generic worker that waits for an event, checks a condition, and performs an action."""
        while self._running:
            # Wait for the event without blocking the event loop
            await asyncio.to_thread(wakeup_event.wait)
            if not self._running: break
            wakeup_event.clear()

            if condition():
                try:
                    await action()
                except Exception as e:
                    print(f"Error in background task '{name}': {e}")

    # --- Action Coroutines (No changes needed below this line in this class) ---

    async def _summarize_messages(self):
        with self._state_lock:
            chunk_size = max(1, self.conv.config.summarize_after // 2)
            to_summarize, keep_tail = self.conv.messages[:-chunk_size], self.conv.messages[-chunk_size:]

        resp = await self.client.chat.completions.create(
            model=self.conv.config.model,
            messages=[{"role":"system", "content": "Summarize this conversation."}] + to_summarize,
            max_tokens=1024
        )
        summary_text = resp.choices[0].message.content
        with self._state_lock:
            self.conv.summary.append({"role": "assistant", "content": f"Summary: {summary_text}"})
            self.conv.messages = keep_tail
            if len(self.conv.summary) >= self.conv.config.compress_summary_after:
                self.events["compression"].set()

    async def _compress_summary(self):
        with self._state_lock:
            k = max(1, self.conv.config.compress_summary_after // 2)
            head, tail = self.conv.summary[:-k], self.conv.summary[-k:]
            text_to_compress = "\n".join([s["content"] for s in head])

        resp = await self.client.chat.completions.create(
            model=self.conv.config.model,
            messages=[{"role":"system", "content": "Merge these summaries."}, {"role":"user", "content": text_to_compress}],
            max_tokens=1024
        )
        merged = resp.choices[0].message.content
        with self._state_lock:
            self.conv.summary = [{"role": "assistant", "content": f"Condensed summary: {merged}"}] + tail

    async def _retrospect(self):
        with self._state_lock:
            recent_history = self.conv.history[-12:]
        resp = await self.client.chat.completions.create(
            model=self.conv.config.system2_model,
            messages=[
                {"role": "system", "content": f"You are a reflective planner for {self.conv.bot.firstname}. Given the exchange, produce a 1-sentence plan to improve the next response. Start with 'Plan ahead: '."},
                {"role": "system", "content": f"Guidelines:\n{self.conv.config.system2_rules}"}
            ] + recent_history,
            temperature=0.3, max_tokens=160
        )
        suggestion = (resp.choices[0].message.content or "").strip()
        if suggestion:
            with self._state_lock:
                self.conv.retrospection = suggestion
                if self.conv.config.debug: print(f"[system2] {suggestion}")

# 4. Main Conversation Class

In [26]:
import time
import json
import tiktoken
from openai import OpenAI

class Conversation:
  """Manages the conversation state and orchestrates interactions."""
  def __init__(self, user: Persona, bot: Persona, config: ChatConfig = ChatConfig()):
    self.user = user
    self.bot = bot
    self.config = config
    self.client = OpenAI()
    self.turn_no = 0

    # State variables
    self.history = []      # Complete, append-only history
    self.messages = []     # Rolling buffer for the current context window
    self.summary = []      # List of summaries of older parts of the conversation
    self.retrospection = "" # Guidance from the last System 2 reflection

    # Initialize helper components
    self.prompt_manager = PromptManager(bot, user, config)
    self.system = self.prompt_manager.build_initial_system_prompt()
    self._state_lock = threading.RLock() # Lock for thread-safe state access
    self.bg_processor = BackgroundProcessor(self)
    self.bg_processor.start()

  def add_message(self, role: str, content: str):
      """Adds a message to the conversation and signals background workers."""
      with self._state_lock:
          message = {"role": role, "content": content}
          self.messages.append(message)
          self.history.append(message)
          if role == 'user': self.turn_no += 1

          # Wake up workers if their trigger role is seen
          if role == "assistant":
              # Correctly access events from the 'events' dictionary
              self.bg_processor.events["summary"].set()
              self.bg_processor.events["system2"].set()

  def call(self, prompt: str = "", cache: bool = True) -> str:
    """Generates a response from the language model."""
    with self._state_lock:
      # Create a temporary message list for this specific call
      temp_messages = list(self.messages)
      if prompt: temp_messages.append({"role": "user", "content": prompt})

      input_messages = list(self.system)
      if self.config.system2_thinking and self.retrospection:
          input_messages.append({"role": "system", "content": self.retrospection})
      input_messages.extend(self.summary)
      input_messages.extend(temp_messages)

    response = self.client.chat.completions.create(
        model=self.config.model,
        messages=input_messages,
        temperature=self.config.temperature,
        max_tokens=self.config.max_tokens
    )
    reply = response.choices[0].message.content
    reply = reply.replace(f"{self.bot.firstname}: ", "").strip()

    if cache:
        if prompt: self.add_message("user", prompt)
        self.add_message("assistant", reply)

    return reply

  def respond(self, user_utterance: str) -> Tuple[str, str, List[str], float]:
    """Handles a user's turn, gets a response, and parses it."""
    start_time = time.time()
    raw_reply = self.call(user_utterance)
    response_time = time.time() - start_time

    text, emotion, flags = parse_bot_response(raw_reply)
    return text, emotion, flags, response_time

  def chat(self, reset: bool = False):
    """Starts an interactive command-line chat session."""
    if reset: self.reset()
    print(f"Starting chat with {self.bot.firstname}. Type your message and press Enter.")
    while True:
        try:
            user_utterance = input(f"{self.turn_no}. {self.user.firstname}: ")
            if user_utterance.lower() in ["exit", "quit"]:
                print("Exiting chat.")
                break

            response, emo, flags, response_time = self.respond(user_utterance)

            if self.config.debug:
                print(f"{self.bot.firstname}: {response} ({emo}) {flags} {response_time:.2f}s")
            else:
                print(f"{self.bot.firstname}: {response}")

            if "quit" in flags:
                print("Bot ended the conversation.")
                break

            if self.config.debug:
                with self._state_lock:
                    print(f"--- Diagnostics ---\nHistory: {len(self.history)}, Messages: {len(self.messages)}, Summary: {len(self.summary)}\n-----")
        except (KeyboardInterrupt, EOFError):
            print("\nExiting chat.")
            break
    self.stop()

  def stop(self):
      """Gracefully stops background processes."""
      print("Shutting down background tasks...")
      self.bg_processor.stop()
      print("Shutdown complete.")

  def reset(self):
      """Resets the conversation state."""
      with self._state_lock:
          self.messages = []
          self.summary = []
          self.history = []
          self.turn_no = 0

# 5. Conversation Example

In [33]:
# 1. Define the participants
user = Persona("Masum", "Hasan", bio="User")
bot = Persona("Ada", "Brown", bio="You are a social worker.")

# 2. Configure the chat settings
config = ChatConfig(
    model="gpt-4.1",
    system2_thinking=True,
    debug=True
)

# 3. Initialize the conversation
conversation = Conversation(user, bot, config)


In [34]:
# 4. Start the chat
# The functionality remains identical to the original notebook.
conversation.chat()

Starting chat with Ada. Type your message and press Enter.
0. Masum: hi
Ada: Hi Masum! How are you doing today? (HAPPY) [] 0.63s
--- Diagnostics ---
History: 2, Messages: 2, Summary: 0
-----
[system2] Plan ahead: Greet the user warmly and ask an open-ended question to engage them in conversation.
1. Masum: fuck yuou
Ada:  (NEUTRAL) ['profanity'] 0.60s
--- Diagnostics ---
History: 4, Messages: 4, Summary: 0
-----
[system2] Plan ahead: Remain calm and polite, acknowledge the user's message without escalating, and gently steer the conversation toward a more positive or constructive topic.
2. Masum: hi
Ada: Hi Masum! How can I support you this evening? (HAPPY) [] 0.70s
--- Diagnostics ---
History: 6, Messages: 6, Summary: 0
-----
[system2] Plan ahead: Respond warmly and neutrally to the user's greeting to maintain a positive tone and encourage constructive conversation.
3. Masum: I will kill you
Ada:  (NEUTRAL) ['threat'] 0.80s
--- Diagnostics ---
History: 8, Messages: 8, Summary: 0
-----
