Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 130 additions & 60 deletions slack-mcp/server/llm.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
/**
* LLM Module - AI Agent Integration for Slack MCP
*
* The Slack webhook path lives OUTSIDE the @decocms/runtime request context
* (no JWT, no per-request bindings resolution), so the AgentOf() binding
* proxy cannot be picked up from `env.MESH_REQUEST_CONTEXT.state.AGENT`.
* The Slack webhook path runs OUTSIDE @decocms/runtime's request context
* (no JWT, no per-request binding resolution), so the AgentOf() proxy
* cannot be picked up from `env.MESH_REQUEST_CONTEXT.state.AGENT`. We
* also cannot use the runtime's `streamAgent`, which targets
* `/decopilot/runtime/stream` — that endpoint is the "resume a task"
* path and requires a pre-existing `taskId`.
*
* Instead we construct an agent client by hand, using the persisted
* `meshApiKey` + `organizationSlug` + `agentId` from Supabase, and call
* `streamAgent` from @decocms/runtime/decopilot — which is the exact same
* machinery the binding proxy uses internally (POST to
* `/api/<org>/decopilot/runtime/stream` with AI SDK UIMessage chunks).
*
* This makes the slack-mcp behave identically to a runtime-resolved binding.
* Instead we call the user-facing chat endpoint `/decopilot/stream`
* directly, using the persisted `meshApiKey` for auth, and parse the
* custom SSE stream the endpoint emits (data: { type, text/delta, ... }).
* We deliberately omit `thread_id`: the endpoint rejects any id that
* was not minted by studio, and we already rebuild conversation context
* from Slack history on every webhook (`buildContextMessages` → 1 system
* message), so the agent stays coherent without depending on
* decopilot-side thread memory.
*/
import {
streamAgent,
type AgentBindingConfig,
type AgentStreamParams,
type ResolvedAgentClient,
} from "@decocms/runtime/decopilot";

import { getCachedConnectionConfig } from "./lib/config-cache.ts";

// ============================================================================
Expand All @@ -39,54 +36,130 @@ export interface SlackChatMessage {
images?: MessageImage[];
}

interface UIMessagePart {
type: string;
text?: string;
url?: string;
filename?: string;
mediaType?: string;
}

interface UIMessageLike {
role: string;
parts: UIMessagePart[];
}

interface AgentClient {
STREAM: (params: {
messages: UIMessageLike[];
toolApprovalLevel?: "auto" | "readonly" | "plan";
}) => Promise<
AsyncIterable<{ parts: Array<{ type: string; text?: string }> }>
>;
}

// ============================================================================
// Agent client
// ============================================================================

/**
* Build a ResolvedAgentClient for a given connection.
*
* Mirrors what the runtime's `createAgentProxy` builds for resolved bindings,
* but constructed manually because the Slack webhook path has no per-request
* runtime context. Uses the persisted `meshApiKey` so the call is always
* authenticated without depending on a session token.
*/
async function getAgentClient(
connectionId: string,
): Promise<ResolvedAgentClient | null> {
): Promise<AgentClient | null> {
const config = await getCachedConnectionConfig(connectionId);
const token = config?.meshApiKey ?? config?.meshToken;
const orgSlug = config?.organizationSlug ?? config?.organizationId;
if (!token || !orgSlug || !config?.meshUrl || !config?.agentId) {
return null;
}

const streamUrl = `${config.meshUrl}/api/${orgSlug}/decopilot/runtime/stream`;
const agentConfig: AgentBindingConfig = {
__type: "@deco/agent",
id: config.agentId,
};
const { meshUrl, agentId } = config;
const url = `${meshUrl}/api/${orgSlug}/decopilot/stream`;

return {
STREAM: async (params, opts) => {
// We deliberately strip `thread_id` from the request: the decopilot
// endpoint does not auto-create threads — passing an unknown id (the
// user's name, a temp `<name>-<ts>`, anything not minted by studio)
// results in 500 "Thread not found". Without a thread_id, the
// decopilot allocates a fresh thread per call. Slack-side context
// is already rebuilt every webhook via `buildContextMessages`
// (channel/thread history → 1 system message), so the agent stays
// coherent within a conversation without depending on decopilot's
// own thread memory. Per-person decopilot memory would require us
// to call a thread-create API and cache the returned id — out of
// scope here.
const { thread_id: _ignored, ...rest } = params;
console.log(`[LLM] streamAgent ${streamUrl} (no thread_id)`);
return await streamAgent(streamUrl, token, agentConfig, rest, opts);
STREAM: async (params) => {
console.log(`[LLM] POST ${url}`);
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${token}`,
Accept: "application/json, text/event-stream",
},
body: JSON.stringify({
messages: params.messages,
agent: { id: agentId },
stream: true,
toolApprovalLevel: params.toolApprovalLevel ?? "auto",
}),
});

if (!response.ok) {
const errorText = await response.text();
throw new Error(
`decopilot stream failed (${response.status}): ${errorText}`,
);
}

return sseResponseToAsyncIterable(response);
},
};
}

/**
* Parse the decopilot's custom SSE stream into the same shape the
* AI SDK `readUIMessageStream` would produce: one yield carrying a
* `parts: [{ type: "text", text }]` array with the accumulated text.
* Tool-call events reset the buffer so we never surface them to Slack.
*/
async function* sseResponseToAsyncIterable(
response: Response,
): AsyncGenerator<{ parts: Array<{ type: string; text?: string }> }> {
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
let textContent = "";

try {
while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";

for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith("data:")) continue;
const data = trimmed.slice("data:".length).trim();
if (!data || data === "[DONE]") continue;

try {
const event = JSON.parse(data);
if (event.type === "text-delta" && event.delta) {
textContent += event.delta;
} else if (event.type === "text" && event.text) {
textContent += event.text;
} else if (
event.type === "tool-call" ||
event.type === "tool-input-start"
) {
textContent = "";
} else if (event.type === "finish") {
break;
}
} catch {
// ignore parse errors
}
}
}
} finally {
reader.releaseLock();
}

yield { parts: [{ type: "text", text: textContent }] };
}

// ============================================================================
// Public API
// ============================================================================
Expand All @@ -111,12 +184,10 @@ export async function isAgentAvailableAsync(
}

/**
* Convert SlackChatMessage[] to the UIMessage format expected by
* @decocms/runtime/decopilot's `streamAgent`.
* Convert SlackChatMessage[] to the UIMessage-like shape the decopilot
* `/stream` endpoint accepts.
*/
function toUIMessages(
messages: SlackChatMessage[],
): AgentStreamParams["messages"] {
function toUIMessages(messages: SlackChatMessage[]): UIMessageLike[] {
return messages.map((m) => ({
role: m.role,
parts: [
Expand All @@ -132,13 +203,15 @@ function toUIMessages(
}

/**
* Stream an agent response via the runtime's decopilot streamAgent —
* the same path the AgentOf() binding would use.
* Stream an agent response.
*
* `threadId` is accepted for caller-side bookkeeping/logs but is NOT sent
* to the decopilot — see the file-level note on thread_id handling.
*/
export async function streamAgentResponse(
connectionId: string,
messages: SlackChatMessage[],
threadId?: string,
_threadId?: string,
) {
const client = await getAgentClient(connectionId);
if (!client) {
Expand All @@ -154,17 +227,14 @@ export async function streamAgentResponse(
return client.STREAM({
messages: toUIMessages(messages),
toolApprovalLevel: "auto",
...(threadId ? { thread_id: threadId } : {}),
});
}

/**
* Collect the final text from a UIMessage stream.
* Collect the final text from an agent stream.
*
* `streamAgent` yields a UIMessage that's progressively rebuilt as chunks
* arrive; each yield carries the cumulative `parts` array. We take the last
* text part of the last yield, which equals the final assistant message.
* Tool-call parts are ignored (we don't surface them to Slack).
* Each yield carries the cumulative `parts` array; we keep the last text
* part of the last yield, which equals the final assistant message.
*/
export async function collectStreamText(
stream: AsyncIterable<{ parts: Array<{ type: string; text?: string }> }>,
Expand Down
Loading