-
-
Couldn't load subscription status.
- Fork 6
first attempt at making a smooth stream of model output -- need to re… #329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -314,11 +314,16 @@ async function submit(formData?: FormData, skip?: boolean) { | |||||||||||||||||||||
| const streamText = createStreamableValue<string>() | ||||||||||||||||||||||
| uiStream.update(<Spinner />) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /* removal 1: | ||||||||||||||||||||||
| while ( | ||||||||||||||||||||||
| useSpecificAPI | ||||||||||||||||||||||
| ? answer.length === 0 | ||||||||||||||||||||||
| : answer.length === 0 && !errorOccurred | ||||||||||||||||||||||
| ) { | ||||||||||||||||||||||
| useSpecificAPI | ||||||||||||||||||||||
| ? answer.length === 0 | ||||||||||||||||||||||
| : answer.length === 0 && !errorOccurred | ||||||||||||||||||||||
| ) { | ||||||||||||||||||||||
| */ | ||||||||||||||||||||||
| // Split and stage producers | ||||||||||||||||||||||
| const abortController = new AbortController() | ||||||||||||||||||||||
| const stage1 = (async () => { | ||||||||||||||||||||||
| const { fullResponse, hasError, toolResponses } = await researcher( | ||||||||||||||||||||||
| currentSystemPrompt, | ||||||||||||||||||||||
| uiStream, | ||||||||||||||||||||||
|
|
@@ -329,7 +334,9 @@ async function submit(formData?: FormData, skip?: boolean) { | |||||||||||||||||||||
| answer = fullResponse | ||||||||||||||||||||||
| toolOutputs = toolResponses | ||||||||||||||||||||||
| errorOccurred = hasError | ||||||||||||||||||||||
| })() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /* removal 1: | ||||||||||||||||||||||
| if (toolOutputs.length > 0) { | ||||||||||||||||||||||
| toolOutputs.map(output => { | ||||||||||||||||||||||
| aiState.update({ | ||||||||||||||||||||||
|
|
@@ -348,6 +355,16 @@ async function submit(formData?: FormData, skip?: boolean) { | |||||||||||||||||||||
| }) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| */ | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Stage 2: cheap producer - related suggestions | ||||||||||||||||||||||
| const stage2 = (async () => { | ||||||||||||||||||||||
| try { | ||||||||||||||||||||||
| await querySuggestor(uiStream, messages) | ||||||||||||||||||||||
| } catch {} | ||||||||||||||||||||||
| })() | ||||||||||||||||||||||
|
Comment on lines
+362
to
+365
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Don’t swallow stage2 errors silently. Log at least a warning so failures aren’t invisible. - try {
- await querySuggestor(uiStream, messages)
- } catch {}
+ try {
+ await querySuggestor(uiStream, messages)
+ } catch (err) {
+ console.warn('querySuggestor failed:', err)
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
Comment on lines
+361
to
+366
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Don’t run querySuggestor twice; capture stage2 result and reuse to avoid duplicate “Related” UI and extra API calls. Stage2 already runs querySuggestor, but it’s called again later. Capture the stage2 result and only call again if stage2 failed. - const stage2 = (async () => {
- try {
- await querySuggestor(uiStream, messages)
- } catch {}
- })()
+ let relatedFromStage2: RelatedQueries | null = null
+ const stage2 = (async () => {
+ try {
+ relatedFromStage2 = await querySuggestor(uiStream, messages)
+ } catch (err) {
+ console.warn('querySuggestor failed (stage2):', err)
+ }
+ })()
await Promise.allSettled([stage1, stage2])
- if (!errorOccurred) {
- const relatedQueries = await querySuggestor(uiStream, messages)
+ if (!errorOccurred) {
+ // If stage2 already produced related queries (and UI), reuse them; otherwise fetch once.
+ const relatedQueries = relatedFromStage2 ?? await querySuggestor(uiStream, messages)
uiStream.append(
<Section title="Follow-up">
<FollowupPanel />
</Section>
)
@@
{
id: groupeId,
role: 'assistant',
content: JSON.stringify(relatedQueries),
type: 'related'
},Also applies to: 393-427 🤖 Prompt for AI Agents |
||||||||||||||||||||||
| await Promise.allSettled([stage1, stage2]) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if (useSpecificAPI && answer.length === 0) { | ||||||||||||||||||||||
| const modifiedMessages = aiState | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -80,8 +80,10 @@ Analysis & Planning | |||||||||||||||||||
| }) | ||||||||||||||||||||
| }) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Remove the spinner | ||||||||||||||||||||
| uiStream.update(null) | ||||||||||||||||||||
| // removal 1: // Remove the spinner | ||||||||||||||||||||
| // removal 1: // uiStream.update(null) | ||||||||||||||||||||
| // Append the answer section immediately to avoid gating on first token | ||||||||||||||||||||
| uiStream.append(answerSection) | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
Comment on lines
+83
to
87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clear the spinner before appending the answer section to prevent a stuck spinner. The spinner is set in app/actions.tsx via uiStream.update(). Call update(null) here to remove it once streaming begins. - // Append the answer section immediately to avoid gating on first token
- uiStream.append(answerSection)
+ // Replace spinner with the answer section once streaming starts
+ uiStream.update(null)
+ uiStream.append(answerSection)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||
| // Process the response | ||||||||||||||||||||
| const toolCalls: ToolCallPart[] = [] | ||||||||||||||||||||
|
|
@@ -90,12 +92,13 @@ Analysis & Planning | |||||||||||||||||||
| switch (delta.type) { | ||||||||||||||||||||
| case 'text-delta': | ||||||||||||||||||||
| if (delta.textDelta) { | ||||||||||||||||||||
| /* removal 1: | ||||||||||||||||||||
| // If the first text delta is available, add a UI section | ||||||||||||||||||||
| if (fullResponse.length === 0 && delta.textDelta.length > 0) { | ||||||||||||||||||||
| // Update the UI | ||||||||||||||||||||
| uiStream.update(answerSection) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| */ | ||||||||||||||||||||
| fullResponse += delta.textDelta | ||||||||||||||||||||
| streamText.update(fullResponse) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -104,10 +107,13 @@ Analysis & Planning | |||||||||||||||||||
| toolCalls.push(delta) | ||||||||||||||||||||
| break | ||||||||||||||||||||
| case 'tool-result': | ||||||||||||||||||||
| /* removal 1: | ||||||||||||||||||||
| // Append the answer section if the specific model is not used | ||||||||||||||||||||
| if (!useSpecificModel && toolResponses.length === 0 && delta.result) { | ||||||||||||||||||||
| uiStream.append(answerSection) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| // Keep answer section already appended; just collect tool outputs | ||||||||||||||||||||
| if (!delta.result) { | ||||||||||||||||||||
| hasError = true | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -129,5 +135,7 @@ Analysis & Planning | |||||||||||||||||||
| messages.push({ role: 'tool', content: toolResponses }) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Mark the answer stream as done | ||||||||||||||||||||
| streamText.done() | ||||||||||||||||||||
| return { result, fullResponse, hasError, toolResponses } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,63 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export async function runWithConcurrencyLimit<T>(limit: number, tasks: Array<() => Promise<T>>): Promise<PromiseSettledResult<T>[]> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (limit <= 0) limit = 1; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const results: PromiseSettledResult<T>[] = []; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let index = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async function worker() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while (index < tasks.length) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const current = index++; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const task = tasks[current]; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const value = await task(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| results[current] = { status: 'fulfilled', value } as PromiseFulfilledResult<T>; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (reason) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| results[current] = { status: 'rejected', reason } as PromiseRejectedResult; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const workers = Array.from({ length: Math.min(limit, tasks.length) }, () => worker()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await Promise.all(workers); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return results; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export async function withBudget<T>(options: { maxMs: number; signal?: AbortSignal }, task: () => Promise<T>): Promise<T> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { maxMs, signal } = options; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let timeoutId: ReturnType<typeof setTimeout> | null = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const timeoutPromise = new Promise<never>((_, reject) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timeoutId = setTimeout(() => reject(new Error('budget_exceeded')), maxMs); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (signal) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (signal.aborted) throw new Error('aborted'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| signal.addEventListener('abort', () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (timeoutId) clearTimeout(timeoutId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, { once: true }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return await Promise.race([task(), timeoutPromise]); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (timeoutId) clearTimeout(timeoutId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+32
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. withBudget can hang on AbortSignal; reject on abort and clear timeout. Currently abort clears the timeout but doesn’t reject, potentially hanging forever. Include an abortPromise in the race. if (signal) {
- if (signal.aborted) throw new Error('aborted');
- signal.addEventListener('abort', () => {
- if (timeoutId) clearTimeout(timeoutId);
- }, { once: true });
+ if (signal.aborted) throw new DOMException('Aborted', 'AbortError');
+ // Ensure we reject promptly on abort and clear timeout
+ // Note: listeners are one-shot
+ var abortPromise = new Promise<never>((_, reject) => {
+ signal.addEventListener(
+ 'abort',
+ () => {
+ if (timeoutId) clearTimeout(timeoutId);
+ reject(new DOMException('Aborted', 'AbortError'));
+ },
+ { once: true }
+ )
+ })
}
try {
- return await Promise.race([task(), timeoutPromise]);
+ // Include abortPromise in the race if provided
+ return await Promise.race(
+ typeof abortPromise !== 'undefined'
+ ? [task(), timeoutPromise, abortPromise]
+ : [task(), timeoutPromise]
+ );
} finally {
if (timeoutId) clearTimeout(timeoutId);
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export async function tokenGate(start: () => number, threshold: number, fn: () => Promise<void>): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (start() >= threshold) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await fn(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Poll lightly until threshold reached | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await new Promise<void>((resolve) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const id = setInterval(async () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (start() >= threshold) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| clearInterval(id); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| resolve(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, 50); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await fn(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+46
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Optional: tokenGate can wait indefinitely; consider max wait or external signal. To avoid unbounded waits, support a max wait or AbortSignal. -export async function tokenGate(start: () => number, threshold: number, fn: () => Promise<void>): Promise<void> {
+export async function tokenGate(start: () => number, threshold: number, fn: () => Promise<void>, options?: { maxMs?: number; signal?: AbortSignal }): Promise<void> {
if (start() >= threshold) {
await fn();
return;
}
// Poll lightly until threshold reached
- await new Promise<void>((resolve) => {
+ await new Promise<void>((resolve, reject) => {
const id = setInterval(async () => {
if (start() >= threshold) {
clearInterval(id);
resolve();
}
- }, 50);
+ }, 50);
+ if (options?.signal) {
+ options.signal.addEventListener('abort', () => { clearInterval(id); reject(new DOMException('Aborted', 'AbortError')) }, { once: true })
+ }
+ if (options?.maxMs) {
+ setTimeout(() => { clearInterval(id); reject(new Error('token_gate_timeout')) }, options.maxMs)
+ }
});
await fn();
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Remove or wire AbortController.
Declared but unused. Either pass its signal into a budget/timeout helper or remove it.
📝 Committable suggestion
🤖 Prompt for AI Agents