diff --git a/package.json b/package.json index 4ad72847..8281080a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openacp/cli", - "version": "2026.411.1", + "version": "2026.413.1", "private": true, "license": "MIT", "type": "module", diff --git a/packages/plugin-sdk/package.json b/packages/plugin-sdk/package.json index 46e8e390..2cb23fe8 100644 --- a/packages/plugin-sdk/package.json +++ b/packages/plugin-sdk/package.json @@ -1,6 +1,6 @@ { "name": "@openacp/plugin-sdk", - "version": "2026.411.1", + "version": "2026.413.1", "description": "SDK for building OpenACP plugins — types, base classes, and testing utilities", "type": "module", "exports": { diff --git a/packages/plugin-sdk/src/testing/test-context.ts b/packages/plugin-sdk/src/testing/test-context.ts index 3fd2db42..8bd2cc28 100644 --- a/packages/plugin-sdk/src/testing/test-context.ts +++ b/packages/plugin-sdk/src/testing/test-context.ts @@ -154,6 +154,7 @@ export function createTestContext(opts: TestContextOpts): TestPluginContext { async sendMessage(sessionId: string, content: OutgoingMessage): Promise { sentMessages.push({ sessionId, content }) }, + notify(_target: any, _message: any, _options?: any): void {}, defineHook(_name: string): void {}, async emitHook>(_name: string, payload: T): Promise { return payload diff --git a/src/core/channel.ts b/src/core/channel.ts index 03ca32d0..454e6baf 100644 --- a/src/core/channel.ts +++ b/src/core/channel.ts @@ -63,6 +63,19 @@ export interface IChannelAdapter { // Agent switch cleanup — optional, called when switching agents to clear adapter-side per-session state cleanupSessionState?(sessionId: string): Promise + + // --- User-targeted notifications (optional) --- + /** Send a notification directly to a user by platform ID. Best-effort delivery. */ + sendUserNotification?( + platformId: string, + message: NotificationMessage, + options?: { + via?: 'dm' | 'thread' | 'topic' + topicId?: string + sessionId?: string + platformMention?: { platformUsername?: string; platformId: string } + } + ): Promise } /** @@ -100,4 +113,5 @@ export abstract class ChannelAdapter implements IChannelAdapter async cleanupSkillCommands(_sessionId: string): Promise {} async cleanupSessionState(_sessionId: string): Promise {} async archiveSessionTopic(_sessionId: string): Promise {} + async sendUserNotification(_platformId: string, _message: NotificationMessage, _options?: any): Promise {} } diff --git a/src/core/event-bus.ts b/src/core/event-bus.ts index dd60f076..d7407694 100644 --- a/src/core/event-bus.ts +++ b/src/core/event-bus.ts @@ -13,6 +13,7 @@ export interface EventBusEvents { sessionId: string; agent: string; status: SessionStatus; + userId?: string; }) => void; "session:updated": (data: { sessionId: string; @@ -94,6 +95,15 @@ export interface EventBusEvents { resumed?: boolean; error?: string; }) => void; + + // Identity lifecycle (emitted by @openacp/identity built-in plugin) + "identity:created": (data: { userId: string; identityId: string; source: string; displayName: string }) => void; + "identity:updated": (data: { userId: string; changes: string[] }) => void; + "identity:linked": (data: { userId: string; identityId: string; linkedFrom?: string }) => void; + "identity:unlinked": (data: { userId: string; identityId: string; newUserId: string }) => void; + "identity:userMerged": (data: { keptUserId: string; mergedUserId: string; movedIdentities: string[] }) => void; + "identity:roleChanged": (data: { userId: string; oldRole: string; newRole: string; changedBy?: string }) => void; + "identity:seen": (data: { userId: string; identityId: string; sessionId: string }) => void; } /** diff --git a/src/core/events.ts b/src/core/events.ts index 3eb656ba..b0dc9b94 100644 --- a/src/core/events.ts +++ b/src/core/events.ts @@ -158,6 +158,22 @@ export const BusEvent = { // --- Usage --- /** Fired when a token usage record is captured (consumed by usage plugin). */ USAGE_RECORDED: 'usage:recorded', + + // --- Identity lifecycle --- + /** Fired when a new user+identity record is created. */ + IDENTITY_CREATED: 'identity:created', + /** Fired when user profile fields change. */ + IDENTITY_UPDATED: 'identity:updated', + /** Fired when two identities are linked (same person). */ + IDENTITY_LINKED: 'identity:linked', + /** Fired when an identity is unlinked into a new user. */ + IDENTITY_UNLINKED: 'identity:unlinked', + /** Fired when two user records are merged during a link operation. */ + IDENTITY_USER_MERGED: 'identity:userMerged', + /** Fired when a user's role changes. */ + IDENTITY_ROLE_CHANGED: 'identity:roleChanged', + /** Fired when a user is seen (throttled). */ + IDENTITY_SEEN: 'identity:seen', } as const satisfies Record; export type BusEventName = typeof BusEvent[keyof typeof BusEvent]; diff --git a/src/core/plugin/plugin-context.ts b/src/core/plugin/plugin-context.ts index 3d2a13cc..1c7a9fe3 100644 --- a/src/core/plugin/plugin-context.ts +++ b/src/core/plugin/plugin-context.ts @@ -199,6 +199,19 @@ export function createPluginContext(opts: CreatePluginContextOpts): PluginContex } }, + notify( + target: { identityId: string } | { userId: string } | { channelId: string; platformId: string }, + message: { type: 'text'; text: string }, + options?: any, + ): void { + requirePermission(permissions, 'notifications:send', 'notify()') + const svc = serviceRegistry.get<{ notifyUser(t: any, m: any, o: any): Promise }>('notifications') + if (svc?.notifyUser) { + // Fire-and-forget — don't await, swallow errors + svc.notifyUser(target, message, options).catch(() => {}) + } + }, + defineHook(_name: string): void { // MiddlewareChain creates hook entries lazily on first add(), so no pre-registration is needed. // This method exists to let plugins declare intent — documentation/discoverability only. diff --git a/src/core/plugin/types.ts b/src/core/plugin/types.ts index 87c840fb..91561489 100644 --- a/src/core/plugin/types.ts +++ b/src/core/plugin/types.ts @@ -44,6 +44,14 @@ export type PluginPermission = | 'kernel:access' /** Read-only session metadata without kernel:access */ | 'sessions:read' + /** Read identity data (users, identities, search) */ + | 'identity:read' + /** Write identity data (create, update, link, unlink, roles) */ + | 'identity:write' + /** Register an identity source (adapters register their platform name) */ + | 'identity:register-source' + /** Send push notifications to users */ + | 'notifications:send' /** * The runtime plugin instance — the object a plugin module default-exports. @@ -425,6 +433,23 @@ export interface PluginContext { */ sendMessage(sessionId: string, content: OutgoingMessage): Promise + /** + * Send a user-targeted notification. Fire-and-forget, best-effort. + * Resolves target via identity system — one call delivers across all linked platforms. + * Requires 'notifications:send' permission. + */ + notify( + target: { identityId: string } | { userId: string } | { channelId: string; platformId: string }, + message: { type: 'text'; text: string }, + options?: { + via?: 'dm' | 'thread' | 'topic' + topicId?: string + sessionId?: string + onlyPlatforms?: string[] + excludePlatforms?: string[] + } + ): void + /** * Define a custom hook that other plugins can register middleware on. * The hook name is automatically prefixed with `plugin:{pluginName}:`. @@ -707,6 +732,18 @@ export interface FileServiceInterface { export interface NotificationService { notify(channelId: string, notification: NotificationMessage): Promise notifyAll(notification: NotificationMessage): Promise + /** Send a notification to a user across all their linked platforms. Fire-and-forget. */ + notifyUser?( + target: { identityId: string } | { userId: string } | { channelId: string; platformId: string }, + message: { type: 'text'; text: string }, + options?: { + via?: 'dm' | 'thread' | 'topic' + topicId?: string + sessionId?: string + onlyPlatforms?: string[] + excludePlatforms?: string[] + } + ): Promise } export interface UsageService { diff --git a/src/core/sessions/session-factory.ts b/src/core/sessions/session-factory.ts index 8b7e1daa..03161c58 100644 --- a/src/core/sessions/session-factory.ts +++ b/src/core/sessions/session-factory.ts @@ -27,6 +27,8 @@ export interface SessionCreateParams { existingSessionId?: string; initialName?: string; isAssistant?: boolean; + /** User ID from identity system — who is creating this session. */ + userId?: string; } export interface SideEffectDeps { @@ -89,7 +91,7 @@ export class SessionFactory { const payload = { agentName: params.agentName, workingDir: params.workingDirectory, - userId: '', // userId is not part of SessionCreateParams — resolved upstream + userId: params.userId ?? '', channelId: params.channelId, threadId: '', // threadId is assigned after session creation }; @@ -219,6 +221,7 @@ export class SessionFactory { sessionId: session.id, agent: session.agentName, status: session.status, + userId: createParams.userId, }); } diff --git a/src/core/types.ts b/src/core/types.ts index bf29b9c9..0220d972 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -362,6 +362,10 @@ export interface SessionRecord

> { firstAgent?: string; currentPromptCount?: number; agentSwitchHistory?: AgentSwitchEntry[]; + /** userId of the user who created this session (from identity system). */ + createdBy?: string; + /** userId[] of all users who have sent messages in this session. */ + participants?: string[]; // ACP state (cached — overridden by agent response on resume) acpState?: { // Primary fields (used on load) diff --git a/src/plugins/api-server/auth/token-store.ts b/src/plugins/api-server/auth/token-store.ts index ac8c05c1..0f8a88b8 100644 --- a/src/plugins/api-server/auth/token-store.ts +++ b/src/plugins/api-server/auth/token-store.ts @@ -182,6 +182,20 @@ export class TokenStore { } } + /** Associate a user ID with a token. Called by identity plugin after /identity/setup. */ + setUserId(tokenId: string, userId: string): void { + const token = this.tokens.get(tokenId); + if (token) { + token.userId = userId; + this.scheduleSave(); + } + } + + /** Get the user ID associated with a token. */ + getUserId(tokenId: string): string | undefined { + return this.tokens.get(tokenId)?.userId; + } + /** * Generates a one-time authorization code that can be exchanged for a JWT. * diff --git a/src/plugins/api-server/auth/types.ts b/src/plugins/api-server/auth/types.ts index 24f7adf2..4573c2a8 100644 --- a/src/plugins/api-server/auth/types.ts +++ b/src/plugins/api-server/auth/types.ts @@ -10,6 +10,8 @@ export interface StoredToken { refreshDeadline: string; lastUsedAt?: string; revoked: boolean; + /** User ID from identity system. Null until user completes /identity/setup. */ + userId?: string; } /** Claims embedded in a signed JWT. `rfd` (refresh deadline) is a Unix timestamp (seconds). */ diff --git a/src/plugins/api-server/index.ts b/src/plugins/api-server/index.ts index d21f6b14..8b2cd09f 100644 --- a/src/plugins/api-server/index.ts +++ b/src/plugins/api-server/index.ts @@ -222,6 +222,10 @@ function createApiServerPlugin(): OpenACPPlugin { await tokenStore.load() tokenStoreRef = tokenStore + // Expose tokenStore as a service so other plugins (e.g. identity) can associate + // their user IDs with tokens without a direct import dependency. + ctx.registerService('token-store', tokenStore) + // Lazy import to avoid loading Fastify unless needed const { createApiServer } = await import('./server.js') const { SSEManager } = await import('./sse-manager.js') @@ -289,7 +293,12 @@ function createApiServerPlugin(): OpenACPPlugin { server.registerPlugin('/api/v1/tunnel', async (app) => tunnelRoutes(app, deps)) server.registerPlugin('/api/v1/notify', async (app) => notifyRoutes(app, deps)) server.registerPlugin('/api/v1/commands', async (app) => commandRoutes(app, deps)) - server.registerPlugin('/api/v1/auth', async (app) => authRoutes(app, { tokenStore, getJwtSecret: () => jwtSecret })) + server.registerPlugin('/api/v1/auth', async (app) => authRoutes(app, { + tokenStore, + getJwtSecret: () => jwtSecret, + // Lazy resolver: identity plugin may not be loaded, so we fetch it on demand + getIdentityService: () => ctx.getService<{ getUser(userId: string): Promise<{ displayName: string } | undefined> }>('identity') ?? undefined, + })) server.registerPlugin('/api/v1/plugins', async (app) => pluginRoutes(app, deps)) const appConfig = core.configManager.get() const workspaceName = (appConfig as Record).instanceName as string ?? 'Main' diff --git a/src/plugins/api-server/routes/auth.ts b/src/plugins/api-server/routes/auth.ts index 4df827de..3a9393b5 100644 --- a/src/plugins/api-server/routes/auth.ts +++ b/src/plugins/api-server/routes/auth.ts @@ -10,6 +10,11 @@ export interface AuthRouteDeps { tokenStore: TokenStore; /** Returns the current JWT signing secret. Fetched lazily to support future rotation. */ getJwtSecret: () => string; + /** + * Optional resolver for the identity service. Provided lazily so the auth plugin + * does not hard-depend on identity — if identity is not loaded, this returns undefined. + */ + getIdentityService?: () => { getUser(userId: string): Promise<{ displayName: string } | undefined> } | undefined; } /** @@ -142,14 +147,29 @@ export async function authRoutes( }; }); - // GET /me — current auth info + // GET /me — current auth info, enriched with identity data if available. + // Identity fields are optional — callers must not assume they are always present. app.get('/me', async (request) => { const { auth } = request; + const userId = auth.tokenId ? deps.tokenStore.getUserId(auth.tokenId) : undefined; + + let displayName: string | null = null; + if (userId && deps.getIdentityService) { + const identityService = deps.getIdentityService(); + if (identityService) { + const user = await identityService.getUser(userId); + displayName = user?.displayName ?? null; + } + } + return { type: auth.type, tokenId: auth.tokenId, role: auth.role, scopes: auth.scopes, + userId: userId ?? null, + displayName, + claimed: !!userId, }; }); diff --git a/src/plugins/core-plugins.ts b/src/plugins/core-plugins.ts index 90eed841..d15fb4c2 100644 --- a/src/plugins/core-plugins.ts +++ b/src/plugins/core-plugins.ts @@ -4,6 +4,7 @@ * Adapter plugins depend on service plugins, so they boot last. */ import securityPlugin from './security/index.js' +import identityPlugin from './identity/index.js' import fileServicePlugin from './file-service/index.js' import contextPlugin from './context/index.js' import speechPlugin from './speech/index.js' @@ -30,6 +31,7 @@ import telegramPlugin from './telegram/index.js' export const corePlugins = [ // Service plugins (no adapter dependencies) securityPlugin, + identityPlugin, // Must boot after security (blocked users rejected before identity records are created) fileServicePlugin, contextPlugin, speechPlugin, diff --git a/src/plugins/identity/__tests__/auto-register.test.ts b/src/plugins/identity/__tests__/auto-register.test.ts new file mode 100644 index 00000000..784242a7 --- /dev/null +++ b/src/plugins/identity/__tests__/auto-register.test.ts @@ -0,0 +1,205 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest' +import { IdentityServiceImpl } from '../identity-service.js' +import { KvIdentityStore } from '../store/kv-identity-store.js' +import { createAutoRegisterHandler } from '../middleware/auto-register.js' +import { formatIdentityId } from '../types.js' +import type { IdentityStore } from '../store/identity-store.js' +import type { UserRecord, IdentityRecord, IdentityId } from '../types.js' + +// ─── Helpers ─── + +/** In-memory PluginStorage — matches the interface used by KvIdentityStore */ +function createMockStorage() { + const data = new Map() + return { + get: async (key: string) => (data.has(key) ? (data.get(key) as T) : undefined), + set: async (key: string, value: T) => { data.set(key, value) }, + delete: async (key: string) => { data.delete(key) }, + list: async () => [...data.keys()], + keys: async (prefix?: string) => { + const all = [...data.keys()] + return prefix ? all.filter((k) => k.startsWith(prefix)) : all + }, + clear: async () => { data.clear() }, + getDataDir: () => '/tmp/test', + forSession: () => createMockStorage(), + } +} + +function makePayload(overrides: Partial<{ + channelId: string + userId: string + threadId: string + text: string + meta: Record +}> = {}) { + return { + channelId: 'telegram', + threadId: 't1', + userId: 'user123', + text: 'hello', + meta: {} as Record, + ...overrides, + } +} + +// ─── Suite ─── + +describe('createAutoRegisterHandler', () => { + let store: KvIdentityStore + let service: IdentityServiceImpl + let emitEvent: ReturnType + let next: ReturnType + let handler: ReturnType + + beforeEach(() => { + store = new KvIdentityStore(createMockStorage()) + emitEvent = vi.fn() + service = new IdentityServiceImpl(store, emitEvent) + next = vi.fn().mockResolvedValue(undefined) + handler = createAutoRegisterHandler(service, store) + }) + + it('creates user + identity for an unknown identity and injects meta.identity', async () => { + const payload = makePayload({ + channelId: 'telegram', + userId: 'user123', + meta: { + channelUser: { channelId: 'telegram', userId: 'user123', displayName: 'Alice', username: 'alice' }, + }, + }) + + await handler(payload, next) + + expect(next).toHaveBeenCalledOnce() + + // Identity should now exist in store + const identityId = formatIdentityId('telegram', 'user123') + const identity = await store.getIdentity(identityId) + expect(identity).toBeDefined() + expect(identity?.platformDisplayName).toBe('Alice') + expect(identity?.platformUsername).toBe('alice') + + // User should exist + const user = await store.getUser(identity!.userId) + expect(user).toBeDefined() + expect(user?.displayName).toBe('Alice') + + // meta.identity injected + expect(payload.meta.identity).toMatchObject({ + identityId, + displayName: 'Alice', + username: 'alice', + }) + }) + + it('does not create a new user on subsequent messages — reuses existing identity', async () => { + const payload = makePayload({ + meta: { + channelUser: { channelId: 'telegram', userId: 'user123', displayName: 'Alice' }, + }, + }) + + // First message — creates + await handler(payload, next) + const countAfterFirst = await service.getUserCount() + + // Second message + const payload2 = makePayload({ meta: { channelUser: { channelId: 'telegram', userId: 'user123', displayName: 'Alice' } } }) + await handler(payload2, next) + + const countAfterSecond = await service.getUserCount() + expect(countAfterSecond).toBe(countAfterFirst) + expect(next).toHaveBeenCalledTimes(2) + }) + + it('injects meta.identity on subsequent messages', async () => { + const payload = makePayload({ meta: {} }) + await handler(payload, next) + + const payload2 = makePayload({ meta: {} }) + await handler(payload2, next) + + expect(payload2.meta.identity).toBeDefined() + expect((payload2.meta.identity as any).userId).toBeTruthy() + }) + + it('assigns admin role to the first user ever created', async () => { + const payload = makePayload({ meta: {} }) + await handler(payload, next) + + const identityId = formatIdentityId('telegram', 'user123') + const identity = await store.getIdentity(identityId) + const user = await store.getUser(identity!.userId) + + expect(user?.role).toBe('admin') + }) + + it('assigns member role to subsequent users', async () => { + // First user → admin + await handler(makePayload({ userId: 'user1', meta: {} }), next) + + // Second user → member + const payload2 = makePayload({ userId: 'user2', meta: {} }) + await handler(payload2, next) + + const identityId2 = formatIdentityId('telegram', 'user2') + const identity2 = await store.getIdentity(identityId2) + const user2 = await store.getUser(identity2!.userId) + + expect(user2?.role).toBe('member') + }) + + it('syncs platform displayName when it changes', async () => { + // First message — sets initial displayName + await handler( + makePayload({ meta: { channelUser: { channelId: 'telegram', userId: 'user123', displayName: 'Alice' } } }), + next, + ) + + // Second message — adapter reports new displayName + await handler( + makePayload({ meta: { channelUser: { channelId: 'telegram', userId: 'user123', displayName: 'Alice Smith' } } }), + next, + ) + + const identityId = formatIdentityId('telegram', 'user123') + const identity = await store.getIdentity(identityId) + expect(identity?.platformDisplayName).toBe('Alice Smith') + }) + + it('falls back to userId as displayName when channelUser is missing', async () => { + const payload = makePayload({ userId: 'raw_id_42', meta: {} }) + await handler(payload, next) + + const identityId = formatIdentityId('telegram', 'raw_id_42') + const identity = await store.getIdentity(identityId) + const user = await store.getUser(identity!.userId) + + expect(user?.displayName).toBe('raw_id_42') + }) + + it('throttles lastSeenAt updates — two rapid calls produce only one write', async () => { + // First call — creates user + await handler(makePayload({ meta: {} }), next) + + const putUserSpy = vi.spyOn(store, 'putUser') + + // Two rapid calls — second should not persist lastSeenAt again + await handler(makePayload({ meta: {} }), next) + await handler(makePayload({ meta: {} }), next) + + // Only one lastSeenAt update should have been written (the first seen update) + // Both rapid calls share the same throttle window, so only the first fires + const lastSeenCalls = putUserSpy.mock.calls.filter(([record]) => + typeof record === 'object' && 'lastSeenAt' in record, + ) + expect(lastSeenCalls.length).toBeLessThanOrEqual(1) + }) + + it('still calls next() even when no meta is provided', async () => { + const payload = { channelId: 'telegram', threadId: 't1', userId: 'user999', text: 'hi' } + await handler(payload, next) + expect(next).toHaveBeenCalledOnce() + }) +}) diff --git a/src/plugins/identity/__tests__/identity-service.test.ts b/src/plugins/identity/__tests__/identity-service.test.ts new file mode 100644 index 00000000..d7934858 --- /dev/null +++ b/src/plugins/identity/__tests__/identity-service.test.ts @@ -0,0 +1,540 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest' +import { IdentityServiceImpl } from '../identity-service.js' +import { formatIdentityId } from '../types.js' +import type { IdentityStore } from '../store/identity-store.js' +import type { UserRecord, IdentityRecord, IdentityId, UserRole } from '../types.js' + +// ─── In-memory store for testing ─── + +function createMemoryStore(): IdentityStore { + const users = new Map() + const identities = new Map() + const usernameIdx = new Map() + const sourceIdx = new Map() + + return { + getUser: async (id) => users.get(id), + putUser: async (record) => { users.set(record.userId, record) }, + deleteUser: async (id) => { users.delete(id) }, + listUsers: async (filter) => { + let all = [...users.values()] + if (filter?.role) all = all.filter((u) => u.role === filter.role) + if (filter?.source) { + all = all.filter((u) => u.identities.some((id) => id.startsWith(`${filter.source}:`))) + } + return all + }, + getIdentity: async (id) => identities.get(id), + putIdentity: async (record) => { identities.set(record.identityId, record) }, + deleteIdentity: async (id) => { identities.delete(id) }, + getIdentitiesForUser: async (userId) => { + const user = users.get(userId) + if (!user) return [] + return user.identities + .map((id) => identities.get(id)) + .filter((r): r is IdentityRecord => r !== undefined) + }, + getUserIdByUsername: async (username) => usernameIdx.get(username.toLowerCase()), + getIdentityIdBySource: async (source, platformId) => + sourceIdx.get(`${source}/${platformId}`), + setUsernameIndex: async (username, userId) => { usernameIdx.set(username.toLowerCase(), userId) }, + deleteUsernameIndex: async (username) => { usernameIdx.delete(username.toLowerCase()) }, + setSourceIndex: async (source, platformId, identityId) => { + sourceIdx.set(`${source}/${platformId}`, identityId) + }, + deleteSourceIndex: async (source, platformId) => { + sourceIdx.delete(`${source}/${platformId}`) + }, + getUserCount: async () => users.size, + } +} + +describe('IdentityServiceImpl', () => { + let store: IdentityStore + let emitEvent: ReturnType + let service: IdentityServiceImpl + + beforeEach(() => { + store = createMemoryStore() + emitEvent = vi.fn() + service = new IdentityServiceImpl(store, emitEvent) + }) + + // ─── createUserWithIdentity ─── + + describe('createUserWithIdentity()', () => { + it('creates user and identity with generated userId', async () => { + const { user, identity } = await service.createUserWithIdentity({ + displayName: 'Alice', + source: 'telegram', + platformId: '100', + }) + + expect(user.userId).toMatch(/^u_/) + expect(user.displayName).toBe('Alice') + expect(user.role).toBe('admin') // first user → admin + expect(user.identities).toContain(identity.identityId) + expect(identity.source).toBe('telegram') + expect(identity.platformId).toBe('100') + expect(identity.userId).toBe(user.userId) + }) + + it('first user is always admin regardless of requested role', async () => { + const { user } = await service.createUserWithIdentity({ + displayName: 'Alice', + source: 'telegram', + platformId: '1', + role: 'viewer', + }) + expect(user.role).toBe('admin') + }) + + it('subsequent users respect the provided role', async () => { + await service.createUserWithIdentity({ displayName: 'Admin', source: 'telegram', platformId: '1' }) + const { user } = await service.createUserWithIdentity({ + displayName: 'Bob', + source: 'discord', + platformId: '2', + role: 'viewer', + }) + expect(user.role).toBe('viewer') + }) + + it('subsequent users default to member when no role given', async () => { + await service.createUserWithIdentity({ displayName: 'Admin', source: 'telegram', platformId: '1' }) + const { user } = await service.createUserWithIdentity({ displayName: 'Bob', source: 'discord', platformId: '2' }) + expect(user.role).toBe('member') + }) + + it('sets username index when username provided', async () => { + const { user } = await service.createUserWithIdentity({ + displayName: 'Alice', + username: 'alice', + source: 'telegram', + platformId: '1', + }) + expect(await service.getUserByUsername('alice')).toEqual(user) + }) + + it('emits identity:created with userId, identityId, source, and displayName', async () => { + const { user, identity } = await service.createUserWithIdentity({ + displayName: 'Alice', + source: 'telegram', + platformId: '1', + }) + expect(emitEvent).toHaveBeenCalledWith('identity:created', { + userId: user.userId, + identityId: identity.identityId, + source: 'telegram', + displayName: 'Alice', + }) + }) + }) + + // ─── updateUser ─── + + describe('updateUser()', () => { + it('updates displayName', async () => { + const { user } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + const updated = await service.updateUser(user.userId, { displayName: 'Alicia' }) + expect(updated.displayName).toBe('Alicia') + }) + + it('bumps updatedAt', async () => { + const { user } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + const before = user.updatedAt + // Tiny delay to ensure timestamp differs + await new Promise((r) => setTimeout(r, 1)) + const updated = await service.updateUser(user.userId, { displayName: 'Alicia' }) + expect(updated.updatedAt).not.toBe(before) + }) + + it('throws for unknown userId', async () => { + await expect(service.updateUser('u_ghost', { displayName: 'X' })).rejects.toThrow('User not found') + }) + + it('updates username index when username changes', async () => { + const { user } = await service.createUserWithIdentity({ + displayName: 'Alice', username: 'alice', source: 'telegram', platformId: '1', + }) + await service.updateUser(user.userId, { username: 'alicia' }) + + expect(await service.getUserByUsername('alicia')).toBeDefined() + // Old index entry should be gone + expect(await service.getUserByUsername('alice')).toBeUndefined() + }) + + it('throws when username is already taken by another user', async () => { + await service.createUserWithIdentity({ displayName: 'Admin', source: 'telegram', platformId: '1' }) + const { user: bob } = await service.createUserWithIdentity({ + displayName: 'Bob', username: 'bob', source: 'discord', platformId: '2', + }) + await service.createUserWithIdentity({ + displayName: 'Carol', username: 'carol', source: 'slack', platformId: '3', + }) + + await expect(service.updateUser(bob.userId, { username: 'carol' })).rejects.toThrow('already taken') + }) + }) + + // ─── setRole ─── + + describe('setRole()', () => { + it('changes the user role', async () => { + const { user } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + await service.setRole(user.userId, 'blocked') + const fetched = await service.getUser(user.userId) + expect(fetched?.role).toBe('blocked') + }) + + it('emits identity:roleChanged with old and new role', async () => { + const { user } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + emitEvent.mockClear() + await service.setRole(user.userId, 'viewer') + expect(emitEvent).toHaveBeenCalledWith('identity:roleChanged', { + userId: user.userId, + oldRole: 'admin', + newRole: 'viewer', + }) + }) + + it('throws for unknown userId', async () => { + await expect(service.setRole('u_ghost', 'member')).rejects.toThrow('User not found') + }) + }) + + // ─── createIdentity ─── + + describe('createIdentity()', () => { + it('adds identity to existing user', async () => { + const { user } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + const identity = await service.createIdentity(user.userId, { source: 'discord', platformId: '42' }) + + expect(identity.userId).toBe(user.userId) + expect(identity.source).toBe('discord') + + const fetched = await service.getUser(user.userId) + expect(fetched?.identities).toContain(identity.identityId) + }) + + it('throws for unknown userId', async () => { + await expect(service.createIdentity('u_ghost', { source: 'discord', platformId: '1' })).rejects.toThrow('User not found') + }) + }) + + // ─── link ─── + + describe('link()', () => { + it('is a no-op when both identities already share a user', async () => { + const { user, identity } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + const identity2 = await service.createIdentity(user.userId, { source: 'discord', platformId: '2' }) + emitEvent.mockClear() + + await service.link(identity.identityId, identity2.identityId) + + expect(emitEvent).not.toHaveBeenCalled() + // User still exists with both identities + const fetched = await service.getUser(user.userId) + expect(fetched?.identities).toHaveLength(2) + }) + + it('merges younger user into older user', async () => { + const { user: alice, identity: aliceId } = await service.createUserWithIdentity({ + displayName: 'Alice', source: 'telegram', platformId: '1', + }) + const { user: bob, identity: bobId } = await service.createUserWithIdentity({ + displayName: 'Bob', source: 'discord', platformId: '2', + }) + + await service.link(aliceId.identityId, bobId.identityId) + + // Alice is older (created first) — she should survive + const survivorAlice = await service.getUser(alice.userId) + expect(survivorAlice).toBeDefined() + expect(survivorAlice?.identities).toContain(aliceId.identityId) + expect(survivorAlice?.identities).toContain(bobId.identityId) + + // Bob's user record should be gone + expect(await service.getUser(bob.userId)).toBeUndefined() + }) + + it('moves identities from merged user to survivor', async () => { + const { user: alice, identity: aliceId } = await service.createUserWithIdentity({ + displayName: 'Alice', source: 'telegram', platformId: '1', + }) + const { identity: bobId } = await service.createUserWithIdentity({ + displayName: 'Bob', source: 'discord', platformId: '2', + }) + + await service.link(aliceId.identityId, bobId.identityId) + + const movedIdentity = await service.getIdentity(bobId.identityId) + expect(movedIdentity?.userId).toBe(alice.userId) + }) + + it('merges pluginData — survivor namespace wins', async () => { + const { user: alice, identity: aliceId } = await service.createUserWithIdentity({ + displayName: 'Alice', source: 'telegram', platformId: '1', + }) + const { user: bob, identity: bobId } = await service.createUserWithIdentity({ + displayName: 'Bob', source: 'discord', platformId: '2', + }) + + // Both have data in 'context' namespace; only Bob has 'usage' + await service.setPluginData(alice.userId, 'context', 'history', ['msg1']) + await service.setPluginData(bob.userId, 'context', 'history', ['msg2']) + await service.setPluginData(bob.userId, 'usage', 'tokens', 500) + + await service.link(aliceId.identityId, bobId.identityId) + + // Alice's 'context' wins + expect(await service.getPluginData(alice.userId, 'context', 'history')).toEqual(['msg1']) + // Bob's 'usage' fills in the missing namespace + expect(await service.getPluginData(alice.userId, 'usage', 'tokens')).toBe(500) + }) + + it('removes merged user username index', async () => { + const { identity: aliceId } = await service.createUserWithIdentity({ + displayName: 'Alice', source: 'telegram', platformId: '1', + }) + const { user: bob, identity: bobId } = await service.createUserWithIdentity({ + displayName: 'Bob', username: 'bob', source: 'discord', platformId: '2', + }) + + await service.link(aliceId.identityId, bobId.identityId) + + // Bob's username index should be cleaned up since his user was deleted + expect(await service.getUserByUsername('bob')).toBeUndefined() + }) + + it('emits identity:linked and identity:userMerged', async () => { + const { user: alice, identity: aliceId } = await service.createUserWithIdentity({ + displayName: 'Alice', source: 'telegram', platformId: '1', + }) + const { user: bob, identity: bobId } = await service.createUserWithIdentity({ + displayName: 'Bob', source: 'discord', platformId: '2', + }) + emitEvent.mockClear() + + await service.link(aliceId.identityId, bobId.identityId) + + // Alice is older — she survives; Bob's identity is linked into her account + expect(emitEvent).toHaveBeenCalledWith('identity:linked', { + userId: alice.userId, + identityId: bobId.identityId, + linkedFrom: bob.userId, + }) + expect(emitEvent).toHaveBeenCalledWith('identity:userMerged', { + keptUserId: alice.userId, + mergedUserId: bob.userId, + movedIdentities: [bobId.identityId], + }) + }) + + it('throws when identity not found', async () => { + const { identity } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + const ghost = formatIdentityId('discord', '999') as IdentityId + await expect(service.link(identity.identityId, ghost)).rejects.toThrow('Identity not found') + }) + }) + + // ─── unlink ─── + + describe('unlink()', () => { + it('throws when trying to unlink last identity', async () => { + const { identity } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + await expect(service.unlink(identity.identityId)).rejects.toThrow('Cannot unlink the last identity') + }) + + it('separates identity into a new user account', async () => { + const { user, identity: id1 } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + const id2 = await service.createIdentity(user.userId, { source: 'discord', platformId: '2' }) + + await service.unlink(id2.identityId) + + // Original user no longer has the identity + const originalUser = await service.getUser(user.userId) + expect(originalUser?.identities).not.toContain(id2.identityId) + expect(originalUser?.identities).toContain(id1.identityId) + + // The identity now points to a new user + const unlinkedIdentity = await service.getIdentity(id2.identityId) + expect(unlinkedIdentity?.userId).not.toBe(user.userId) + const newUser = await service.getUser(unlinkedIdentity!.userId) + expect(newUser).toBeDefined() + expect(newUser?.identities).toContain(id2.identityId) + }) + + it('emits identity:unlinked', async () => { + const { user, identity: id1 } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + const id2 = await service.createIdentity(user.userId, { source: 'discord', platformId: '2' }) + emitEvent.mockClear() + + await service.unlink(id2.identityId) + + const call = emitEvent.mock.calls.find(([event]) => event === 'identity:unlinked') + expect(call).toBeDefined() + expect(call![0]).toBe('identity:unlinked') + expect(call![1]).toMatchObject({ + userId: user.userId, + identityId: id2.identityId, + }) + // newUserId should be a fresh user ID (not the original) + expect(call![1].newUserId).toMatch(/^u_/) + expect(call![1].newUserId).not.toBe(user.userId) + }) + + it('throws when identity not found', async () => { + const ghost = formatIdentityId('telegram', '999') as IdentityId + await expect(service.unlink(ghost)).rejects.toThrow('Identity not found') + }) + }) + + // ─── Plugin data ─── + + describe('setPluginData / getPluginData', () => { + it('stores and retrieves namespaced data', async () => { + const { user } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + await service.setPluginData(user.userId, 'my-plugin', 'foo', 42) + expect(await service.getPluginData(user.userId, 'my-plugin', 'foo')).toBe(42) + }) + + it('namespaces are isolated between plugins', async () => { + const { user } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + await service.setPluginData(user.userId, 'plugin-a', 'key', 'value-a') + await service.setPluginData(user.userId, 'plugin-b', 'key', 'value-b') + expect(await service.getPluginData(user.userId, 'plugin-a', 'key')).toBe('value-a') + expect(await service.getPluginData(user.userId, 'plugin-b', 'key')).toBe('value-b') + }) + + it('returns undefined for unknown user', async () => { + expect(await service.getPluginData('u_ghost', 'plugin', 'key')).toBeUndefined() + }) + + it('returns undefined for unset key', async () => { + const { user } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + expect(await service.getPluginData(user.userId, 'plugin', 'missing')).toBeUndefined() + }) + + it('throws when setting data for unknown user', async () => { + await expect(service.setPluginData('u_ghost', 'plugin', 'key', 'value')).rejects.toThrow('User not found') + }) + }) + + // ─── resolveCanonicalMention ─── + + describe('resolveCanonicalMention()', () => { + it('returns found=false for unknown username', async () => { + const result = await service.resolveCanonicalMention('nobody', 'telegram') + expect(result).toEqual({ found: false }) + }) + + it('returns found=false when user exists but has no identity on the source', async () => { + await service.createUserWithIdentity({ displayName: 'Alice', username: 'alice', source: 'telegram', platformId: '1' }) + const result = await service.resolveCanonicalMention('alice', 'discord') + expect(result).toEqual({ found: false }) + }) + + it('returns platform info when user has identity on the requested source', async () => { + const { user } = await service.createUserWithIdentity({ + displayName: 'Alice', username: 'alice', source: 'telegram', platformId: '111', platformUsername: 'alice_tg', + }) + const result = await service.resolveCanonicalMention('alice', 'telegram') + expect(result).toEqual({ found: true, platformId: '111', platformUsername: 'alice_tg' }) + }) + + it('resolves after linking identities across platforms', async () => { + const { user, identity: tgId } = await service.createUserWithIdentity({ + displayName: 'Alice', username: 'alice', source: 'telegram', platformId: '111', platformUsername: 'alice_tg', + }) + const { identity: discordId } = await service.createUserWithIdentity({ + displayName: 'Alice Discord', source: 'discord', platformId: '222', platformUsername: 'alice_dc', + }) + await service.link(tgId.identityId, discordId.identityId) + + const result = await service.resolveCanonicalMention('alice', 'discord') + expect(result).toEqual({ found: true, platformId: '222', platformUsername: 'alice_dc' }) + }) + }) + + // ─── searchUsers ─── + + describe('searchUsers()', () => { + beforeEach(async () => { + await service.createUserWithIdentity({ displayName: 'Alice Admin', username: 'alice', source: 'telegram', platformId: '1' }) + await service.createUserWithIdentity({ displayName: 'Bob Builder', source: 'discord', platformId: '2' }) + }) + + it('finds user by displayName substring', async () => { + const result = await service.searchUsers('Alice') + expect(result).toHaveLength(1) + expect(result[0].displayName).toBe('Alice Admin') + }) + + it('finds user by username', async () => { + const result = await service.searchUsers('alice') + expect(result).toHaveLength(1) + }) + + it('is case-insensitive', async () => { + const result = await service.searchUsers('bob') + expect(result).toHaveLength(1) + expect(result[0].displayName).toBe('Bob Builder') + }) + + it('returns empty array for no matches', async () => { + expect(await service.searchUsers('xyz_nonexistent')).toHaveLength(0) + }) + }) + + // ─── getSessionsFor ─── + + describe('getSessionsFor()', () => { + it('returns empty array when no session provider configured', async () => { + const { user } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + expect(await service.getSessionsFor(user.userId)).toEqual([]) + }) + + it('delegates to the provided session resolver', async () => { + const sessions = [{ sessionId: 'sess-1', agentName: 'claude', channelId: 'telegram', status: 'active', createdAt: '2024-01-01T00:00:00.000Z' }] + const getSessionsForUser = vi.fn().mockResolvedValue(sessions) + const svc = new IdentityServiceImpl(store, emitEvent, getSessionsForUser) + + const { user } = await svc.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + const result = await svc.getSessionsFor(user.userId) + + expect(result).toEqual(sessions) + expect(getSessionsForUser).toHaveBeenCalledWith(user.userId) + }) + }) + + // ─── registerSource ─── + + describe('registerSource()', () => { + it('does not throw', () => { + expect(() => service.registerSource('telegram')).not.toThrow() + expect(() => service.registerSource('telegram')).not.toThrow() // idempotent + }) + }) + + // ─── getUserCount ─── + + describe('getUserCount()', () => { + it('returns 0 initially', async () => { + expect(await service.getUserCount()).toBe(0) + }) + + it('increments with each created user', async () => { + await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + await service.createUserWithIdentity({ displayName: 'Bob', source: 'discord', platformId: '2' }) + expect(await service.getUserCount()).toBe(2) + }) + + it('decrements after link (merged user is deleted)', async () => { + const { identity: id1 } = await service.createUserWithIdentity({ displayName: 'Alice', source: 'telegram', platformId: '1' }) + const { identity: id2 } = await service.createUserWithIdentity({ displayName: 'Bob', source: 'discord', platformId: '2' }) + await service.link(id1.identityId, id2.identityId) + expect(await service.getUserCount()).toBe(1) + }) + }) +}) diff --git a/src/plugins/identity/__tests__/kv-identity-store.test.ts b/src/plugins/identity/__tests__/kv-identity-store.test.ts new file mode 100644 index 00000000..11bca6f1 --- /dev/null +++ b/src/plugins/identity/__tests__/kv-identity-store.test.ts @@ -0,0 +1,275 @@ +import { describe, it, expect, beforeEach } from 'vitest' +import { KvIdentityStore } from '../store/kv-identity-store.js' +import { formatIdentityId } from '../types.js' +import type { UserRecord, IdentityRecord, IdentityId } from '../types.js' + +// In-memory PluginStorage for testing — no file I/O required +function createMemoryStorage() { + const data = new Map() + return { + get: async (key: string) => (data.has(key) ? (data.get(key) as T) : undefined), + set: async (key: string, value: T) => { data.set(key, value) }, + delete: async (key: string) => { data.delete(key) }, + list: async () => [...data.keys()], + keys: async (prefix?: string) => { + const all = [...data.keys()] + return prefix ? all.filter((k) => k.startsWith(prefix)) : all + }, + clear: async () => { data.clear() }, + getDataDir: () => '/tmp/test', + forSession: () => createMemoryStorage(), + } +} + +function makeUser(overrides: Partial = {}): UserRecord { + return { + userId: 'u_test1', + displayName: 'Test User', + role: 'member', + identities: [], + pluginData: {}, + createdAt: '2024-01-01T00:00:00.000Z', + updatedAt: '2024-01-01T00:00:00.000Z', + lastSeenAt: '2024-01-01T00:00:00.000Z', + ...overrides, + } +} + +function makeIdentity(overrides: Partial = {}): IdentityRecord { + const identityId = formatIdentityId('telegram', '123') + return { + identityId, + userId: 'u_test1', + source: 'telegram', + platformId: '123', + createdAt: '2024-01-01T00:00:00.000Z', + updatedAt: '2024-01-01T00:00:00.000Z', + ...overrides, + } +} + +describe('KvIdentityStore', () => { + let store: KvIdentityStore + + beforeEach(() => { + store = new KvIdentityStore(createMemoryStorage()) + }) + + // ─── User CRUD ─── + + describe('getUser / putUser', () => { + it('returns undefined for unknown userId', async () => { + expect(await store.getUser('u_unknown')).toBeUndefined() + }) + + it('stores and retrieves a user by userId', async () => { + const user = makeUser() + await store.putUser(user) + expect(await store.getUser(user.userId)).toEqual(user) + }) + + it('overwrites an existing user on putUser', async () => { + const user = makeUser() + await store.putUser(user) + const updated = { ...user, displayName: 'Updated' } + await store.putUser(updated) + expect(await store.getUser(user.userId)).toEqual(updated) + }) + }) + + describe('deleteUser', () => { + it('removes the user so getUser returns undefined', async () => { + const user = makeUser() + await store.putUser(user) + await store.deleteUser(user.userId) + expect(await store.getUser(user.userId)).toBeUndefined() + }) + + it('is idempotent — deleting non-existent user does not throw', async () => { + await expect(store.deleteUser('u_ghost')).resolves.toBeUndefined() + }) + }) + + describe('listUsers', () => { + beforeEach(async () => { + await store.putUser(makeUser({ userId: 'u_1', role: 'admin', identities: [formatIdentityId('telegram', '1')] })) + await store.putUser(makeUser({ userId: 'u_2', role: 'member', identities: [formatIdentityId('discord', '2')] })) + await store.putUser(makeUser({ userId: 'u_3', role: 'viewer', identities: [formatIdentityId('telegram', '3')] })) + await store.putUser(makeUser({ userId: 'u_4', role: 'blocked', identities: [formatIdentityId('telegram', '4')] })) + }) + + it('returns all users when no filter given', async () => { + const users = await store.listUsers() + expect(users).toHaveLength(4) + }) + + it('filters by role', async () => { + const admins = await store.listUsers({ role: 'admin' }) + expect(admins).toHaveLength(1) + expect(admins[0].userId).toBe('u_1') + }) + + it('filters by source using identity prefix', async () => { + const telegramUsers = await store.listUsers({ source: 'telegram' }) + expect(telegramUsers).toHaveLength(3) + expect(telegramUsers.map((u) => u.userId).sort()).toEqual(['u_1', 'u_3', 'u_4']) + }) + + it('combines role + source filters', async () => { + const result = await store.listUsers({ role: 'member', source: 'discord' }) + expect(result).toHaveLength(1) + expect(result[0].userId).toBe('u_2') + }) + + it('returns empty array when no users match', async () => { + expect(await store.listUsers({ role: 'viewer', source: 'discord' })).toHaveLength(0) + }) + }) + + // ─── Identity CRUD ─── + + describe('getIdentity / putIdentity', () => { + it('returns undefined for unknown identityId', async () => { + const id = formatIdentityId('telegram', '999') + expect(await store.getIdentity(id)).toBeUndefined() + }) + + it('stores and retrieves an identity', async () => { + const identity = makeIdentity() + await store.putIdentity(identity) + expect(await store.getIdentity(identity.identityId)).toEqual(identity) + }) + + it('overwrites existing identity', async () => { + const identity = makeIdentity() + await store.putIdentity(identity) + const updated = { ...identity, platformUsername: 'newname' } + await store.putIdentity(updated) + expect(await store.getIdentity(identity.identityId)).toEqual(updated) + }) + }) + + describe('deleteIdentity', () => { + it('removes the identity', async () => { + const identity = makeIdentity() + await store.putIdentity(identity) + await store.deleteIdentity(identity.identityId) + expect(await store.getIdentity(identity.identityId)).toBeUndefined() + }) + + it('is idempotent for non-existent identities', async () => { + const id = formatIdentityId('telegram', '999') + await expect(store.deleteIdentity(id)).resolves.toBeUndefined() + }) + }) + + describe('getIdentitiesForUser', () => { + it('returns empty array for unknown userId', async () => { + expect(await store.getIdentitiesForUser('u_ghost')).toEqual([]) + }) + + it('returns empty array for user with no identities', async () => { + const user = makeUser({ identities: [] }) + await store.putUser(user) + expect(await store.getIdentitiesForUser(user.userId)).toEqual([]) + }) + + it('returns all identities linked to a user', async () => { + const id1 = makeIdentity({ identityId: formatIdentityId('telegram', '1') as IdentityId, source: 'telegram', platformId: '1' }) + const id2 = makeIdentity({ identityId: formatIdentityId('discord', '2') as IdentityId, source: 'discord', platformId: '2' }) + const user = makeUser({ identities: [id1.identityId, id2.identityId] }) + await store.putUser(user) + await store.putIdentity(id1) + await store.putIdentity(id2) + + const result = await store.getIdentitiesForUser(user.userId) + expect(result).toHaveLength(2) + expect(result.map((r) => r.identityId).sort()).toEqual([id1.identityId, id2.identityId].sort()) + }) + + it('skips missing identity records gracefully', async () => { + const id1 = makeIdentity() + const user = makeUser({ identities: [id1.identityId] }) + await store.putUser(user) + // Intentionally don't put the identity record + + const result = await store.getIdentitiesForUser(user.userId) + expect(result).toHaveLength(0) + }) + }) + + // ─── Secondary indexes ─── + + describe('username index', () => { + it('returns undefined for unknown username', async () => { + expect(await store.getUserIdByUsername('nobody')).toBeUndefined() + }) + + it('stores and resolves username → userId', async () => { + await store.setUsernameIndex('Alice', 'u_alice') + expect(await store.getUserIdByUsername('Alice')).toBe('u_alice') + }) + + it('is case-insensitive', async () => { + await store.setUsernameIndex('Alice', 'u_alice') + expect(await store.getUserIdByUsername('alice')).toBe('u_alice') + expect(await store.getUserIdByUsername('ALICE')).toBe('u_alice') + }) + + it('deleteUsernameIndex removes the entry', async () => { + await store.setUsernameIndex('alice', 'u_alice') + await store.deleteUsernameIndex('alice') + expect(await store.getUserIdByUsername('alice')).toBeUndefined() + }) + }) + + describe('source index', () => { + it('returns undefined for unknown source+platformId', async () => { + expect(await store.getIdentityIdBySource('telegram', '999')).toBeUndefined() + }) + + it('stores and resolves source+platformId → identityId', async () => { + const identityId = formatIdentityId('telegram', '123') + await store.setSourceIndex('telegram', '123', identityId) + expect(await store.getIdentityIdBySource('telegram', '123')).toBe(identityId) + }) + + it('deleteSourceIndex removes the entry', async () => { + const identityId = formatIdentityId('telegram', '123') + await store.setSourceIndex('telegram', '123', identityId) + await store.deleteSourceIndex('telegram', '123') + expect(await store.getIdentityIdBySource('telegram', '123')).toBeUndefined() + }) + + it('different sources with same platformId are independent', async () => { + const telegramId = formatIdentityId('telegram', '123') + const discordId = formatIdentityId('discord', '123') + await store.setSourceIndex('telegram', '123', telegramId) + await store.setSourceIndex('discord', '123', discordId) + + expect(await store.getIdentityIdBySource('telegram', '123')).toBe(telegramId) + expect(await store.getIdentityIdBySource('discord', '123')).toBe(discordId) + }) + }) + + // ─── getUserCount ─── + + describe('getUserCount', () => { + it('returns 0 when no users', async () => { + expect(await store.getUserCount()).toBe(0) + }) + + it('returns correct count after insertions', async () => { + await store.putUser(makeUser({ userId: 'u_1' })) + await store.putUser(makeUser({ userId: 'u_2' })) + expect(await store.getUserCount()).toBe(2) + }) + + it('decrements after deleteUser', async () => { + await store.putUser(makeUser({ userId: 'u_1' })) + await store.putUser(makeUser({ userId: 'u_2' })) + await store.deleteUser('u_1') + expect(await store.getUserCount()).toBe(1) + }) + }) +}) diff --git a/src/plugins/identity/identity-service.ts b/src/plugins/identity/identity-service.ts new file mode 100644 index 00000000..70118d8b --- /dev/null +++ b/src/plugins/identity/identity-service.ts @@ -0,0 +1,410 @@ +import { nanoid } from 'nanoid' +import { formatIdentityId } from './types.js' +import type { IdentityStore } from './store/identity-store.js' +import type { + IdentityService, + IdentityRecord, + UserRecord, + IdentityId, + UserRole, + SessionInfo, +} from './types.js' + +/** + * Core implementation of the identity service. + * + * Coordinates reads/writes through an IdentityStore and emits events via a + * provided callback (EventBus integration). The emitter decoupling allows the + * service to be tested without a full EventBus, and lets it be registered in + * the plugin's setup() before the EventBus is fully wired. + * + * SessionManager integration is optional — if not provided, getSessionsFor() + * always returns an empty array (graceful degradation during early boot). + */ +export class IdentityServiceImpl implements IdentityService { + private readonly registeredSources = new Set() + + /** + * @param store - Persistence layer for user/identity records and indexes. + * @param emitEvent - Callback to publish events on the EventBus. + * @param getSessionsForUser - Optional function to look up sessions for a userId. + */ + constructor( + private readonly store: IdentityStore, + private readonly emitEvent: (event: string, payload: unknown) => void, + private readonly getSessionsForUser?: (userId: string) => Promise, + ) {} + + // ─── Lookups ─── + + async getUser(userId: string): Promise { + return this.store.getUser(userId) + } + + async getUserByUsername(username: string): Promise { + const userId = await this.store.getUserIdByUsername(username) + if (!userId) return undefined + return this.store.getUser(userId) + } + + async getIdentity(identityId: IdentityId): Promise { + return this.store.getIdentity(identityId) + } + + async getUserByIdentity(identityId: IdentityId): Promise { + const identity = await this.store.getIdentity(identityId) + if (!identity) return undefined + return this.store.getUser(identity.userId) + } + + async getIdentitiesFor(userId: string): Promise { + return this.store.getIdentitiesForUser(userId) + } + + async listUsers(filter?: { source?: string; role?: UserRole }): Promise { + return this.store.listUsers(filter) + } + + /** + * Case-insensitive substring search across displayName, username, and platform + * usernames. Designed for admin tooling, not high-frequency user-facing paths. + */ + async searchUsers(query: string): Promise { + const all = await this.store.listUsers() + const q = query.toLowerCase() + + const matched: UserRecord[] = [] + for (const user of all) { + const nameMatch = + user.displayName.toLowerCase().includes(q) || + (user.username && user.username.toLowerCase().includes(q)) + + if (nameMatch) { + matched.push(user) + continue + } + + // Also search platform usernames via identity records + const identities = await this.store.getIdentitiesForUser(user.userId) + const platformMatch = identities.some( + (id) => id.platformUsername && id.platformUsername.toLowerCase().includes(q), + ) + if (platformMatch) matched.push(user) + } + + return matched + } + + async getSessionsFor(userId: string): Promise { + if (!this.getSessionsForUser) return [] + return this.getSessionsForUser(userId) + } + + // ─── Mutations ─── + + /** + * Creates a user + identity pair atomically. + * The first ever user in the system is auto-promoted to admin — this ensures + * there is always at least one admin when bootstrapping a fresh instance. + */ + async createUserWithIdentity(data: { + displayName: string + username?: string + role?: UserRole + source: string + platformId: string + platformUsername?: string + platformDisplayName?: string + }): Promise<{ user: UserRecord; identity: IdentityRecord }> { + const now = new Date().toISOString() + const userId = `u_${nanoid(12)}` + const identityId = formatIdentityId(data.source, data.platformId) + + // First user ever gets admin to avoid a locked-out instance + const count = await this.store.getUserCount() + const role: UserRole = count === 0 ? 'admin' : (data.role ?? 'member') + + const user: UserRecord = { + userId, + displayName: data.displayName, + username: data.username, + role, + identities: [identityId], + pluginData: {}, + createdAt: now, + updatedAt: now, + lastSeenAt: now, + } + + const identity: IdentityRecord = { + identityId, + userId, + source: data.source, + platformId: data.platformId, + platformUsername: data.platformUsername, + platformDisplayName: data.platformDisplayName, + createdAt: now, + updatedAt: now, + } + + await this.store.putUser(user) + await this.store.putIdentity(identity) + await this.store.setSourceIndex(data.source, data.platformId, identityId) + if (data.username) { + await this.store.setUsernameIndex(data.username, userId) + } + + this.emitEvent('identity:created', { userId, identityId, source: data.source, displayName: data.displayName }) + return { user, identity } + } + + async updateUser( + userId: string, + changes: Partial>, + ): Promise { + const user = await this.store.getUser(userId) + if (!user) throw new Error(`User not found: ${userId}`) + + // Username index must stay consistent — remove old, add new + if (changes.username !== undefined && changes.username !== user.username) { + if (changes.username) { + const existingId = await this.store.getUserIdByUsername(changes.username) + if (existingId && existingId !== userId) { + throw new Error(`Username already taken: ${changes.username}`) + } + await this.store.setUsernameIndex(changes.username, userId) + } + if (user.username) { + await this.store.deleteUsernameIndex(user.username) + } + } + + const updated: UserRecord = { + ...user, + ...changes, + updatedAt: new Date().toISOString(), + } + await this.store.putUser(updated) + this.emitEvent('identity:updated', { userId, changes: Object.keys(changes) }) + return updated + } + + async setRole(userId: string, role: UserRole): Promise { + const user = await this.store.getUser(userId) + if (!user) throw new Error(`User not found: ${userId}`) + + const oldRole = user.role + await this.store.putUser({ ...user, role, updatedAt: new Date().toISOString() }) + this.emitEvent('identity:roleChanged', { userId, oldRole, newRole: role }) + } + + async createIdentity( + userId: string, + identity: { + source: string + platformId: string + platformUsername?: string + platformDisplayName?: string + }, + ): Promise { + const user = await this.store.getUser(userId) + if (!user) throw new Error(`User not found: ${userId}`) + + const now = new Date().toISOString() + const identityId = formatIdentityId(identity.source, identity.platformId) + const record: IdentityRecord = { + identityId, + userId, + source: identity.source, + platformId: identity.platformId, + platformUsername: identity.platformUsername, + platformDisplayName: identity.platformDisplayName, + createdAt: now, + updatedAt: now, + } + + await this.store.putIdentity(record) + await this.store.setSourceIndex(identity.source, identity.platformId, identityId) + + // Add to user's identity list + const updatedUser: UserRecord = { + ...user, + identities: [...user.identities, identityId], + updatedAt: now, + } + await this.store.putUser(updatedUser) + + return record + } + + /** + * Links two identities into a single user. + * + * When identities belong to different users, the younger (more recently created) + * user is merged into the older one. We keep the older user as the canonical + * record because it likely has more history, sessions, and plugin data. + * + * Merge strategy for pluginData: per-namespace, the winning user's data takes + * precedence. The younger user's data only fills in missing namespaces. + */ + async link(identityIdA: IdentityId, identityIdB: IdentityId): Promise { + const identityA = await this.store.getIdentity(identityIdA) + const identityB = await this.store.getIdentity(identityIdB) + if (!identityA) throw new Error(`Identity not found: ${identityIdA}`) + if (!identityB) throw new Error(`Identity not found: ${identityIdB}`) + + // Already on the same user — no-op + if (identityA.userId === identityB.userId) return + + const userA = await this.store.getUser(identityA.userId) + const userB = await this.store.getUser(identityB.userId) + if (!userA) throw new Error(`User not found: ${identityA.userId}`) + if (!userB) throw new Error(`User not found: ${identityB.userId}`) + + // Keep the older user as the survivor + const [keep, merge] = + userA.createdAt <= userB.createdAt ? [userA, userB] : [userB, userA] + + const now = new Date().toISOString() + + // Move all of the younger user's identities to the surviving user + for (const identityId of merge.identities) { + const identity = await this.store.getIdentity(identityId) + if (!identity) continue + const updated: IdentityRecord = { ...identity, userId: keep.userId, updatedAt: now } + await this.store.putIdentity(updated) + } + + // Merge pluginData: keep's namespaces win; merge fills in missing ones + const mergedPluginData: Record> = { ...merge.pluginData } + for (const [ns, nsData] of Object.entries(keep.pluginData)) { + mergedPluginData[ns] = nsData + } + + // Clean up username index for the merged user if it had one + if (merge.username) { + await this.store.deleteUsernameIndex(merge.username) + } + + const updatedKeep: UserRecord = { + ...keep, + identities: [...new Set([...keep.identities, ...merge.identities])], + pluginData: mergedPluginData, + updatedAt: now, + } + await this.store.putUser(updatedKeep) + await this.store.deleteUser(merge.userId) + + // The "linked" identity is the one that belonged to the merged user — i.e. identityIdB + // (the caller-supplied second identity). If the merge resolved in the opposite direction, + // identityIdA is the one moving to the survivor. + const linkedIdentityId = identityA.userId === merge.userId ? identityIdA : identityIdB + this.emitEvent('identity:linked', { userId: keep.userId, identityId: linkedIdentityId, linkedFrom: merge.userId }) + this.emitEvent('identity:userMerged', { + keptUserId: keep.userId, + mergedUserId: merge.userId, + movedIdentities: merge.identities, + }) + } + + /** + * Separates an identity from its user into a new standalone account. + * Throws if it's the user's last identity — unlinking would produce a + * ghost user with no way to authenticate. + */ + async unlink(identityId: IdentityId): Promise { + const identity = await this.store.getIdentity(identityId) + if (!identity) throw new Error(`Identity not found: ${identityId}`) + + const user = await this.store.getUser(identity.userId) + if (!user) throw new Error(`User not found: ${identity.userId}`) + + if (user.identities.length <= 1) { + throw new Error(`Cannot unlink the last identity from user ${identity.userId}`) + } + + const now = new Date().toISOString() + const newUserId = `u_${nanoid(12)}` + + // Create new user for the separated identity + const newUser: UserRecord = { + userId: newUserId, + displayName: identity.platformDisplayName ?? identity.platformUsername ?? 'User', + role: 'member', + identities: [identityId], + pluginData: {}, + createdAt: now, + updatedAt: now, + lastSeenAt: now, + } + await this.store.putUser(newUser) + + // Point the identity at the new user + await this.store.putIdentity({ ...identity, userId: newUserId, updatedAt: now }) + + // Remove the identity from the original user + const updatedUser: UserRecord = { + ...user, + identities: user.identities.filter((id) => id !== identityId), + updatedAt: now, + } + await this.store.putUser(updatedUser) + + this.emitEvent('identity:unlinked', { + userId: user.userId, + identityId, + newUserId, + }) + } + + // ─── Plugin data ─── + + async setPluginData(userId: string, pluginName: string, key: string, value: unknown): Promise { + const user = await this.store.getUser(userId) + if (!user) throw new Error(`User not found: ${userId}`) + + const pluginData = { ...user.pluginData } + pluginData[pluginName] = { ...(pluginData[pluginName] ?? {}), [key]: value } + + await this.store.putUser({ ...user, pluginData, updatedAt: new Date().toISOString() }) + } + + async getPluginData(userId: string, pluginName: string, key: string): Promise { + const user = await this.store.getUser(userId) + if (!user) return undefined + return user.pluginData[pluginName]?.[key] + } + + // ─── Source registry ─── + + registerSource(source: string): void { + this.registeredSources.add(source) + } + + /** + * Resolves a username mention to platform-specific info for the given source. + * Finds the user by username, then scans their identities for the matching source. + * Returns found=false when no user or no identity for that source exists. + */ + async resolveCanonicalMention( + username: string, + source: string, + ): Promise<{ found: boolean; platformId?: string; platformUsername?: string }> { + const user = await this.getUserByUsername(username) + if (!user) return { found: false } + + const identities = await this.store.getIdentitiesForUser(user.userId) + const sourceIdentity = identities.find((id) => id.source === source) + if (!sourceIdentity) return { found: false } + + return { + found: true, + platformId: sourceIdentity.platformId, + platformUsername: sourceIdentity.platformUsername, + } + } + + async getUserCount(): Promise { + return this.store.getUserCount() + } +} diff --git a/src/plugins/identity/index.ts b/src/plugins/identity/index.ts new file mode 100644 index 00000000..a2f6cf40 --- /dev/null +++ b/src/plugins/identity/index.ts @@ -0,0 +1,102 @@ +import type { OpenACPPlugin } from '../../core/plugin/types.js' +import { IdentityServiceImpl } from './identity-service.js' +import { KvIdentityStore } from './store/kv-identity-store.js' +import { createAutoRegisterHandler } from './middleware/auto-register.js' +import { formatIdentityId } from './types.js' +import type { IdentityId } from './types.js' +import { Hook } from '../../core/events.js' + +/** + * Identity plugin — user identity, cross-platform linking, and role-based access. + * + * Boot order requirement: must come after @openacp/security so that blocked users + * are rejected (at priority 100) before we create identity records for them + * (this middleware runs at priority 110). + */ +function createIdentityPlugin(): OpenACPPlugin { + return { + name: '@openacp/identity', + version: '1.0.0', + description: 'User identity, cross-platform linking, and role-based access', + essential: false, + permissions: [ + 'storage:read', + 'storage:write', + 'middleware:register', + 'services:register', + 'services:use', + 'events:emit', + 'events:read', + 'commands:register', + 'kernel:access', + ], + optionalPluginDependencies: { + '@openacp/api-server': '>=1.0.0', + }, + + async setup(ctx) { + const store = new KvIdentityStore(ctx.storage) + const service = new IdentityServiceImpl(store, (event, data) => { + ctx.emit(event, data) + }) + + ctx.registerService('identity', service) + + // Auto-registration runs at priority 110 — after security (100) rejects blocked users + ctx.registerMiddleware(Hook.MESSAGE_INCOMING, { + priority: 110, + handler: createAutoRegisterHandler(service, store), + }) + + // /whoami — lets users set their display name and username + ctx.registerCommand({ + name: 'whoami', + description: 'Set your display name and username', + usage: '[name]', + category: 'plugin', + async handler(args) { + const name = args.raw.trim() + if (!name) { + return { type: 'text', text: 'Usage: /whoami ' } + } + + const identityId = formatIdentityId(args.channelId, args.userId) as IdentityId + const user = await service.getUserByIdentity(identityId) + if (!user) { + return { type: 'error', message: 'User not found — send a message first.' } + } + + try { + // Derive username from display name by lowercasing and stripping invalid chars + const username = name.toLowerCase().replace(/[^a-z0-9_]/g, '') + await service.updateUser(user.userId, { displayName: name, username }) + return { type: 'text', text: `Display name set to "${name}", username: @${username}` } + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err) + return { type: 'error', message } + } + }, + }) + + // Register REST routes if api-server is available. + // Uses optional chaining so the identity plugin boots fine without api-server. + const apiServer = ctx.getService<{ registerPlugin(prefix: string, plugin: any, opts?: any): void }>('api-server') + if (apiServer) { + const tokenStore = ctx.getService<{ + getUserId(id: string): string | undefined + setUserId(id: string, uid: string): void + }>('token-store') + const { registerIdentityRoutes } = await import('./routes/users.js') + const { registerSetupRoutes } = await import('./routes/setup.js') + apiServer.registerPlugin('/api/v1/identity', async (app: any) => { + registerIdentityRoutes(app, { service, tokenStore: tokenStore ?? undefined }) + registerSetupRoutes(app, { service, tokenStore: tokenStore ?? undefined }) + }, { auth: true }) + } + + ctx.log.info(`Identity service ready (${await service.getUserCount()} users)`) + }, + } +} + +export default createIdentityPlugin() diff --git a/src/plugins/identity/middleware/auto-register.ts b/src/plugins/identity/middleware/auto-register.ts new file mode 100644 index 00000000..28f7a83c --- /dev/null +++ b/src/plugins/identity/middleware/auto-register.ts @@ -0,0 +1,104 @@ +import type { IdentityServiceImpl } from '../identity-service.js' +import type { IdentityStore } from '../store/identity-store.js' +import { formatIdentityId } from '../types.js' +import type { UserRecord } from '../types.js' +import type { MiddlewarePayloadMap } from '../../../core/plugin/types.js' + +interface ChannelUser { + channelId: string + userId: string + displayName?: string + username?: string +} + +// 5 minutes in milliseconds — max frequency for persisting lastSeenAt updates +const LAST_SEEN_THROTTLE_MS = 5 * 60 * 1000 + +type IncomingPayload = MiddlewarePayloadMap['message:incoming'] + +/** + * Creates the message:incoming middleware for identity auto-registration. + * + * Runs at priority 110 — after security (100) so blocked users are rejected + * before we bother creating identity records for them. + * + * On each incoming message: + * 1. Look up the identity by {channelId}:{userId} + * 2. If missing, create a new user + identity via the service + * 3. If found, throttle lastSeenAt updates and sync platform fields if changed + * 4. Inject meta.identity for downstream hooks (agent:beforePrompt, etc.) + */ +export function createAutoRegisterHandler(service: IdentityServiceImpl, store: IdentityStore) { + // In-memory throttle map — resets on process restart, which is acceptable + // since lastSeenAt is a best-effort field (no hard SLA on freshness) + const lastSeenThrottle = new Map() + + return async ( + payload: IncomingPayload, + next: () => Promise, + ): Promise => { + const { channelId, userId, meta } = payload + const identityId = formatIdentityId(channelId, userId) + const channelUser = (meta?.channelUser as ChannelUser | undefined) + + let identity = await store.getIdentity(identityId) + let user: UserRecord | undefined + + if (!identity) { + // First time we've seen this user on this channel — create their account + const result = await service.createUserWithIdentity({ + displayName: channelUser?.displayName ?? userId, + username: channelUser?.username, + source: channelId, + platformId: userId, + platformUsername: channelUser?.username, + platformDisplayName: channelUser?.displayName, + }) + user = result.user + identity = result.identity + } else { + user = await service.getUser(identity.userId) + + // Guard against an identity pointing at a deleted user (data inconsistency) + if (!user) return next() + + // Throttled lastSeenAt update — writing on every message would cause excessive I/O + const now = Date.now() + const lastSeen = lastSeenThrottle.get(user.userId) + if (!lastSeen || now - lastSeen > LAST_SEEN_THROTTLE_MS) { + lastSeenThrottle.set(user.userId, now) + await store.putUser({ ...user, lastSeenAt: new Date(now).toISOString() }) + } + + // Sync platform display fields if the adapter reports updated values + if (channelUser) { + const needsUpdate = + (channelUser.displayName !== undefined && channelUser.displayName !== identity.platformDisplayName) || + (channelUser.username !== undefined && channelUser.username !== identity.platformUsername) + + if (needsUpdate) { + await store.putIdentity({ + ...identity, + platformDisplayName: channelUser.displayName ?? identity.platformDisplayName, + platformUsername: channelUser.username ?? identity.platformUsername, + updatedAt: new Date().toISOString(), + }) + } + } + } + + // Inject a lightweight identity snapshot into TurnMeta for downstream hooks. + // Avoids each downstream hook from doing a separate store lookup. + if (meta) { + meta.identity = { + userId: user.userId, + identityId: identity.identityId, + displayName: user.displayName, + username: user.username, + role: user.role, + } + } + + return next() + } +} diff --git a/src/plugins/identity/routes/setup.ts b/src/plugins/identity/routes/setup.ts new file mode 100644 index 00000000..e6464faa --- /dev/null +++ b/src/plugins/identity/routes/setup.ts @@ -0,0 +1,102 @@ +import { randomBytes } from 'node:crypto' +import type { FastifyInstance } from 'fastify' +import type { IdentityServiceImpl } from '../identity-service.js' + +interface SetupDeps { + service: IdentityServiceImpl + tokenStore: + | { + getUserId(tokenId: string): string | undefined + setUserId(tokenId: string, userId: string): void + } + | undefined +} + +/** + * Short-lived in-memory link codes for multi-device account linking. + * + * These are intentionally not persisted: a restart invalidates all pending codes, + * which is acceptable given the 5-minute TTL. Persistence would add complexity + * (storage, migration) for minimal practical benefit. + */ +const linkCodes = new Map() + +/** + * Registers identity setup routes under the plugin's prefix (e.g. /api/v1/identity). + * + * POST /setup — first-time identity claim for API token holders. + * POST /link-code — generate a one-time code to link a second device/token. + */ +export function registerSetupRoutes(app: FastifyInstance, deps: SetupDeps): void { + const { service, tokenStore } = deps + + // POST /setup — claim an identity for the current JWT token. + // On first call: creates a new user and links the token. + // If linkCode is provided: links the token to an existing user instead. + // Subsequent calls with the same token return the already-linked user. + app.post('/setup', async (request, reply) => { + const auth = (request as any).auth + if (!auth?.tokenId) return reply.status(401).send({ error: 'JWT required' }) + + // Idempotent: if this token is already linked, return the existing user + const existingUserId = tokenStore?.getUserId?.(auth.tokenId) + if (existingUserId) { + const user = await service.getUser(existingUserId) + if (user) return user + } + + const body = request.body as any + + if (body?.linkCode) { + // Link this token to an existing user account via a short-lived link code. + // The code was generated by the existing token's /link-code endpoint. + const entry = linkCodes.get(body.linkCode as string) + if (!entry || entry.expiresAt < Date.now()) { + return reply.status(401).send({ error: 'Invalid or expired link code' }) + } + linkCodes.delete(body.linkCode as string) + + await service.createIdentity(entry.userId, { + source: 'api', + platformId: auth.tokenId as string, + }) + + tokenStore?.setUserId?.(auth.tokenId as string, entry.userId) + return service.getUser(entry.userId) + } + + // New user path — displayName is required + if (!body?.displayName) return reply.status(400).send({ error: 'displayName is required' }) + + const { user } = await service.createUserWithIdentity({ + displayName: body.displayName as string, + username: body.username as string | undefined, + source: 'api', + platformId: auth.tokenId as string, + }) + + tokenStore?.setUserId?.(auth.tokenId as string, user.userId) + return user + }) + + // POST /link-code — generate a one-time code so another token can link to this user. + // The caller must already have a linked identity. Code expires in 5 minutes. + app.post('/link-code', async (request, reply) => { + const auth = (request as any).auth + if (!auth?.tokenId) return reply.status(401).send({ error: 'JWT required' }) + + const userId = tokenStore?.getUserId?.(auth.tokenId as string) + if (!userId) return reply.status(403).send({ error: 'Identity not set up' }) + + const code = randomBytes(16).toString('hex') + const expiresAt = Date.now() + 5 * 60 * 1000 + + // Evict stale codes before inserting to keep the map bounded + for (const [k, v] of linkCodes) { + if (v.expiresAt < Date.now()) linkCodes.delete(k) + } + + linkCodes.set(code, { userId, expiresAt }) + return { linkCode: code, expiresAt: new Date(expiresAt).toISOString() } + }) +} diff --git a/src/plugins/identity/routes/users.ts b/src/plugins/identity/routes/users.ts new file mode 100644 index 00000000..689ea84a --- /dev/null +++ b/src/plugins/identity/routes/users.ts @@ -0,0 +1,124 @@ +import type { FastifyInstance } from 'fastify' +import type { IdentityServiceImpl } from '../identity-service.js' +import type { IdentityId } from '../types.js' + +interface RouteDeps { + service: IdentityServiceImpl + tokenStore: { getUserId(tokenId: string): string | undefined } | undefined +} + +/** + * Registers identity user routes under the plugin's prefix (e.g. /api/v1/identity). + * + * All routes require auth (enforced by the parent registerPlugin call). + * The /users/me and /resolve/:identityId routes bridge the JWT token identity + * to the canonical user record stored by the identity service. + */ +export function registerIdentityRoutes(app: FastifyInstance, deps: RouteDeps): void { + const { service, tokenStore } = deps + + // Resolves the caller's userId from the JWT token via the token-store mapping. + // Returns undefined if the token has not been associated with an identity yet. + function resolveUserId(request: any): string | undefined { + return tokenStore?.getUserId?.(request.auth?.tokenId) + } + + // GET /users — list users with optional filters + app.get('/users', async (request) => { + const { source, role, q } = request.query as any + if (q) return service.searchUsers(q as string) + return service.listUsers({ source: source as string | undefined, role: role as any }) + }) + + // GET /users/me — own profile (must be declared BEFORE /:userId to avoid shadowing) + app.get('/users/me', async (request, reply) => { + const userId = resolveUserId(request) + if (!userId) return reply.status(403).send({ error: 'Identity not set up' }) + const user = await service.getUser(userId) + if (!user) return reply.status(404).send({ error: 'User not found' }) + return user + }) + + // PUT /users/me — update own profile fields + app.put('/users/me', async (request, reply) => { + const userId = resolveUserId(request) + if (!userId) { + return reply.status(403).send({ error: 'Identity not set up. Call POST /identity/setup first.' }) + } + const body = request.body as any + return service.updateUser(userId, { + displayName: body.displayName, + username: body.username, + avatarUrl: body.avatarUrl, + timezone: body.timezone, + locale: body.locale, + }) + }) + + // GET /users/:userId + app.get('/users/:userId', async (request, reply) => { + const { userId } = request.params as any + const user = await service.getUser(userId as string) + if (!user) return reply.status(404).send({ error: 'User not found' }) + return user + }) + + // PUT /users/:userId/role — admin-only role assignment + app.put('/users/:userId/role', async (request, reply) => { + const callerUserId = resolveUserId(request) + if (!callerUserId) return reply.status(403).send({ error: 'Identity not set up' }) + const caller = await service.getUser(callerUserId) + if (!caller || caller.role !== 'admin') return reply.status(403).send({ error: 'Admin only' }) + + const { userId } = request.params as any + const { role } = request.body as any + await service.setRole(userId as string, role) + return { ok: true } + }) + + // GET /users/:userId/identities — all platform identities for a user + app.get('/users/:userId/identities', async (request) => { + const { userId } = request.params as any + return service.getIdentitiesFor(userId as string) + }) + + // GET /resolve/:identityId — look up user by platform identity ID + app.get('/resolve/:identityId', async (request, reply) => { + const { identityId } = request.params as any + const user = await service.getUserByIdentity(identityId as IdentityId) + if (!user) return reply.status(404).send({ error: 'Identity not found' }) + const identity = await service.getIdentity(identityId as IdentityId) + return { user, identity } + }) + + // POST /link — merge two identities into a single user account (admin only) + app.post('/link', async (request, reply) => { + const callerUserId = resolveUserId(request) + if (!callerUserId) return reply.status(403).send({ error: 'Identity not set up' }) + const caller = await service.getUser(callerUserId) + if (!caller || caller.role !== 'admin') return reply.status(403).send({ error: 'Admin only' }) + + const { identityIdA, identityIdB } = request.body as any + await service.link(identityIdA as IdentityId, identityIdB as IdentityId) + return { ok: true } + }) + + // POST /unlink — split an identity off into its own user account (admin only) + app.post('/unlink', async (request, reply) => { + const callerUserId = resolveUserId(request) + if (!callerUserId) return reply.status(403).send({ error: 'Identity not set up' }) + const caller = await service.getUser(callerUserId) + if (!caller || caller.role !== 'admin') return reply.status(403).send({ error: 'Admin only' }) + + const { identityId } = request.body as any + await service.unlink(identityId as IdentityId) + return { ok: true } + }) + + // GET /search — search users by display name or username + app.get('/search', async (request) => { + const { q } = request.query as any + if (!q) return [] + return service.searchUsers(q as string) + }) +} diff --git a/src/plugins/identity/store/identity-store.ts b/src/plugins/identity/store/identity-store.ts new file mode 100644 index 00000000..48c1e340 --- /dev/null +++ b/src/plugins/identity/store/identity-store.ts @@ -0,0 +1,37 @@ +import type { UserRecord, IdentityRecord, IdentityId, UserRole } from '../types.js' + +/** + * Persistence contract for the identity system. + * + * Implementations must be consistent: after a put/delete, subsequent reads + * must reflect the change. All index operations (setUsernameIndex, etc.) must + * be called explicitly by the service layer — the store does not auto-index. + */ +export interface IdentityStore { + // === User CRUD === + getUser(userId: string): Promise + putUser(record: UserRecord): Promise + deleteUser(userId: string): Promise + listUsers(filter?: { source?: string; role?: UserRole }): Promise + + // === Identity CRUD === + getIdentity(identityId: IdentityId): Promise + putIdentity(record: IdentityRecord): Promise + deleteIdentity(identityId: IdentityId): Promise + /** Returns all identity records linked to a user. */ + getIdentitiesForUser(userId: string): Promise + + // === Secondary indexes === + /** Resolves a username (case-insensitive) to its userId. */ + getUserIdByUsername(username: string): Promise + /** Resolves a source+platformId pair to an identityId. */ + getIdentityIdBySource(source: string, platformId: string): Promise + + // === Index mutations (managed by service layer) === + setUsernameIndex(username: string, userId: string): Promise + deleteUsernameIndex(username: string): Promise + setSourceIndex(source: string, platformId: string, identityId: IdentityId): Promise + deleteSourceIndex(source: string, platformId: string): Promise + + getUserCount(): Promise +} diff --git a/src/plugins/identity/store/kv-identity-store.ts b/src/plugins/identity/store/kv-identity-store.ts new file mode 100644 index 00000000..6a2561a9 --- /dev/null +++ b/src/plugins/identity/store/kv-identity-store.ts @@ -0,0 +1,124 @@ +import type { PluginStorage } from '../../../core/plugin/types.js' +import type { IdentityStore } from './identity-store.js' +import type { UserRecord, IdentityRecord, IdentityId, UserRole } from '../types.js' + +/** + * PluginStorage-backed implementation of IdentityStore. + * + * Key layout in kv.json: + * users/{userId} → UserRecord + * identities/{identityId} → IdentityRecord + * idx/usernames/{username} → userId (lowercase for case-insensitive lookup) + * idx/sources/{source}/{pid} → identityId + * + * The flat kv.json layout avoids nested directories and makes the data portable. + * All identity records are also referenced via the owning user's `identities` array, + * enabling O(n) user-scoped lookups without a separate index. + */ +export class KvIdentityStore implements IdentityStore { + constructor(private readonly storage: PluginStorage) {} + + // === User CRUD === + + async getUser(userId: string): Promise { + return this.storage.get(`users/${userId}`) + } + + async putUser(record: UserRecord): Promise { + await this.storage.set(`users/${record.userId}`, record) + } + + async deleteUser(userId: string): Promise { + await this.storage.delete(`users/${userId}`) + } + + /** + * Lists all users, optionally filtered by role or source. + * Filtering by source requires scanning all identity records for the user, + * which is acceptable given the expected user count (hundreds, not millions). + */ + async listUsers(filter?: { source?: string; role?: UserRole }): Promise { + const keys = await this.storage.keys('users/') + const users: UserRecord[] = [] + + for (const key of keys) { + const user = await this.storage.get(key) + if (!user) continue + if (filter?.role && user.role !== filter.role) continue + + if (filter?.source) { + // Check if user has at least one identity from the requested source + const hasSource = user.identities.some((id) => id.startsWith(`${filter.source}:`)) + if (!hasSource) continue + } + + users.push(user) + } + + return users + } + + // === Identity CRUD === + + async getIdentity(identityId: IdentityId): Promise { + return this.storage.get(`identities/${identityId}`) + } + + async putIdentity(record: IdentityRecord): Promise { + await this.storage.set(`identities/${record.identityId}`, record) + } + + async deleteIdentity(identityId: IdentityId): Promise { + await this.storage.delete(`identities/${identityId}`) + } + + /** + * Fetches all identity records for a user by scanning their identities array. + * Avoids a full table scan by leveraging the user record as a secondary index. + */ + async getIdentitiesForUser(userId: string): Promise { + const user = await this.getUser(userId) + if (!user) return [] + + const records: IdentityRecord[] = [] + for (const identityId of user.identities) { + const record = await this.getIdentity(identityId) + if (record) records.push(record) + } + return records + } + + // === Secondary indexes === + + async getUserIdByUsername(username: string): Promise { + // Lowercase to enforce case-insensitive uniqueness + return this.storage.get(`idx/usernames/${username.toLowerCase()}`) + } + + async getIdentityIdBySource(source: string, platformId: string): Promise { + return this.storage.get(`idx/sources/${source}/${platformId}`) + } + + // === Index mutations === + + async setUsernameIndex(username: string, userId: string): Promise { + await this.storage.set(`idx/usernames/${username.toLowerCase()}`, userId) + } + + async deleteUsernameIndex(username: string): Promise { + await this.storage.delete(`idx/usernames/${username.toLowerCase()}`) + } + + async setSourceIndex(source: string, platformId: string, identityId: IdentityId): Promise { + await this.storage.set(`idx/sources/${source}/${platformId}`, identityId) + } + + async deleteSourceIndex(source: string, platformId: string): Promise { + await this.storage.delete(`idx/sources/${source}/${platformId}`) + } + + async getUserCount(): Promise { + const keys = await this.storage.keys('users/') + return keys.length + } +} diff --git a/src/plugins/identity/types.ts b/src/plugins/identity/types.ts new file mode 100644 index 00000000..e72e7a9e --- /dev/null +++ b/src/plugins/identity/types.ts @@ -0,0 +1,170 @@ +/** + * Branded string type for identity IDs. Format: '{source}:{platformId}'. + * Collision between identity spaces is structurally impossible — the source + * prefix guarantees no two platforms share an ID space. + */ +export type IdentityId = string & { readonly __brand: 'IdentityId' } + +/** + * Validates and creates an IdentityId from source and platformId. + * The colon delimiter allows parsing back to components without ambiguity, + * because source names must not contain colons. + */ +export function formatIdentityId(source: string, platformId: string): IdentityId { + return `${source}:${platformId}` as IdentityId +} + +/** + * Splits an IdentityId into its source and platformId components. + * Uses the first colon as delimiter, so platformId may itself contain colons. + */ +export function parseIdentityId(id: IdentityId): { source: string; platformId: string } { + const colonIdx = id.indexOf(':') + if (colonIdx === -1) throw new Error(`Invalid IdentityId: ${id}`) + return { source: id.slice(0, colonIdx), platformId: id.slice(colonIdx + 1) } +} + +/** Access level within the system. Blocked users are denied all operations. */ +export type UserRole = 'admin' | 'member' | 'viewer' | 'blocked' + +/** + * Canonical user record. One user may have multiple identities across platforms. + * pluginData provides namespaced extension storage without schema coupling. + */ +export interface UserRecord { + userId: string + displayName: string + username?: string + avatarUrl?: string + role: UserRole + timezone?: string + locale?: string + /** All IdentityIds linked to this user. At least one is always present. */ + identities: IdentityId[] + /** Per-plugin extension data. Keys are plugin names; values are plugin-defined schemas. */ + pluginData: Record> + createdAt: string + updatedAt: string + lastSeenAt: string +} + +/** + * A single platform identity. Each identity belongs to exactly one user. + * Multiple identities for the same user are linked at the UserRecord level. + */ +export interface IdentityRecord { + identityId: IdentityId + userId: string + source: string + platformId: string + platformUsername?: string + platformDisplayName?: string + createdAt: string + updatedAt: string +} + +/** Lightweight session snapshot used when listing sessions for a user. */ +export interface SessionInfo { + sessionId: string + agentName: string + channelId: string + status: string + createdAt: string +} + +/** + * Public contract for the identity service. + * + * Implementations coordinate a persistent store with event emission. + * All mutation methods update `updatedAt` on affected records. + */ +export interface IdentityService { + // === Lookups === + getUser(userId: string): Promise + getUserByUsername(username: string): Promise + getIdentity(identityId: IdentityId): Promise + /** Fetches the user that owns the given identity. */ + getUserByIdentity(identityId: IdentityId): Promise + getIdentitiesFor(userId: string): Promise + listUsers(filter?: { source?: string; role?: UserRole }): Promise + searchUsers(query: string): Promise + /** + * Returns active session snapshots for a user. + * Reads from SessionManager — requires kernel:access during setup. + */ + getSessionsFor(userId: string): Promise + + // === Mutations === + /** + * Creates a new user and their first identity atomically. + * The first user ever created in the system is automatically promoted to admin. + */ + createUserWithIdentity(data: { + displayName: string + username?: string + role?: UserRole + source: string + platformId: string + platformUsername?: string + platformDisplayName?: string + }): Promise<{ user: UserRecord; identity: IdentityRecord }> + + updateUser( + userId: string, + changes: Partial>, + ): Promise + + setRole(userId: string, role: UserRole): Promise + + createIdentity( + userId: string, + identity: { + source: string + platformId: string + platformUsername?: string + platformDisplayName?: string + }, + ): Promise + + /** + * Links two identities into a single user account. + * If they already share a user, this is a no-op. + * If they belong to different users, the younger account is merged into the + * older one and deleted. Emits identity:linked + identity:userMerged. + */ + link(identityIdA: IdentityId, identityIdB: IdentityId): Promise + + /** + * Separates an identity into a new standalone user account. + * Throws if the identity is the last one on its user (would leave a ghost user). + * Emits identity:unlinked. + */ + unlink(identityId: IdentityId): Promise + + // === Plugin data === + /** + * Stores arbitrary plugin-specific data under a namespaced key. + * Prevents plugins from accidentally overwriting each other's data. + */ + setPluginData(userId: string, pluginName: string, key: string, value: unknown): Promise + getPluginData(userId: string, pluginName: string, key: string): Promise + + // === Source registry === + /** + * Registers a source name (e.g., 'telegram', 'discord') so it can be used + * for filtering and mention resolution. Adapters call this during setup(). + */ + registerSource(source: string): void + + /** + * Resolves a @username mention in the context of a given source platform. + * Returns the platform-specific ID and username for the matched user, enabling + * adapters to construct native mentions (e.g., Telegram @username, Discord <@id>). + */ + resolveCanonicalMention( + username: string, + source: string, + ): Promise<{ found: boolean; platformId?: string; platformUsername?: string }> + + getUserCount(): Promise +} diff --git a/src/plugins/notifications/__tests__/notification.test.ts b/src/plugins/notifications/__tests__/notification.test.ts index ae1c908c..3abc1ccd 100644 --- a/src/plugins/notifications/__tests__/notification.test.ts +++ b/src/plugins/notifications/__tests__/notification.test.ts @@ -1,11 +1,11 @@ import { describe, it, expect, vi } from 'vitest' -import { NotificationManager } from '../notification.js' +import { NotificationManager, NotificationService } from '../notification.js' import type { IChannelAdapter } from '../../../core/channel.js' import type { NotificationMessage } from '../../../core/types.js' -function mockAdapter(): IChannelAdapter { +function mockAdapter(name = 'test'): IChannelAdapter { return { - name: 'test', + name, capabilities: { streaming: false, richFormatting: false, threads: false, reactions: false, fileUpload: false, voice: false }, start: vi.fn(), stop: vi.fn(), @@ -17,16 +17,29 @@ function mockAdapter(): IChannelAdapter { deleteSessionThread: vi.fn(), sendSkillCommands: vi.fn(), cleanupSkillCommands: vi.fn(), + sendUserNotification: vi.fn().mockResolvedValue(undefined), } as unknown as IChannelAdapter } -describe('NotificationManager', () => { +function mockAdapterWithoutUserNotification(name = 'test'): IChannelAdapter { + const adapter = mockAdapter(name) + // Simulate an adapter that hasn't implemented sendUserNotification + const { sendUserNotification: _, ...rest } = adapter as any + return rest as unknown as IChannelAdapter +} + +describe('NotificationService (alias: NotificationManager)', () => { const notification: NotificationMessage = { sessionId: 'sess-1', type: 'completed', summary: 'Test notification', } + // Verify the backward compat alias works + it('NotificationManager is an alias for NotificationService', () => { + expect(NotificationManager).toBe(NotificationService) + }) + describe('notify()', () => { it('sends notification to specified adapter', async () => { const adapter = mockAdapter() @@ -47,8 +60,8 @@ describe('NotificationManager', () => { }) it('does not notify other adapters', async () => { - const telegram = mockAdapter() - const discord = mockAdapter() + const telegram = mockAdapter('telegram') + const discord = mockAdapter('discord') const adapters = new Map([ ['telegram', telegram], ['discord', discord], @@ -64,8 +77,8 @@ describe('NotificationManager', () => { describe('notifyAll()', () => { it('sends notification to all adapters', async () => { - const telegram = mockAdapter() - const discord = mockAdapter() + const telegram = mockAdapter('telegram') + const discord = mockAdapter('discord') const adapters = new Map([ ['telegram', telegram], ['discord', discord], @@ -109,9 +122,9 @@ describe('NotificationManager', () => { }) it('notifyAll() continues to next adapter when one fails', async () => { - const failing = mockAdapter() + const failing = mockAdapter('failing') ;(failing.sendNotification as ReturnType).mockRejectedValue(new Error('network error')) - const working = mockAdapter() + const working = mockAdapter('working') const adapters = new Map([ ['telegram', failing], ['discord', working], @@ -124,4 +137,174 @@ describe('NotificationManager', () => { expect(working.sendNotification).toHaveBeenCalledWith(notification) }) }) + + describe('notifyUser()', () => { + const message = { type: 'text' as const, text: 'Hello!' } + + it('calls sendUserNotification on direct { channelId, platformId } target', async () => { + const adapter = mockAdapter('telegram') + const adapters = new Map([['telegram', adapter]]) + const service = new NotificationService(adapters) + + await service.notifyUser({ channelId: 'telegram', platformId: 'user-123' }, message) + + expect(adapter.sendUserNotification).toHaveBeenCalledWith('user-123', message, expect.objectContaining({})) + }) + + it('skips gracefully when adapter does not have sendUserNotification', async () => { + const adapter = mockAdapterWithoutUserNotification('telegram') + const adapters = new Map([['telegram', adapter]]) + const service = new NotificationService(adapters) + + // Should not throw + await service.notifyUser({ channelId: 'telegram', platformId: 'user-123' }, message) + }) + + it('does nothing with identity target when no resolver is set', async () => { + const adapter = mockAdapter('telegram') + const adapters = new Map([['telegram', adapter]]) + const service = new NotificationService(adapters) + + await service.notifyUser({ identityId: 'identity-1' }, message) + + // No resolver → no delivery + expect(adapter.sendUserNotification).not.toHaveBeenCalled() + }) + + it('resolves via identityId and delivers to all user platforms', async () => { + const telegram = mockAdapter('telegram') + const discord = mockAdapter('discord') + const adapters = new Map([ + ['telegram', telegram], + ['discord', discord], + ]) + const service = new NotificationService(adapters) + + const resolver = { + getIdentity: vi.fn().mockResolvedValue({ userId: 'user-1', source: 'telegram', platformId: 'tg-123' }), + getUser: vi.fn().mockResolvedValue({ userId: 'user-1', identities: ['identity-1', 'identity-2'] }), + getIdentitiesFor: vi.fn().mockResolvedValue([ + { identityId: 'identity-1', source: 'telegram', platformId: 'tg-123', platformUsername: 'alice' }, + { identityId: 'identity-2', source: 'discord', platformId: 'dc-456' }, + ]), + } + service.setIdentityResolver(resolver) + + await service.notifyUser({ identityId: 'identity-1' }, message) + + expect(resolver.getIdentity).toHaveBeenCalledWith('identity-1') + expect(resolver.getUser).toHaveBeenCalledWith('user-1') + expect(resolver.getIdentitiesFor).toHaveBeenCalledWith('user-1') + expect(telegram.sendUserNotification).toHaveBeenCalledWith('tg-123', message, expect.objectContaining({ + platformMention: { platformUsername: 'alice', platformId: 'tg-123' }, + })) + expect(discord.sendUserNotification).toHaveBeenCalledWith('dc-456', message, expect.objectContaining({})) + }) + + it('resolves via userId and delivers to all platforms', async () => { + const adapter = mockAdapter('slack') + const adapters = new Map([['slack', adapter]]) + const service = new NotificationService(adapters) + + const resolver = { + getIdentity: vi.fn(), + getUser: vi.fn(), + getIdentitiesFor: vi.fn().mockResolvedValue([ + { identityId: 'identity-3', source: 'slack', platformId: 'sk-789' }, + ]), + } + service.setIdentityResolver(resolver) + + await service.notifyUser({ userId: 'user-2' }, message) + + expect(resolver.getIdentitiesFor).toHaveBeenCalledWith('user-2') + expect(adapter.sendUserNotification).toHaveBeenCalledWith('sk-789', message, expect.objectContaining({})) + }) + + it('applies onlyPlatforms filter', async () => { + const telegram = mockAdapter('telegram') + const discord = mockAdapter('discord') + const adapters = new Map([ + ['telegram', telegram], + ['discord', discord], + ]) + const service = new NotificationService(adapters) + + const resolver = { + getIdentity: vi.fn(), + getUser: vi.fn(), + getIdentitiesFor: vi.fn().mockResolvedValue([ + { identityId: 'i1', source: 'telegram', platformId: 'tg-1' }, + { identityId: 'i2', source: 'discord', platformId: 'dc-1' }, + ]), + } + service.setIdentityResolver(resolver) + + await service.notifyUser({ userId: 'user-1' }, message, { onlyPlatforms: ['telegram'] }) + + expect(telegram.sendUserNotification).toHaveBeenCalled() + expect(discord.sendUserNotification).not.toHaveBeenCalled() + }) + + it('applies excludePlatforms filter', async () => { + const telegram = mockAdapter('telegram') + const discord = mockAdapter('discord') + const adapters = new Map([ + ['telegram', telegram], + ['discord', discord], + ]) + const service = new NotificationService(adapters) + + const resolver = { + getIdentity: vi.fn(), + getUser: vi.fn(), + getIdentitiesFor: vi.fn().mockResolvedValue([ + { identityId: 'i1', source: 'telegram', platformId: 'tg-1' }, + { identityId: 'i2', source: 'discord', platformId: 'dc-1' }, + ]), + } + service.setIdentityResolver(resolver) + + await service.notifyUser({ userId: 'user-1' }, message, { excludePlatforms: ['discord'] }) + + expect(telegram.sendUserNotification).toHaveBeenCalled() + expect(discord.sendUserNotification).not.toHaveBeenCalled() + }) + + it('does not propagate errors (fire-and-forget)', async () => { + const adapter = mockAdapter('telegram') + ;(adapter.sendUserNotification as ReturnType).mockRejectedValue(new Error('delivery failed')) + const adapters = new Map([['telegram', adapter]]) + const service = new NotificationService(adapters) + + // Should not throw even if delivery fails + await service.notifyUser({ channelId: 'telegram', platformId: 'user-123' }, message) + }) + + it('continues delivering to remaining platforms when one fails', async () => { + const failing = mockAdapter('telegram') + ;(failing.sendUserNotification as ReturnType).mockRejectedValue(new Error('failed')) + const working = mockAdapter('discord') + const adapters = new Map([ + ['telegram', failing], + ['discord', working], + ]) + const service = new NotificationService(adapters) + + const resolver = { + getIdentity: vi.fn(), + getUser: vi.fn(), + getIdentitiesFor: vi.fn().mockResolvedValue([ + { identityId: 'i1', source: 'telegram', platformId: 'tg-1' }, + { identityId: 'i2', source: 'discord', platformId: 'dc-1' }, + ]), + } + service.setIdentityResolver(resolver) + + await service.notifyUser({ userId: 'user-1' }, message) + + // Working adapter must still receive the notification + expect(working.sendUserNotification).toHaveBeenCalledWith('dc-1', message, expect.anything()) + }) + }) }) diff --git a/src/plugins/notifications/index.ts b/src/plugins/notifications/index.ts index 4f683dc0..9127e622 100644 --- a/src/plugins/notifications/index.ts +++ b/src/plugins/notifications/index.ts @@ -1,5 +1,5 @@ import type { OpenACPPlugin, InstallContext, CoreAccess } from '../../core/plugin/types.js' -import { NotificationManager } from './notification.js' +import { NotificationService } from './notification.js' function createNotificationsPlugin(): OpenACPPlugin { return { @@ -9,20 +9,16 @@ function createNotificationsPlugin(): OpenACPPlugin { essential: false, // Depends on security so the notification service is only active for authorized sessions pluginDependencies: { '@openacp/security': '^1.0.0' }, - permissions: ['services:register', 'kernel:access'], + permissions: ['services:register', 'services:use', 'kernel:access', 'events:read'], async install(ctx: InstallContext) { - const { settings, terminal } = ctx - - // No interactive prompts needed — save defaults - await settings.setAll({ enabled: true }) - terminal.log.success('Notifications defaults saved') + await ctx.settings.setAll({ enabled: true }) + ctx.terminal.log.success('Notifications defaults saved') }, async configure(ctx: InstallContext) { const { terminal, settings } = ctx const current = await settings.getAll() - const toggle = await terminal.confirm({ message: `Notifications are ${current.enabled !== false ? 'enabled' : 'disabled'}. Toggle?`, initialValue: false, @@ -42,10 +38,23 @@ function createNotificationsPlugin(): OpenACPPlugin { }, async setup(ctx) { - // NotificationManager needs the live adapters Map from core + // NotificationService needs the live adapters Map from core const core = ctx.core as CoreAccess - const manager = new NotificationManager(core.adapters) - ctx.registerService('notifications', manager) + const service = new NotificationService(core.adapters) + + // Wire identity resolver if available — enables user-targeted notifications + const identity = ctx.getService('identity') + if (identity) service.setIdentityResolver(identity) + + // Listen for identity plugin load in case it boots after notifications + ctx.on('plugin:loaded', (data: unknown) => { + if ((data as any)?.name === '@openacp/identity') { + const id = ctx.getService('identity') + if (id) service.setIdentityResolver(id) + } + }) + + ctx.registerService('notifications', service) ctx.log.info('Notifications service ready') }, } diff --git a/src/plugins/notifications/notification.ts b/src/plugins/notifications/notification.ts index f5ac5540..dc4d0fdf 100644 --- a/src/plugins/notifications/notification.ts +++ b/src/plugins/notifications/notification.ts @@ -1,21 +1,55 @@ import type { IChannelAdapter } from '../../core/channel.js' import type { NotificationMessage } from '../../core/types.js' +/** Target for user-directed notifications. */ +export type NotificationTarget = + | { identityId: string } + | { userId: string } + | { channelId: string; platformId: string } + +export interface NotificationOptions { + via?: 'dm' | 'thread' | 'topic' + topicId?: string + sessionId?: string + onlyPlatforms?: string[] + excludePlatforms?: string[] +} + +/** User-facing notification content — distinct from system NotificationMessage. */ +export interface UserNotificationContent { + type: 'text' + text: string +} + /** - * Routes cross-session notifications to the appropriate channel adapter. - * - * Notifications are triggered by `SessionBridge` when a session completes, - * errors, or hits a budget threshold. Unlike regular messages, notifications - * are not tied to a specific outgoing message stream — they are pushed to the - * channel that owns the session (identified by `channelId` on the session). + * Minimal identity service interface — avoids hard dependency on identity plugin types. + * NotificationService only needs resolution capabilities, not full CRUD. + */ +interface IdentityResolver { + getIdentity(identityId: string): Promise<{ userId: string; source: string; platformId: string; platformUsername?: string } | undefined> + getUser(userId: string): Promise<{ userId: string; identities: string[] } | undefined> + getIdentitiesFor(userId: string): Promise> +} + +/** + * Routes notifications to channel adapters. Extends the legacy NotificationManager + * with user-targeted delivery via the identity system. * - * The adapters Map is the live registry maintained by `OpenACPCore`. Holding a - * reference to the Map (rather than a snapshot) ensures that adapters registered - * after this service is created are still reachable. + * Legacy API (notify/notifyAll) is preserved — existing callers in SessionBridge + * continue to work without changes. */ -export class NotificationManager { +export class NotificationService { + private identityResolver?: IdentityResolver + constructor(private adapters: Map) {} + /** Inject identity resolver for user-targeted notifications. */ + setIdentityResolver(resolver: IdentityResolver): void { + this.identityResolver = resolver + } + + // --- Legacy API (backward compat with NotificationManager) --- + /** * Send a notification to a specific channel adapter. * @@ -28,7 +62,7 @@ export class NotificationManager { try { await adapter.sendNotification(notification) } catch { - // Don't let notification failures crash the caller + // Best effort } } @@ -43,8 +77,90 @@ export class NotificationManager { try { await adapter.sendNotification(notification) } catch { - // Continue to next adapter + // Continue + } + } + } + + // --- New user-targeted API --- + + /** + * Send a notification to a user across all their linked platforms. + * Fire-and-forget — never throws, swallows all errors. + */ + async notifyUser( + target: NotificationTarget, + message: UserNotificationContent, + options?: NotificationOptions, + ): Promise { + try { + await this._resolveAndDeliver(target, message, options) + } catch { + // Fire-and-forget + } + } + + private async _resolveAndDeliver( + target: NotificationTarget, + message: UserNotificationContent, + options?: NotificationOptions, + ): Promise { + // Direct adapter call — bypass identity resolution. + // sendUserNotification accepts NotificationMessage which covers UserNotificationContent shape. + if ('channelId' in target && 'platformId' in target) { + const adapter = this.adapters.get(target.channelId) + if (!adapter?.sendUserNotification) return + await adapter.sendUserNotification(target.platformId, message as unknown as NotificationMessage, { + via: options?.via, + topicId: options?.topicId, + sessionId: options?.sessionId, + }) + return + } + + // Identity-based resolution + if (!this.identityResolver) return + + let identities: Array<{ identityId: string; source: string; platformId: string; platformUsername?: string }> = [] + + if ('identityId' in target) { + const identity = await this.identityResolver.getIdentity(target.identityId) + if (!identity) return + const user = await this.identityResolver.getUser(identity.userId) + if (!user) return + identities = await this.identityResolver.getIdentitiesFor(user.userId) + } else if ('userId' in target) { + identities = await this.identityResolver.getIdentitiesFor(target.userId) + } + + // Platform filters + if (options?.onlyPlatforms) { + identities = identities.filter(i => options.onlyPlatforms!.includes(i.source)) + } + if (options?.excludePlatforms) { + identities = identities.filter(i => !options.excludePlatforms!.includes(i.source)) + } + + // Deliver to each identity's adapter + for (const identity of identities) { + const adapter = this.adapters.get(identity.source) + if (!adapter?.sendUserNotification) continue + try { + await adapter.sendUserNotification(identity.platformId, message as unknown as NotificationMessage, { + via: options?.via, + topicId: options?.topicId, + sessionId: options?.sessionId, + platformMention: { + platformUsername: identity.platformUsername, + platformId: identity.platformId, + }, + }) + } catch { + // Continue — best effort } } } } + +// Backward compat alias — existing imports use NotificationManager +export { NotificationService as NotificationManager } diff --git a/src/plugins/sse-adapter/adapter.ts b/src/plugins/sse-adapter/adapter.ts index 1d8d2798..4328aad1 100644 --- a/src/plugins/sse-adapter/adapter.ts +++ b/src/plugins/sse-adapter/adapter.ts @@ -116,6 +116,20 @@ export class SSEAdapter implements IChannelAdapter { } } + /** + * Delivers a push notification to a specific user's SSE connections. + * + * `platformId` is the userId for the SSE adapter — SSE has no concept of + * platform-specific user handles, so we use the internal userId directly. + */ + async sendUserNotification(platformId: string, message: any, options?: any): Promise { + const serialized = `event: notification:text\ndata: ${JSON.stringify({ + text: message.text ?? message.summary ?? '', + ...(options ?? {}), + })}\n\n`; + this.connectionManager.pushToUser(platformId, serialized); + } + /** SSE has no concept of threads — return sessionId as the threadId */ async createSessionThread(sessionId: string, _name: string): Promise { return sessionId; diff --git a/src/plugins/sse-adapter/connection-manager.ts b/src/plugins/sse-adapter/connection-manager.ts index c924ee00..d8bdf59e 100644 --- a/src/plugins/sse-adapter/connection-manager.ts +++ b/src/plugins/sse-adapter/connection-manager.ts @@ -9,11 +9,13 @@ import { randomBytes } from 'node:crypto'; * `lastEventId` tracks the most recent event delivered, for reconnection replay. * `backpressured` indicates that the last `response.write()` returned false, * meaning the OS send buffer is full. + * `userId` is set for user-level connections not tied to a specific session. */ export interface SSEConnection { id: string; sessionId: string; tokenId: string; + userId?: string; // Set for user-level connections response: ServerResponse; connectedAt: Date; lastEventId?: string; @@ -35,6 +37,8 @@ export class ConnectionManager { private connections = new Map(); // Secondary index: sessionId → Set of connection IDs for O(1) broadcast targeting private sessionIndex = new Map>(); + // Secondary index: userId → Set of connection IDs for user-level event delivery + private userIndex = new Map>(); private maxConnectionsPerSession: number; private maxTotalConnections: number; @@ -80,7 +84,67 @@ export class ConnectionManager { return connection; } - /** Remove a connection from both indexes. Called automatically on client disconnect. */ + /** + * Registers a user-level SSE connection (not tied to a specific session). + * Used for notifications and system events delivered to a user. + * + * @throws if the global connection limit is reached. + */ + addUserConnection(userId: string, tokenId: string, response: ServerResponse): SSEConnection { + if (this.connections.size >= this.maxTotalConnections) { + throw new Error('Maximum total connections reached'); + } + + const id = `conn_${randomBytes(8).toString('hex')}`; + const connection: SSEConnection = { + id, sessionId: '', tokenId, userId, response, connectedAt: new Date() + }; + + this.connections.set(id, connection); + + let userConns = this.userIndex.get(userId); + if (!userConns) { + userConns = new Set(); + this.userIndex.set(userId, userConns); + } + userConns.add(id); + + response.on('close', () => this.removeConnection(id)); + return connection; + } + + /** + * Writes a serialized SSE event to all connections for a given user. + * + * Uses the same backpressure strategy as `broadcast`: flag on first overflow, + * forcibly close if still backpressured on the next write. + */ + pushToUser(userId: string, serializedEvent: string): void { + const connIds = this.userIndex.get(userId); + if (!connIds) return; + for (const connId of connIds) { + const conn = this.connections.get(connId); + if (!conn || conn.response.writableEnded) continue; + try { + const ok = conn.response.write(serializedEvent); + if (!ok) { + if (conn.backpressured) { + // Still backpressured from previous write — disconnect to prevent OOM + conn.response.end(); + this.removeConnection(conn.id); + } else { + conn.backpressured = true; + conn.response.once('drain', () => { conn.backpressured = false; }); + } + } + } catch { + // Connection broken — clean up + this.removeConnection(conn.id); + } + } + } + + /** Remove a connection from all indexes. Called automatically on client disconnect. */ removeConnection(connectionId: string): void { const conn = this.connections.get(connectionId); if (!conn) return; @@ -90,6 +154,14 @@ export class ConnectionManager { sessionConns.delete(connectionId); if (sessionConns.size === 0) this.sessionIndex.delete(conn.sessionId); } + // Clean user index + if (conn.userId) { + const userConns = this.userIndex.get(conn.userId); + if (userConns) { + userConns.delete(connectionId); + if (userConns.size === 0) this.userIndex.delete(conn.userId); + } + } } /** Returns all active connections for a session. */ @@ -156,5 +228,6 @@ export class ConnectionManager { } this.connections.clear(); this.sessionIndex.clear(); + this.userIndex.clear(); } } diff --git a/src/plugins/sse-adapter/index.ts b/src/plugins/sse-adapter/index.ts index d905d9fe..17b99130 100644 --- a/src/plugins/sse-adapter/index.ts +++ b/src/plugins/sse-adapter/index.ts @@ -46,6 +46,10 @@ const plugin: OpenACPPlugin = { // Get command registry for command execution in routes const commandRegistry = ctx.getService('command-registry'); + // Resolve token→user mapping from the token-store service for user-level SSE streams. + // token-store is registered by api-server plugin, which is a declared dependency. + const tokenStore = ctx.getService<{ getUserId(tokenId: string): string | undefined } | undefined>('token-store'); + // Clean up event buffer when a session ends or is deleted to prevent unbounded memory growth ctx.on(BusEvent.SESSION_DELETED, (data: unknown) => { const { sessionId } = data as { sessionId: string }; @@ -63,6 +67,7 @@ const plugin: OpenACPPlugin = { connectionManager, eventBuffer, commandRegistry: commandRegistry ?? undefined, + getUserId: tokenStore ? (id) => tokenStore.getUserId(id) : undefined, }); }, { auth: true }); diff --git a/src/plugins/sse-adapter/routes.ts b/src/plugins/sse-adapter/routes.ts index 36123947..470be5eb 100644 --- a/src/plugins/sse-adapter/routes.ts +++ b/src/plugins/sse-adapter/routes.ts @@ -28,6 +28,8 @@ export interface SSERouteDeps { connectionManager: ConnectionManager; eventBuffer: EventBuffer; commandRegistry?: CommandRegistry; + /** Resolves a tokenId to a userId for user-level connections. Provided by the token-store service. */ + getUserId?: (tokenId: string) => string | undefined; } /** @@ -248,4 +250,51 @@ export async function sseRoutes(app: FastifyInstance, deps: SSERouteDeps): Promi total: connections.length, }; }); + + // GET /events — user-level SSE stream (notifications + system events) + // Not session-scoped — delivers notifications to any authenticated user with identity set up. + app.get('/events', async (request, reply) => { + const auth = (request as any).auth; + if (!auth?.tokenId) { + return reply.status(401).send({ error: 'Unauthorized' }); + } + + // Resolve userId from the token-store mapping set during identity setup + const userId = deps.getUserId?.(auth.tokenId); + if (!userId) { + return reply.status(403).send({ error: 'Identity not set up. Complete /identity/setup first.' }); + } + + // Check connection limits before hijacking — once hijacked, Fastify can no longer + // write error responses, so we must gate on limits before committing to the stream. + try { + deps.connectionManager.addUserConnection(userId, auth.tokenId, reply.raw); + } catch (err: any) { + return reply.status(503).send({ error: err.message }); + } + + reply.hijack(); + const raw = reply.raw; + raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + // Disable buffering in Nginx/Cloudflare so events arrive without delay + 'X-Accel-Buffering': 'no', + }); + + // Initial heartbeat to confirm the stream is live + raw.write(`event: heartbeat\ndata: ${JSON.stringify({ ts: Date.now() })}\n\n`); + + // Keep-alive heartbeat every 30s to survive proxy idle-connection timeouts + const heartbeat = setInterval(() => { + if (raw.writableEnded) { + clearInterval(heartbeat); + return; + } + raw.write(`event: heartbeat\ndata: ${JSON.stringify({ ts: Date.now() })}\n\n`); + }, 30_000); + + raw.on('close', () => clearInterval(heartbeat)); + }); }