Skip to content

DevOpRohan/llm_stream_processor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

19 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

🌊 LLM Stream Processor

PyPI Python Versions License PRs Welcome

A callback-driven, prefix-safe, lazy LLM stream sanitization library.

Real-time filtering, redaction, and control of streaming LLM outputs with sub-microsecond overhead.


✨ Features

  • πŸ”’ Prefix-Safe Pattern Matching β€” Uses Aho-Corasick automaton to ensure no partial sensitive content leaks before full match confirmation
  • ⚑ Ultra-Low Latency β€” Target <5ΞΌs per-token overhead, designed for real-time streaming
  • πŸ”„ Sync & Async Support β€” Works seamlessly with both synchronous and asynchronous LLM SDKs
  • 🎯 Flexible Actions β€” PASS, DROP, REPLACE, HALT, or CONTINUE_DROP/PASS based on pattern matches
  • πŸ“Š History Tracking β€” Optional input/output/action history for debugging and analytics
  • πŸ”Œ Runtime Updates β€” Dynamically register/deregister patterns without restarting streams

πŸ“¦ Installation

pip install llm-stream-processor

For development:

git clone https://github.com/DevOpRohan/llm_stream_processor.git
cd llm_stream_processor
pip install -e .

πŸš€ Quickstart

from stream_processor import KeywordRegistry, llm_stream_processor, replace, halt

# Create a registry and register pattern callbacks
reg = KeywordRegistry()
reg.register("secret", lambda ctx: replace("[REDACTED]"))
reg.register("STOP", halt)  # Halt stream on this keyword

@llm_stream_processor(reg, yield_mode="token")
def generate_response():
    yield "The secret password is hidden. "
    yield "Do not STOP here."
    yield "This won't be seen."

# Consume the filtered stream
for token in generate_response():
    print(token, end="", flush=True)
# Output: The [REDACTED] password is hidden. Do not 

πŸ“– API Reference

Core Classes

Class Description
KeywordRegistry Register/deregister keywords and their callbacks, compiles to Aho-Corasick automaton
StreamProcessor Low-level processor for character-by-character filtering
ActionContext Context passed to callbacks with keyword, buffer, position, and history
StreamHistory Tracks input/output/actions for debugging

Decorator

@llm_stream_processor(registry, yield_mode='token', record_history=True)
Parameter Options Description
registry KeywordRegistry Registry with registered patterns
yield_mode 'char', 'token', 'chunk:N' Output mode: per-character, per-token, or N-char chunks
record_history True/False Enable/disable history tracking

Action Helpers

Function Description
drop() Remove the matched keyword from output
replace(text) Replace matched keyword with custom text
halt() Immediately abort the stream
passthrough() Leave matched keyword unchanged (no-op)
continuous_drop() Start dropping all content until continuous_pass
continuous_pass() Resume normal output after continuous_drop

🎯 Use Cases

PII Redaction

import re
from stream_processor import KeywordRegistry, llm_stream_processor, replace

reg = KeywordRegistry()

# Redact email-like patterns (register common domains)
for domain in ["@gmail.com", "@yahoo.com", "@outlook.com"]:
    reg.register(domain, lambda ctx: replace("@[REDACTED]"))

# Redact SSN patterns
reg.register("SSN:", lambda ctx: replace("SSN: [REDACTED]"))

Content Moderation (Drop Segments)

from stream_processor import KeywordRegistry, llm_stream_processor, continuous_drop, continuous_pass

reg = KeywordRegistry()

# Drop internal "thinking" blocks
reg.register("<think>", continuous_drop)
reg.register("</think>", continuous_pass)

@llm_stream_processor(reg, yield_mode="token")
def llm_stream():
    yield "Hello! <think>internal reasoning here</think>Here's my response."

print("".join(llm_stream()))
# Output: Hello! </think>Here's my response.

Async Streaming (OpenAI Pattern)

import asyncio
from stream_processor import KeywordRegistry, llm_stream_processor, replace

reg = KeywordRegistry()
reg.register("API_KEY", lambda ctx: replace("[HIDDEN]"))

@llm_stream_processor(reg, yield_mode="token")
async def stream_chat():
    # Simulating async LLM response chunks
    chunks = ["Your ", "API_KEY ", "is safe."]
    for chunk in chunks:
        yield chunk
        await asyncio.sleep(0.1)

async def main():
    async for token in stream_chat():
        print(token, end="", flush=True)

asyncio.run(main())

πŸ—οΈ Architecture

Token Generator (sync/async)
         β”‚
         β–Ό
@llm_stream_processor    ← Decorator API
         β”‚
         β–Ό
   StreamProcessor       ← Character-level engine
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚ Aho-Corasick β”‚
   β”‚  Automaton   β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
  Lazy Buffer + Callbacks
         β”‚
         β–Ό
   Re-packer (char/token/chunk)
         β”‚
         β–Ό
      Consumer

For detailed architecture, see docs/ARCHITECTURE.md.

πŸ§ͺ Development

# Install in editable mode
pip install -e .

# Run tests
python -m pytest tests/ -v

# Run the example
python -m examples.example

πŸ“š Documentation

🀝 Contributing

Contributions are welcome! Please read our Contributing Guide for details on:

  • Code of Conduct
  • Development setup
  • Submitting pull requests

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ™ Acknowledgments

  • Aho-Corasick algorithm for efficient multi-pattern matching
  • Inspired by the need for real-time LLM output sanitization in production systems

About

No description, website, or topics provided.

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages