Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ export class Level2UXSitemapStructureHandler implements BuildHandler<string> {
);
}

// Process all sections concurrently
const modelProvider = OpenAIModelProvider.getInstance();

// Prepare all requests
const requests = sections.map((section) => ({
model: 'gpt-4o-mini',
Expand Down
2 changes: 1 addition & 1 deletion backend/src/common/model-provider/openai-model-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class OpenAIModelProvider implements IModelProvider {

const queue = new PQueue({
concurrency,
timeout: 30000, // 30 second timeout
timeout: 120000, // 120 second timeout
});

// Log queue events for monitoring
Expand Down
21 changes: 19 additions & 2 deletions llm-server/src/model/remote-model-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ export class RemoteOpenAIModelEngine implements ModelInstance {
baseURL: config.endpoint,
});

this.initializeQueue();
}

private initializeQueue() {
// Initialize queue with 30 RPS limit
this.queue = new PQueue({
intervalCap: config.rps ?? 30, // 30 requests
intervalCap: this.config.rps ?? 30, // 30 requests
interval: 1000, // per 1000ms (1 second)
carryoverConcurrencyCount: true, // Carry over pending tasks
timeout: 30000, // 30 second timeout
// FIXME: hack way to set up timeout
timeout: 120000, // 120 second timeout to accommodate longer streams
});

// Log queue events for monitoring
Expand Down Expand Up @@ -73,11 +78,17 @@ export class RemoteOpenAIModelEngine implements ModelInstance {
});

if (!result) {
this.logger.warn('Queue is closed, reinitializing queue');
this.initializeQueue();
throw new Error('Queue is closed');
}

return result;
} catch (error) {
if (error.message === 'Queue is closed') {
this.logger.warn('Reinitializing queue due to closure');
this.initializeQueue();
}
const modelError = this.createModelError(error);
this.logger.error('Error in chat:', modelError);
throw modelError;
Expand All @@ -100,6 +111,8 @@ export class RemoteOpenAIModelEngine implements ModelInstance {
);

if (!stream) {
this.logger.warn('Queue is closed, reinitializing queue');
this.initializeQueue();
throw new Error('Queue is closed');
}

Expand All @@ -108,6 +121,10 @@ export class RemoteOpenAIModelEngine implements ModelInstance {
yield chunk;
}
} catch (error) {
if (error.message === 'Queue is closed') {
this.logger.warn('Reinitializing queue due to closure');
this.initializeQueue();
}
const modelError = this.createModelError(error);
this.logger.error('Error in chatStream:', modelError);
throw modelError;
Expand Down
Loading