In [10]:
#Imports
import ipywidgets as widgets
from IPython.display import display
import numpy as np
import wave
import speech_recognition as sr
from io import BytesIO

In [16]:
MEMORY = ""

In [None]:
from pathlib import Path
from google import genai
from google.genai import types
import os
import numpy as np
from typing import Literal
from abc import ABC, abstractmethod
import subprocess
import xml.etree.ElementTree as ET

class LLMJokerAgent(ABC):
    """Base class for LLM-powered joke generation agents."""

    MEMORY_SYSTEM_INSTRUCTION = "You are an advanced agent memory manager that keeps track of conversation content by maintaining a SHORT summary. You transform the old summary, with the new user message and agent response to the new summary."

    def __init__(self, prompts_dir: Path):
        self.prompts_dir = Path(prompts_dir)
        self.memory = ""

        # Load output format
        output_format_path = self.prompts_dir / "output-format.md"
        with open(output_format_path, 'r') as f:
            self.output_format = f.read()

        # Load profile prompts into a dictionary
        self.profiles = {}
        for file in self.prompts_dir.iterdir():
            if file.suffix == '.md' and 'output' not in file.name:
                with open(file, 'r') as f:
                    self.profiles[file.stem] = f.read()

        self.profile_names = list(self.profiles.keys())

    @abstractmethod
    def _call_llm(self, system_instruction: str, user_content: str) -> str | None:
        """Sub-classes implement this to either prompt with the CLI or the API"""
        pass

    def get_random_profile(self):
        name = np.random.choice(self.profile_names)
        return name, self.profiles[name]

    def generate_response(self, user_response: str = "I don't have much to say.", personality=None, update_memory=True, N=3):
        """Generate a joke with a random personality if personaltiy is not set."""
        _, profile = self.get_random_profile()
        
        if personality is not None:
            # Soft fail if personality does not exist
            profile = self.profiles.get(personality, profile)
        
        system_instruction = f"{profile}\n{self.output_format.replace('[N]', str(N))}"

        model_response = self._call_llm(system_instruction, user_response)
        
        if not model_response:
            return ["I'm done for the day"]
        
        try:
            parsed_response = ET.fromstring(model_response)
            response_jokes = list(map(lambda x: x.text, parsed_response.findall("joke")))
            if response_jokes is None or response_jokes[0] is None:
                return ["I'm not feeling so funny today."]
            if update_memory:
                self.update_memory(user_response, response_jokes[0])
        except:
            return ["XML in this case stands for X Major Loser-markup, because I cannot parse it."]

        return response_jokes

    # TODO: we might want to handle this completely differently with another agent without an LLM
    def update_memory(self, user_response: str, model_response: str) -> str:
        """Update conversation memory with a summary of the exchange."""
        prompt = f"previous summary: {self.memory}, new user msg: {user_response}, new system msg: {model_response}"

        summary = self._call_llm(self.MEMORY_SYSTEM_INSTRUCTION, prompt)

        if summary:
            self.memory = summary
            print("New memory:\n", self.memory)

        return self.memory

class LLMAPIJokerAgent(LLMJokerAgent):
    """Implementation with the gemini API"""
    # Default to Jelle's API key
    def __init__(self, prompts_dir: Path, api_key: str="AIzaSyC2B_9Koiklo6Dh5WsxtZe7J7iU2ZFp01Q", model="gemini-2.5-flash"):
        super().__init__(prompts_dir)
        self.client = genai.Client(api_key=api_key)
        self.model = model

    def _call_llm(self, system_instruction: str, user_content: str) -> str | None:
        response = self.client.models.generate_content(
            model=self.model,
            config=types.GenerateContentConfig(
                system_instruction=system_instruction
            ),
            contents=user_content,
        )
        return response.text

# To use this, you have to get your free student gemini pro, and install the CLI.
class LLMCLIJokerAgent(LLMJokerAgent):
    """Implementation with the gemini CLI"""

    def __init__(self, prompts_dir: Path):
        super().__init__(prompts_dir)

    def _call_llm(self, system_instruction: str, user_content: str) -> str | None:
        try:
            result = subprocess.run(
                ["gemini", "-p", f"{system_instruction}\n\nUser: {user_content}"],
                capture_output=True,
                text=True,
                timeout=60
            )

            if result.returncode != 0:
                print(f"CLI Error: {result.stderr}")
                return None

            return result.stdout.strip() or None

        except subprocess.TimeoutExpired:
            print("CLI timeout")
            return None
        except FileNotFoundError:
            print("Gemini CLI not found.")
            return None

In [16]:
joker_agent = LLMCLIJokerAgent(os.getcwd()/Path("prompts"))
joker_agent.generate_response("I'm just testing you out if you don't mind.", update_memory=False, N=1)

['I had to take a test once. The only question was "What is the capital of pancakes?" I answered "The sky is made of retired librarians." The teacher marked it correct and then I woke up inside a filing cabinet.']

In [None]:
import random
import spacy

nlp = spacy.load("en_core_web_sm")

class MemoryAgent:
    # We use these to track when the user refers back to nouns that were said before.
    PRONOUNS = {"it", "they", "he", "she", "this", "that", "these", "those", "i", "me", "my", "we", "us", "our", "you", "your"}

    def __init__(self, max_memory_length):
        self.memory = []
        self.max_memory_length = max_memory_length

    def user_update(self, content: str):
        features = self._extract_features(content)
        if self.memory and any(s.lower() in self.PRONOUNS for s in features["subj"]):
            prev = self.memory[-1]
            features["resolved_refs"] = prev.get("subj", []) + prev.get("obj", []) + prev.get("entities", [])
        self.memory.append(features)
        if len(self.memory) > self.max_memory_length:
            self.memory = self.memory[-self.max_memory_length:]

    def get_full_memory_summary(self, n_contrast: int = 1) -> str:
        if not self.memory:
            return ""

        current_idx = len(self.memory) - 1
        relevant_idx = self._get_relevant_indices(current_idx)
        non_relevant_idx = [i for i in range(current_idx + 1) if i not in relevant_idx]

        # Group non-relevant by topic and randomly select n
        contrast_groups = []
        used = set()
        for idx in sorted(non_relevant_idx, reverse=True):
            if idx in used:
                continue
            group = [i for i in self._get_relevant_indices(idx) if i in non_relevant_idx]
            if group:
                contrast_groups.append(group)
                used.update(group)

        selected_contrasts = random.sample(contrast_groups, min(n_contrast, len(contrast_groups)))

        parts = [self._summarize([self.memory[i] for i in relevant_idx])]
        for group in selected_contrasts:
            parts.append(self._summarize([self.memory[i] for i in group]))

        return " | ".join(filter(None, parts))

    def _get_relevant_indices(self, target_idx: int) -> set[int]:
        if target_idx < 0 or target_idx >= len(self.memory):
            return set()
        target_refs = self._get_refs(self.memory[target_idx])
        return {i for i in range(target_idx + 1) if not target_refs.isdisjoint(self._get_refs(self.memory[i]))}

    def _get_refs(self, f: dict) -> set:
        all_refs = f.get("subj", []) + f.get("obj", []) + f.get("entities", []) + f.get("resolved_refs", [])
        return {r for r in all_refs if r.lower() not in self.PRONOUNS}

    def _summarize(self, memory_list: list[dict]) -> str:
        if not memory_list:
            return ""
        tuples = []
        for f in memory_list:
            tuples.append((
                " ".join(w for w in f.get("subj", []) if w.lower() not in self.PRONOUNS),
                " ".join(f.get("verbs", [])),
                " ".join(f.get("adjectives", [])),
                " ".join(w for w in f.get("obj", []) if w.lower() not in self.PRONOUNS),
                " ".join(f.get("entities", [])),
            ))
        # We basically create a string with subjects, verbs, adjectives, ... with this zip.
        return " ".join(filter(None, (" ".join(filter(None, t)) for t in zip(*tuples))))

    def _extract_features(self, sentence: str) -> dict:
        doc = nlp(sentence)
        return {
            "entities": [e.text for e in doc.ents],
            "subj": [t.text for t in doc if "subj" in t.dep_],
            "obj": [t.text for t in doc if "obj" in t.dep_],
            "adjectives": [t.text for t in doc if t.pos_ == "ADJ"],
            "verbs": [t.lemma_ for t in doc if t.pos_ == "VERB"],
            "past_tense": any(t.tag_ == "VBD" for t in doc),
            "negated": any(t.dep_ == "neg" for t in doc),
            "numbers": [t.text for t in doc if t.like_num],
        }

memory = MemoryAgent(10)

In [None]:
import numpy as np
import wave
import speech_recognition as sr
from io import BytesIO
import pyttsx3

# ----------------------------
# Convert recorder audio to WAV in-memory
# ----------------------------
def get_audio_as_wav_bytes(recorder):
    """Return recorder audio as a WAV byte stream."""
    data = recorder.audiodata
    srate = recorder.sampleRate
    
    if data is None or len(data) == 0:
        raise ValueError("No audio recorded!")

    # Convert float32 [-1,1] → int16
    data_int16 = (data * 32767).astype(np.int16)

    # Write WAV to BytesIO
    wav_bytes = BytesIO()
    with wave.open(wav_bytes, 'wb') as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(srate)
        wf.writeframes(data_int16.tobytes())
    wav_bytes.seek(0)
    return wav_bytes

# ----------------------------
# Transcribe using speech_recognition
# ----------------------------
recognizer = sr.Recognizer()
def transcribe(recorder):
    global recognizer
    wav_bytes = get_audio_as_wav_bytes(recorder)

    with sr.AudioFile(wav_bytes) as source:
        audio = recognizer.record(source)
    
    # You can use different recognizers, here we use Google free API
    try:
        text = recognizer.recognize_google(audio)
        print("Transcription:", text)
        return text
    except sr.UnknownValueError:
        print("Could not understand audio")
    except sr.RequestError as e:
        print(f"API error: {e}")
    return None

#Text to speech
engine = None
def SpeakText(command):
    # Initialize the engine
    global engine
    if engine is None:
        engine = pyttsx3.init()
    
    engine.say(command) 
    engine.runAndWait()

In [19]:
import matplotlib.pyplot as plt

#Plot and Save audio (optional)
def save_wav_from_recorder(recorder, filename="recording.wav"):
    """Save the recorded audio from recorder into a WAV file."""
    data = recorder.audiodata
    sr = recorder.sampleRate
    
    if data is None or len(data) == 0:
        print("No audio data to save!")
        return None

    # Convert float32 [-1,1] → int16
    data_int16 = (data * 32767).astype(np.int16)

    with wave.open(filename, "wb") as f:
        f.setnchannels(1)
        f.setsampwidth(2)  # int16
        f.setframerate(sr)
        f.writeframes(data_int16.tobytes())

    print(f"Saved: {filename}")
    return filename

def plot_audio(rec):        
    # Plot waveform
    plt.figure(figsize=(10,3))
    plt.plot(rec.audiodata)
    plt.title("Recorded Audio Waveform")
    plt.xlabel("Samples")
    plt.ylabel("Amplitude")
    plt.show()

In [None]:
# ----------------------------
# Audio Recorder Notebook
# ----------------------------

import numpy as np
import wave
import ipyaudioworklet as ipyaudio
from ipywidgets import Output, VBox
from IPython.display import display

# ----------------------------
# Create Recorder
# ----------------------------
rec = ipyaudio.AudioRecorder()
display(rec)

# ----------------------------
# Status Output
# ----------------------------
status_out = Output(layout={'border': '1px solid black', 'padding': '5px'})
status_out.append_stdout("Recorder ready.\n")
display(status_out)

@status_out.capture(clear_output=True)
def status_changed(change):
    print("Status:", change.new)

rec.observe(status_changed, "status")

# ----------------------------
# Automatic Save and Plot
# ----------------------------
@status_out.capture()
def on_status_change(change):
    if change.new in ("STOPPED", "RECORDED"):
        
        user_input = transcribe(rec)
        
        if user_input is None:
            return
        
        memory.user_update(user_input)
        
        # TODO: block all user input during this step
        # TODO: say something random and funny every couple of seconds to indicate that you're thinking about a response
        # One jokes already takes approx. 15 seconds.
        jokes = joker_agent.generate_response(f"<history>{memory.get_full_memory_summary(2)}</history>\n{user_input}", update_memory=False, N=1)
        SpeakText(jokes[0])

rec.observe(on_status_change, "status")

AudioRecorder()

Output(layout=Layout(border_bottom='1px solid black', border_left='1px solid black', border_right='1px solid b…

In [21]:
from IPython.display import display, clear_output
import ipywidgets as widgets

text_to_display = "Do you like this Answer?"
text_label = widgets.Label(value=text_to_display)

like_button = widgets.Button(description=": )", button_style='success')
dislike_button = widgets.Button(description=": (", button_style='danger')
output = widgets.Output()

def on_like_clicked(b):
    with output:
        clear_output()
        print("You liked this!")

def on_dislike_clicked(b):
    with output:
        clear_output()
        SpeakText("Wow, tough crowd!")
        print("You disliked this!")

like_button.on_click(on_like_clicked)
dislike_button.on_click(on_dislike_clicked)

display(text_label, like_button, dislike_button, output)

Label(value='Do you like this Answer?')

Button(button_style='success', description=': )', style=ButtonStyle())

Button(button_style='danger', description=': (', style=ButtonStyle())

Output()