Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 121 additions & 35 deletions packages/core/src/embedding-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ import type {

const { modelId, dimensions, vendorModel } = workerData as WorkerInitData;

/**
* Token ceiling used when retrying after an ONNX OOM. We don't pre-truncate
* every request — most texts are fine at their natural length. Instead, on
* OOM we retry with progressively halved token limits starting from this
* value. The pipeline's built-in `truncation: true` caps at model_max_length
* (8192 for Nomic v1.5), which is too high for ONNX to allocate safely.
*/
const OOM_RETRY_START_TOKENS = 4096;

/** Maximum number of OOM retry attempts (including the initial full-length
* try). On OOM the token limit halves each retry: full → 4096 → 2048 → 1024.
* Three truncated retries covers extreme cases. */
const OOM_MAX_RETRIES = 3;

// ---------------------------------------------------------------------------
// Model lifecycle — lazy init on first embed request
// ---------------------------------------------------------------------------
Expand All @@ -50,6 +64,10 @@ type FeatureExtractionPipeline = {
};

let pipe: FeatureExtractionPipeline | null = null;
let tokenizer: {
encode(text: string, options?: Record<string, unknown>): number[];
decode(ids: number[] | bigint[], options?: Record<string, unknown>): string;
} | null = null;
let layerNormFn: ((input: unknown, normalized_shape: number[]) => {
dims: number[];
data: Float32Array;
Expand Down Expand Up @@ -97,6 +115,10 @@ async function ensurePipeline(): Promise<void> {
device: "cpu",
})) as unknown as FeatureExtractionPipeline;

// Stash a reference to the pipeline's tokenizer for token-level
// truncation during OOM retries.
tokenizer = (pipe as unknown as { tokenizer: typeof tokenizer }).tokenizer;

layerNormFn = layer_norm as typeof layerNormFn;
})().catch((err) => {
initFailed = true;
Expand Down Expand Up @@ -160,6 +182,25 @@ async function drain(): Promise<void> {
// Embed processing
// ---------------------------------------------------------------------------

/**
* Truncate texts to a maximum number of tokens using the real tokenizer.
* Only texts exceeding `maxTokens` chars are checked — at worst-case
* ~1 char/token, shorter texts can never exceed the limit.
*/
function truncateTexts(texts: string[], maxTokens: number): string[] {
if (!tokenizer) return texts;
return texts.map((text) => {
if (text.length <= maxTokens) return text;
// Exclude [CLS]/[SEP] special tokens so ids.length reflects pure content
// token count — otherwise the 2 extra tokens skew the limit check.
const ids = tokenizer!.encode(text, { add_special_tokens: false });
if (ids.length <= maxTokens) return text;
return tokenizer!.decode(ids.slice(0, maxTokens), {
skip_special_tokens: true,
});
});
}

/**
* Detect ONNX runtime out-of-memory errors. The runtime throws opaque
* numeric error codes (e.g. "287180544") for allocation failures rather
Expand All @@ -174,55 +215,100 @@ function isOomError(msg: string): boolean {
return false;
}

/** Run inference on `texts` and return per-text vectors. */
async function runInference(texts: string[]): Promise<Float32Array[]> {
// Run feature extraction with mean pooling.
// truncation: true caps each text at the model's max length (8192 tokens
// for Nomic v1.5) as a last-resort safety net.
const output = await pipe!(texts, { pooling: "mean", truncation: true });

// Post-process following Nomic's recipe:
// 1. Layer normalization over the full hidden dimension
// 2. Matryoshka truncation to target dimensions
// 3. L2 normalization
const fullDim = output.dims[output.dims.length - 1]; // 768 for Nomic v1.5
const truncate = dimensions < fullDim;

let normalized: { tolist(): number[][]; data: Float32Array; dims: number[] };
if (truncate) {
// layer_norm → slice → L2 normalize
normalized = layerNormFn!(output, [fullDim])
.slice(null, [0, dimensions])
.normalize(2, -1);
} else {
// layer_norm → L2 normalize (no truncation)
normalized = layerNormFn!(output, [fullDim])
.normalize(2, -1);
}

// Extract per-text vectors from the batched tensor.
const numTexts = texts.length;
const vectors: Float32Array[] = [];
const dim = truncate ? dimensions : fullDim;

for (let i = 0; i < numTexts; i++) {
const start = i * dim;
const vec = new Float32Array(dim);
vec.set(normalized.data.subarray(start, start + dim));
vectors.push(vec);
}

return vectors;
}

async function processEmbed(req: EmbedRequest): Promise<void> {
try {
await ensurePipeline();

// Run feature extraction with mean pooling.
// truncation: true caps each text at the model's max length (8192 tokens
// for Nomic v1.5), preventing oversized inputs from causing OOM.
const output = await pipe!(req.texts, { pooling: "mean", truncation: true });

// Post-process following Nomic's recipe:
// 1. Layer normalization over the full hidden dimension
// 2. Matryoshka truncation to target dimensions
// 3. L2 normalization
const fullDim = output.dims[output.dims.length - 1]; // 768 for Nomic v1.5
const truncate = dimensions < fullDim;

let normalized: { tolist(): number[][]; data: Float32Array; dims: number[] };
if (truncate) {
// layer_norm → slice → L2 normalize
normalized = layerNormFn!(output, [fullDim])
.slice(null, [0, dimensions])
.normalize(2, -1);
} else {
// layer_norm → L2 normalize (no truncation)
normalized = layerNormFn!(output, [fullDim])
.normalize(2, -1);
}
// Try inference at full length first. On ONNX OOM, retry with
// progressively halved token limits using the real tokenizer.
// This preserves maximum semantic content for normal texts while
// handling dense-token content (code, CJK, base64) adaptively.
//
// attempt 0 = original texts (no truncation)
// attempt 1 = truncated to 4096 tokens
// attempt 2 = truncated to 2048 tokens
// attempt 3 = truncated to 1024 tokens
let texts = req.texts;
let lastError: Error | undefined;

// Extract per-text vectors from the batched tensor.
const numTexts = req.texts.length;
const vectors: Float32Array[] = [];
const dim = truncate ? dimensions : fullDim;
for (let attempt = 0; attempt <= OOM_MAX_RETRIES; attempt++) {
try {
const vectors = await runInference(texts);
post({ type: "result", id: req.id, vectors });
return;
} catch (err) {
const raw = err instanceof Error ? err.message : String(err);
if (!isOomError(raw) || !tokenizer) throw err;
lastError = err instanceof Error ? err : new Error(raw);

for (let i = 0; i < numTexts; i++) {
const start = i * dim;
const vec = new Float32Array(dim);
vec.set(normalized.data.subarray(start, start + dim));
vectors.push(vec);
// OOM — truncate texts and retry with fewer tokens.
// attempt 0 failed at full length → retry at 4096 tokens
// attempt 1 failed at 4096 → retry at 2048 tokens
// attempt 2 failed at 2048 → retry at 1024 tokens
// attempt 3 failed at 1024 → loop exits, throw below
if (attempt < OOM_MAX_RETRIES) {
const maxTokens = OOM_RETRY_START_TOKENS >> attempt; // 4096, 2048, 1024
texts = truncateTexts(req.texts, maxTokens);
console.warn(
`[lore] ONNX OOM on attempt ${attempt + 1}, retrying with ≤${maxTokens} tokens ` +
`(batch=${req.texts.length}, longest≈${Math.max(...req.texts.map((t) => t.length))} chars)`,
);
}
}
}

post({ type: "result", id: req.id, vectors });
// All retries exhausted — report the last error.
throw lastError ?? new Error("ONNX OOM retries exhausted");
} catch (err) {
// Don't re-post init-error — it was already sent in ensurePipeline().
if (!initFailed) {
const raw = err instanceof Error ? err.message : String(err);
const msg = isOomError(raw)
? `ONNX runtime out of memory (batch=${req.texts.length}, ` +
? `ONNX runtime out of memory after ${OOM_MAX_RETRIES} retries ` +
`(batch=${req.texts.length}, ` +
`longest≈${Math.max(...req.texts.map((t) => t.length))} chars). ` +
`Try reducing batch size. Raw: ${raw}`
`Raw: ${raw}`
: raw;
post({ type: "error", id: req.id, error: msg });
}
Expand Down
6 changes: 4 additions & 2 deletions packages/core/src/embedding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ const EMBED_TIMEOUT_MS = 10_000;
* Safe per-text character limit for local ONNX inference. The Nomic v1.5 model
* supports up to 8192 tokens, but ONNX runtime OOMs on inputs near that ceiling
* (error codes 284432024, 287180544, 144786472). Pre-truncating to ~4096 tokens
* worth of characters keeps the tensor well within safe allocation bounds.
* The worker's `truncation: true` remains as a safety net.
* worth of characters keeps the tensor well within safe allocation bounds for
* typical English text (~4 chars/token). For dense-token content (code, CJK,
* base64) where the ratio is lower, the worker retries with token-level
* truncation on OOM — see OOM_RETRY_START_TOKENS in embedding-worker.ts.
*/
const LOCAL_MAX_CHARS = 4096 * 4; // ~4096 tokens × ~4 chars/token

Expand Down
Loading