In [None]:
#| notest
#| eval: false
#| hide

import sys

sys.path.insert(0, '../unreal_llm_sandbox/')
sys.path.insert(0, '../../../')

import big_project_helper as bph

bph.display_project_contents('unreal_llm_sandbox')

Dialog Name: unreal/unreal-llm-sandbox/nbs/main

../unreal_llm_sandbox/cells.py
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/cells.ipynb.

# %% auto 0
__all__ = ['up_arrow_ic', 'down_arrow_ic', 'close_ic', 'swap_ic', 'view_ic', 'clean_ic', 'minimize_ic', 'play_ic', 'stop_ic',
           'edit_ic', 'tools_ic', 'label_css', 'cell_button_format', 'interrupt_button', 'BaseCell', 'PromptCell',
           'MarkdownCell', 'CodeCell']

# %% ../nbs/cells.ipynb 3
import json
import uuid
import re
import mistune
from fasthtml.common import * 
from .app_config import PROMPT_SPLIT, AGENT_CODE_SPLIT


# %% ../nbs/cells.ipynb 4
up_arrow_ic = NotStr("&#11014")
down_arrow_ic = NotStr("&#11015")
close_ic = NotStr("&#x274C")
swap_ic = NotStr("&#128257")
view_ic =NotStr("&#x1F50D")
clean_ic =NotStr("&#x1F9F9")
minimize_ic = NotStr("&#x25BC");
play_ic = NotStr("&#9654")
stop_ic = NotStr("&#9209")
edit_ic = NotStr("&#x1F4DD")
tools_ic = NotStr("&#x1F6E0")

label_css = "text-xs text-gray-400 px-2 py-1 

In [None]:
#| default_exp main


# Main

In [None]:
#| export
import json
import asyncio
import requests
import lisette
import time

from fasthtml.common import *
from fasthtml.jupyter import JupyUvi
from starlette.staticfiles import StaticFiles

from unreal_llm_sandbox.app_config import MODEL, KERNEL_URL, NOTEBOOK_SYS_PROMPT
from unreal_llm_sandbox.cells import MarkdownCell, CodeCell, PromptCell #, AgentCell
from unreal_llm_sandbox.streaming import SSEStream, active_streams
#from unreal_llm_sandbox.kernel import execute_unreal_code, convert_to_accumulated
from unreal_llm_sandbox.notebook_io import reconstruct_cells_from_history, prepare_chat_history, reconstruct_ipynb_cell
from unreal_llm_sandbox.llm import RemoteToolLLM, send_llm_request
#from unreal_llm_sandbox.agent import AgentTools, SYS_PROMPT

from fasthtml.common import *
import importlib.resources
import unreal_llm_sandbox

# 1. Helper function to read the text content of your files
def get_static(fname, icon=False):
    """Read static file content from package resources.
    
    Args:
        fname: Filename to read from unreal_llm_sandbox/static/.
        
    Returns:
        String content of the file.
    """
    ref = importlib.resources.files(unreal_llm_sandbox) / 'static' / fname
    if icon:
        icon_bytes = (importlib.resources.files(unreal_llm_sandbox) / 'static' / 'Icon128.png').read_bytes()
        return base64.b64encode(icon_bytes).decode()
    else:
        return ref.read_text(encoding='utf-8')


# Then in headers:
daisy_hdrs =[
Link(rel="icon", href=f"data:image/png;base64,{get_static('Icon32.png',icon=True)}"),
Link(href='https://cdn.jsdelivr.net/npm/daisyui@5', rel='stylesheet', type='text/css'),
Script(src='https://cdn.jsdelivr.net/npm/@tailwindcss/browser@4'),
Link(rel="stylesheet", href="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/themes/prism-okaidia.min.css"),
Link(rel="stylesheet", href="https://cdnjs.cloudflare.com/ajax/libs/github-markdown-css/5.5.1/github-markdown.min.css"),
Script (src ='https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js'),
Script (src ='https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-python.min.js'),
Script(src='https://cdn.jsdelivr.net/npm/marked/marked.min.js'),
Script(get_static('cells.js')),
Script(src="https://cdn.jsdelivr.net/npm/ansi_up@5/ansi_up.min.js"),
Script(src="https://cdn.jsdelivr.net/npm/monaco-editor@latest/min/vs/loader.js"),
Script("""
require.config({ paths: { 'vs': 'https://cdn.jsdelivr.net/npm/monaco-editor@latest/min/vs' }});
"""),
Script(src="https://unpkg.com/htmx.org/dist/ext/sse.js")]


# FastHTML app setup + daisy_hdrs (the big Style/Script list)
app = FastHTML(hdrs=daisy_hdrs)


rt = app.route


In [None]:
#| export

@rt('/interrupt/{cell_id}', methods=['POST'])
async def interrupt(cell_id: str, request):
    """Signal abort for an active stream.
    
    Args:
        cell_id: Unique cell identifier.
    
    Returns:
        "OK" acknowledgment string.
    """
    try:
        data = await request.json()
        notebook = data.get('notebook', 'untitled')
    except:
        notebook = 'untitled'
    stream_key = f"{notebook}:{cell_id}"
    if stream_key in active_streams:
        active_streams[stream_key]['abort'] = True
    return "OK"
    

In [None]:
#| export

@rt('/execute_prompt/{cell_id}')
async def exe_prompt(cell_id: str, request): 
    """Execute LLM prompt with notebook context via SSE.
    
    Args:
        cell_id: Unique cell identifier.
        request: FastHTML request with JSON body containing:
            - prompt: User's prompt text.
            - context: List of cell dicts for history.
    
    Returns:
        SSE StreamingResponse yielding LLM chunks.
    """

    data = await request.json() 

    notebook = data.get('notebook', 'untitled')
    
    stream_key = f"{notebook}:{cell_id}" 

    prompt = data['prompt']
    cell_dict_list = data.get('context', [])
    use_tools = data.get('use_tools', True) 

    cells = reconstruct_cells_from_history(cell_dict_list)
    ipynb_list = [cell.to_ipynb() for cell in cells]
    chat_history = prepare_chat_history(ipynb_list)
    
    stream = SSEStream(stream_key)
    
    def run():
        for msg in send_llm_request(prompt, history=chat_history, use_ue_tools=use_tools):
            if stream.aborted(): break
            stream.text(msg)
        stream.done()
    
    asyncio.create_task(asyncio.to_thread(run))
    return stream.response()


In [None]:
#| export

@rt('/execute_code/{cell_id}')
async def exe_code(cell_id: str, request):
    """Execute Python code in Unreal Engine via SSE.
    
    Args:
        cell_id: Unique cell identifier.
        request: FastHTML request with JSON body containing:
            - code: Python code string to execute.
    
    Returns:
        SSE StreamingResponse yielding kernel output messages.
    """

    data = await request.json()
    notebook = data.get('notebook', 'untitled')
    stream_key = f"{notebook}:{cell_id}" 

    stream = SSEStream(stream_key)
    
    def run():
        response = requests.post(f'{KERNEL_URL}/execute', json={'code': data['code']}, stream=True, timeout=(5, 60))
        for line in response.iter_lines():
            if stream.aborted(): break
            if line.startswith(b'data: '):
                stream.output(json.loads(line[6:]))  # ‚Üê output() not raw()
        stream.done()
    
    asyncio.create_task(asyncio.to_thread(run))
    return stream.response()


In [None]:
"""
@rt('/agent_tool_build/{cell_id}', methods=['POST'])
async def agent_stream(cell_id: str, request):
    ""Run agent code generation loop with tool calling via SSE.
    
    Args:
        cell_id: Unique cell identifier.
        request: FastHTML request with JSON body containing:
            - prompt: Code generation request.
            - existing_code: Optional code to modify.
            - context: List of cell dicts for history.
    
    Returns:
        SSE StreamingResponse yielding agent progress (tags, text, outputs).
    ""
    data = await request.json()
    notebook = data.get('notebook', 'untitled')
    stream_key = f"{notebook}:{cell_id}" 

    stream = SSEStream(stream_key)
    def run_chat():
        PROMPT = data['prompt']

        existing_code = data.get('existing_code')
        cell_dict_list = data.get('context', [])  # ‚Üê Add this

        # Convert to chat history like prompt cells do
        cells = reconstruct_cells_from_history(cell_dict_list)
        ipynb_list = [cell.to_ipynb() for cell in cells]
        chat_history = prepare_chat_history(ipynb_list)

        if existing_code:
            CODE = existing_code  # Initialize CODE with existing
            PROMPT = f"Modify this code: {existing_code}\n\nRequest: {data['prompt']}"
        else:
            CODE = ""
            PROMPT = data['prompt']
        

        CHAT = lisette.Chat(MODEL, SYS_PROMPT)
        CHAT.hist += chat_history
        CHAT.hist.append( {"role":"assistant", "content":PROMPT})

        a_tools = AgentTools(stream, CHAT, PROMPT,cell_id, code=CODE, print_updates=False)
        tools = a_tools.get_tools()
        
        chat = lisette.Chat(MODEL, SYS_PROMPT, tools=tools)
        chat.hist += chat_history


        gen = chat(PROMPT, max_steps=15)
        for _ in gen:
            if stream.aborted():
                break
        print("Chat loop finished!") 
        time.sleep(0.1) 
        stream.done()
    
    asyncio.create_task(asyncio.to_thread(run_chat))
    return stream.response()
"""

'\n@rt(\'/agent_tool_build/{cell_id}\', methods=[\'POST\'])\nasync def agent_stream(cell_id: str, request):\n    ""Run agent code generation loop with tool calling via SSE.\n\n    Args:\n        cell_id: Unique cell identifier.\n        request: FastHTML request with JSON body containing:\n            - prompt: Code generation request.\n            - existing_code: Optional code to modify.\n            - context: List of cell dicts for history.\n\n    Returns:\n        SSE StreamingResponse yielding agent progress (tags, text, outputs).\n    ""\n    data = await request.json()\n    notebook = data.get(\'notebook\', \'untitled\')\n    stream_key = f"{notebook}:{cell_id}" \n\n    stream = SSEStream(stream_key)\n    def run_chat():\n        PROMPT = data[\'prompt\']\n\n        existing_code = data.get(\'existing_code\')\n        cell_dict_list = data.get(\'context\', [])  # ‚Üê Add this\n\n        # Convert to chat history like prompt cells do\n        cells = reconstruct_cells_from_histo

In [None]:
#| export

def Toolbar(title):
    """Build the notebook toolbar with cell creation buttons.
    
    Args:
        title: Notebook name to display in the editable input.
        
    Returns:
        FastHTML Div containing toolbar elements.
    """
    return Div(

        Div(
            Input(value=title, cls="text-xl font-bold text-white notebook-name bg-transparent border-none outline-none focus:outline-none flex-1"),
            Script("""
                document.querySelector('.notebook-name').addEventListener('blur', (e) => {
                    const name = e.target.value || 'untitled';
                    history.replaceState(null, '', `/notebook/${name}.ipynb`);
                });
            """),
            cls="flex flex-1 items-center"
        ),
        Div(
            Button("‚ûï Markdown", 
                   hx_post="/add_cell/markdown",
                   hx_target="#notebook-container",
                   hx_swap="beforeend",
                   cls="btn btn-sm bg-gray-700 hover:bg-gray-600 text-white border-gray-600 shadow-none"), 
            Button("‚ûï Code", 
                   hx_post="/add_cell/code",
                   hx_target="#notebook-container",
                   hx_swap="beforeend",
                   cls="btn btn-sm bg-gray-700 hover:bg-gray-600 text-white border-gray-600 shadow-none"),  
            Button("‚ûï Prompt", 
                   hx_post="/add_cell/prompt",
                   hx_target="#notebook-container",
                   hx_swap="beforeend",
                   cls="btn btn-sm bg-gray-700 hover:bg-gray-600 text-white border-gray-600 shadow-none"),  
            #Button("‚ûï Agent", 
            #       hx_post="/add_cell/agent",
            #       hx_target="#notebook-container",
            #       hx_swap="beforeend",
            #       cls="btn btn-sm bg-gray-700 hover:bg-gray-600 text-white border-gray-600 shadow-none"), 
            cls="flex gap-2"
        ),
        cls="flex justify-between p-4 bg-[#0d0d0d]"  # ‚Üê Changed from #0d0d0d to pure black
    )

@rt('/add_cell/{cell_type}')
def add_cell(cell_type: str):
    """Create and return a new cell of the specified type.
    
    Args:
        cell_type: One of 'markdown', 'code', 'prompt', or 'agent'.
        
    Returns:
        Rendered FastHTML cell component.
    """
    if cell_type == 'markdown':
        new_cell = MarkdownCell("")
    elif cell_type == 'code':
        new_cell = CodeCell("")
    elif cell_type == 'prompt':
        new_cell = PromptCell("")
    #elif cell_type == 'agent':
    #    new_cell = AgentCell("")
    else:
        return "Invalid cell type"
    
    return new_cell.render()


In [None]:
#| export

@rt('/notebook/{notebook_file}')
def load_notebook(notebook_file:str): 
    """Load a Jupyter Notebook file and render its cells.
    
    Args:
        notebook_file: Path to .ipynb file to load.
        
    Returns:
        Tuple of Title and Body elements for the page.
    """
    if not os.path.exists(notebook_file):
        rendered_cells = []
        print ('Notebook Not Found:',notebook_file)

    else:
        cells = []
        with open(notebook_file, 'r', encoding='utf-8') as f:
            notebook = json.load(f)
            cells = notebook['cells']
        
        cell_objects = [reconstruct_ipynb_cell(cell) for cell in cells]
        rendered_cells = [cell.render() for cell in cell_objects]


    return Title("Unreal LLM Sandbox"),Body(
        Toolbar(notebook_file.split('.ipynb')[0]),
        Style(get_static('styles.css')),
        Div(  *rendered_cells,
            cls='px-5',  
            id='notebook-container' 
        )
    )

@rt('/save_notebook/{notebook_file}', methods=['POST'])
async def save_notebook(notebook_file: str, request):
    """Save notebook cells to a Jupyter .ipynb file.
    
    Args:
        notebook_file: Filename to save to.
        request: Request with JSON body containing 'cells' list.
    
    Returns:
        JSON with status message.
    """
    data = await request.json()
    cell_dict_list = data.get('cells', [])
    
    # Reconstruct cell objects and convert to ipynb format
    cells = reconstruct_cells_from_history(cell_dict_list)
    ipynb_cells = [cell.to_ipynb() for cell in cells]
    
    # Build notebook structure
    notebook = {
        "nbformat": 4,
        "nbformat_minor": 5,
        "metadata": {
            "kernelspec": {
                "display_name": "Python 3",
                "language": "python",
                "name": "python3"
            }
        },
        "cells": ipynb_cells
    }
    
    with open(notebook_file, 'w', encoding='utf-8') as f:
        json.dump(notebook, f, indent=2, ensure_ascii=False)
    
    return {"status": "saved", "file": notebook_file}
    

In [None]:
#| export
import uvicorn

def start_server():
    uvicorn.run(app,
                 host='0.0.0.0',
                 port=5001, 
                 timeout_graceful_shutdown=1)

In [None]:
#| hide
##| eval: false
#from fasthtml.jupyter import JupyUvi
#server = JupyUvi(app)

In [None]:
#| export
#| eval: false
if __name__ == "__main__":
    start_server()

RuntimeError: asyncio.run() cannot be called from a running event loop