## 1. Environment Setup

In [None]:
import sys
import os

REPO_DIR = "OT1-APITS"

# Check if we are running locally or need to clone
if os.path.exists("../src"):
    print("Running locally (parent directory has src).")
    REPO_ROOT = ".."
elif os.path.exists("src"):
    print("Running locally (current directory has src).")
    REPO_ROOT = "."
else:
    print(f"Local src not found. Checking for clone in {REPO_DIR}...")
    if not os.path.exists(REPO_DIR):
        print("Cloning repo...")
        !git clone https://github.com/Samsam19191/OT1-APITS.git {REPO_DIR}
    else:
        print(f"Repo already cloned. Pulling latest...")
        !cd {REPO_DIR} && git pull
    REPO_ROOT = REPO_DIR

# Add to path
if REPO_ROOT not in sys.path:
    sys.path.append(os.path.abspath(REPO_ROOT))

print(f"Repository root: {REPO_ROOT}")

: 

## 2. Import Streaming Modules

In [None]:
import asyncio
from src.events import EventType, KeystrokeEvent, StreamingSession
from src.keystroke_simulator import (
    KeystrokeSimulator,
    LoggingConsumer,
    simulate_typing,
)

print("✓ Imports successful")

## 3. Basic Typing Simulation

First, let's see how the simulator generates events for a simple query.

In [None]:
# Create shared queue (this is the interface between Lizhi and Rémi)
queue = asyncio.Queue()

# Sample Text-to-SQL query
sample_query = "Show me all patients"

print(f"Simulating typing: '{sample_query}'")
print("="*50)

In [None]:
async def run_demo():
    """Run producer and consumer concurrently."""
    queue = asyncio.Queue()
    
    # Create producer (Lizhi's simulator)
    simulator = KeystrokeSimulator(
        queue,
        mean_delay_ms=100,      # Faster for demo
        debounce_ms=200,        # Shorter debounce for demo
        typo_rate=0.0,          # No typos for cleaner output
    )
    
    # Create consumer (placeholder for Rémi's inference service)
    consumer = LoggingConsumer(queue, verbose=True)
    
    # Run both concurrently
    producer_task = asyncio.create_task(
        simulator.simulate_typing(sample_query)
    )
    consumer_task = asyncio.create_task(consumer.run())
    
    # Wait for completion
    await asyncio.gather(producer_task, consumer_task)
    
    return consumer.events, simulator.session

# Run the demo
events, session = await run_demo()

print("\n" + "="*50)
print(f"Total events: {len(events)}")
print(f"Final text: '{session.current_text}'")

## 4. Event Analysis

Let's analyze the event stream to understand the confirmation boundaries.

In [None]:
# Count events by type
from collections import Counter

event_counts = Counter(e.event_type.name for e in events)
print("Event distribution:")
for event_type, count in event_counts.items():
    print(f"  {event_type}: {count}")

# Show FLUSH events (confirmation points)
print("\nFLUSH events (token confirmation points):")
for e in events:
    if e.event_type == EventType.FLUSH:
        print(f"  [{e.timestamp_ms:.0f}ms] confirmed: '{e.confirmed_text}'")

## 5. Typing Simulation with Typos

Real users make typos. The simulator can model this behavior.

In [None]:
async def run_demo_with_typos():
    queue = asyncio.Queue()
    
    simulator = KeystrokeSimulator(
        queue,
        mean_delay_ms=80,
        typo_rate=0.1,  # 10% typo rate for demo
    )
    
    consumer = LoggingConsumer(queue, verbose=True)
    
    query = "Find patients with diabetes"
    print(f"Simulating with typos: '{query}'")
    print("="*50)
    
    await asyncio.gather(
        simulator.simulate_typing(query),
        consumer.run()
    )
    
    # Count typo corrections (CHAR_DELETE events)
    deletes = sum(1 for e in consumer.events if e.event_type == EventType.CHAR_DELETE)
    print(f"\nTypos corrected: {deletes}")
    print(f"Final text: '{simulator.session.current_text}'")

await run_demo_with_typos()

## 6. Multiple Queries from File

Load sample queries and simulate typing them.

In [None]:
from pathlib import Path

# Load sample queries
queries_file = Path(REPO_ROOT) / "data" / "sample_queries.txt"

if queries_file.exists():
    queries = queries_file.read_text().strip().split('\n')
    queries = [q for q in queries if q and not q.startswith('#')]
    print(f"Loaded {len(queries)} sample queries:")
    for i, q in enumerate(queries[:5]):
        print(f"  {i+1}. {q}")
    if len(queries) > 5:
        print(f"  ... and {len(queries)-5} more")
else:
    print(f"Warning: {queries_file} not found")
    queries = ["Show me all patients", "List products by revenue"]

In [None]:
async def benchmark_query(query: str):
    """Measure timing statistics for a single query."""
    queue = asyncio.Queue()
    
    simulator = KeystrokeSimulator(
        queue,
        mean_delay_ms=120,
        debounce_ms=250,
        typo_rate=0.02,
    )
    
    consumer = LoggingConsumer(queue, verbose=False)
    
    await asyncio.gather(
        simulator.simulate_typing(query),
        consumer.run()
    )
    
    # Calculate stats
    flush_events = [e for e in consumer.events if e.event_type == EventType.FLUSH]
    submit_time = next(
        (e.timestamp_ms for e in consumer.events if e.event_type == EventType.SUBMIT),
        0
    )
    
    return {
        'query': query,
        'chars': len(query),
        'total_events': len(consumer.events),
        'flush_count': len(flush_events),
        'typing_time_ms': submit_time,
        'chars_per_sec': len(query) / (submit_time / 1000) if submit_time > 0 else 0,
    }

# Benchmark first 3 queries
print("Benchmarking queries...\n")
for query in queries[:3]:
    stats = await benchmark_query(query)
    print(f"Query: '{stats['query'][:40]}...'" if len(stats['query']) > 40 else f"Query: '{stats['query']}'")
    print(f"  Chars: {stats['chars']}, Events: {stats['total_events']}, Flushes: {stats['flush_count']}")
    print(f"  Typing time: {stats['typing_time_ms']:.0f}ms, Speed: {stats['chars_per_sec']:.1f} chars/sec")
    print()

## 7. Custom Consumer Template (For Rémi)

This shows how Rémi should implement the inference service consumer.

In [None]:
from src.keystroke_simulator import EventConsumer
from src.events import EventType, KeystrokeEvent

class InferenceConsumer(EventConsumer):
    """
    Template for Rémi's inference service.
    
    This consumer should:
    1. On CHAR_ADD: Update text buffer, maybe tokenize speculatively
    2. On CHAR_DELETE: Handle rollback (crop KV-cache if needed)
    3. On FLUSH: Run forward pass to extend KV-cache with confirmed tokens
    4. On SUBMIT: Finalize cache, start generation
    """
    
    def __init__(self, queue):
        super().__init__(queue)
        self.confirmed_text = ""
        self.kv_cache_len = 0  # Placeholder for actual cache tracking
    
    async def handle_event(self, event: KeystrokeEvent):
        if event.event_type == EventType.CHAR_ADD:
            # Text buffer updated - could tokenize speculatively here
            pass
        
        elif event.event_type == EventType.CHAR_DELETE:
            # Check if we need to rollback KV-cache
            if len(event.current_text) < len(self.confirmed_text):
                # Need to crop cache - this is where cache.crop() would be called
                print(f"  [ROLLBACK] Would crop cache from '{self.confirmed_text}' to '{event.confirmed_text}'")
                self.confirmed_text = event.confirmed_text
        
        elif event.event_type == EventType.FLUSH:
            # New tokens confirmed - run forward pass
            new_confirmed = event.confirmed_text
            if len(new_confirmed) > len(self.confirmed_text):
                delta = new_confirmed[len(self.confirmed_text):]
                print(f"  [FORWARD] Would extend KV-cache with: '{delta}'")
                self.confirmed_text = new_confirmed
        
        elif event.event_type == EventType.SUBMIT:
            # Final submission - align cache and start generation
            print(f"  [SUBMIT] Final text: '{event.current_text}'")
            print(f"  [SUBMIT] Cache already has: '{self.confirmed_text}'")
            remaining = event.current_text[len(self.confirmed_text):]
            if remaining:
                print(f"  [SUBMIT] Would process remaining: '{remaining}'")
            print(f"  [SUBMIT] Ready to generate!")

print("InferenceConsumer template defined ✓")

In [None]:
async def demo_inference_consumer():
    """Demo the inference consumer with a sample query."""
    queue = asyncio.Queue()
    
    simulator = KeystrokeSimulator(
        queue,
        mean_delay_ms=50,
        debounce_ms=150,
        typo_rate=0.0,
    )
    
    consumer = InferenceConsumer(queue)
    
    query = "Show patients"
    print(f"Demo with InferenceConsumer: '{query}'\n")
    
    await asyncio.gather(
        simulator.simulate_typing(query),
        consumer.run()
    )

await demo_inference_consumer()

## 8. Next Steps

### For Lizhi:
- [ ] Add more realistic typing patterns (burst typing, thinking pauses)
- [ ] Integrate with Riad's WebSocket frontend when ready
- [ ] Test with various query complexities

### For Rémi:
- [ ] Implement `InferenceConsumer` with actual KV-cache operations
- [ ] Use the validated patterns from `02_extension.py` and `03_cropping.py`
- [ ] Handle tokenization of confirmed text

### Interface Contract:
- Queue: `asyncio.Queue[KeystrokeEvent]`
- Events: `CHAR_ADD`, `CHAR_DELETE`, `FLUSH`, `SUBMIT`, `END`
- Key fields: `current_text`, `confirmed_text`, `timestamp_ms`