Skip to content

ComsIndeed/llm-json-stream-typescript

Repository files navigation

LLM JSON Stream

The streaming JSON parser for AI applications

npm package TypeScript License: MIT

Parse JSON reactively as LLM responses stream in. Subscribe to properties and receive values chunk-by-chunk as they're generatedβ€”no waiting for the complete response.

Live Demo Β· API Docs Β· GitHub

Table of Contents


The Problem

LLM APIs stream responses token-by-token. When the response is JSON, you get incomplete fragments:

{"title": "My Bl
{"title": "My Blog Po
{"title": "My Blog Post", "content": "This is

JSON.parse() fails on partial JSON. Your options aren't great:

Approach Problem
Wait for complete response High latency, defeats streaming
Display raw chunks Broken JSON in your UI
Build a custom parser Complex, error-prone, weeks of work

The Solution

LLM JSON Stream parses JSON character-by-character as it arrives, allowing you to subscribe to specific properties and react to their values the moment they're available.

Instead of waiting for the entire JSON response to complete, you can:

  • Display text fields progressively as they stream in
  • Add list items to your UI the instant they begin parsing
  • Await complete values for properties that need them (like IDs or flags)

Quick Start

npm install llm-json-stream
import { JsonStreamParser } from 'llm-json-stream';

// Works with any AsyncIterable<string>
// Compatible with: Node.js, Deno, Bun, browsers, Cloudflare Workers, etc.
const parser = new JsonStreamParser(llmResponseStream);

// Stream text as it types using async iteration
for await (const chunk of parser.getStringProperty('message')) {
  displayText += chunk;  // Update UI character-by-character
}

// Or get the complete value
const title = await parser.getStringProperty('title').promise;

// Clean up when done
await parser.dispose();

✨ Cross-Platform Compatibility

This library uses only async iterables (AsyncIterable<string>), making it 100% platform-agnostic:

  • βœ… Node.js - All versions with async iterator support
  • βœ… Deno - Native compatibility
  • βœ… Bun - Native compatibility
  • βœ… Browsers - Works with native Web Streams via adapters
  • βœ… Cloudflare Workers - Full support
  • βœ… Edge runtimes - Compatible with all edge computing platforms

No polyfills required! This library uses standard AsyncIterable, which is natively supported everywhere now. Unlike Node.js stream libraries that break in the browser, this works seamlessly across all platforms.


How It Works

Two APIs for Every Property

Every property gives you both an async iterator (incremental updates) and a promise (complete value):

const title = parser.getStringProperty('title');

// Async iterator - each chunk as it arrives
for await (const chunk of title) {
  console.log(chunk);
}

// Promise - the final value
const complete = await title.promise;
Use case API
Typing effect, live updates for await...of
Atomic values (IDs, flags, counts) .promise

Path Syntax

Navigate JSON with dot notation and array indices:

parser.getStringProperty('title')                    // Root property
parser.getStringProperty('user.name')                // Nested object
parser.getStringProperty('items[0].title')           // Array element
parser.getNumberProperty('data.users[2].age')        // Deep nesting

Feature Highlights

πŸ”€ Streaming Strings

Display text as the LLM generates it, creating a smooth typing effect:

for await (const chunk of parser.getStringProperty('response')) {
  displayText += chunk;
  updateUI();
}

πŸ“‹ Reactive Lists

Add items to your UI the instant parsing beginsβ€”even before their content arrives:

const listStream = parser.getArrayProperty('articles');

listStream.onElement(async (article, index) => {
  // Fires IMMEDIATELY when "[{" is detected
  addArticlePlaceholder(index);
  
  // Fill in content as it streams (cast to access nested properties)
  const mapStream = article as ObjectPropertyStream;
  for await (const chunk of mapStream.getStringProperty('title')) {
    updateArticleTitle(index, chunk);
  }
});

Traditional parsers wait for complete objects β†’ jarring UI jumps.
This approach β†’ smooth loading states that populate progressively.

πŸ—ΊοΈ Reactive Maps

Maps support an onProperty callback that fires when each property starts parsing:

const mapStream = parser.getObjectProperty('user');

mapStream.onProperty((property, key) => {
  // Fires IMMEDIATELY when a property key is discovered
  console.log(`Property "${key}" started parsing`);
  
  // Subscribe to the property value as it streams
  if (property instanceof StringPropertyStream) {
    (async () => {
      for await (const chunk of property) {
        userFields[key] = (userFields[key] || '') + chunk;
      }
    })();
  }
});

🎯 All JSON Types

parser.getStringProperty('name')      // String β†’ streams chunks
parser.getNumberProperty('age')       // Number β†’ int or double
parser.getBooleanProperty('active')   // Boolean  
parser.getNullProperty('deleted')     // Null
parser.getObjectProperty('config')       // Object β†’ Record<string, any>
parser.getArrayProperty('tags')        // Array β†’ any[]

⛓️ Flexible API

Navigate complex structures with chained access:

// Chain getters together
const user = parser.getObjectProperty('user');
const name = await user.getStringProperty('name').promise;
const email = await user.getStringProperty('email').promise;

// Or go deep in one line
const city = await parser.getStringProperty('user.address.city').promise;

🎭 Smart Casts

Handle dynamic list elements with type casts:

parser.getArrayProperty('items').onElement(async (element, index) => {
  // Cast to appropriate type to access type-specific methods
  const mapElement = element as ObjectPropertyStream;
  
  for await (const chunk of mapElement.getStringProperty('title')) {
    updateTitle(index, chunk);
  }
  
  const price = await mapElement.getNumberProperty('price').promise;
  updatePrice(index, price);
});

πŸ”„ Buffered vs Unbuffered Streams

Property streams offer two modes to handle different subscription timing scenarios:

const items = parser.getArrayProperty('items');

// Recommended: Buffered iteration (replays values to new subscribers)
for await (const snapshot of items) {
  // Will receive the LATEST state immediately, then continue with live updates
  // Safe for late subscriptions - no race conditions!
}

// Alternative: Unbuffered iteration (live only, no replay)
for await (const snapshot of items.unbuffered()) {
  // Only receives values emitted AFTER subscription
  // Use when you explicitly want live-only behavior
}
Stream Type Behavior Use Case
for await...of Replays latest value, then live Recommended β€” prevents race conditions
.unbuffered() Live values only, no replay When you need live-only behavior

Memory efficient: Maps and Lists only buffer the latest state (O(1) memory), not the full history. Strings buffer chunks for accumulation.

πŸ›‘ Yap Filter (closeOnRootComplete)

Some LLMs "yap" after the JSONβ€”adding explanatory text that can confuse downstream processing. The closeOnRootComplete option stops parsing the moment the root JSON object/array is complete:

const parser = new JsonStreamParser(llmStream, {
  closeOnRootComplete: true  // Stop after root JSON completes (default: true)
});

// Input: '{"data": 123} Hope this helps! Let me know if you need anything else.'
// Parser stops after '}' β€” the trailing text is ignored

This is especially useful when:

  • Your LLM tends to add conversational text after JSON
  • You want to minimize processing overhead
  • You're building a pipeline where only the JSON matters

Complete Example

A realistic scenario: parsing a blog post with streaming title and reactive sections.

import { JsonStreamParser, StringPropertyStream, ObjectPropertyStream } from 'llm-json-stream';

async function main() {
  // Your LLM stream (OpenAI, Claude, Gemini, etc.)
  const stream = await llm.streamChat("Generate a blog post as JSON");
  
  const parser = new JsonStreamParser(stream);
  
  // Title streams character-by-character
  (async () => {
    for await (const chunk of parser.getStringProperty('title')) {
      process.stdout.write(chunk);  // "H" "e" "l" "l" "o" " " "W" "o" "r" "l" "d"
    }
    console.log();
  })();
  
  // Sections appear the moment they start
  parser.getArrayProperty('sections').onElement(async (section, index) => {
    console.log(`Section ${index} detected!`);
    
    const sectionMap = section as ObjectPropertyStream;
    
    for await (const chunk of sectionMap.getStringProperty('heading')) {
      console.log(`  Heading chunk: ${chunk}`);
    }
    
    for await (const chunk of sectionMap.getStringProperty('body')) {
      console.log(`  Body chunk: ${chunk}`);
    }
  });
  
  // Wait for completion
  const allSections = await parser.getArrayProperty('sections').promise;
  console.log(`Done! Got ${allSections.length} sections`);
  
  await parser.dispose();
}

API Reference

Property Methods

Method Returns Description
.getStringProperty(path) StringPropertyStream Streams string chunks
.getNumberProperty(path) NumberPropertyStream Complete number value
.getBooleanProperty(path) BooleanPropertyStream Boolean value
.getNullProperty(path) NullPropertyStream Null value
.getObjectProperty(path) ObjectPropertyStream Object with nested access
.getArrayProperty(path) ArrayPropertyStream Array with element callbacks

PropertyStream Interface

// All property streams implement AsyncIterable
for await (const value of propertyStream) { ... }     // Buffered iteration
for await (const value of propertyStream.unbuffered()) { ... }  // Unbuffered

// Promise for complete value
const complete = await propertyStream.promise;

ArrayPropertyStream

listStream.onElement((element, index) => {
  // Callback when element parsing starts
});

ObjectPropertyStream

mapStream.onProperty((property, key) => {
  // Callback when property parsing starts
});

Cleanup

Always dispose the parser when you're done:

await parser.dispose();

Constructor Options

new JsonStreamParser(stream: Readable, {
  closeOnRootComplete?: boolean  // Stop parsing after root JSON completes (default: true)
});

Robustness

Battle-tested with comprehensive test coverage. Handles real-world edge cases:

Category What's Covered
Escape sequences \", \\, \n, \t, \r, \uXXXX
Unicode Emoji πŸŽ‰, CJK characters, RTL text
Numbers Scientific notation (1.5e10), negative, decimals
Whitespace Multiline JSON, arbitrary formatting
Nesting 5+ levels deep
Scale 10,000+ element arrays
Chunk boundaries Any size, splitting any token
LLM quirks Trailing commas, markdown wrappers (auto-stripped)

LLM Provider Setup

OpenAI
import OpenAI from 'openai';
import { JsonStreamParser } from 'llm-json-stream';

const openai = new OpenAI();

const response = await openai.chat.completions.create({
  model: 'gpt-4',
  messages: [{ role: 'user', content: 'Generate a JSON blog post' }],
  stream: true,
});

// Create an async generator that yields text chunks
async function* openaiStream() {
  for await (const chunk of response) {
    const content = chunk.choices[0]?.delta?.content || '';
    if (content) yield content;
  }
}

const parser = new JsonStreamParser(openaiStream());
Anthropic Claude
import Anthropic from '@anthropic-ai/sdk';
import { JsonStreamParser } from 'llm-json-stream';

const anthropic = new Anthropic();

const stream = await anthropic.messages.stream({
  model: 'claude-3-opus-20240229',
  max_tokens: 1024,
  messages: [{ role: 'user', content: 'Generate a JSON blog post' }],
});

// Create an async generator from Claude's event emitter
async function* claudeStream() {
  for await (const chunk of stream) {
    if (chunk.type === 'content_block_delta' && chunk.delta.type === 'text_delta') {
      yield chunk.delta.text;
    }
  }
}

const parser = new JsonStreamParser(claudeStream());
Google Gemini
import { GoogleGenerativeAI } from '@google/generative-ai';
import { JsonStreamParser } from 'llm-json-stream';

const genAI = new GoogleGenerativeAI(process.env.GOOGLE_API_KEY);
const model = genAI.getGenerativeModel({ model: 'gemini-pro' });

const response = await model.generateContentStream('Generate a JSON blog post');

// Create an async generator that yields text chunks
async function* geminiStream() {
  for await (const chunk of response.stream) {
    const text = chunk.text();
    if (text) yield text;
  }
}

const parser = new JsonStreamParser(geminiStream());

Architecture

This package implements a character-by-character JSON state machine with a reactive, streaming API designed specifically for handling LLM streaming responses.

Core Components

1. Parser Core

  • JsonStreamParser - Main parser class that consumes input streams
  • JsonStreamParserController - Internal coordinator for parsing operations

2. Property Streams (Public API)

  • StringPropertyStream - Streams string content chunk-by-chunk
  • NumberPropertyStream - Emits complete number values
  • BooleanPropertyStream - Emits boolean values
  • NullPropertyStream - Emits null values
  • ObjectPropertyStream - Provides access to object properties
  • ArrayPropertyStream - Provides reactive array handling with onElement callbacks

3. Property Delegates (Internal State Machine)

Delegates handle character-by-character parsing for each JSON type:

  • StringPropertyDelegate - Handles strings with escape sequences
  • NumberPropertyDelegate - Handles number parsing
  • BooleanPropertyDelegate - Handles true/false
  • NullPropertyDelegate - Handles null
  • MapPropertyDelegate - Handles object parsing
  • ListPropertyDelegate - Handles array parsing

Design Patterns

  • State Machine: Character-by-character parsing with delegates
  • Async Iterators: Modern streaming via for await...of
  • Promise-based Futures: Async access to complete values
  • Factory Pattern: Delegate creation based on first character
  • Controller Pattern: Separation of public API from internal logic

Project Structure

src/
β”œβ”€β”€ classes/
β”‚   β”œβ”€β”€ json_stream_parser.ts           # Main parser
β”‚   β”œβ”€β”€ property_stream.ts              # Public API property streams
β”‚   β”œβ”€β”€ property_stream_controller.ts   # Internal controllers
β”‚   β”œβ”€β”€ mixins.ts                       # Factory functions
β”‚   └── property_delegates/             # State machine workers
β”‚       β”œβ”€β”€ property_delegate.ts
β”‚       β”œβ”€β”€ string_property_delegate.ts
β”‚       β”œβ”€β”€ number_property_delegate.ts
β”‚       β”œβ”€β”€ boolean_property_delegate.ts
β”‚       β”œβ”€β”€ null_property_delegate.ts
β”‚       β”œβ”€β”€ map_property_delegate.ts
β”‚       └── list_property_delegate.ts
β”œβ”€β”€ utilities/
β”‚   └── stream_text_in_chunks.ts        # Test utility
└── index.ts                             # Public exports

test/
β”œβ”€β”€ properties/                          # Property-type specific tests
β”‚   β”œβ”€β”€ string_property.test.ts
β”‚   β”œβ”€β”€ number_property.test.ts
β”‚   β”œβ”€β”€ boolean_property.test.ts
β”‚   β”œβ”€β”€ null_property.test.ts
β”‚   β”œβ”€β”€ map_property.test.ts
β”‚   └── list_property.test.ts
└── [integration tests]                  # Comprehensive test suites

Development

# Install dependencies
npm install

# Build
npm run build

# Run tests
npm test

# Watch mode
npm run test:watch

Contributing

Contributions welcome!

  1. Check open issues
  2. Open an issue before major changes
  3. Run npm test before submitting
  4. Match existing code style

License

MIT β€” see LICENSE


Made for TypeScript developers building the next generation of AI-powered apps

⭐ Star Β· πŸ“¦ npm Β· πŸ› Issues

Credits

This is a TypeScript port of the Dart llm_json_stream package.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages