diff --git a/packages/pds/src/account-do.ts b/packages/pds/src/account-do.ts index 4e247b89..57f3dbb7 100644 --- a/packages/pds/src/account-do.ts +++ b/packages/pds/src/account-do.ts @@ -770,6 +770,52 @@ export class AccountDurableObject extends DurableObject { console.error("WebSocket error:", error); } + /** + * Emit an identity event to notify downstream services to refresh identity cache. + */ + async rpcEmitIdentityEvent(handle: string): Promise<{ seq: number }> { + await this.ensureStorageInitialized(); + + const time = new Date().toISOString(); + + // Get next sequence number + const result = this.ctx.storage.sql + .exec( + `INSERT INTO firehose_events (event_type, payload) + VALUES ('identity', ?) + RETURNING seq`, + new Uint8Array(0), // Empty payload, we just need seq + ) + .one(); + const seq = result.seq as number; + + // Build identity event frame + const header = { op: 1, t: "#identity" }; + const body = { + seq, + did: this.env.DID, + time, + handle, + }; + + const headerBytes = cborEncode(header as unknown as import("@atproto/lex-cbor").LexValue); + const bodyBytes = cborEncode(body as unknown as import("@atproto/lex-cbor").LexValue); + const frame = new Uint8Array(headerBytes.length + bodyBytes.length); + frame.set(headerBytes, 0); + frame.set(bodyBytes, headerBytes.length); + + // Broadcast to all connected clients + for (const ws of this.ctx.getWebSockets()) { + try { + ws.send(frame); + } catch (e) { + console.error("Error broadcasting identity event:", e); + } + } + + return { seq }; + } + /** * HTTP fetch handler for WebSocket upgrades. * This is used instead of RPC to avoid WebSocket serialization errors. diff --git a/packages/pds/src/index.ts b/packages/pds/src/index.ts index eec2d51d..43dd4e77 100644 --- a/packages/pds/src/index.ts +++ b/packages/pds/src/index.ts @@ -87,6 +87,7 @@ app.get("/.well-known/did.json", (c) => { "https://w3id.org/security/suites/secp256k1-2019/v1", ], id: c.env.DID, + alsoKnownAs: [`at://${c.env.HANDLE}`], verificationMethod: [ { id: `${c.env.DID}#atproto`, @@ -106,6 +107,13 @@ app.get("/.well-known/did.json", (c) => { return c.json(didDocument); }); +// Handle verification for AT Protocol +app.get("/.well-known/atproto-did", (c) => { + return new Response(c.env.DID, { + headers: { "Content-Type": "text/plain" }, + }); +}); + // Health check app.get("/health", (c) => c.json({ @@ -212,6 +220,13 @@ app.get("/xrpc/app.bsky.ageassurance.getState", requireAuth, (c) => { }); }); +// Admin: Emit identity event to refresh handle verification +app.post("/admin/emit-identity", requireAuth, async (c) => { + const accountDO = getAccountDO(c.env); + const result = await accountDO.rpcEmitIdentityEvent(c.env.HANDLE); + return c.json(result); +}); + // Proxy unhandled XRPC requests to Bluesky services app.all("/xrpc/*", async (c) => { const url = new URL(c.req.url);