Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ local_cache/
*.db-wal
*.db-shm

# TypeScript declaration artifacts (generated by tsc/build)
packages/*/src/**/*.d.ts
packages/*/src/**/*.d.ts.map


# Local plans and agent config
.plans/
Expand Down
3 changes: 3 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions packages/core/src/distillation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ function splitSegments(
const totalTokens = messages.reduce((s, m) => s + m.tokens, 0);
if (totalTokens <= maxTokens) return [messages];

// Cannot subdivide a single message — yield as-is (oversized but indivisible).
// Prevents infinite recursion when one message exceeds maxTokens (e.g., a 50KB+
// tool output where Math.ceil(content.length / 3) > 16384).
if (messages.length <= 1) return [messages];

// Find the split point: prefer the largest time gap if it's significant
const splitIdx = findSplitIndex(messages, maxTokens);

Expand Down
30 changes: 30 additions & 0 deletions packages/core/test/distillation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,36 @@ describe("detectSegments", () => {
expect(result[0]).toHaveLength(10);
expect(result[1]).toHaveLength(10);
});

test("single oversized message yields one segment without infinite recursion", () => {
// One message with 20000 tokens > maxTokens of 16384.
// Previously caused RangeError: Maximum call stack size exceeded.
const messages = msgs(1, 20000);
const result = detectSegments(messages, 16384);
expect(result).toHaveLength(1);
expect(result[0]).toHaveLength(1);
expect(result[0][0].tokens).toBe(20000);
});

test("oversized message among normal messages splits without crashing", () => {
// 3 messages: 100 + 20000 + 100 = 20200 tokens, maxTokens = 16384
const normal1 = msgs(1, 100);
const oversized = msgs(1, 20000).map((m) => ({
...m,
id: "seg-msg-oversized",
created_at: T + 5000,
}));
const normal2 = msgs(1, 100).map((m) => ({
...m,
id: "seg-msg-trailing",
created_at: T + 10000,
}));
const messages = [...normal1, ...oversized, ...normal2];
const result = detectSegments(messages, 16384);
// Should not crash; all messages must be present in output
const flat = result.flat();
expect(flat).toHaveLength(3);
});
});

// ─── r_compression / c_norm DB columns ──────────────────────────────────────
Expand Down
4 changes: 3 additions & 1 deletion packages/gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
"build:binary": "bun run script/build.ts --binary",
"start": "bun run src/index.ts"
},
"dependencies": {},
"dependencies": {
"p-limit": "7"
},
"files": [
"dist/bin.cjs",
"dist/embedding-worker.cjs",
Expand Down
152 changes: 152 additions & 0 deletions packages/gateway/src/background-limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/**
* Global concurrency limiter for background LLM work.
*
* Wraps fire-and-forget background LLM calls (idle distillation,
* curation, pipeline-triggered incremental distillation) through a
* single p-limit(2) so at most 2 background LLM operations run
* concurrently across all sessions.
*
* Note: auto-import extraction (`import-auto.ts`) is NOT wrapped here —
* it creates its own LLM client and runs sequentially per-process.
* The circuit breaker still provides protection for import because
* `tripCircuitBreaker` is called from `llm-adapter.ts` on any
* non-urgent 429, and import uses the same adapter.
*
* Also provides a circuit breaker that trips on upstream 429 responses,
* pausing all background work for the Retry-After period. This prevents
* cascading retries from consuming the rate limit budget that
* conversation turns need.
*/

import pLimit from "p-limit";
import { log } from "@loreai/core";

/** Global concurrency cap for background (non-urgent) LLM work. */
const BACKGROUND_CONCURRENCY = 2;

const limiter = pLimit(BACKGROUND_CONCURRENCY);

// ---------------------------------------------------------------------------
// Circuit breaker
// ---------------------------------------------------------------------------

/** Timestamp (ms) when the circuit breaker expires. 0 = not tripped. */
let circuitOpenUntil = 0;

/** Default pause duration when Retry-After header is absent (seconds). */
const DEFAULT_PAUSE_SECONDS = 60;

/**
* Check if the circuit breaker is currently tripped.
* Background work should check this before submitting to the limiter.
*/
export function isBackgroundPaused(): boolean {
if (circuitOpenUntil === 0) return false;
if (Date.now() >= circuitOpenUntil) {
circuitOpenUntil = 0; // auto-reset
return false;
}
return true;
}

/**
* Trip the circuit breaker. Called when any background LLM call
* receives a 429. Pauses all background work for `pauseSeconds`.
*
* @param pauseSeconds Duration to pause. If 0 or omitted, uses default (60s).
*/
export function tripCircuitBreaker(pauseSeconds?: number): void {
const duration =
pauseSeconds && pauseSeconds > 0 ? pauseSeconds : DEFAULT_PAUSE_SECONDS;
const until = Date.now() + duration * 1000;
// Only extend, never shorten an active pause
if (until > circuitOpenUntil) {
circuitOpenUntil = until;
log.warn(
`background circuit breaker tripped: pausing all background work for ${duration}s`,
);
}
}

/**
* Get remaining pause time in seconds (for diagnostics/logging).
* Returns 0 if not paused.
*/
export function remainingPauseSeconds(): number {
if (!isBackgroundPaused()) return 0;
return Math.ceil((circuitOpenUntil - Date.now()) / 1000);
}

// ---------------------------------------------------------------------------
// Limiter
// ---------------------------------------------------------------------------

/**
* Run a background task through the global concurrency limiter.
*
* If the circuit breaker is tripped, the task is skipped (returns undefined).
* Otherwise, the task is queued behind the p-limit(2) gate.
*
* The circuit breaker is checked both at submission time (fast rejection)
* and again when the task reaches the front of the queue (in case the
* breaker tripped while the task was waiting).
*
* @param fn Async function to execute
* @param label Human-readable label for logging (e.g., "idle session=abc")
* @returns The function's return value, or undefined if skipped
*/
export async function runBackground<T>(
fn: () => Promise<T>,
label?: string,
): Promise<T | undefined> {
if (isBackgroundPaused()) {
if (label) {
log.info(
`background work skipped (circuit breaker, ${remainingPauseSeconds()}s remaining): ${label}`,
);
}
return undefined;
}
return limiter(async () => {
// Re-check after waiting in the queue — the breaker may have tripped
// while this task was pending behind other in-flight work.
if (isBackgroundPaused()) {
if (label) {
log.info(
`background work skipped at execution (circuit breaker, ${remainingPauseSeconds()}s remaining): ${label}`,
);
}
return undefined as T | undefined;
}
return fn();
});
}

/**
* Current limiter stats (for diagnostics).
*/
export function backgroundLimiterStats(): {
activeCount: number;
pendingCount: number;
paused: boolean;
pauseRemainingSeconds: number;
} {
return {
activeCount: limiter.activeCount,
pendingCount: limiter.pendingCount,
paused: isBackgroundPaused(),
pauseRemainingSeconds: remainingPauseSeconds(),
};
}

/**
* Reset all state (for tests).
*
* Note: `clearQueue()` only removes pending (queued) tasks — up to 2
* in-flight tasks will continue to completion. Pending tasks resolve
* as `undefined`, consistent with the circuit breaker skip behavior.
*/
export function resetBackgroundLimiter(): void {
limiter.clearQueue();
circuitOpenUntil = 0;
}
6 changes: 5 additions & 1 deletion packages/gateway/src/idle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
MIN_TURNS_FOR_WARMING,
} from "./cache-warmer";
import * as Sentry from "@sentry/bun";
import { runBackground } from "./background-limiter";
import { emitWarmupMetric, emitSessionCostMetrics, emitCurationMetrics } from "./sentry";
import { getSessionCosts, totalWorkerCost } from "./cost-tracker";

Expand Down Expand Up @@ -80,7 +81,10 @@ export function startIdleScheduler(
if (now - state.lastRequestTime < timeoutMs) continue;

inProgress.add(sessionID);
doIdleWork(sessionID, state)
runBackground(
() => doIdleWork(sessionID, state),
`idle session=${sessionID.slice(0, 16)}`,
)
.catch((e) => log.error(`idle work failed for session ${sessionID}:`, e))
.finally(() => inProgress.delete(sessionID));
}
Expand Down
11 changes: 11 additions & 0 deletions packages/gateway/src/llm-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { log } from "@loreai/core";
import * as Sentry from "@sentry/bun";
import type { AuthCredential } from "./auth";
import { authHeaders } from "./auth";
import { tripCircuitBreaker } from "./background-limiter";
import { buildBillingBlock, signBody } from "./cch";
import {
setGenAiUsageAttributes,
Expand Down Expand Up @@ -446,6 +447,16 @@ export function createGatewayLLMClient(
}

// Transient error — retry if attempts remain
// Trip the global circuit breaker on non-urgent 429s so other
// background work pauses instead of piling on more requests.
if (response.status === 429 && !urgent) {
const cbRetryAfter = parseRetryAfter(response);
const pauseSec = cbRetryAfter
? Math.ceil(cbRetryAfter / 1000)
: undefined;
tripCircuitBreaker(pauseSec);
}

const maxRetries = maxRetriesFor(response.status, urgent);
if (attempt < maxRetries) {
const retryAfter = parseRetryAfter(response);
Expand Down
19 changes: 13 additions & 6 deletions packages/gateway/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ import {
} from "./temporal-adapter";
import { createGatewayLLMClient } from "./llm-adapter";
import { createBatchLLMClient } from "./batch-queue";
import { runBackground, resetBackgroundLimiter } from "./background-limiter";
import {
extractAuth,
authFingerprint,
Expand Down Expand Up @@ -220,6 +221,7 @@ export async function resetPipelineState(): Promise<void> {
}
lastSeenSessionModel = null;
resetWorkerModelState();
resetBackgroundLimiter();
}

/** Per-session state tracked across requests. */
Expand Down Expand Up @@ -2025,9 +2027,10 @@ function scheduleBackgroundWork(
log.info(
`incremental distillation: ${pendingTokens} undistilled tokens in ${sessionID.slice(0, 16)}`,
);
distillation
.run({ llm, projectPath, sessionID, model, skipMeta: true, callType: batchQueueEnabled ? "batch" : "direct" })
.catch((e) => log.error("background distillation failed:", e));
runBackground(
() => distillation.run({ llm, projectPath, sessionID, model, skipMeta: true, callType: batchQueueEnabled ? "batch" : "direct" }),
`incremental-distill session=${sessionID.slice(0, 16)}`,
).catch((e) => log.error("background distillation failed:", e));
}
}

Expand All @@ -2046,11 +2049,15 @@ function scheduleBackgroundWork(
cfg.curator.onIdle &&
sessionState.turnsSinceCuration >= effectiveAfterTurns
) {
Sentry.startSpan(
{ name: "lore.curator", op: "lore.curation", attributes: { trigger: "in-flight" } },
() => curator.run({ llm, projectPath, sessionID, model }),
runBackground(
() => Sentry.startSpan(
{ name: "lore.curator", op: "lore.curation", attributes: { trigger: "in-flight" } },
() => curator.run({ llm, projectPath, sessionID, model }),
),
`in-flight-curation session=${sessionID.slice(0, 16)}`,
)
.then((result) => {
if (!result) return; // skipped by circuit breaker
sessionState.turnsSinceCuration = 0;
saveSessionTracking(sessionID, { turnsSinceCuration: 0 });
if (result.created > 0 || result.updated > 0 || result.deleted > 0) {
Expand Down
Loading
Loading