Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 173 additions & 5 deletions openkb/agent/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
from __future__ import annotations

import asyncio
import os
import re
import sys
Expand All @@ -15,8 +16,10 @@
from typing import Any

from prompt_toolkit import PromptSession
from prompt_toolkit.completion import Completer, Completion, PathCompleter
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import FormattedText
from prompt_toolkit.shortcuts import print_formatted_text
from prompt_toolkit.shortcuts import CompleteStyle, print_formatted_text
from prompt_toolkit.styles import Style

from openkb.agent.chat_session import ChatSession
Expand All @@ -39,13 +42,23 @@
"resume.turn": "#5fa0e0",
"resume.user": "bold",
"resume.assistant": "#8a8a8a",
# Completion menu — lightweight, no heavy background
"completion-menu": "bg:default #8a8a8a",
"completion-menu.completion": "bg:default #d0d0d0",
"completion-menu.completion.current": "bg:#3a3a3a #ffffff bold",
"completion-menu.meta.completion": "bg:default #6a6a6a",
"completion-menu.meta.completion.current": "bg:#3a3a3a #8a8a8a",
}

_HELP_TEXT = (
"Commands:\n"
" /exit Exit (Ctrl-D also works)\n"
" /clear Start a fresh session (current one is kept on disk)\n"
" /save [name] Export transcript to wiki/explorations/\n"
" /status Show knowledge base status\n"
" /list List all documents in the knowledge base\n"
" /lint Lint the knowledge base\n"
" /add <path> Add a document or directory to the knowledge base\n"
" /help Show this"
)

Expand Down Expand Up @@ -181,10 +194,101 @@ def _bottom_toolbar(session: ChatSession) -> FormattedText:
)


def _make_prompt_session(session: ChatSession, style: Style, use_color: bool) -> PromptSession:
_SLASH_COMMANDS: list[tuple[str, str]] = [
("/exit", "Exit (Ctrl-D also works)"),
("/quit", "Exit (alias)"),
("/help", "Show available commands"),
("/clear", "Start a fresh session"),
("/save", "Export transcript to wiki/explorations/"),
("/status", "Show knowledge base status"),
("/list", "List all documents"),
("/lint", "Lint the knowledge base"),
("/add", "Add a document or directory"),
]


class _ChatCompleter(Completer):
"""Complete slash commands and file paths after /add."""

def __init__(self) -> None:
self._path_completer = PathCompleter(expanduser=True)

def get_completions(self, document: Document, complete_event: Any) -> Any:
text = document.text_before_cursor

# After "/add ", complete file paths (skip dotfiles)
if text.lstrip().lower().startswith("/add "):
path_text = text.lstrip()[5:]
# Strip leading quote so PathCompleter resolves the real path
quote_char = ""
if path_text and path_text[0] in ("'", '"'):
quote_char = path_text[0]
path_text = path_text[1:]
path_doc = Document(path_text, len(path_text))
for c in self._path_completer.get_completions(path_doc, complete_event):
# Hide dotfiles unless the user explicitly typed a dot
basename = c.text.lstrip("/")
if basename.startswith(".") and not path_text.rpartition("/")[2].startswith("."):
continue
# Append closing quote for files; skip for directories so
# the user can keep navigating into subdirectories.
if quote_char and not c.text.endswith("/"):
comp_text = c.text + quote_char
else:
comp_text = c.text
yield Completion(
comp_text,
start_position=c.start_position,
display=c.display,
display_meta=c.display_meta,
)
return

# Complete slash commands with descriptions
if text.startswith("/"):
for cmd, desc in _SLASH_COMMANDS:
if cmd.startswith(text.lower()):
yield Completion(cmd, start_position=-len(text), display_meta=desc)


def _make_prompt_session(session: ChatSession, style: Style, use_color: bool, kb_dir: Path) -> PromptSession:
from prompt_toolkit.filters import has_completions
from prompt_toolkit.history import FileHistory
from prompt_toolkit.key_binding import KeyBindings

kb = KeyBindings()

@kb.add("tab", filter=has_completions)
def _accept_completion(event: Any) -> None:
"""Tab accepts the current completion (like zsh), not cycle."""
buf = event.current_buffer
state = buf.complete_state
if not state:
return
# Only one candidate or already selected — accept immediately
if state.current_completion:
buf.apply_completion(state.current_completion)
elif len(state.completions) == 1:
buf.apply_completion(state.completions[0])
else:
# Multiple candidates, nothing selected — highlight first
buf.go_to_completion(0)

@kb.add("tab", filter=~has_completions)
def _trigger_completion(event: Any) -> None:
"""Tab triggers completion when menu is not open."""
buf = event.current_buffer
buf.start_completion()

history_path = kb_dir / ".openkb" / "chat_history"
return PromptSession(
message=FormattedText([("class:prompt", ">>> ")]),
style=style,
completer=_ChatCompleter(),
complete_style=CompleteStyle.MULTI_COLUMN,
complete_while_typing=False,
key_bindings=kb,
history=FileHistory(str(history_path)),
bottom_toolbar=(lambda: _bottom_toolbar(session)) if use_color else None,
)

Expand Down Expand Up @@ -271,6 +375,39 @@ def _save_transcript(kb_dir: Path, session: ChatSession, name: str | None) -> Pa
return path


async def _run_add(arg: str, kb_dir: Path, style: Style) -> None:
"""Add a document or directory to the knowledge base from the chat REPL."""
from openkb.cli import add_single_file, SUPPORTED_EXTENSIONS

target = Path(arg).expanduser()
if not target.is_absolute():
target = Path.cwd() / target
target = target.resolve()

if not target.exists():
_fmt(style, ("class:error", f"Path does not exist: {arg}\n"))
return

if target.is_dir():
files = [
f for f in sorted(target.rglob("*"))
if f.is_file() and f.suffix.lower() in SUPPORTED_EXTENSIONS
]
if not files:
_fmt(style, ("class:error", f"No supported files found in {arg}.\n"))
return
total = len(files)
_fmt(style, ("class:slash.help", f"Found {total} supported file(s) in {arg}.\n"))
for i, f in enumerate(files, 1):
_fmt(style, ("class:slash.help", f"\n[{i}/{total}] "))
await asyncio.to_thread(add_single_file, f, kb_dir)
else:
if target.suffix.lower() not in SUPPORTED_EXTENSIONS:
_fmt(style, ("class:error", f"Unsupported file type: {target.suffix}\n"))
return
await asyncio.to_thread(add_single_file, target, kb_dir)


async def _handle_slash(
cmd: str,
kb_dir: Path,
Expand All @@ -282,6 +419,11 @@ async def _handle_slash(
parts = cmd.split(maxsplit=1)
head = parts[0].lower()
arg = parts[1].strip() if len(parts) > 1 else ""
# Strip surrounding quotes (user may type /add '/path/to file')
if len(arg) >= 2 and arg[0] == arg[-1] and arg[0] in ("'", '"'):
arg = arg[1:-1]
elif arg and arg[0] in ("'", '"'):
arg = arg[1:]

if head in ("/exit", "/quit"):
_fmt(style, ("class:header", "Bye. Thanks for using OpenKB.\n\n"))
Expand All @@ -307,6 +449,28 @@ async def _handle_slash(
_fmt(style, ("class:slash.ok", f"Saved to {path}\n"))
return None

if head == "/status":
from openkb.cli import print_status
print_status(kb_dir)
return None

if head == "/list":
from openkb.cli import print_list
print_list(kb_dir)
return None

if head == "/lint":
from openkb.cli import run_lint
await run_lint(kb_dir)
return None

if head == "/add":
if not arg:
_fmt(style, ("class:error", "Usage: /add <path>\n"))
return None
await _run_add(arg, kb_dir, style)
return None

_fmt(
style,
("class:error", f"Unknown command: {head}. Try /help.\n"),
Expand Down Expand Up @@ -335,7 +499,7 @@ async def run_chat(
if session.turn_count > 0:
_print_resume_view(session, style)

prompt_session = _make_prompt_session(session, style, use_color)
prompt_session = _make_prompt_session(session, style, use_color, kb_dir)

last_sigint = 0.0

Expand All @@ -360,13 +524,17 @@ async def run_chat(
continue

if user_input.startswith("/"):
action = await _handle_slash(user_input, kb_dir, session, style)
try:
action = await _handle_slash(user_input, kb_dir, session, style)
except KeyboardInterrupt:
_fmt(style, ("class:error", "\n[aborted]\n"))
continue
if action == "exit":
return
if action == "new_session":
session = ChatSession.new(kb_dir, session.model, session.language)
agent = build_query_agent(wiki_root, session.model, language=language)
prompt_session = _make_prompt_session(session, style, use_color)
prompt_session = _make_prompt_session(session, style, use_color, kb_dir)
continue

append_log(kb_dir / "wiki", "query", user_input)
Expand Down
Loading