Skip to content

BRIC-18: Multiplex content stream with typed events (thinking/content/message)#17

Merged
Kaiohz merged 6 commits into
mainfrom
BRIC-18/multiplexed-content-stream
May 5, 2026
Merged

BRIC-18: Multiplex content stream with typed events (thinking/content/message)#17
Kaiohz merged 6 commits into
mainfrom
BRIC-18/multiplexed-content-stream

Conversation

@Kaiohz
Copy link
Copy Markdown
Contributor

@Kaiohz Kaiohz commented Apr 29, 2026

Jira

BRIC-18

Changes

  • Protocol: SSE/WebSocket now emit typed StreamEvent JSON (type: thinking|content|message) instead of raw strings
  • Detection: _classify_chunk() inspects additional_kwargs.reasoning_content (OpenAI) and additional_kwargs.type==\thinking`` (Anthropic) to separate thinking from content
  • Persistence: Message.thinking field + DB migration 004_add_thinking_column stores accumulated reasoning text
  • API: Updated AgentRunner ABC, StreamMessageUseCase, SSE/WS routes, and Postgres mapper

Tests

  • Unit tests updated and passing (241/241)
  • Ruff linter clean

@Kaiohz
Copy link
Copy Markdown
Contributor Author

Kaiohz commented Apr 29, 2026

🔍 Review BRIC-18: Multiplexed Content Stream

Score: 8/10


✅ Points positifs

  1. Architecture propre - Nouvelle entité StreamEvent bien placée dans le domaine, architecture hexagonale respectée
  2. Migration DB - Alembic propre avec upgrade/downgrade corrects
  3. Multi-provider - Détection thinking pour OpenAI (reasoning_content) et Anthropic (type=thinking)
  4. Refactoring propre - Code plus concis, suppression du boilerplate inutile
  5. Tests mis à jour - Bon coverage sur les tests existants

⚠️ Points à améliorer

1. Docstrings ABC supprimées

Les méthodes de AgentRunner avaient des docstrings utiles qui ont été retirées :

# Avant
async def stream(self, thread_id: str, message: str) -> AsyncIterator[str]:
    """Envoie un message et streame la reponse par chunks."""
    ...

# Après
async def stream(self, thread_id: str, message: str) -> AsyncIterator[StreamEvent]:
    ...

Suggestion : Remettre les docstrings pour la documentation de l'API publique.

2. Pas de tests pour _classify_chunk()

La logique de classification est critique :

  • Détecte reasoning_content (OpenAI)
  • Détecte type=thinking (Anthropic)
  • Fallback vers content

Suggestion : Ajouter des tests unitaires dédiés dans test_deep_agent_runner_stream_with_message.py :

def test_classify_chunk_openai_reasoning():
    # Test OpenAI reasoning_content detection
    
def test_classify_chunk_anthropic_thinking():
    # Test Anthropic type=thinking detection
    
def test_classify_chunk_fallback_to_content():
    # Test normal content fallback

3. Helper _safe_str() dans postgres adapter

def _safe_str(val: object) -> str | None:
    return val if isinstance(val, str) else None

Question : Y a-t-il un cas où msg.thinking n'est pas déjà str | None ? Si non, ce helper est inutile.

4. Sémantique StreamEvent.data

Le champ data contient :

  • Du JSON sérialisé pour MESSAGE
  • Du texte brut pour THINKING et CONTENT

Suggestion : Considérer deux types distincts ou documenter clairement cette différence.


📝 Suggestions additionnelles

  1. Statut DRAFT - La PR est marquée draft. Préciser ce qu'il manque pour être prête.

  2. Validation Pydantic - StreamEvent pourrait bénéficier de validators pour MESSAGE data (vérifier que c'est du JSON valide).


Verdict

Bonne PR avec un refactoring propre. Principalement besoin de :

  • Tests pour la logique de classification
  • Clarifier le statut DRAFT
  • Remettre les docstrings ABC

👍 Prêt à merger après ces ajustements mineurs.

@Kaiohz Kaiohz force-pushed the BRIC-18/multiplexed-content-stream branch from 3c7b3dc to 30902e4 Compare April 29, 2026 12:11
- Add StreamEvent entity (THINKING, CONTENT, MESSAGE)
- Add thinking field to Message entity & DB migration
- Update AgentRunner ABC to yield StreamEvent
- Classify AIMessageChunk by additional_kwargs.reasoning_content and type=thinking
- Update SSE/WS routes to emit JSON StreamEvent
- Update postgres_thread adapter to map thinking column
- Update all tests for new protocol
@Kaiohz Kaiohz force-pushed the BRIC-18/multiplexed-content-stream branch from 30902e4 to 213e228 Compare April 29, 2026 12:18
@Kaiohz Kaiohz marked this pull request as ready for review April 29, 2026 12:20
Copy link
Copy Markdown
Contributor Author

@Kaiohz Kaiohz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📋 Review de la PR: BRIC-18 - Multiplex content stream with typed events

🎯 Résumé

Cette PR introduit un système d'événements typés pour le streaming, remplaçant le système précédent qui mélangeait str et Message. Le nouveau design est plus propre et mieux typé.

✅ Points Positifs

Architecture & Design

  • Nouvelle entité StreamEvent - Design élégant avec StreamEventType (THINKING, CONTENT, MESSAGE)
  • 🧊 Entité immuable (frozen=True) - Bonne pratique pour les DTOs
  • 🗃️ Migration DB propre - Ajout de la colonne thinking dans messages
  • 📉 Réduction de complexité - Le stream handler est simplifié, plus de isinstance checks

Code Quality

  • 🧪 Tests unitaires à jour - Tous les tests ont été mis à jour pour le nouveau format
  • 📝 Code plus lisible - Suppression de la logique complexe de parsing dans les routes
  • 🎯 Séparation claire des types - THINKING, CONTENT, MESSAGE bien distincts

Implementation

  • La méthode _classify_chunk() dans DeepAgentRunner est bien pensée
  • Support des "reasoning tokens" pour les modèles extended-thinking (Claude, etc.)
  • Gestion des additional_kwargs pour les différents formats de chunk

⚠️ Points à Améliorer

Tests

  • 🔶 Manque de tests pour les events THINKING - Les tests actuels ne couvrent que CONTENT et MESSAGE. Ajouter des tests pour:

    # Test case à ajouter
    async def test_stream_with_message_yields_thinking_then_content():
        chunks = [
            (StreamEventType.THINKING, "Let me think..."),
            (StreamEventType.CONTENT, "Hello "),
            (StreamEventType.CONTENT, "world!")
        ]
        # ... assert que thinking est bien yieldé
  • 🔶 Edge cases non testés - Que se passe-t-il si un chunk est vide? Si additional_kwargs est manquant?

Code Quality

  • 🔶 Docstrings supprimées - Les docstrings de AgentRunner ont été retirées. Pour une interface publique, c'est mieux de les garder:

    # Avant (mieux)
    async def invoke(self, thread_id: str, message: str) -> Message:
        """Envoie un message et retourne la reponse complete."""
        ...
    
    # Après (moins documenté)
    async def invoke(self, thread_id: str, message: str) -> Message:
        ...
  • 🔶 Validation JSON - Message.model_validate_json(event.data) pourrait throw si le JSON est malformé. Envisager un try/except:

    elif event.type == StreamEventType.MESSAGE:
        try:
            final_message = Message.model_validate_json(event.data)
        except ValidationError:
            logger.exception("[thread=%s] Invalid MESSAGE event data", thread_id)
            raise AgentError("Invalid message format")

Minor nitpicks

  • 💡 La méthode _is_nonblank_str pourrait être remplacée par une simple condition dans _classify_chunk pour réduire l'indirection
  • 💡 Le type hint val: object est correct mais str | None serait plus idiomatique

📊 Score de Qualité: 8/10

Justification:

  • +3 Architecture propre et bien pensée (StreamEvent, enum, frozen)
  • +2 Réduction de complexité significative
  • +2 Tests unitaires mis à jour
  • +1 Migration DB propre et réversible

Déductions:

  • -1 Manque de tests pour les events THINKING
  • -1 Docstrings retirées de l'interface publique

🚀 Recommandations

  1. Ajouter des tests THINKING - Au minimum un test case pour valider le flux thinking → content → message
  2. Restaurer les docstrings - Garder la documentation de l'interface AgentRunner
  3. Ajouter validation - try/except autour de model_validate_json pour la robustesse

🟢 Verdict: APPROVED (avec suggestions)

La PR est mergable en l'état. Les suggestions ci-dessus sont des améliorations qui peuvent être faites dans des PRs suivantes si on veut itérer rapidement.


Review générée par SoluBot 🤖

Copy link
Copy Markdown
Contributor Author

@Kaiohz Kaiohz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: BRIC-18 Multiplexed Content Stream

Score: 8.5/10

Cette PR implémente une refonte propre du streaming pour supporter les événements typés (thinking/content/message). Voici mon analyse détaillée.


Points forts

Architecture (excellent)

  • Nouvelle entité domaine StreamEvent avec enum StreamEventType - respecte parfaitement l'architecture hexagonale
  • Classification propre dans _classify_chunk() avec détection OpenAI (reasoning_content) et Anthropic (type=thinking)
  • Separation of concerns claire: le domaine ne dépend pas des détails d'implementation

Persistance (très bon)

  • Migration 004_add_thinking_column propre avec upgrade/downgrade
  • Accumulation intelligente des thinking tokens dans thinking_parts avant persistence
  • Le champ Message.thinking est nullable - backward compatible

Tests (excellent)

  • Tous les tests adaptés au nouveau format StreamEvent
  • Coverage correcte des cas: content seul, thinking+content, tool_calls, structured_response, HITL interrupt
  • Mock _make_streaming_graph bien refactorisé

Documentation (bon)

  • README mis à jour avec le nouveau format SSE/WebSocket
  • Table explicite des types avec colonne Persisted?
  • Exemples de code client mis à jour

Points à améliorer

1. Logs de debug manquants dans _classify_chunk

La méthode _classify_chunk fait des choix importants (thinking vs content) mais ne loggue rien. Ajouter des logs de debug faciliterait le troubleshooting:

@staticmethod
def _classify_chunk(chunk) -> tuple[StreamEventType, str] | None:
    if chunk.type != "AIMessageChunk":
        return None
    additional = getattr(chunk, "additional_kwargs", {})
    reasoning = additional.get("reasoning_content")
    if DeepAgentRunner._is_nonblank_str(reasoning):
        logger.debug("[classify] Detected OpenAI reasoning_content (%d chars)", len(reasoning))
        return (StreamEventType.THINKING, reasoning)
    # ...

2. Helper _is_nonblank_str à partager

Cette méthode utilitaire pourrait être dans un module src/domain/utils/strings.py pour être réutilisée ailleurs.

3. Documentation format thinking

Le README montre data: {"type":"thinking","data":"..."} mais ne précise pas:

  • Est-ce que data contient le texte brut ou du JSON?
  • Comment accumuler les thinking events côté client?

Un petit paragraphe example aurait été utile.

4. Variables unused _tool_call_id

Les paramètres approve_hitl(thread_id, _tool_call_id) et reject_hitl utilisent l'underscore pour ignorer le paramètre. C'est un pattern valide, mais un commentaire aurait clarifié.


Suggestions mineures

  • Type alias: StreamIterator = AsyncIterator[StreamEvent] pourrait éviter la répétition
  • Error handling: Le raise RuntimeError dans stream_message.py est bien, mais pourrait être un AgentError custom pour consistency

Verdict

Ready to merge avec des améliorations optionnelles.

L'architecture est solide, le code est propre, les tests passent. Les points ci-dessus sont des améliorations de qualité, pas des blockers.

✅ Approval recommandée.


Review postée par SoluBot - Assistant SoluDevTech

Copy link
Copy Markdown
Contributor Author

@Kaiohz Kaiohz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: BRIC-18 Multiplexed Content Stream

Score: 8.5/10

Cette PR implémente une refonte propre du streaming pour supporter les événements typés (thinking/content/message). Voici mon analyse détaillée.


Points forts

Architecture (excellent)

  • Nouvelle entité domaine StreamEvent avec enum StreamEventType - respecte parfaitement l'architecture hexagonale
  • Classification propre dans _classify_chunk() avec détection OpenAI (reasoning_content) et Anthropic (type=thinking)
  • Separation of concerns claire: le domaine ne dépend pas des détails d'implémentation

Persistance (très bon)

  • Migration 004_add_thinking_column propre avec upgrade/downgrade
  • Accumulation intelligente des thinking tokens dans thinking_parts avant persistence
  • Champ Message.thinking nullable - backward compatible

Tests (excellent)

  • Tous les tests adaptés au nouveau format StreamEvent
  • Coverage correcte des cas: content seul, thinking+content, tool_calls, structured_response, HITL interrupt
  • Mock _make_streaming_graph bien refactorisé

Documentation (bon)

  • README mis à jour avec le nouveau format SSE/WebSocket
  • Table explicite des types avec colonne "Persisted?"
  • Exemples de code client mis à jour

Points à améliorer

1. Logs de debug manquants dans _classify_chunk

La méthode _classify_chunk() fait des choix importants (thinking vs content) mais ne loggue rien. Ajouter des logs de debug faciliterait le troubleshooting:

@staticmethod
def _classify_chunk(chunk) -> tuple[StreamEventType, str] | None:
    if chunk.type != "AIMessageChunk":
        return None
    additional = getattr(chunk, "additional_kwargs", {})
    reasoning = additional.get("reasoning_content")
    if DeepAgentRunner._is_nonblank_str(reasoning):
        logger.debug("[classify] Detected OpenAI reasoning_content (%d chars)", len(reasoning))
        return (StreamEventType.THINKING, reasoning)
    # ...

2. Helper _is_nonblank_str à partager

Cette méthode utilitaire pourrait être dans un module src/domain/utils/strings.py pour être réutilisée ailleurs.

3. Documentation format thinking côté client

Le README montre data: {"type":"thinking","data":"..."} mais ne précise pas:

  • Est-ce que data contient le texte brut ou du JSON?
  • Comment accumuler les thinking events côté client?

Un petit paragraphe example aurait été utile.

4. Variables unused _tool_call_id

Les paramètres approve_hitl(thread_id, _tool_call_id) et reject_hitl utilisent l'underscore pour ignorer le paramètre. C'est un pattern valide, mais un commentaire aurait clarifié.


Suggestions mineures

  • Type alias: StreamIterator = AsyncIterator[StreamEvent] pourrait éviter la répétition
  • Error handling: Le raise RuntimeError dans stream_message.py est bien, mais pourrait être un AgentError custom pour consistency

Verdict

Approval recommandée.

L'architecture est solide, le code est propre, les tests passent. Les points ci-dessus sont des améliorations de qualité, pas des blockers.

Ready to merge!


Review postée par SoluBot - Assistant SoluDevTech

Add `STRUCTURED` stream event type and update test to handle
structured events when locating the message event in stream output.
Simplify root logging configuration in `main.py`.
Copy link
Copy Markdown
Contributor Author

@Kaiohz Kaiohz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: BRIC-18 - Multiplexed Content Stream

📊 Score: 8.5/10


✅ Points forts

1. Architecture propre

  • La nouvelle entité StreamEvent avec StreamEventType enum apporte de la clarté et de la sécurité de type
  • Séparation claire entre THINKING, CONTENT, MESSAGE, STRUCTURED, ERROR
  • Le protocole SSE/WebSocket devient cohérent et facile à consommer côté client

2. Support thinking natif

  • _classify_chunk() détecte correctement les tokens de raisonnement OpenAI (reasoning_content) et Anthropic (type: thinking)
  • Accumulation des thinking tokens pendant le streaming
  • Persistance en DB via la colonne Message.thinking

3. Logger standardization

  • Migration de logging.getLogger("composable-agents") vers logging.getLogger(__name__) — bonne pratique Python
  • Logging simplifié dans main.py

4. Error handling amélioré

  • StreamEvent(type=ERROR, data=str(exc)) pour des erreurs structurées
  • Proper asyncio.CancelledError handling dans les routes

5. Documentation à jour

  • README mis à jour avec table clair des types d'événements
  • Explication de la persistance par type

6. Tests à jour

  • Tests unitaires refactorés pour utiliser StreamEvent
  • Coverage des cas thinking + content

🔧 Suggestions d'amélioration

1. Type hints sur _classify_chunk

# Actuel
def _classify_chunk(chunk) -> tuple[StreamEventType, str] | None:

# Suggestion: typer l'argument chunk
from langchain_core.messages import AIMessageChunk

def _classify_chunk(chunk: AIMessageChunk) -> tuple[StreamEventType, str] | None:

Cela documenterait mieux le type attendu et permettrait à mypy/pyright de valider.

2. Edge case: thinking vide
Dans stream_with_message(), on accumule les thinking tokens:

thinking = "".join(thinking_parts) if thinking_parts else None

Mais si tous les thinking tokens sont des chaînes vides (whitespace), on pourrait persister une chaîne vide. Suggestion:

thinking = ("".join(thinking_parts)).strip() or None

3. Structured event emission
Dans StreamMessageUseCase.execute(), on yield un STRUCTURED event APRÈS le MESSAGE event:

if final_message and final_message.structured_response is not None:
    yield StreamEvent(type=StreamEventType.STRUCTURED, data=json.dumps(final_message.structured_response))
yield event  # MESSAGE event

L'ordre [MESSAGE, STRUCTURED] pourrait surprendre les clients. Est-ce intentionnel? Alternative:

# D'abord le message, puis le structured
yield event  # MESSAGE
if final_message.structured_response:
    yield StreamEvent(type=StreamEventType.STRUCTURED, ...)

4. Documentation API client
Une petite section dans le README montrant comment un client JS/Python doit parser les événements serait utile:

// Exemple client
for await (const event of stream) {
  switch (event.type) {
    case 'thinking': showReasoningPanel(event.data); break;
    case 'content': appendToChatBubble(event.data); break;
    case 'message': finalizeMessage(JSON.parse(event.data)); break;
  }
}

🧪 Tests

  • 241/241 passing ✓
  • Ruff linter clean ✓

📝 Conclusion

Excellente PR qui standardise le protocole de streaming et ajoute le support natif des modèles "extended-thinking". Le code est propre, bien testé, et suit les bonnes pratiques.

Le score de 8.5/10 reflète:

  • Architecture solide et cohérente
  • Support complet thinking (OpenAI + Anthropic)
  • Documentation et tests à jour
  • Quelques mineurs améliorations possibles sur les edge cases et la documentation client

Recommandation: APPROVE avec les suggestions optionnelles ci-dessus. Les améliorations peuvent être adressées dans une PR suivante si priorité.

@Kaiohz Kaiohz merged commit 285359e into main May 5, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant