# Instruction Processing Pipeline Analysis

This notebook analyzes the core instruction processing pipeline of the realtime system and provides feedback analysis for improvements.

## System Overview
1. Input Reception (`server_sse.py`)
2. Orchestration (`realtime_preview.py`)
3. Input Adaptation (`input_adapter.py`)
4. Trajectory Analysis (`core_trajectory.py`)
5. Visual Rendering (`visual_renderer.py`)
6. Security & Broadcast (`security_integration.py`)

## Feedback Analysis

### 1. Architecture Review
- **Potential Concerns:** 
  - SSE scalability with many clients
  - Security of direct file operations
  - Trajectory engine's memory usage

### 2. Code Quality
- **Areas for Improvement:**
  - Add type hints in visual renderer
  - Implement retry logic in SSE client
  - Add input validation in InputAdapter

### 3. Documentation
- **Enhancements:**
  - Add performance benchmarks
  - Include error handling examples
  - Document production deployment

### 4. Testing
- **Coverage Gaps:**
  - Test race conditions
  - Security test suite
  - Load testing

### 5. Security
- **Considerations:**
  - Rate limiting
  - Input sanitization
  - Authentication

### 6. Performance
- **Optimizations:**
  - Profile CPU/memory
  - Batch trajectory processing
  - Compression for large payloads

### 7. User Experience
- **Improvements:**
  - Progress indicators
  - Better error messages
  - Client reconnection logic

### 8. Future-Proofing
- **Suggestions:**
  - Version API
  - Plugin architecture
  - Telemetry

### 9. Deployment
- **Requirements:**
  - Containerization
  - Monitoring
  - Backup strategy

### 10. Team Alignment
- **Discussion Points:**
  - Roadmap
  - Integration
  - Maintenance

## Preparation Tasks

1. **Performance Monitoring**
   - Implement benchmarking with Locust
   - Track CPU/memory metrics
   - Set up continuous profiling

2. **Security Implementation**
   - Add input validation layers
   - Implement rate limiting
   - Set up authentication system

3. **Documentation Updates**
   - Create architecture diagrams
   - Document performance characteristics
   - Detail security measures

4. **Testing Infrastructure**
   - Set up integration test suite
   - Implement load testing
   - Create security test cases

5. **Deployment Pipeline**
   - Configure container orchestration
   - Set up monitoring and alerts
   - Implement backup procedures

6. **Team Resources**
   - Prepare technical presentations
   - Create feedback collection system
   - Document maintenance procedures

In [1]:
# Setup imports
import os
import pathlib
import ast
from typing import Dict, List, Set
import re
from dataclasses import dataclass
from collections import defaultdict

# Define paths to core components
COMPONENTS = {
    'server': pathlib.Path('d:/realtime/server_sse.py'),
    'orchestrator': pathlib.Path('d:/realtime/realtime_preview.py'),
    'input': pathlib.Path('d:/realtime/input_adapter.py'),
    'trajectory': pathlib.Path('d:/realtime/core_trajectory.py'),
    'renderer': pathlib.Path('d:/realtime/visual_renderer.py'),
    'security': pathlib.Path('d:/realtime/security_integration.py')
}

@dataclass
class ComponentInfo:
    """Store analyzed information about a component"""
    name: str
    classes: List[str]
    methods: List[str]
    calls: List[str]
    dependencies: Set[str]

In [2]:
def extract_name_from_node(node: ast.AST) -> str:
    """Extract a name from an AST node, handling various node types"""
    if isinstance(node, ast.Name):
        return node.id
    elif isinstance(node, ast.Attribute):
        # Handle nested attributes (e.g., a.b.c)
        parts = []
        current = node
        while isinstance(current, ast.Attribute):
            parts.append(current.attr)
            current = current.value
        if isinstance(current, ast.Name):
            parts.append(current.id)
        return '.'.join(reversed(parts))
    return ''

def analyze_component(path: pathlib.Path) -> ComponentInfo:
    """Analyze a component's source code to extract key information"""
    try:
        code = path.read_text(encoding='utf-8')
        tree = ast.parse(code)
        
        classes = []
        methods = []
        calls = []
        dependencies = set()
        
        for node in ast.walk(tree):
            # Collect class definitions
            if isinstance(node, ast.ClassDef):
                classes.append(node.name)
                
            # Collect method definitions
            elif isinstance(node, ast.FunctionDef):
                methods.append(node.name)
                
            # Collect function calls
            elif isinstance(node, ast.Call):
                call_name = extract_name_from_node(node.func)
                if call_name:
                    calls.append(call_name)
                    
            # Collect imports
            elif isinstance(node, ast.Import):
                for alias in node.names:
                    dependencies.add(alias.name.split('.')[0])
            elif isinstance(node, ast.ImportFrom):
                if node.module:
                    dependencies.add(node.module.split('.')[0])
                for alias in node.names:
                    dependencies.add(alias.name.split('.')[0])
        
        return ComponentInfo(
            name=path.stem,
            classes=sorted(set(classes)),
            methods=sorted(set(methods)),
            calls=sorted(set(calls)),
            dependencies=dependencies
        )
    except Exception as e:
        print(f"Error analyzing {path}: {e}")
        return ComponentInfo(name=path.stem, classes=[], methods=[], calls=[], dependencies=set())

# Analyze all components
component_analysis = {
    name: analyze_component(path)
    for name, path in COMPONENTS.items()
}

# Print component summaries
for name, info in component_analysis.items():
    print(f"\n{name.upper()} Component:")
    print(f"Classes: {', '.join(info.classes)}")
    print(f"Methods: {len(info.methods)}")
    print(f"Dependencies: {', '.join(sorted(info.dependencies))}")
    print(f"Sample calls: {', '.join(info.calls[:5]) if info.calls else 'None'}")


SERVER Component:
Classes: PreviewRequestHandler, SSEHub, ServerState
Methods: 12
Dependencies: Any, BaseHTTPRequestHandler, Dict, Empty, List, Path, Queue, ThreadingHTTPServer, __future__, annotations, create_glimpse, http, json, pathlib, queue, realtime_preview, threading, time, typing
Sample calls: Path, Queue, SSEHub, ServerState, ThreadingHTTPServer

ORCHESTRATOR Component:
Classes: GlimpseConfiguration, GlimpseOrchestrator, GlimpseState
Methods: 18
Dependencies: Any, Callable, Dict, InputAdapter, InputEventType, List, Optional, Path, PreviewFrame, SecurityContext, SecurityManager, TrajectoryDirection, TrajectoryEngine, VisualRenderer, VisualizationMode, __future__, annotations, core_trajectory, dataclass, dataclasses, field, input_adapter, json, logging, pathlib, security_integration, time, typing, visual_renderer
Sample calls: GlimpseConfiguration, GlimpseOrchestrator, GlimpseState, InputAdapter, LOG.error

INPUT Component:
Classes: AdaptationContext, InputAdapter, InputEvent, 

In [3]:
def find_instruction_handlers(info: ComponentInfo) -> List[Dict[str, str]]:
    """Find methods that handle instruction processing"""
    instruction_patterns = {
        'process': r'process.*instruction|process.*input|process.*command',
        'handle': r'handle.*instruction|handle.*input|handle.*command',
        'execute': r'execute.*instruction|execute.*command',
        'validate': r'validate.*instruction|validate.*input',
        'transform': r'transform.*instruction|transform.*input'
    }
    
    handlers = []
    for method in info.methods:
        for category, pattern in instruction_patterns.items():
            if re.search(pattern, method.lower()):
                handlers.append({
                    'method': method,
                    'category': category,
                    'calls': [call for call in info.calls if method in call]
                })
    return handlers

# Find instruction handling methods in each component
print("\nInstruction Processing Methods:")
for name, info in component_analysis.items():
    handlers = find_instruction_handlers(info)
    if handlers:
        print(f"\n{name.upper()}:")
        for handler in handlers:
            print(f"  - {handler['method']} ({handler['category']})")
            if handler['calls']:
                print(f"    Called by: {', '.join(handler['calls'])}")

# Map the flow between components
def map_component_flow() -> Dict[str, Dict[str, Set[str]]]:
    """Map how components interact with each other"""
    flow = defaultdict(lambda: {'imports': set(), 'calls': set()})
    
    for name, info in component_analysis.items():
        # Check imports
        for dep in info.dependencies:
            for other_name, other_info in component_analysis.items():
                if dep == other_info.name:
                    flow[name]['imports'].add(other_name)
        
        # Check method calls
        for call in info.calls:
            for other_name, other_info in component_analysis.items():
                if name != other_name:
                    if any(method in call for method in other_info.methods):
                        flow[name]['calls'].add(other_name)
    
    return flow

print("\nComponent Interaction Flow:")
flow = map_component_flow()
for source, interactions in flow.items():
    if interactions['imports'] or interactions['calls']:
        print(f"\n{source.upper()}:")
        if interactions['imports']:
            print(f"  Imports from: {', '.join(sorted(interactions['imports']))}")
        if interactions['calls']:
            print(f"  Calls to: {', '.join(sorted(interactions['calls']))}")

# Calculate component complexity
def calculate_complexity(info: ComponentInfo) -> Dict[str, int]:
    """Calculate complexity metrics for a component"""
    return {
        'classes': len(info.classes),
        'methods': len(info.methods),
        'calls': len(info.calls),
        'dependencies': len(info.dependencies),
        'instruction_handlers': len(find_instruction_handlers(info))
    }

print("\nComponent Complexity Metrics:")
for name, info in component_analysis.items():
    metrics = calculate_complexity(info)
    print(f"\n{name.upper()}:")
    for metric, value in metrics.items():
        print(f"  {metric}: {value}")


Instruction Processing Methods:

SERVER:
  - _handle_input (handle)
    Called by: self._handle_input

ORCHESTRATOR:
  - process_input (process)

Component Interaction Flow:

SERVER:
  Imports from: orchestrator
  Calls to: input, orchestrator, renderer, security, trajectory

ORCHESTRATOR:
  Imports from: input, renderer, security, trajectory
  Calls to: input, renderer, security, server, trajectory

INPUT:
  Calls to: orchestrator, renderer, security, trajectory

TRAJECTORY:
  Calls to: input, orchestrator, renderer, security

RENDERER:
  Calls to: input, orchestrator, security, trajectory

SECURITY:
  Calls to: input, orchestrator, renderer, trajectory

Component Complexity Metrics:

SERVER:
  classes: 3
  methods: 12
  calls: 45
  dependencies: 19
  instruction_handlers: 1

ORCHESTRATOR:
  classes: 3
  methods: 18
  calls: 67
  dependencies: 29
  instruction_handlers: 1

INPUT:
  classes: 4
  methods: 17
  calls: 31
  dependencies: 15
  instruction_handlers: 0

TRAJECTORY:
  classe

## Pipeline Implementation Analysis

The instruction processing pipeline follows these stages:

1. **Input Reception** (`server_sse.py`)
   - Handles incoming POST requests to `/input`
   - Validates request format and authentication
   - Forwards to orchestrator

2. **Orchestration** (`realtime_preview.py`)
   - Coordinates processing flow
   - Manages component interactions
   - Handles error cases and retries

3. **Input Processing** (`input_adapter.py`)
   - Normalizes input format
   - Tracks edit history
   - Handles undo/redo operations

4. **Trajectory Analysis** (`core_trajectory.py`)
   - Analyzes instruction patterns
   - Builds cause-effect chains
   - Calculates confidence scores

5. **Visual Rendering** (`visual_renderer.py`)
   - Generates preview frames
   - Handles different output formats
   - Manages render queue

6. **Security & Broadcast** (`security_integration.py`)
   - Validates operations
   - Manages permissions
   - Broadcasts updates via SSE

## Extension Points

1. Custom Input Adapters
   - Implement `InputAdapter` interface
   - Register in orchestrator

2. Trajectory Analyzers
   - Extend `TrajectoryEngine`
   - Add custom metrics

3. Render Providers
   - Implement `VisualRenderer` interface
   - Support new output formats

## Enhanced Pipeline Analysis

From the improved component analysis, we can now see:

1. **Core Components Hierarchy**
   - Entry point: `server_sse.py` handles HTTP/SSE endpoints
   - Coordinator: `realtime_preview.py` orchestrates processing
   - Processors: `input_adapter.py` and `core_trajectory.py` handle transformations
   - Output: `visual_renderer.py` generates results
   - Security: `security_integration.py` validates operations

2. **Key Integration Points**
   - SSE event broadcasting system
   - Input processing pipeline
   - Security validation gates
   - Visual rendering queue

3. **Extension Mechanisms**
   - Custom input adapters
   - Trajectory analyzers
   - Render providers
   - Security validators

4. **Component Dependencies**
   - Direct method calls
   - Import relationships
   - Event-based communication
   - Shared state management

## Next Steps

1. **SSE Implementation**
   - Document event types and payloads
   - Map broadcast patterns
   - Analyze client reconnection handling

2. **Security Integration**
   - Review validation gates
   - Document permission model
   - Map authentication flow

3. **Performance Analysis**
   - Identify bottlenecks
   - Monitor processing times
   - Optimize critical paths

## Implementation Examples

The following sections demonstrate practical implementations of pipeline extensions, showing how to integrate with each component of the system.

### 1. Custom Input Adapter

The `MarkdownInputAdapter` extends the base input adapter to provide markdown-specific suggestions and formatting. This example shows how to:
- Extend the base `InputAdapter` class
- Register custom suggestion providers
- Handle markdown-specific patterns
- Integrate with the orchestrator

In [4]:
from input_adapter import InputAdapter, AdaptationContext
from typing import List
import re

class MarkdownInputAdapter(InputAdapter):
    """Input adapter with markdown-aware suggestions"""
    
    def __init__(self, buffer_size: int = 50):
        super().__init__(buffer_size)
        # Register markdown suggestion provider
        self.register_suggestion_provider(self._markdown_suggestions)
    
    def _markdown_suggestions(self, context: AdaptationContext) -> List[str]:
        """Generate markdown-specific suggestions"""
        content = context.current_content
        cursor = context.cursor_position
        
        suggestions = []
        
        # Get current line
        lines = content[:cursor].split('\n')
        current_line = lines[-1] if lines else ""
        
        # Heading completion
        if current_line.startswith('#'):
            level = len(re.match(r'^#+', current_line).group())
            suggestions.append(f"{'#' * level} Heading level {level}")
        
        # List continuation
        if re.match(r'^\s*[-*+]\s', current_line):
            suggestions.append("Continue list")
            suggestions.append("Nest list item")
        
        # Link template
        if '[' in current_line and ']' not in current_line:
            suggestions.append("[text](url)")
        
        # Code block
        if current_line.strip() == '```':
            suggestions.append("```python\n# code here\n```")
            suggestions.append("```javascript\n// code here\n```")
        
        return suggestions[:3]

### 2. Custom Trajectory Analyzer

The `CodeComplexityAnalyzer` extends the trajectory analysis system to track code complexity trends. This example demonstrates:
- Complexity metric calculation
- Pattern-based code analysis
- Integration with the trajectory engine
- Trend analysis for code evolution

In [5]:
from core_trajectory import TrajectoryEngine, TrajectoryDirection, TrajectoryPoint
from typing import List
import re

class CodeComplexityAnalyzer:
    """Analyzes code complexity trends in trajectory"""
    
    def __init__(self):
        self.complexity_keywords = [
            'if', 'else', 'elif', 'for', 'while', 'try', 'except',
            'def', 'class', 'async', 'await', 'lambda'
        ]
    
    def analyze_direction(self, points: List[TrajectoryPoint]) -> TrajectoryDirection:
        """Determine trajectory based on code complexity"""
        if len(points) < 3:
            return TrajectoryDirection.UNCERTAIN
        
        recent = points[-5:]
        complexities = [self._compute_complexity(p.content) for p in recent]
        
        # Trend analysis
        if len(complexities) < 2:
            return TrajectoryDirection.UNCERTAIN
        
        trend = complexities[-1] - complexities[0]
        variance = max(complexities) - min(complexities)
        
        if trend > variance * 0.5:
            return TrajectoryDirection.EXPANDING  # Adding complexity
        elif trend < -variance * 0.5:
            return TrajectoryDirection.CONVERGING  # Simplifying
        elif variance > sum(complexities) / len(complexities) * 0.4:
            return TrajectoryDirection.PIVOTING  # Refactoring
        else:
            return TrajectoryDirection.STABLE
    
    def _compute_complexity(self, content: str) -> float:
        """Simple complexity metric based on control structures"""
        score = 0.0
        
        for keyword in self.complexity_keywords:
            # Count keyword occurrences
            score += len(re.findall(rf'\b{keyword}\b', content))
        
        # Nesting depth (approximate)
        max_indent = 0
        for line in content.split('\n'):
            if line.strip():
                indent = len(line) - len(line.lstrip())
                max_indent = max(max_indent, indent // 4)
        
        score += max_indent * 2
        
        # Normalize by content length
        return score / max(len(content), 1) * 1000

### 3. Integration Testing

The following example shows how to test custom extensions:

In [6]:
# Example testing code for extensions
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Any
import re

# Mock classes for testing
@dataclass
class AdaptationContext:
    """Mock context for testing input adapters"""
    current_content: str
    cursor_position: int

class InputAdapter:
    """Mock base input adapter"""
    def __init__(self, buffer_size: int = 50):
        self.buffer_size = buffer_size
        self.suggestion_providers = []
    
    def register_suggestion_provider(self, provider):
        self.suggestion_providers.append(provider)

class MockTrajectoryPoint:
    """Mock trajectory point for testing"""
    def __init__(self, content: str):
        self.content = content

# Import test classes
from core_trajectory import TrajectoryDirection

class CodeComplexityAnalyzer:
    """Analyzes code complexity trends in trajectory"""
    
    def __init__(self):
        self.complexity_keywords = [
            'if', 'else', 'elif', 'for', 'while', 'try', 'except',
            'def', 'class', 'async', 'await', 'lambda'
        ]
    
    def _compute_complexity(self, content: str) -> float:
        """Simple complexity metric based on control structures"""
        score = 0.0
        
        for keyword in self.complexity_keywords:
            # Count keyword occurrences
            score += len(re.findall(rf'\b{keyword}\b', content))
        
        # Nesting depth (approximate)
        max_indent = 0
        for line in content.split('\n'):
            if line.strip():
                indent = len(line) - len(line.lstrip())
                max_indent = max(max_indent, indent // 4)
        
        score += max_indent * 2
        
        # Normalize by content length
        return score / max(len(content), 1) * 1000

class MarkdownInputAdapter(InputAdapter):
    """Input adapter with markdown-aware suggestions"""
    
    def __init__(self, buffer_size: int = 50):
        super().__init__(buffer_size)
        # Register markdown suggestion provider
        self.register_suggestion_provider(self._markdown_suggestions)
    
    def _markdown_suggestions(self, context: AdaptationContext) -> List[str]:
        """Generate markdown-specific suggestions"""
        content = context.current_content
        cursor = context.cursor_position
        
        suggestions = []
        
        # Get current line
        lines = content[:cursor].split('\n')
        current_line = lines[-1] if lines else ""
        
        # Heading completion
        if current_line.startswith('#'):
            level = len(re.match(r'^#+', current_line).group())
            suggestions.append(f"{'#' * level} Heading level {level}")
        
        # List continuation
        if re.match(r'^\s*[-*+]\s', current_line):
            suggestions.append("Continue list")
            suggestions.append("Nest list item")
        
        # Link template
        if '[' in current_line and ']' not in current_line:
            suggestions.append("[text](url)")
        
        # Code block
        if current_line.strip() == '```':
            suggestions.append("```python\n# code here\n```")
            suggestions.append("```javascript\n// code here\n```")
        
        return suggestions[:3]

def test_custom_extensions():
    """Test custom pipeline extensions"""
    print("\nSimulating extension testing:")
    results = []
    
    print("- Testing CodeComplexityAnalyzer...")
    # Test complexity analyzer
    analyzer = CodeComplexityAnalyzer()
    test_code = "if x:\n    if y:\n        if z:\n            pass"
    complexity = analyzer._compute_complexity(test_code)
    
    results.append({
        'test': 'complexity_analyzer',
        'success': True,
        'complexity': complexity,
        'details': 'Analyzer processed nested code structure'
    })
    
    print("- Testing MarkdownInputAdapter...")
    # Test markdown adapter
    adapter = MarkdownInputAdapter()
    context = AdaptationContext(
        current_content="# ",
        cursor_position=2
    )
    suggestions = adapter._markdown_suggestions(context)
    
    results.append({
        'test': 'markdown_adapter',
        'success': bool(suggestions),
        'suggestions': len(suggestions),
        'details': 'Markdown adapter provided heading suggestions'
    })
    
    return results

# Run tests without server connection
print("Running extension tests in isolation...")
test_results = test_custom_extensions()

# Display results
print("\nTest Results:")
for result in test_results:
    print(f"\n{result['test']}:")
    print(f"  Success: {result['success']}")
    print(f"  Details: {result['details']}")
    if 'complexity' in result:
        print(f"  Complexity Score: {result['complexity']}")
    if 'suggestions' in result:
        print(f"  Suggestions: {result['suggestions']}")

Running extension tests in isolation...

Simulating extension testing:
- Testing CodeComplexityAnalyzer...
- Testing MarkdownInputAdapter...

Test Results:

complexity_analyzer:
  Success: True
  Details: Analyzer processed nested code structure
  Complexity Score: 195.6521739130435

markdown_adapter:
  Success: True
  Details: Markdown adapter provided heading suggestions
  Suggestions: 1


## System Integration Checklist

When implementing these extensions:

1. **Component Testing**
   - Test each extension in isolation
   - Verify edge cases and error handling
   - Check performance impact

2. **Security Considerations**
   - Review custom validation rules
   - Test security boundaries
   - Validate input sanitization

3. **Performance Monitoring**
   - Profile extension overhead
   - Monitor memory usage
   - Track processing times

4. **Documentation**
   - Document extension APIs
   - Provide usage examples
   - Include configuration options

5. **Deployment**
   - Set up logging
   - Configure monitoring
   - Create rollback procedures
   - Test under load

6. **Client Integration**
   - Update client SDKs
   - Document new features
   - Provide migration guides

## Client Integration Examples

The following Python client implementation shows how to:
- Connect to the SSE stream
- Send instructions
- Handle preview updates
- Manage the connection lifecycle

In [7]:
import requests
import sseclient
import threading
import json
from typing import Callable, Dict, Any
from dataclasses import dataclass, fields
from datetime import datetime

@dataclass
class PreviewEvent:
    """Structure for preview events"""
    stage: str
    content: str
    job_id: str
    timestamp: datetime
    metadata: Dict[str, Any]

class GlimpseClient:
    """Python client for Glimpse SSE server"""
    
    def __init__(self, server_url: str = "http://127.0.0.1:8765"):
        self.server_url = server_url
        self.handlers: Dict[str, Callable] = {}
        self._stream_thread = None
        self._running = False
        self.session = requests.Session()
    
    def connect(self):
        """Connect to SSE stream"""
        self._running = True
        self._stream_thread = threading.Thread(target=self._listen_stream, daemon=True)
        self._stream_thread.start()
    
    def _listen_stream(self):
        """Listen to SSE stream in background thread"""
        try:
            response = self.session.get(f"{self.server_url}/events", stream=True)
            client = sseclient.SSEClient(response)
            
            for event in client.events():
                if not self._running:
                    break
                
                if event.event == "preview":
                    data = json.loads(event.data)
                    preview = PreviewEvent(
                        stage=data.get('stage', 'unknown'),
                        content=data.get('content', ''),
                        job_id=data.get('job_id', ''),
                        timestamp=datetime.now(),
                        metadata=data.get('metadata', {})
                    )
                    self._handle_preview(preview)
        except Exception as e:
            print(f"Stream error: {e}")
            if self._running:
                # Try to reconnect after a delay
                threading.Timer(5.0, self._listen_stream).start()
    
    def send_input(self, content: str, action: str = "replace", 
                   start: int = 0, end: int = 0) -> Dict[str, Any]:
        """Send input to server"""
        payload = {
            "content": content,
            "action": action,
            "start": start,
            "end": end,
            "timestamp": datetime.now().isoformat()
        }
        
        response = self.session.post(
            f"{self.server_url}/input",
            json=payload
        )
        return response.json()
    
    def on_preview(self, handler: Callable[[PreviewEvent], None]):
        """Register preview handler"""
        self.handlers["preview"] = handler
    
    def _handle_preview(self, preview: PreviewEvent):
        """Handle preview event"""
        if "preview" in self.handlers:
            self.handlers["preview"](preview)
    
    def disconnect(self):
        """Disconnect from server"""
        self._running = False
        if self.session:
            self.session.close()

# Demonstrate client API features without connecting to server
def explore_client_api():
    """Explore the GlimpseClient API capabilities"""
    client = GlimpseClient()
    print("GlimpseClient Features Exploration")
    print("=================================")
    print(f"\nConnection Settings:")
    print(f"Server URL: {client.server_url}")
    
    # List and describe available methods
    print("\nAvailable Methods:")
    methods = [m for m in dir(client) if not m.startswith('_')]
    for method in methods:
        doc = getattr(client, method).__doc__ or "No description"
        print(f"\n- {method}:")
        print(f"  {doc.split('\n')[0]}")
    
    # Show PreviewEvent structure
    print("\nPreviewEvent Structure:")
    preview_fields = [field.name for field in fields(PreviewEvent)]
    print(f"Fields: {', '.join(preview_fields)}")
    
    # Example payload structure
    print("\nExample Input Payload:")
    example_payload = {
        "content": "print('Hello, World!')",
        "action": "replace",
        "start": 0,
        "end": 0,
        "timestamp": datetime.now().isoformat()
    }
    print(json.dumps(example_payload, indent=2))
    
    print("\nUsage Examples:")
    print("1. Connect to server:")
    print("   client.connect()")
    print("\n2. Register preview handler:")
    print("   client.on_preview(lambda event: print(f'Preview: {event}'))")
    print("\n3. Send input:")
    print("   client.send_input('print(\"Hello\")')")
    print("\n4. Disconnect:")
    print("   client.disconnect()")

# Run API exploration
explore_client_api()

GlimpseClient Features Exploration

Connection Settings:
Server URL: http://127.0.0.1:8765

Available Methods:

- connect:
  Connect to SSE stream

- disconnect:
  Disconnect from server

- handlers:
  dict() -> new empty dictionary

- on_preview:
  Register preview handler

- send_input:
  Send input to server

- server_url:
  str(object='') -> str

- session:
  A Requests session.

PreviewEvent Structure:
Fields: stage, content, job_id, timestamp, metadata

Example Input Payload:
{
  "content": "print('Hello, World!')",
  "action": "replace",
  "start": 0,
  "end": 0,
  "timestamp": "2025-10-18T13:04:24.403819"
}

Usage Examples:
1. Connect to server:
   client.connect()

2. Register preview handler:
   client.on_preview(lambda event: print(f'Preview: {event}'))

3. Send input:
   client.send_input('print("Hello")')

4. Disconnect:
   client.disconnect()


## Performance Monitoring

The following examples demonstrate how to:
1. Monitor processing times
2. Track memory usage
3. Profile critical operations
4. Generate performance reports

In [8]:
import time
import psutil
import cProfile
import pstats
import io
from functools import wraps
from typing import Dict, List, Any, Pattern
from dataclasses import dataclass
import re
from collections import deque
from datetime import datetime, timedelta

@dataclass
class PerformanceMetric:
    """Structure for performance measurements"""
    operation: str
    duration: float
    timestamp: datetime
    memory_usage: float
    context: Dict[str, Any]

class PerformanceMonitor:
    """Monitor and analyze system performance"""
    
    FRAMEWORK_PATTERNS = [
        r'.*python.*[\\/]asyncio[\\/].*',
        r'.*python.*[\\/]ipykernel[\\/].*',
        r'.*python.*[\\/]tornado[\\/].*',
        r'.*python.*[\\/]zmq[\\/].*'
    ]
    
    def __init__(self, window_size: int = 1000):
        self.metrics: deque = deque(maxlen=window_size)
        self.start_time = datetime.now()
        self.process = psutil.Process()
        self._cached_initial_memory = None
        self._framework_regex = [re.compile(pattern) for pattern in self.FRAMEWORK_PATTERNS]
    
    @property
    def initial_memory(self):
        """Get or compute initial memory usage"""
        if self._cached_initial_memory is None:
            self._cached_initial_memory = self.process.memory_info().rss / (1024 * 1024)
        return self._cached_initial_memory
    
    def measure(self, operation: str = None):
        """Decorator to measure operation performance"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # Force garbage collection before measurement
                import gc
                gc.collect()
                
                start_time = time.perf_counter()
                start_memory = self.process.memory_info().rss / (1024 * 1024)  # MB
                
                try:
                    result = func(*args, **kwargs)
                    success = True
                except Exception as e:
                    result = e
                    success = False
                    raise
                finally:
                    end_time = time.perf_counter()
                    # Force GC again to get accurate memory measurement
                    gc.collect()
                    end_memory = self.process.memory_info().rss / (1024 * 1024)
                    
                    metric = PerformanceMetric(
                        operation=operation or func.__name__,
                        duration=end_time - start_time,
                        timestamp=datetime.now(),
                        memory_usage=end_memory - start_memory,
                        context={
                            'success': success,
                            'args_length': len(args),
                            'kwargs_keys': list(kwargs.keys()),
                            'total_memory': end_memory - self.initial_memory
                        }
                    )
                    self.metrics.append(metric)
                
                return result
            return wrapper
        return decorator
    
    def get_statistics(self, operation: str = None, 
                      time_window: timedelta = None) -> Dict[str, Any]:
        """Calculate performance statistics"""
        if time_window:
            cutoff = datetime.now() - time_window
            metrics = [m for m in self.metrics if m.timestamp > cutoff]
        else:
            metrics = list(self.metrics)
        
        if operation:
            metrics = [m for m in metrics if m.operation == operation]
        
        if not metrics:
            return {}
        
        durations = [m.duration for m in metrics]
        memory_usages = [m.memory_usage for m in metrics]
        total_time = (metrics[-1].timestamp - metrics[0].timestamp).total_seconds()
        
        return {
            'count': len(metrics),
            'avg_duration': sum(durations) / len(durations),
            'max_duration': max(durations),
            'min_duration': min(durations),
            'avg_memory_delta': sum(memory_usages) / len(memory_usages),
            'max_memory_delta': max(memory_usages),
            'total_memory': metrics[-1].context['total_memory'],
            'operations_per_second': len(metrics) / max(total_time, 0.001),
            'total_time': total_time
        }
    
    def _is_framework_file(self, filename: str) -> bool:
        """Check if a file is a framework internal"""
        return any(pattern.match(filename) for pattern in self._framework_regex)
    
    def profile_operation(self, func, *args, **kwargs):
        """Profile a specific operation with framework filtering"""
        # Create profiler and run operation
        profiler = cProfile.Profile()
        result = profiler.runcall(func, *args, **kwargs)
        
        # Create filtered stats object
        stats = pstats.Stats(profiler)
        stats.sort_stats('cumulative')
        
        # Filter framework calls and create new stats
        filtered_entries = {}
        for (filename, lineno, funcname), (cc, nc, tt, ct, callers) in stats.stats.items():
            if not self._is_framework_file(filename):
                filtered_entries[(filename, lineno, funcname)] = (cc, nc, tt, ct, callers)
        
        filtered_stats = pstats.Stats(profiler)
        filtered_stats.stats = filtered_entries
        filtered_stats.sort_stats('cumulative')
        
        return result, filtered_stats

# Performance Monitor Demo
print("\033[1mPerformance Monitoring Demo\033[0m")
print("=" * 25)
monitor = PerformanceMonitor()

@monitor.measure("process_instruction")
def process_instruction(content: str) -> Dict[str, Any]:
    """Example processing function"""
    time.sleep(0.1)  # Simulate work
    return {"status": "processed", "length": len(content)}

# 1. Content Size Testing
print("\n\033[1m1. Content Size Performance\033[0m")
print("-" * 25)
sizes = [10, 100, 1000]
for size in sizes:
    content = "x" * size
    result = process_instruction(content)
    print(f"Processed {size:4d} chars -> {result}")

# 2. Statistics Report
print("\n\033[1m2. Performance Metrics\033[0m")
print("-" * 25)
stats = monitor.get_statistics(operation="process_instruction")
metrics = {
    "Total Operations": stats["count"],
    "Average Duration": f"{stats['avg_duration']:.4f}s",
    "Operations/Second": f"{stats['operations_per_second']:.2f}",
    "Memory Impact": f"{stats['avg_memory_delta']:.2f} MB",
    "Total Runtime": f"{stats['total_time']:.3f}s"
}
for key, value in metrics.items():
    print(f"{key:20}: {value}")

# 3. Profile Analysis
print("\n\033[1m3. Profile Analysis\033[0m")
print("-" * 25)
test_size = 100
print(f"Sample profiling ({test_size} chars):")
result, profile_stats = monitor.profile_operation(
    process_instruction, "x" * test_size
)

# Format profile output
s = io.StringIO()
profile_stats.stream = s
profile_stats.sort_stats('cumulative').print_stats(5)

# Clean up profile output
print("\nTop Operations (excluding framework):")
profile_output = s.getvalue().split('\n')
# Print header
print("\n".join(line for line in profile_output[:3] if line.strip()))
# Print stats table header
print("-" * 85)
print("{:>8s} {:>8s} {:>8s} {:>8s} {:>8s}  {}".format(
    "ncalls", "tottime", "percall", "cumtime", "percall", "filename:lineno(function)"))
print("-" * 85)
# Print actual stats (up to 5 lines)
stats_lines = [line for line in profile_output[3:] if line.strip()][:5]
print("\n".join(stats_lines))

[1mPerformance Monitoring Demo[0m

[1m1. Content Size Performance[0m
-------------------------
Processed   10 chars -> {'status': 'processed', 'length': 10}Processed   10 chars -> {'status': 'processed', 'length': 10}

Processed  100 chars -> {'status': 'processed', 'length': 100}
Processed  100 chars -> {'status': 'processed', 'length': 100}
Processed 1000 chars -> {'status': 'processed', 'length': 1000}

[1m2. Performance Metrics[0m
-------------------------
Total Operations    : 3
Average Duration    : 0.1002s
Operations/Second   : 11.82
Memory Impact       : 0.00 MB
Total Runtime       : 0.254s

[1m3. Profile Analysis[0m
-------------------------
Sample profiling (100 chars):
Processed 1000 chars -> {'status': 'processed', 'length': 1000}

[1m2. Performance Metrics[0m
-------------------------
Total Operations    : 3
Average Duration    : 0.1002s
Operations/Second   : 11.82
Memory Impact       : 0.00 MB
Total Runtime       : 0.254s

[1m3. Profile Analysis[0m
-----------

### Using the Performance Monitor

To monitor performance in the Glimpse system:

1. **Wrap Components**
```python
@monitor.measure("input_processing")
def process_input(self, content: str):
    # Processing logic here
    pass
```

2. **Track Critical Paths**
```python
@monitor.measure("trajectory_analysis")
def analyze_trajectory(self):
    # Analysis logic here
    pass
```

3. **Generate Reports**
```python
stats = monitor.get_statistics(
    operation="input_processing",
    time_window=timedelta(minutes=5)
)
```

4. **Profile Bottlenecks**
```python
result, stats = monitor.profile_operation(
    orchestrator.process_input,
    content="test content"
)
stats.print_stats(3)  # Show top 3 operations
```

The performance monitoring system provides:
- Real-time metrics through the `measure` decorator
- Historical trends with configurable time windows
- Memory usage tracking per operation
- Operation profiling with framework filtering
- Bottleneck identification using cumulative stats
- Garbage collection optimization
- Customizable metric collection

Key features:
1. **Accurate Memory Tracking**
   - Forced GC before measurements
   - Delta and total memory monitoring
   - Per-operation memory impact

2. **Framework-Aware Profiling**
   - Filters out noise from Python internals
   - Focus on application code
   - Pattern-based exclusion rules

3. **Flexible Statistics**
   - Time-windowed analysis
   - Operation-specific filtering
   - Comprehensive metrics

# Final Documentation

## System Architecture Overview

### Core Components
1. **Server SSE Layer** (`server_sse.py`)
   - Event streaming infrastructure
   - Request handling and validation
   - Client connection management

2. **Orchestration Layer** (`realtime_preview.py`)
   - Workflow coordination
   - Component lifecycle management
   - Error handling and recovery

3. **Input Processing** (`input_adapter.py`)
   - Content normalization
   - Edit history tracking
   - Suggestion generation

4. **Trajectory Analysis** (`core_trajectory.py`)
   - Pattern recognition
   - Code complexity analysis
   - Evolution tracking

5. **Visual Rendering** (`visual_renderer.py`)
   - Preview generation
   - Output format handling
   - Render queue management

6. **Security Layer** (`security_integration.py`)
   - Operation validation
   - Permission management
   - Event broadcasting

### Integration Points
- SSE event system for real-time updates
- Input processing pipeline
- Security validation gates
- Visual rendering queue
- Inter-component communication
- Extension registration system

## Implementation Details

### 1. Extension System
The system supports three main types of extensions:

#### Input Adapters
- Base class: `InputAdapter`
- Key features:
  * Custom suggestion providers
  * Pattern-based content analysis
  * Edit history tracking
- Example: `MarkdownInputAdapter` implementation

#### Trajectory Analyzers
- Core class: `CodeComplexityAnalyzer`
- Features:
  * Complexity metric calculation
  * Pattern recognition
  * Trend analysis
- Integration with `TrajectoryEngine`

#### Performance Monitoring
- Core class: `PerformanceMonitor`
- Capabilities:
  * Real-time metrics tracking
  * Memory usage analysis
  * Framework-aware profiling
  * Statistical analysis

### 2. Client Integration

#### Python Client Implementation
```python
client = GlimpseClient(server_url="http://127.0.0.1:8765")
client.connect()
client.on_preview(handle_preview)
client.send_input("content")
```

Key Features:
- Automatic reconnection
- Event handling
- Preview updates
- Connection lifecycle management

## Deployment & Operations

### 1. Performance Optimization
- Framework filtering for accurate profiling
- Garbage collection optimization
- Memory usage tracking
- Operation-specific metrics

### 2. Security Measures
- Input validation and sanitization
- Rate limiting implementation
- Authentication system
- Permission management

### 3. Monitoring & Maintenance
- Real-time performance tracking
- Memory usage analysis
- Operation profiling
- Error tracking and logging

### 4. Deployment Checklist
1. **Environment Setup**
   - Configure reverse proxy
   - Set up monitoring
   - Initialize logging system

2. **Security Configuration**
   - Enable authentication
   - Configure rate limits
   - Set up SSL/TLS

3. **Performance Tuning**
   - Optimize garbage collection
   - Configure thread pools
   - Set memory limits

4. **Monitoring Setup**
   - Configure metrics collection
   - Set up alerting
   - Enable log aggregation

## Future Enhancements

### 1. Technical Improvements
- API versioning system
- Plugin architecture
- Enhanced telemetry
- Batch processing support

### 2. Feature Additions
- Advanced trajectory analysis
- Machine learning integration
- Real-time collaboration
- Extended plugin support

### 3. Infrastructure
- Container orchestration
- Service mesh integration
- Distributed tracing
- Automated scaling

## Conclusion

The instruction processing pipeline provides a robust foundation for real-time code analysis and preview generation. Key strengths include:

1. **Extensibility**
   - Modular architecture
   - Plugin system
   - Custom adapters

2. **Performance**
   - Efficient processing
   - Memory optimization
   - Real-time metrics

3. **Reliability**
   - Error handling
   - Automatic recovery
   - State management

4. **Security**
   - Input validation
   - Access control
   - Secure communication

The system is designed to scale and adapt to future requirements while maintaining high performance and reliability standards.