diff --git a/apps/server/src/persistence/Layers/SmeConversations.ts b/apps/server/src/persistence/Layers/SmeConversations.ts index 0fd5c5eb4..661d37853 100644 --- a/apps/server/src/persistence/Layers/SmeConversations.ts +++ b/apps/server/src/persistence/Layers/SmeConversations.ts @@ -28,16 +28,18 @@ const makeSmeConversationRepository = Effect.gen(function* () { execute: (row) => sql` INSERT INTO sme_conversations ( - conversation_id, project_id, title, model, + conversation_id, project_id, title, provider, auth_method, model, created_at, updated_at, deleted_at ) VALUES ( - ${row.conversationId}, ${row.projectId}, ${row.title}, ${row.model}, + ${row.conversationId}, ${row.projectId}, ${row.title}, ${row.provider}, ${row.authMethod}, ${row.model}, ${row.createdAt}, ${row.updatedAt}, ${row.deletedAt} ) ON CONFLICT (conversation_id) DO UPDATE SET title = excluded.title, + provider = excluded.provider, + auth_method = excluded.auth_method, model = excluded.model, updated_at = excluded.updated_at, deleted_at = excluded.deleted_at @@ -53,6 +55,8 @@ const makeSmeConversationRepository = Effect.gen(function* () { conversation_id AS "conversationId", project_id AS "projectId", title, + provider, + auth_method AS "authMethod", model, created_at AS "createdAt", updated_at AS "updatedAt", @@ -71,6 +75,8 @@ const makeSmeConversationRepository = Effect.gen(function* () { conversation_id AS "conversationId", project_id AS "projectId", title, + provider, + auth_method AS "authMethod", model, created_at AS "createdAt", updated_at AS "updatedAt", diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index ddc301790..7f244932f 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -31,6 +31,7 @@ import Migration0016 from "./Migrations/016_ProjectionThreadsInteractionModeChat import Migration0017 from "./Migrations/017_EnvironmentVariables.ts"; import Migration0018 from "./Migrations/018_ProjectionThreadsGithubRef.ts"; import Migration0019 from "./Migrations/019_SmeKnowledgeBase.ts"; +import Migration0020 from "./Migrations/020_SmeConversationProviderAuth.ts"; import { Effect } from "effect"; /** @@ -63,6 +64,7 @@ const loader = Migrator.fromRecord({ "17_EnvironmentVariables": Migration0017, "18_ProjectionThreadsGithubRef": Migration0018, "19_SmeKnowledgeBase": Migration0019, + "20_SmeConversationProviderAuth": Migration0020, }); /** diff --git a/apps/server/src/persistence/Migrations/020_SmeConversationProviderAuth.ts b/apps/server/src/persistence/Migrations/020_SmeConversationProviderAuth.ts new file mode 100644 index 000000000..192705fe5 --- /dev/null +++ b/apps/server/src/persistence/Migrations/020_SmeConversationProviderAuth.ts @@ -0,0 +1,35 @@ +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + ALTER TABLE sme_conversations + ADD COLUMN provider TEXT NOT NULL DEFAULT 'claudeAgent' + `.pipe(Effect.catch(() => Effect.void)); + + yield* sql` + ALTER TABLE sme_conversations + ADD COLUMN auth_method TEXT NOT NULL DEFAULT 'auto' + `.pipe(Effect.catch(() => Effect.void)); + + yield* sql` + UPDATE sme_conversations + SET provider = CASE + WHEN lower(model) LIKE 'claude-%' THEN 'claudeAgent' + WHEN lower(model) LIKE 'gpt-%' THEN 'codex' + WHEN lower(model) LIKE 'openclaw/%' OR lower(model) = 'default' THEN 'openclaw' + ELSE 'claudeAgent' + END + WHERE provider IS NULL + OR provider = '' + OR provider = 'claudeAgent' + `; + + yield* sql` + UPDATE sme_conversations + SET auth_method = 'auto' + WHERE auth_method IS NULL OR auth_method = '' + `; +}); diff --git a/apps/server/src/persistence/Services/SmeConversations.ts b/apps/server/src/persistence/Services/SmeConversations.ts index 7608c721f..84591b4de 100644 --- a/apps/server/src/persistence/Services/SmeConversations.ts +++ b/apps/server/src/persistence/Services/SmeConversations.ts @@ -5,7 +5,13 @@ * * @module SmeConversationRepository */ -import { IsoDateTime, ProjectId, SmeConversationId } from "@okcode/contracts"; +import { + IsoDateTime, + ProjectId, + ProviderKind, + SmeAuthMethod, + SmeConversationId, +} from "@okcode/contracts"; import { Option, Schema, ServiceMap } from "effect"; import type { Effect } from "effect"; @@ -15,6 +21,8 @@ export const SmeConversationRow = Schema.Struct({ conversationId: SmeConversationId, projectId: ProjectId, title: Schema.String, + provider: ProviderKind, + authMethod: SmeAuthMethod, model: Schema.String, createdAt: IsoDateTime, updatedAt: IsoDateTime, diff --git a/apps/server/src/sme/Layers/SmeChatServiceLive.test.ts b/apps/server/src/sme/Layers/SmeChatServiceLive.test.ts index a3d724886..c45980ef2 100644 --- a/apps/server/src/sme/Layers/SmeChatServiceLive.test.ts +++ b/apps/server/src/sme/Layers/SmeChatServiceLive.test.ts @@ -1,5 +1,5 @@ import { ProjectId, SmeConversationId, type EnvironmentVariableEntry } from "@okcode/contracts"; -import { Effect, Layer, Option } from "effect"; +import { Effect, Layer, Option, Stream } from "effect"; import { afterEach, describe, expect, it, vi } from "vitest"; import { @@ -21,6 +21,10 @@ import { type SmeMessageRepositoryShape, type SmeMessageRow, } from "../../persistence/Services/SmeMessages.ts"; +import { + ProviderService, + type ProviderServiceShape, +} from "../../provider/Services/ProviderService.ts"; import { SmeChatService } from "../Services/SmeChatService.ts"; import { makeSmeChatServiceLive } from "./SmeChatServiceLive.ts"; @@ -154,6 +158,21 @@ function makeMessageRepository() { return { repository, rowsByConversation }; } +function makeProviderService(): ProviderServiceShape { + return { + startSession: () => Effect.die("unexpected provider startSession"), + sendTurn: () => Effect.die("unexpected provider sendTurn"), + interruptTurn: () => Effect.void, + respondToRequest: () => Effect.void, + respondToUserInput: () => Effect.void, + stopSession: () => Effect.void, + listSessions: () => Effect.succeed([]), + getCapabilities: () => Effect.die("unexpected provider getCapabilities"), + rollbackConversation: () => Effect.void, + streamEvents: Stream.empty, + }; +} + describe("SmeChatServiceLive", () => { it("uses persisted Anthropic credentials for a successful send and stores the final reply", async () => { setAnthropicEnv({ @@ -168,6 +187,8 @@ describe("SmeChatServiceLive", () => { conversationId, projectId, title: "Architecture Q&A", + provider: "claudeAgent", + authMethod: "apiKey", model: "claude-sonnet-4-6", createdAt: "2026-01-01T00:00:00.000Z", updatedAt: "2026-01-01T00:00:00.000Z", @@ -209,6 +230,7 @@ describe("SmeChatServiceLive", () => { Layer.succeed(SmeConversationRepository, makeConversationRepository([conversationRow])), ), Layer.provideMerge(Layer.succeed(SmeMessageRepository, messageRepo)), + Layer.provideMerge(Layer.succeed(ProviderService, makeProviderService())), ); const events: Array = []; @@ -287,6 +309,8 @@ describe("SmeChatServiceLive", () => { conversationId, projectId, title: "Docs sync", + provider: "claudeAgent", + authMethod: "apiKey", model: "claude-sonnet-4-6", createdAt: "2026-01-01T00:00:00.000Z", updatedAt: "2026-01-01T00:00:00.000Z", @@ -302,6 +326,7 @@ describe("SmeChatServiceLive", () => { Layer.succeed(SmeConversationRepository, makeConversationRepository([conversationRow])), ), Layer.provideMerge(Layer.succeed(SmeMessageRepository, messageRepo)), + Layer.provideMerge(Layer.succeed(ProviderService, makeProviderService())), ); await expect( @@ -314,11 +339,15 @@ describe("SmeChatServiceLive", () => { }); }).pipe(Effect.provide(layer)), ), - ).rejects.toThrow( - "SmeChatError in sendMessage:auth: SME Chat requires ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN.", - ); + ).rejects.toThrow("SmeChatError in sendMessage:validate: Anthropic API key is missing."); expect(createClient).not.toHaveBeenCalled(); - expect(rowsByConversation.get(conversationId) ?? []).toEqual([]); + expect(rowsByConversation.get(conversationId)).toEqual([ + expect.objectContaining({ + role: "user", + text: "Can you summarize the docs?", + isStreaming: false, + }), + ]); }); }); diff --git a/apps/server/src/sme/Layers/SmeChatServiceLive.ts b/apps/server/src/sme/Layers/SmeChatServiceLive.ts index 152946252..4e6271151 100644 --- a/apps/server/src/sme/Layers/SmeChatServiceLive.ts +++ b/apps/server/src/sme/Layers/SmeChatServiceLive.ts @@ -1,26 +1,44 @@ /** * SmeChatServiceLive - Live implementation for the SME chat service. * - * Implements document management, conversation CRUD, and message sending - * using the Anthropic Messages API for streaming completions. + * Implements document management, conversation CRUD, validation, and stateless + * provider-backed message sending for SME chat. * * @module SmeChatServiceLive */ import Anthropic from "@anthropic-ai/sdk"; -import type { SmeConversation, SmeKnowledgeDocument, SmeMessage } from "@okcode/contracts"; +import type { + SmeAuthMethod, + SmeConversation, + SmeKnowledgeDocument, + SmeMessage, +} from "@okcode/contracts"; import { + SME_MAX_CONVERSATIONS_PER_PROJECT, SME_MAX_DOCUMENT_SIZE_BYTES, SME_MAX_DOCUMENTS_PER_PROJECT, - SME_MAX_CONVERSATIONS_PER_PROJECT, } from "@okcode/contracts"; -import { compactNodeProcessEnv } from "@okcode/shared/environment"; import { DateTime, Effect, Layer, Option, Random, Ref } from "effect"; import crypto from "node:crypto"; -import { SmeKnowledgeDocumentRepository } from "../../persistence/Services/SmeKnowledgeDocuments.ts"; -import { SmeConversationRepository } from "../../persistence/Services/SmeConversations.ts"; import { EnvironmentVariables } from "../../persistence/Services/EnvironmentVariables.ts"; +import { SmeConversationRepository } from "../../persistence/Services/SmeConversations.ts"; +import { SmeKnowledgeDocumentRepository } from "../../persistence/Services/SmeKnowledgeDocuments.ts"; import { SmeMessageRepository } from "../../persistence/Services/SmeMessages.ts"; +import { ProviderService } from "../../provider/Services/ProviderService.ts"; +import { + isValidSmeAuthMethod, + validateAnthropicSetup, + validateCodexSetup, + validateOpenClawSetup, +} from "../authValidation.ts"; +import { sendSmeViaAnthropic, type ResolvedAnthropicClientOptions } from "../backends/anthropic.ts"; +import { sendSmeViaProviderRuntime } from "../backends/providerRuntime.ts"; +import { + buildSmeAnthropicMessages, + buildSmeCompiledPrompt, + buildSmeSystemPrompt, +} from "../promptBuilder.ts"; import { SmeChatError, SmeChatService, @@ -29,56 +47,70 @@ import { type AnthropicMessagesClient = Pick; -interface ResolvedAnthropicClientOptions { - readonly apiKey: string | null; - readonly authToken: string | null; - readonly baseURL?: string; -} - export interface SmeChatServiceLiveOptions { readonly createClient?: (options: ResolvedAnthropicClientOptions) => AnthropicMessagesClient; } -const SME_MISSING_ANTHROPIC_AUTH_MESSAGE = - "SME Chat requires ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN. Add one in Settings > Environment Variables (global or project), or launch OK Code with one set in the server environment."; - -function normalizeOptionalEnvValue(value: string | undefined | null): string | null { - const trimmed = value?.trim(); - return trimmed && trimmed.length > 0 ? trimmed : null; +type ActiveRequest = { + readonly interrupt: Effect.Effect; +}; + +function ensureValidConversationAuth( + provider: SmeConversation["provider"], + authMethod: SmeAuthMethod, + operation: string, +) { + return isValidSmeAuthMethod(provider, authMethod) + ? Effect.void + : Effect.fail( + new SmeChatError( + operation, + `Auth method '${authMethod}' is not valid for provider '${provider}'.`, + ), + ); } -function pickAnthropicCredential(env: Record) { - const apiKey = normalizeOptionalEnvValue(env.ANTHROPIC_API_KEY); - const authToken = normalizeOptionalEnvValue(env.ANTHROPIC_AUTH_TOKEN); - if (apiKey !== null) { - return { apiKey, authToken: null } as const; - } - if (authToken !== null) { - return { apiKey: null, authToken } as const; - } - return null; +function toConversation(row: { + readonly conversationId: string; + readonly projectId: string; + readonly title: string; + readonly provider: SmeConversation["provider"]; + readonly authMethod: SmeAuthMethod; + readonly model: string; + readonly createdAt: string; + readonly updatedAt: string; + readonly deletedAt: string | null; +}): SmeConversation { + return { + conversationId: row.conversationId as never, + projectId: row.projectId as never, + title: row.title, + provider: row.provider, + authMethod: row.authMethod, + model: row.model, + createdAt: row.createdAt as never, + updatedAt: row.updatedAt as never, + deletedAt: row.deletedAt as never, + }; } -export function resolveAnthropicClientOptions(input: { - readonly persistedEnv: Record; - readonly processEnv?: NodeJS.ProcessEnv; -}): ResolvedAnthropicClientOptions | null { - const persistedCredential = pickAnthropicCredential(input.persistedEnv); - const processEnvRecord = compactNodeProcessEnv(input.processEnv ?? process.env); - const processCredential = pickAnthropicCredential(processEnvRecord); - const baseURL = - normalizeOptionalEnvValue(input.persistedEnv.ANTHROPIC_BASE_URL) ?? - normalizeOptionalEnvValue(processEnvRecord.ANTHROPIC_BASE_URL) ?? - undefined; - - const credential = persistedCredential ?? processCredential; - if (credential === null) { - return null; - } - +function toMessage(message: { + readonly messageId: string; + readonly conversationId: string; + readonly role: SmeMessage["role"]; + readonly text: string; + readonly isStreaming: boolean; + readonly createdAt: string; + readonly updatedAt: string; +}): SmeMessage { return { - ...credential, - ...(baseURL ? { baseURL } : {}), + messageId: message.messageId as never, + conversationId: message.conversationId as never, + role: message.role, + text: message.text, + isStreaming: message.isStreaming, + createdAt: message.createdAt as never, + updatedAt: message.updatedAt as never, }; } @@ -88,13 +120,13 @@ const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => const conversationRepo = yield* SmeConversationRepository; const messageRepo = yield* SmeMessageRepository; const environmentVariables = yield* EnvironmentVariables; + const providerService = yield* ProviderService; const createClient = options.createClient ?? ((clientOptions: ResolvedAnthropicClientOptions): AnthropicMessagesClient => new Anthropic(clientOptions)); - // Track active streaming fibers per conversation for interruption - const activeStreams = yield* Ref.make(new Map()); + const activeRequests = yield* Ref.make(new Map()); const generateId = () => Effect.map( @@ -104,11 +136,75 @@ const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => const now = () => Effect.map(DateTime.now, (dt) => DateTime.formatIso(dt)); - // ── Document Operations ───────────────────────────────────────────── + const setInterrupt = (conversationId: string, interrupt: Effect.Effect) => + Ref.update(activeRequests, (map) => { + const next = new Map(map); + next.set(conversationId, { interrupt }); + return next; + }); + + const clearInterrupt = (conversationId: string) => + Ref.update(activeRequests, (map) => { + const next = new Map(map); + next.delete(conversationId); + return next; + }); + + const validateSetupForConversation = ( + conversation: Pick, + providerOptions?: Parameters[0]["providerOptions"], + ) => + Effect.gen(function* () { + yield* ensureValidConversationAuth( + conversation.provider, + conversation.authMethod, + "validateSetup", + ); + + switch (conversation.provider) { + case "claudeAgent": { + const persistedEnv = yield* environmentVariables + .resolveEnvironment({ + projectId: conversation.projectId, + }) + .pipe(Effect.mapError((e) => new SmeChatError("validateSetup", e.message))); + return validateAnthropicSetup({ + authMethod: conversation.authMethod as Extract< + SmeAuthMethod, + "auto" | "apiKey" | "authToken" + >, + persistedEnv, + processEnv: process.env, + }); + } + + case "codex": + return yield* Effect.tryPromise({ + try: () => + validateCodexSetup({ + authMethod: conversation.authMethod as Extract< + SmeAuthMethod, + "auto" | "apiKey" | "chatgpt" | "customProvider" + >, + providerOptions, + }), + catch: (cause) => + new SmeChatError("validateSetup", "Failed to validate Codex setup.", cause), + }); + + case "openclaw": + return validateOpenClawSetup({ + authMethod: conversation.authMethod as Extract< + SmeAuthMethod, + "auto" | "password" | "none" + >, + providerOptions, + }); + } + }); const uploadDocument: SmeChatServiceShape["uploadDocument"] = (input) => Effect.gen(function* () { - // Check document count limit const existing = yield* documentRepo .listByProjectId({ projectId: input.projectId }) .pipe(Effect.mapError((e) => new SmeChatError("uploadDocument", e.message))); @@ -121,7 +217,6 @@ const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => ); } - // Decode base64 content const contentBuffer = Buffer.from(input.contentBase64, "base64"); if (contentBuffer.byteLength > SME_MAX_DOCUMENT_SIZE_BYTES) { return yield* Effect.fail( @@ -134,40 +229,37 @@ const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => const contentText = contentBuffer.toString("utf-8"); const contentHash = crypto.createHash("sha256").update(contentText).digest("hex"); - const documentId = yield* generateId(); const timestamp = yield* now(); - const row = { - documentId, - projectId: input.projectId, - title: input.title, - fileName: input.fileName, - mimeType: input.mimeType, - sizeBytes: contentBuffer.byteLength, - contentText, - contentHash, - createdAt: timestamp, - updatedAt: timestamp, - deletedAt: null, - }; - yield* documentRepo - .upsert(row as any) + .upsert({ + documentId: documentId as never, + projectId: input.projectId, + title: input.title, + fileName: input.fileName, + mimeType: input.mimeType, + sizeBytes: contentBuffer.byteLength, + contentText, + contentHash, + createdAt: timestamp as never, + updatedAt: timestamp as never, + deletedAt: null, + } as any) .pipe(Effect.mapError((e) => new SmeChatError("uploadDocument", e.message))); return { - documentId, + documentId: documentId as never, projectId: input.projectId, title: input.title, fileName: input.fileName, mimeType: input.mimeType, sizeBytes: contentBuffer.byteLength, contentHash, - createdAt: timestamp, - updatedAt: timestamp, + createdAt: timestamp as never, + updatedAt: timestamp as never, deletedAt: null, - } as SmeKnowledgeDocument; + } satisfies SmeKnowledgeDocument; }); const deleteDocument: SmeChatServiceShape["deleteDocument"] = (input) => @@ -179,28 +271,25 @@ const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => documentRepo.listByProjectId({ projectId: input.projectId }).pipe( Effect.mapError((e) => new SmeChatError("listDocuments", e.message)), Effect.map((rows) => - rows.map( - (r) => - ({ - documentId: r.documentId, - projectId: r.projectId, - title: r.title, - fileName: r.fileName, - mimeType: r.mimeType, - sizeBytes: r.sizeBytes, - contentHash: r.contentHash, - createdAt: r.createdAt, - updatedAt: r.updatedAt, - deletedAt: r.deletedAt, - }) as SmeKnowledgeDocument, - ), + rows.map((row) => ({ + documentId: row.documentId, + projectId: row.projectId, + title: row.title, + fileName: row.fileName, + mimeType: row.mimeType, + sizeBytes: row.sizeBytes, + contentHash: row.contentHash, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + deletedAt: row.deletedAt, + })), ), ); - // ── Conversation Operations ─────────────────────────────────────── - const createConversation: SmeChatServiceShape["createConversation"] = (input) => Effect.gen(function* () { + yield* ensureValidConversationAuth(input.provider, input.authMethod, "createConversation"); + const existing = yield* conversationRepo .listByProjectId({ projectId: input.projectId }) .pipe(Effect.mapError((e) => new SmeChatError("createConversation", e.message))); @@ -215,14 +304,15 @@ const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => const conversationId = yield* generateId(); const timestamp = yield* now(); - const row = { - conversationId, + conversationId: conversationId as never, projectId: input.projectId, title: input.title, + provider: input.provider, + authMethod: input.authMethod, model: input.model, - createdAt: timestamp, - updatedAt: timestamp, + createdAt: timestamp as never, + updatedAt: timestamp as never, deletedAt: null, }; @@ -230,7 +320,37 @@ const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => .upsert(row as any) .pipe(Effect.mapError((e) => new SmeChatError("createConversation", e.message))); - return row as SmeConversation; + return row satisfies SmeConversation; + }); + + const updateConversation: SmeChatServiceShape["updateConversation"] = (input) => + Effect.gen(function* () { + yield* ensureValidConversationAuth(input.provider, input.authMethod, "updateConversation"); + const existing = yield* conversationRepo + .getById({ conversationId: input.conversationId }) + .pipe(Effect.mapError((e) => new SmeChatError("updateConversation", e.message))); + if (Option.isNone(existing)) { + return yield* Effect.fail( + new SmeChatError("updateConversation", "Conversation not found"), + ); + } + + const row = existing.value; + const timestamp = yield* now(); + const updated = { + ...row, + title: input.title, + provider: input.provider, + authMethod: input.authMethod, + model: input.model, + updatedAt: timestamp as never, + }; + + yield* conversationRepo + .upsert(updated as any) + .pipe(Effect.mapError((e) => new SmeChatError("updateConversation", e.message))); + + return toConversation(updated); }); const deleteConversation: SmeChatServiceShape["deleteConversation"] = (input) => @@ -241,209 +361,205 @@ const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => yield* conversationRepo .deleteById({ conversationId: input.conversationId }) .pipe(Effect.mapError((e) => new SmeChatError("deleteConversation", e.message))); + yield* clearInterrupt(input.conversationId); }); const listConversations: SmeChatServiceShape["listConversations"] = (input) => conversationRepo.listByProjectId({ projectId: input.projectId }).pipe( Effect.mapError((e) => new SmeChatError("listConversations", e.message)), - Effect.map((rows) => - rows.map( - (r) => - ({ - conversationId: r.conversationId, - projectId: r.projectId, - title: r.title, - model: r.model, - createdAt: r.createdAt, - updatedAt: r.updatedAt, - deletedAt: r.deletedAt, - }) as SmeConversation, - ), - ), + Effect.map((rows) => rows.map(toConversation)), ); const getConversation: SmeChatServiceShape["getConversation"] = (input) => Effect.gen(function* () { - const optConv = yield* conversationRepo + const conversation = yield* conversationRepo .getById({ conversationId: input.conversationId }) .pipe(Effect.mapError((e) => new SmeChatError("getConversation", e.message))); - - if (Option.isNone(optConv)) return null; - const conv = optConv.value; + if (Option.isNone(conversation)) { + return null; + } const messages = yield* messageRepo .listByConversationId({ conversationId: input.conversationId }) .pipe(Effect.mapError((e) => new SmeChatError("getConversation", e.message))); return { - conversation: { - conversationId: conv.conversationId, - projectId: conv.projectId, - title: conv.title, - model: conv.model, - createdAt: conv.createdAt, - updatedAt: conv.updatedAt, - deletedAt: conv.deletedAt, - } as SmeConversation, - messages: messages.map( - (m) => - ({ - messageId: m.messageId, - conversationId: m.conversationId, - role: m.role, - text: m.text, - isStreaming: m.isStreaming, - createdAt: m.createdAt, - updatedAt: m.updatedAt, - }) as SmeMessage, - ), + conversation: toConversation(conversation.value), + messages: messages.map((message) => toMessage(message as any)), }; }); - // ── Message Sending ─────────────────────────────────────────────── + const validateSetup: SmeChatServiceShape["validateSetup"] = (input) => + Effect.gen(function* () { + const conversation = yield* conversationRepo + .getById({ conversationId: input.conversationId }) + .pipe(Effect.mapError((e) => new SmeChatError("validateSetup", e.message))); + if (Option.isNone(conversation)) { + return yield* Effect.fail(new SmeChatError("validateSetup", "Conversation not found")); + } + return yield* validateSetupForConversation(conversation.value, input.providerOptions); + }); const sendMessage: SmeChatServiceShape["sendMessage"] = (input, onEvent) => Effect.gen(function* () { - // 1. Resolve conversation - const optConv = yield* conversationRepo + const conversation = yield* conversationRepo .getById({ conversationId: input.conversationId }) .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); - if (Option.isNone(optConv)) { + if (Option.isNone(conversation)) { return yield* Effect.fail(new SmeChatError("sendMessage", "Conversation not found")); } - const conv = optConv.value; - - // 2. Resolve Anthropic auth up front so auth failures do not persist - // speculative user/assistant messages. - const persistedEnv = yield* environmentVariables - .resolveEnvironment({ projectId: conv.projectId }) - .pipe(Effect.mapError((e) => new SmeChatError("sendMessage:env", e.message))); - const anthropicOptions = resolveAnthropicClientOptions({ - persistedEnv, - processEnv: process.env, - }); - if (anthropicOptions === null) { - return yield* Effect.fail( - new SmeChatError("sendMessage:auth", SME_MISSING_ANTHROPIC_AUTH_MESSAGE), - ); - } - const anthropic = createClient(anthropicOptions); + const conv = conversation.value; - // 3. Load knowledge documents const docs = yield* documentRepo .listByProjectId({ projectId: conv.projectId }) .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); - - // 4. Load conversation history const existingMessages = yield* messageRepo .listByConversationId({ conversationId: input.conversationId }) .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); - // 5. Persist user message - const userMessageId = yield* generateId(); const timestamp = yield* now(); + const userMessageId = yield* generateId(); + const assistantMessageId = yield* generateId(); + yield* messageRepo .upsert({ - messageId: userMessageId, + messageId: userMessageId as never, conversationId: input.conversationId, role: "user", text: input.text, isStreaming: false, - createdAt: timestamp, - updatedAt: timestamp, + createdAt: timestamp as never, + updatedAt: timestamp as never, } as any) .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); - // 6. Reserve an assistant message ID for streaming updates. We only - // persist the assistant message after a successful completion so failed - // requests do not leave behind blank "streaming" rows. - const assistantMessageId = yield* generateId(); - - // 7. Build messages array for the API - const systemPrompt = buildSystemPrompt(docs); - const apiMessages: Array<{ role: "user" | "assistant"; content: string }> = []; - for (const msg of existingMessages) { - if (msg.role === "user" || msg.role === "assistant") { - apiMessages.push({ role: msg.role as "user" | "assistant", content: msg.text }); - } + const validation = yield* validateSetupForConversation(conv, input.providerOptions); + if (!validation.ok) { + onEvent?.({ + type: "sme.message.error", + conversationId: input.conversationId, + messageId: assistantMessageId as never, + error: validation.message, + }); + return yield* Effect.fail(new SmeChatError("sendMessage:validate", validation.message)); } - apiMessages.push({ role: "user", content: input.text }); - - // 8. Stream completion via Anthropic Messages API - const abortController = new AbortController(); - yield* Ref.update(activeStreams, (map) => { - const newMap = new Map(map); - newMap.set(input.conversationId, abortController); - return newMap; + + const systemPrompt = buildSmeSystemPrompt(docs); + const promptHistory = existingMessages.map((message) => ({ + role: message.role, + text: message.text, + })); + const anthropicMessages = buildSmeAnthropicMessages({ + history: promptHistory, + userText: input.text, + }); + const compiledPrompt = buildSmeCompiledPrompt({ + docs, + history: promptHistory, + userText: input.text, }); - const fullText = yield* Effect.tryPromise({ - try: async () => { - let result = ""; - const stream = anthropic.messages.stream( - { - model: conv.model, - max_tokens: 8192, - system: systemPrompt, - messages: apiMessages, - }, - { signal: abortController.signal }, - ); - - for await (const event of stream) { - if (event.type === "content_block_delta" && event.delta.type === "text_delta") { - result += event.delta.text; - onEvent?.({ - type: "sme.message.delta", + const sendEffect = + conv.provider === "claudeAgent" + ? Effect.gen(function* () { + const persistedEnv = yield* environmentVariables + .resolveEnvironment({ + projectId: conv.projectId, + }) + .pipe(Effect.mapError((e) => new SmeChatError("sendMessage:env", e.message))); + const anthropicSetup = validateAnthropicSetup({ + authMethod: conv.authMethod as Extract< + SmeAuthMethod, + "auto" | "apiKey" | "authToken" + >, + persistedEnv, + processEnv: process.env, + }); + if (!anthropicSetup.ok || !anthropicSetup.clientOptions) { + return yield* Effect.fail( + new SmeChatError("sendMessage:validate", anthropicSetup.message), + ); + } + + const controller = new AbortController(); + yield* setInterrupt( + input.conversationId, + Effect.sync(() => { + controller.abort(); + }), + ); + return yield* sendSmeViaAnthropic({ + client: createClient(anthropicSetup.clientOptions), conversationId: input.conversationId, - messageId: assistantMessageId, - text: event.delta.text, - } as any); - } - } - return result; - }, - catch: (err) => new SmeChatError("sendMessage:stream", String(err), err), - }).pipe( - Effect.ensuring( - Ref.update(activeStreams, (map) => { - const newMap = new Map(map); - newMap.delete(input.conversationId); - return newMap; + assistantMessageId, + model: conv.model, + systemPrompt, + messages: anthropicMessages, + ...(onEvent ? { onEvent } : {}), + abortSignal: controller.signal, + }).pipe(Effect.ensuring(clearInterrupt(input.conversationId))); + }) + : sendSmeViaProviderRuntime({ + providerService, + provider: conv.provider, + conversationId: input.conversationId, + assistantMessageId, + model: conv.model, + compiledPrompt, + ...(input.providerOptions ? { providerOptions: input.providerOptions } : {}), + ...(onEvent ? { onEvent } : {}), + setInterruptEffect: (interrupt) => setInterrupt(input.conversationId, interrupt), + clearInterruptEffect: clearInterrupt(input.conversationId), + }); + + const responseText = yield* sendEffect.pipe( + Effect.mapError((cause) => + cause instanceof SmeChatError + ? cause + : new SmeChatError("sendMessage", String(cause), cause), + ), + Effect.tapError((error) => + Effect.sync(() => { + onEvent?.({ + type: "sme.message.error", + conversationId: input.conversationId, + messageId: assistantMessageId as never, + error: error.detail, + }); }), ), ); - // 9. Finalize assistant message const finalTimestamp = yield* now(); yield* messageRepo .upsert({ - messageId: assistantMessageId, + messageId: assistantMessageId as never, conversationId: input.conversationId, role: "assistant", - text: fullText, + text: responseText, isStreaming: false, - createdAt: timestamp, - updatedAt: finalTimestamp, + createdAt: timestamp as never, + updatedAt: finalTimestamp as never, } as any) .pipe(Effect.mapError((e) => new SmeChatError("sendMessage:finalize", e.message))); - // 10. Emit completion event onEvent?.({ type: "sme.message.complete", conversationId: input.conversationId, - messageId: assistantMessageId, - text: fullText, - } as any); + messageId: assistantMessageId as never, + text: responseText, + }); }); const interruptMessage: SmeChatServiceShape["interruptMessage"] = (input) => Effect.gen(function* () { - const streams = yield* Ref.get(activeStreams); - const controller = streams.get(input.conversationId); - if (controller) { - controller.abort(); + const active = yield* Ref.get(activeRequests); + const request = active.get(input.conversationId); + if (!request) { + return; } + yield* request.interrupt; + yield* clearInterrupt(input.conversationId); }); return { @@ -451,38 +567,16 @@ const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => deleteDocument, listDocuments, createConversation, + updateConversation, deleteConversation, listConversations, getConversation, + validateSetup, sendMessage, interruptMessage, } satisfies SmeChatServiceShape; }); -// ── Helpers ───────────────────────────────────────────────────────────── - -function buildSystemPrompt( - docs: ReadonlyArray<{ title: string; fileName: string; contentText: string }>, -): string { - const parts = [ - "You are a knowledgeable subject matter expert assistant. Your role is to provide clear, accurate, and helpful answers based on the reference documents provided and your general knowledge.", - "Focus on explanation, analysis, and guidance. Be conversational and thorough.", - ]; - - if (docs.length > 0) { - parts.push( - "\nThe following reference documents have been provided for this project. Use them to inform your answers when relevant:\n", - ); - for (const doc of docs) { - parts.push(``); - parts.push(doc.contentText); - parts.push("\n"); - } - } - - return parts.join("\n"); -} - export const makeSmeChatServiceLive = (options: SmeChatServiceLiveOptions = {}) => Layer.effect(SmeChatService, makeSmeChatService(options)); diff --git a/apps/server/src/sme/Services/SmeChatService.ts b/apps/server/src/sme/Services/SmeChatService.ts index d33f83802..1503ec159 100644 --- a/apps/server/src/sme/Services/SmeChatService.ts +++ b/apps/server/src/sme/Services/SmeChatService.ts @@ -19,7 +19,10 @@ import type { SmeMessage, SmeMessageEvent, SmeSendMessageInput, + SmeUpdateConversationInput, SmeUploadDocumentInput, + SmeValidateSetupInput, + SmeValidateSetupResult, } from "@okcode/contracts"; import { ServiceMap } from "effect"; import type { Effect } from "effect"; @@ -50,6 +53,10 @@ export interface SmeChatServiceShape { input: SmeCreateConversationInput, ) => Effect.Effect; + readonly updateConversation: ( + input: SmeUpdateConversationInput, + ) => Effect.Effect; + readonly deleteConversation: ( input: SmeDeleteConversationInput, ) => Effect.Effect; @@ -65,6 +72,10 @@ export interface SmeChatServiceShape { SmeChatError >; + readonly validateSetup: ( + input: SmeValidateSetupInput, + ) => Effect.Effect; + readonly sendMessage: ( input: SmeSendMessageInput, onEvent?: (event: SmeMessageEvent) => void, diff --git a/apps/server/src/sme/authValidation.ts b/apps/server/src/sme/authValidation.ts new file mode 100644 index 000000000..235cdaa5d --- /dev/null +++ b/apps/server/src/sme/authValidation.ts @@ -0,0 +1,406 @@ +import { + type SmeAuthMethod, + type SmeValidateSetupResult, + type ProviderKind, +} from "@okcode/contracts"; +import { compactNodeProcessEnv } from "@okcode/shared/environment"; +import { homedir } from "node:os"; +import { join } from "node:path"; +import { createInterface } from "node:readline"; +import { spawn } from "node:child_process"; +import { readFile } from "node:fs/promises"; + +import { + buildCodexInitializeParams, + readCodexAccountSnapshot, + type CodexAppServerStartSessionInput, +} from "../codexAppServerManager.ts"; +import type { ResolvedAnthropicClientOptions } from "./backends/anthropic.ts"; + +const OPENAI_MODEL_PROVIDERS = new Set(["openai"]); + +function normalizeOptionalValue(value: string | undefined | null): string | null { + const trimmed = value?.trim(); + return trimmed && trimmed.length > 0 ? trimmed : null; +} + +function pickAnthropicCredential( + env: Record, + authMethod: Extract, +): { + apiKey: string | null; + authToken: string | null; + resolvedAuthMethod: "apiKey" | "authToken"; +} | null { + const apiKey = normalizeOptionalValue(env.ANTHROPIC_API_KEY); + const authToken = normalizeOptionalValue(env.ANTHROPIC_AUTH_TOKEN); + + if (authMethod === "apiKey") { + return apiKey ? { apiKey, authToken: null, resolvedAuthMethod: "apiKey" } : null; + } + if (authMethod === "authToken") { + return authToken ? { apiKey: null, authToken, resolvedAuthMethod: "authToken" } : null; + } + if (apiKey) { + return { apiKey, authToken: null, resolvedAuthMethod: "apiKey" }; + } + if (authToken) { + return { apiKey: null, authToken, resolvedAuthMethod: "authToken" }; + } + return null; +} + +function anthropicBaseUrl(persistedEnv: Record, processEnv?: NodeJS.ProcessEnv) { + const processEnvRecord = compactNodeProcessEnv(processEnv ?? process.env); + return ( + normalizeOptionalValue(persistedEnv.ANTHROPIC_BASE_URL) ?? + normalizeOptionalValue(processEnvRecord.ANTHROPIC_BASE_URL) ?? + undefined + ); +} + +export function getAllowedSmeAuthMethods(provider: ProviderKind): readonly SmeAuthMethod[] { + switch (provider) { + case "claudeAgent": + return ["auto", "apiKey", "authToken"]; + case "codex": + return ["auto", "chatgpt", "apiKey", "customProvider"]; + case "openclaw": + return ["auto", "password", "none"]; + } +} + +export function getDefaultSmeAuthMethod(provider: ProviderKind): SmeAuthMethod { + switch (provider) { + case "claudeAgent": + return "apiKey"; + case "codex": + return "chatgpt"; + case "openclaw": + return "password"; + } +} + +export function isValidSmeAuthMethod(provider: ProviderKind, authMethod: SmeAuthMethod): boolean { + return getAllowedSmeAuthMethods(provider).includes(authMethod); +} + +export function validateAnthropicSetup(input: { + readonly authMethod: Extract; + readonly persistedEnv: Record; + readonly processEnv?: NodeJS.ProcessEnv; +}): SmeValidateSetupResult & { readonly clientOptions?: ResolvedAnthropicClientOptions } { + const processEnvRecord = compactNodeProcessEnv(input.processEnv ?? process.env); + const merged = { ...processEnvRecord, ...input.persistedEnv }; + const credential = pickAnthropicCredential(merged, input.authMethod); + if (!credential) { + if (input.authMethod === "authToken") { + return { + ok: false, + severity: "error", + message: + "Anthropic auth token is missing. Set ANTHROPIC_AUTH_TOKEN in project or global environment variables.", + resolvedAuthMethod: "authToken", + }; + } + if (input.authMethod === "apiKey") { + return { + ok: false, + severity: "error", + message: + "Anthropic API key is missing. Set ANTHROPIC_API_KEY in project or global environment variables.", + resolvedAuthMethod: "apiKey", + }; + } + return { + ok: false, + severity: "error", + message: + "SME Chat requires ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN. Add one in Settings > Environment Variables.", + resolvedAuthMethod: "auto", + }; + } + + return { + ok: true, + severity: "ready", + message: + credential.resolvedAuthMethod === "apiKey" + ? "Anthropic API key is configured." + : "Anthropic auth token is configured.", + resolvedAuthMethod: credential.resolvedAuthMethod, + clientOptions: (() => { + const baseURL = anthropicBaseUrl(input.persistedEnv, input.processEnv); + return { + apiKey: credential.apiKey, + authToken: credential.authToken, + ...(baseURL ? { baseURL } : {}), + }; + })(), + }; +} + +async function readCodexConfigModelProvider( + providerOptions?: CodexAppServerStartSessionInput["providerOptions"], +): Promise { + const homePath = + providerOptions?.codex?.homePath?.trim() || process.env.CODEX_HOME || join(homedir(), ".codex"); + try { + const content = await readFile(join(homePath, "config.toml"), "utf-8"); + let inTopLevel = true; + for (const line of content.split("\n")) { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith("#")) continue; + if (trimmed.startsWith("[")) { + inTopLevel = false; + continue; + } + if (!inTopLevel) continue; + const match = trimmed.match(/^model_provider\s*=\s*["']([^"']+)["']/); + if (match) return match[1]; + } + return undefined; + } catch { + return undefined; + } +} + +async function readCodexAccountType( + providerOptions?: CodexAppServerStartSessionInput["providerOptions"], +): Promise<"apiKey" | "chatgpt" | "unknown"> { + const binaryPath = providerOptions?.codex?.binaryPath?.trim() || "codex"; + const env = { + ...process.env, + ...(providerOptions?.codex?.homePath?.trim() + ? { CODEX_HOME: providerOptions.codex.homePath.trim() } + : {}), + }; + + const child = spawn(binaryPath, ["app-server"], { + env, + stdio: ["pipe", "pipe", "pipe"], + }); + + return await new Promise((resolve, reject) => { + let settled = false; + let nextId = 1; + const pending = new Map< + number, + { resolve: (value: unknown) => void; reject: (error: Error) => void } + >(); + const stdout = createInterface({ input: child.stdout }); + const stderrLines: string[] = []; + const timeout = setTimeout(() => { + cleanup(); + reject(new Error("Timed out while reading Codex account state.")); + }, 4_000); + + const cleanup = () => { + if (settled) return; + settled = true; + clearTimeout(timeout); + stdout.close(); + child.kill(); + }; + + child.stderr.on("data", (chunk) => { + stderrLines.push(chunk.toString("utf-8")); + }); + + child.on("error", (error) => { + cleanup(); + reject(error); + }); + child.on("exit", (code) => { + if (settled) return; + cleanup(); + reject( + new Error( + `Codex app-server exited before account/read completed (code ${code ?? "unknown"}). ${stderrLines.join("").trim()}`, + ), + ); + }); + + stdout.on("line", (line) => { + let parsed: { id?: number; result?: unknown; error?: { message?: string } }; + try { + parsed = JSON.parse(line); + } catch { + return; + } + if (typeof parsed.id !== "number") { + return; + } + const request = pending.get(parsed.id); + if (!request) { + return; + } + pending.delete(parsed.id); + if (parsed.error) { + request.reject(new Error(parsed.error.message ?? "JSON-RPC request failed.")); + return; + } + request.resolve(parsed.result); + }); + + const sendRequest = (method: string, params?: unknown) => + new Promise((resolveRequest, rejectRequest) => { + const id = nextId++; + pending.set(id, { resolve: resolveRequest, reject: rejectRequest }); + child.stdin.write( + `${JSON.stringify({ jsonrpc: "2.0", id, method, ...(params !== undefined ? { params } : {}) })}\n`, + ); + }); + + const sendNotification = (method: string, params?: unknown) => { + child.stdin.write( + `${JSON.stringify({ jsonrpc: "2.0", method, ...(params !== undefined ? { params } : {}) })}\n`, + ); + }; + + void (async () => { + try { + await sendRequest("initialize", buildCodexInitializeParams()); + sendNotification("initialized"); + const account = await sendRequest("account/read", {}); + const snapshot = readCodexAccountSnapshot(account); + cleanup(); + resolve(snapshot.type); + } catch (error) { + cleanup(); + reject(error); + } + })(); + }); +} + +export async function validateCodexSetup(input: { + readonly authMethod: Extract; + readonly providerOptions?: CodexAppServerStartSessionInput["providerOptions"]; +}): Promise { + const modelProvider = await readCodexConfigModelProvider(input.providerOptions); + const customProviderConfigured = + modelProvider !== undefined && !OPENAI_MODEL_PROVIDERS.has(modelProvider); + + if (input.authMethod === "customProvider") { + if (!customProviderConfigured) { + return { + ok: false, + severity: "error", + message: + "Codex custom provider mode requires a non-OpenAI `model_provider` in the Codex config.", + resolvedAuthMethod: "customProvider", + }; + } + return { + ok: true, + severity: "ready", + message: `Codex is configured to use custom model provider '${modelProvider}'.`, + resolvedAuthMethod: "customProvider", + resolvedAccountType: "unknown", + }; + } + + if (input.authMethod === "auto" && customProviderConfigured) { + return { + ok: true, + severity: "ready", + message: `Codex auto mode resolved to custom provider '${modelProvider}'.`, + resolvedAuthMethod: "customProvider", + resolvedAccountType: "unknown", + }; + } + + const accountType = await readCodexAccountType(input.providerOptions).catch( + () => "unknown" as const, + ); + const desiredAuthMethod = + input.authMethod === "auto" + ? accountType === "chatgpt" + ? "chatgpt" + : accountType === "apiKey" + ? "apiKey" + : "auto" + : input.authMethod; + + if (input.authMethod === "auto" && accountType === "unknown") { + return { + ok: false, + severity: "error", + message: + "Codex account state could not be verified. Check the Codex CLI installation and login state.", + resolvedAuthMethod: "auto", + resolvedAccountType: "unknown", + }; + } + + if (desiredAuthMethod === "chatgpt" && accountType !== "chatgpt") { + return { + ok: false, + severity: "error", + message: "Codex is not authenticated with a ChatGPT account.", + resolvedAuthMethod: "chatgpt", + resolvedAccountType: accountType, + }; + } + + if (desiredAuthMethod === "apiKey" && accountType !== "apiKey") { + return { + ok: false, + severity: "error", + message: "Codex is not configured to use API key authentication.", + resolvedAuthMethod: "apiKey", + resolvedAccountType: accountType, + }; + } + + return { + ok: true, + severity: "ready", + message: + desiredAuthMethod === "chatgpt" + ? "Codex ChatGPT authentication is configured." + : "Codex API key authentication is configured.", + resolvedAuthMethod: desiredAuthMethod === "auto" ? input.authMethod : desiredAuthMethod, + resolvedAccountType: accountType, + }; +} + +export function validateOpenClawSetup(input: { + readonly authMethod: Extract; + readonly providerOptions?: CodexAppServerStartSessionInput["providerOptions"]; +}): SmeValidateSetupResult { + const gatewayUrl = normalizeOptionalValue(input.providerOptions?.openclaw?.gatewayUrl); + const password = normalizeOptionalValue(input.providerOptions?.openclaw?.password); + + if (!gatewayUrl) { + return { + ok: false, + severity: "error", + message: "OpenClaw gateway URL is missing. Add it in Settings.", + resolvedAuthMethod: input.authMethod, + }; + } + + const resolvedAuthMethod = + input.authMethod === "auto" ? (password ? "password" : "none") : input.authMethod; + + if (resolvedAuthMethod === "password" && !password) { + return { + ok: false, + severity: "error", + message: "OpenClaw password auth is selected, but no gateway password is configured.", + resolvedAuthMethod, + }; + } + + return { + ok: true, + severity: "ready", + message: + resolvedAuthMethod === "password" + ? "OpenClaw gateway URL and password are configured." + : "OpenClaw gateway URL is configured.", + resolvedAuthMethod, + }; +} diff --git a/apps/server/src/sme/backends/anthropic.ts b/apps/server/src/sme/backends/anthropic.ts new file mode 100644 index 000000000..419e851f4 --- /dev/null +++ b/apps/server/src/sme/backends/anthropic.ts @@ -0,0 +1,56 @@ +import Anthropic from "@anthropic-ai/sdk"; +import type { SmeMessageEvent } from "@okcode/contracts"; +import { Effect } from "effect"; + +import { SmeChatError } from "../Services/SmeChatService.ts"; + +type AnthropicMessagesClient = Pick; + +export interface ResolvedAnthropicClientOptions { + readonly apiKey: string | null; + readonly authToken: string | null; + readonly baseURL?: string; +} + +export interface SendSmeViaAnthropicInput { + readonly client: AnthropicMessagesClient; + readonly conversationId: string; + readonly assistantMessageId: string; + readonly model: string; + readonly systemPrompt: string; + readonly messages: Array<{ role: "user" | "assistant"; content: string }>; + readonly onEvent?: ((event: SmeMessageEvent) => void) | undefined; + readonly abortSignal?: AbortSignal | undefined; +} + +export function sendSmeViaAnthropic(input: SendSmeViaAnthropicInput) { + return Effect.tryPromise({ + try: async () => { + let result = ""; + const stream = input.client.messages.stream( + { + model: input.model, + max_tokens: 8192, + system: input.systemPrompt, + messages: input.messages, + }, + input.abortSignal ? { signal: input.abortSignal } : undefined, + ); + + for await (const event of stream) { + if (event.type === "content_block_delta" && event.delta.type === "text_delta") { + result += event.delta.text; + input.onEvent?.({ + type: "sme.message.delta", + conversationId: input.conversationId as never, + messageId: input.assistantMessageId as never, + text: event.delta.text, + }); + } + } + + return result; + }, + catch: (cause) => new SmeChatError("sendMessage:anthropic", String(cause), cause), + }); +} diff --git a/apps/server/src/sme/backends/providerRuntime.ts b/apps/server/src/sme/backends/providerRuntime.ts new file mode 100644 index 000000000..eb5f3708c --- /dev/null +++ b/apps/server/src/sme/backends/providerRuntime.ts @@ -0,0 +1,201 @@ +import { + type ProviderKind, + type ProviderRuntimeEvent, + type ProviderStartOptions, + type SmeMessageEvent, + ThreadId, +} from "@okcode/contracts"; +import { Effect, Fiber, Ref, Stream } from "effect"; +import { randomUUID } from "node:crypto"; + +import type { ProviderServiceShape } from "../../provider/Services/ProviderService.ts"; +import { SmeChatError } from "../Services/SmeChatService.ts"; + +export const SME_CHAT_ONLY_PROVIDER_WORKFLOW_ERROR = + "Selected SME provider attempted an interactive/tool workflow; SME chat only supports direct conversational replies."; + +function isToolWorkflowItem(event: ProviderRuntimeEvent): boolean { + if ( + (event.type === "item.started" || + event.type === "item.updated" || + event.type === "item.completed") && + event.payload.itemType !== "assistant_message" && + event.payload.itemType !== "reasoning" && + event.payload.itemType !== "plan" && + event.payload.itemType !== "user_message" && + event.payload.itemType !== "unknown" + ) { + return true; + } + return false; +} + +function toRuntimeFailure( + event: Extract, +): string { + if (event.payload.errorMessage) { + return event.payload.errorMessage; + } + return `Provider turn ended in state '${event.payload.state}'.`; +} + +export interface SendSmeViaProviderRuntimeInput { + readonly providerService: ProviderServiceShape; + readonly provider: Extract; + readonly conversationId: string; + readonly assistantMessageId: string; + readonly model: string; + readonly compiledPrompt: string; + readonly providerOptions?: ProviderStartOptions | undefined; + readonly onEvent?: ((event: SmeMessageEvent) => void) | undefined; + readonly setInterruptEffect: (interrupt: Effect.Effect) => Effect.Effect; + readonly clearInterruptEffect: Effect.Effect; +} + +export function sendSmeViaProviderRuntime(input: SendSmeViaProviderRuntimeInput) { + return Effect.gen(function* () { + const threadId = ThreadId.makeUnsafe(`sme-${input.conversationId}-${randomUUID()}`); + const responseText = yield* Ref.make(""); + const completionRef = yield* Ref.make<{ + readonly done: boolean; + readonly text?: string; + readonly error?: SmeChatError; + }>({ done: false }); + + const resolveOnce = (result: { readonly text?: string; readonly error?: SmeChatError }) => + Ref.modify(completionRef, (state) => { + if (state.done) { + return [false, state] as const; + } + return [true, { done: true, ...result }] as const; + }).pipe(Effect.asVoid); + + const safeStopSession = input.providerService + .stopSession({ threadId }) + .pipe(Effect.orElseSucceed(() => undefined)); + + const streamFiber = yield* Stream.runForEach( + input.providerService.streamEvents.pipe( + Stream.filter((event) => event.threadId === threadId), + ), + (event) => + Effect.gen(function* () { + if (event.type === "content.delta" && event.payload.streamKind === "assistant_text") { + yield* Ref.update(responseText, (current) => current + event.payload.delta); + input.onEvent?.({ + type: "sme.message.delta", + conversationId: input.conversationId as never, + messageId: input.assistantMessageId as never, + text: event.payload.delta, + }); + return; + } + + if ( + event.type === "request.opened" || + event.type === "user-input.requested" || + isToolWorkflowItem(event) + ) { + yield* resolveOnce({ + error: new SmeChatError( + "sendMessage:providerRuntime", + SME_CHAT_ONLY_PROVIDER_WORKFLOW_ERROR, + ), + }); + return; + } + + if (event.type === "runtime.error") { + yield* resolveOnce({ + error: new SmeChatError("sendMessage:providerRuntime", event.payload.message), + }); + return; + } + + if (event.type === "turn.aborted") { + yield* resolveOnce({ + error: new SmeChatError("sendMessage:providerRuntime", event.payload.reason), + }); + return; + } + + if (event.type === "turn.completed") { + if (event.payload.state !== "completed") { + yield* resolveOnce({ + error: new SmeChatError("sendMessage:providerRuntime", toRuntimeFailure(event)), + }); + return; + } + const text = yield* Ref.get(responseText); + yield* resolveOnce({ text }); + } + }), + ).pipe(Effect.forkScoped); + + const startSessionInput = { + threadId, + provider: input.provider, + model: input.model, + ...(input.providerOptions ? { providerOptions: input.providerOptions } : {}), + runtimeMode: "approval-required" as const, + }; + yield* input.providerService + .startSession(threadId, startSessionInput) + .pipe( + Effect.mapError( + (cause) => new SmeChatError("sendMessage:providerRuntime", String(cause), cause), + ), + ); + + const turn = yield* input.providerService + .sendTurn({ + threadId, + input: input.compiledPrompt, + model: input.model, + interactionMode: "chat", + }) + .pipe( + Effect.mapError( + (cause) => new SmeChatError("sendMessage:providerRuntime", String(cause), cause), + ), + ); + + yield* input.setInterruptEffect( + Effect.gen(function* () { + yield* input.providerService + .interruptTurn({ threadId, turnId: turn.turnId }) + .pipe(Effect.orElseSucceed(() => undefined)); + yield* safeStopSession; + }), + ); + + const awaitCompletion = (): Effect.Effect<{ + readonly done: boolean; + readonly text?: string; + readonly error?: SmeChatError; + }> => + Ref.get(completionRef).pipe( + Effect.flatMap((result) => + result.done + ? Effect.succeed(result) + : Effect.sleep("100 millis").pipe(Effect.flatMap(() => awaitCompletion())), + ), + ); + + const settled = yield* awaitCompletion().pipe( + Effect.ensuring( + Effect.gen(function* () { + yield* input.clearInterruptEffect; + yield* Fiber.interrupt(streamFiber); + yield* safeStopSession; + }), + ), + ); + + if (settled.error) { + return yield* Effect.fail(settled.error); + } + + return settled.text ?? ""; + }).pipe(Effect.scoped); +} diff --git a/apps/server/src/sme/promptBuilder.ts b/apps/server/src/sme/promptBuilder.ts new file mode 100644 index 000000000..320e942cd --- /dev/null +++ b/apps/server/src/sme/promptBuilder.ts @@ -0,0 +1,81 @@ +interface SmePromptDocument { + readonly title: string; + readonly fileName: string; + readonly contentText: string; +} + +const SME_BASE_INSTRUCTIONS = [ + "You are a knowledgeable subject matter expert assistant.", + "Provide direct, accurate, and helpful answers grounded in the supplied project knowledge when relevant.", + "Stay conversational, explain tradeoffs clearly, and say when the reference material is incomplete.", + "Do not request approvals, tools, or interactive workflows. Reply with plain assistant text only.", +].join(" "); + +function renderDocuments(docs: ReadonlyArray): string { + if (docs.length === 0) { + return ""; + } + + return [ + "Reference documents:", + ...docs.flatMap((doc) => [ + ``, + doc.contentText, + "", + ]), + ].join("\n"); +} + +function renderHistory( + history: ReadonlyArray<{ readonly role: string; readonly text: string }>, +): string { + if (history.length === 0) { + return "Conversation history:\n"; + } + + return [ + "Conversation history:", + ...history + .filter((message) => message.role === "user" || message.role === "assistant") + .map((message) => `${message.role.toUpperCase()}: ${message.text}`), + ].join("\n"); +} + +export function buildSmeSystemPrompt(docs: ReadonlyArray): string { + const sections = [SME_BASE_INSTRUCTIONS]; + const renderedDocs = renderDocuments(docs); + if (renderedDocs.length > 0) { + sections.push(renderedDocs); + } + return sections.join("\n\n"); +} + +export function buildSmeCompiledPrompt(input: { + readonly docs: ReadonlyArray; + readonly history: ReadonlyArray<{ readonly role: string; readonly text: string }>; + readonly userText: string; +}): string { + return [ + SME_BASE_INSTRUCTIONS, + renderDocuments(input.docs), + renderHistory(input.history), + `Latest user message:\nUSER: ${input.userText}`, + "Respond with the assistant reply only.", + ] + .filter((section) => section.length > 0) + .join("\n\n"); +} + +export function buildSmeAnthropicMessages(input: { + readonly history: ReadonlyArray<{ readonly role: string; readonly text: string }>; + readonly userText: string; +}): Array<{ role: "user" | "assistant"; content: string }> { + const apiMessages: Array<{ role: "user" | "assistant"; content: string }> = []; + for (const message of input.history) { + if (message.role === "user" || message.role === "assistant") { + apiMessages.push({ role: message.role, content: message.text }); + } + } + apiMessages.push({ role: "user", content: input.userText }); + return apiMessages; +} diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts index 8697220d3..1be33c80f 100644 --- a/apps/server/src/wsServer.ts +++ b/apps/server/src/wsServer.ts @@ -1640,6 +1640,11 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< return yield* smeChatService.createConversation(body); } + case WS_METHODS.smeUpdateConversation: { + const body = stripRequestTag(request.body); + return yield* smeChatService.updateConversation(body); + } + case WS_METHODS.smeDeleteConversation: { const body = stripRequestTag(request.body); return yield* smeChatService.deleteConversation(body); @@ -1655,6 +1660,11 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< return yield* smeChatService.getConversation(body); } + case WS_METHODS.smeValidateSetup: { + const body = stripRequestTag(request.body); + return yield* smeChatService.validateSetup(body); + } + case WS_METHODS.smeSendMessage: { const body = stripRequestTag(request.body); return yield* smeChatService.sendMessage(body, (event) => { diff --git a/apps/web/src/components/sme/SmeChatShell.tsx b/apps/web/src/components/sme/SmeChatShell.tsx index 677446658..105daa705 100644 --- a/apps/web/src/components/sme/SmeChatShell.tsx +++ b/apps/web/src/components/sme/SmeChatShell.tsx @@ -30,6 +30,7 @@ export function SmeChatShell({ const appendStreamDelta = useSmeStore((s) => s.appendStreamDelta); const completeStream = useSmeStore((s) => s.completeStream); const clearStream = useSmeStore((s) => s.clearStream); + const setConversationError = useSmeStore((s) => s.setConversationError); // Load conversations and documents when project changes useEffect(() => { @@ -66,10 +67,14 @@ export function SmeChatShell({ appendStreamDelta(event.conversationId, event.messageId, event.text); } else if (event.type === "sme.message.complete") { completeStream(event.conversationId, event.messageId, event.text); + setConversationError(event.conversationId, undefined); + } else if (event.type === "sme.message.error") { + clearStream(); + setConversationError(event.conversationId, event.error); } }); return unsubscribe; - }, [appendStreamDelta, completeStream]); + }, [appendStreamDelta, clearStream, completeStream, setConversationError]); return (
diff --git a/apps/web/src/components/sme/SmeChatWorkspace.tsx b/apps/web/src/components/sme/SmeChatWorkspace.tsx index 2f888e4b4..2ad6614b3 100644 --- a/apps/web/src/components/sme/SmeChatWorkspace.tsx +++ b/apps/web/src/components/sme/SmeChatWorkspace.tsx @@ -1,11 +1,27 @@ -import { useCallback, useEffect, useRef, useState } from "react"; -import { BookOpenIcon, ArrowUpIcon, SparklesIcon } from "lucide-react"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { useQuery } from "@tanstack/react-query"; +import { + ArrowUpIcon, + BookOpenIcon, + Settings2Icon, + SparklesIcon, + RefreshCcwIcon, +} from "lucide-react"; import type { SmeConversationId, SmeMessage, SmeMessageId } from "@okcode/contracts"; +import { useNavigate } from "@tanstack/react-router"; + +import { getProviderStartOptions, useAppSettings } from "~/appSettings"; +import { ProviderHealthBanner } from "~/components/chat/ProviderHealthBanner"; +import { Button } from "~/components/ui/button"; import { ensureNativeApi } from "~/nativeApi"; import { useSmeStore } from "~/smeStore"; +import { Alert, AlertDescription, AlertTitle } from "~/components/ui/alert"; +import { serverConfigQueryOptions } from "~/lib/serverReactQuery"; import { toastManager } from "~/components/ui/toast"; +import { SmeConversationDialog } from "./SmeConversationDialog"; import { SmeMessageBubble } from "./SmeMessageBubble"; +import { SME_PROVIDER_LABELS } from "./smeConversationConfig"; const EMPTY_MESSAGES: SmeMessage[] = []; @@ -20,26 +36,73 @@ export function SmeChatWorkspace({ onToggleKnowledge, knowledgePanelOpen, }: SmeChatWorkspaceProps) { - const messages = useSmeStore((s) => - conversationId ? (s.messagesByConversation[conversationId] ?? EMPTY_MESSAGES) : EMPTY_MESSAGES, + const navigate = useNavigate(); + const { settings } = useAppSettings(); + const providerOptions = useMemo(() => getProviderStartOptions(settings), [settings]); + const conversations = useSmeStore((state) => state.conversations); + const conversation = useMemo( + () => conversations.find((item) => item.conversationId === conversationId) ?? null, + [conversationId, conversations], + ); + const messages = useSmeStore((state) => + conversationId + ? (state.messagesByConversation[conversationId] ?? EMPTY_MESSAGES) + : EMPTY_MESSAGES, + ); + const conversationError = useSmeStore((state) => + conversationId ? state.errorsByConversation[conversationId] : undefined, ); - const streamingConversationId = useSmeStore((s) => s.streamingConversationId); - const streamingMessageId = useSmeStore((s) => s.streamingMessageId); - const streamingText = useSmeStore((s) => s.streamingText); - const addUserMessage = useSmeStore((s) => s.addUserMessage); - const clearStream = useSmeStore((s) => s.clearStream); - const setMessages = useSmeStore((s) => s.setMessages); + const streamingConversationId = useSmeStore((state) => state.streamingConversationId); + const streamingMessageId = useSmeStore((state) => state.streamingMessageId); + const streamingText = useSmeStore((state) => state.streamingText); + const addUserMessage = useSmeStore((state) => state.addUserMessage); + const clearStream = useSmeStore((state) => state.clearStream); + const setMessages = useSmeStore((state) => state.setMessages); + const setConversationError = useSmeStore((state) => state.setConversationError); const [inputText, setInputText] = useState(""); const [sending, setSending] = useState(false); + const [dialogOpen, setDialogOpen] = useState(false); const messagesEndRef = useRef(null); const textareaRef = useRef(null); + const serverConfigQuery = useQuery(serverConfigQueryOptions()); + const validationQuery = useQuery({ + queryKey: [ + "sme", + "validateSetup", + conversation?.conversationId ?? null, + conversation?.provider ?? null, + conversation?.authMethod ?? null, + conversation?.model ?? null, + providerOptions ?? null, + ], + enabled: conversation !== null, + queryFn: async () => { + const api = ensureNativeApi(); + return api.sme.validateSetup({ + conversationId: conversation!.conversationId, + providerOptions, + }); + }, + }); + + const providerStatus = useMemo( + () => + conversation + ? (serverConfigQuery.data?.providers.find( + (status) => status.provider === conversation.provider, + ) ?? null) + : null, + [conversation, serverConfigQuery.data?.providers], + ); + const sendDisabled = + sending || + validationQuery.isLoading || + (validationQuery.data ? !validationQuery.data.ok : false); - // Auto-scroll to bottom when messages change useEffect(() => { messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); }, [messages, streamingText]); - // Auto-resize textarea useEffect(() => { const textarea = textareaRef.current; if (!textarea) return; @@ -48,14 +111,16 @@ export function SmeChatWorkspace({ }, [inputText]); const handleSend = useCallback(async () => { - if (!conversationId || !inputText.trim() || sending) return; + if (!conversationId || !conversation || !inputText.trim() || sendDisabled) { + return; + } const text = inputText.trim(); setInputText(""); setSending(true); + setConversationError(conversationId, undefined); const previousMessages = messages; - // Optimistically add user message addUserMessage(conversationId, { messageId: `temp-${Date.now()}` as SmeMessageId, conversationId: conversationId as SmeConversationId, @@ -68,12 +133,16 @@ export function SmeChatWorkspace({ try { const api = ensureNativeApi(); - await api.sme.sendMessage({ conversationId: conversationId as SmeConversationId, text }); + await api.sme.sendMessage({ + conversationId: conversationId as SmeConversationId, + text, + providerOptions, + }); const result = await api.sme.getConversation({ conversationId: conversationId as SmeConversationId, }); if (result) { - setMessages(conversationId, result.messages as any[]); + setMessages(conversationId, result.messages as SmeMessage[]); } } catch (error) { clearStream(); @@ -84,7 +153,7 @@ export function SmeChatWorkspace({ conversationId: conversationId as SmeConversationId, }); if (result) { - setMessages(conversationId, result.messages as any[]); + setMessages(conversationId, result.messages as SmeMessage[]); } else { setMessages(conversationId, previousMessages); } @@ -92,27 +161,40 @@ export function SmeChatWorkspace({ setMessages(conversationId, previousMessages); } + const description = error instanceof Error ? error.message : "Unknown SME Chat error."; + setConversationError(conversationId, description); toastManager.add({ type: "error", title: "SME Chat send failed", - description: error instanceof Error ? error.message : "Unknown SME Chat error.", + description, }); } finally { setSending(false); } - }, [conversationId, inputText, sending, messages, addUserMessage, clearStream, setMessages]); + }, [ + addUserMessage, + clearStream, + conversation, + conversationId, + inputText, + messages, + providerOptions, + sendDisabled, + setConversationError, + setMessages, + ]); const handleKeyDown = useCallback( - (e: React.KeyboardEvent) => { - if (e.key === "Enter" && !e.shiftKey) { - e.preventDefault(); + (event: React.KeyboardEvent) => { + if (event.key === "Enter" && !event.shiftKey) { + event.preventDefault(); void handleSend(); } }, [handleSend], ); - if (!conversationId) { + if (!conversationId || !conversation) { return (
@@ -131,27 +213,90 @@ export function SmeChatWorkspace({ return (
- {/* Minimal Header */} -
- +
+
+

{conversation.title}

+

+ {SME_PROVIDER_LABELS[conversation.provider]} · {conversation.authMethod} ·{" "} + {conversation.model} +

+
+
+ + +
+
+ +
+ + {validationQuery.data ? ( +
+ + + {validationQuery.data.ok ? "Provider ready" : "Provider setup required"} + + + {validationQuery.data.message} + + + {!validationQuery.data.ok ? ( + + ) : null} + + + +
+ ) : null} + {conversationError ? ( +
+ + Latest send failed + {conversationError} + +
+ ) : null}
- {/* Messages */}
- {messages.map((msg) => ( - + {messages.map((message) => ( + ))} {streamingConversationId === conversationId && streamingText ? (
- {/* Modern Composer */}