Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions packages/pds/src/account-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,52 @@ export class AccountDurableObject extends DurableObject<Env> {
console.error("WebSocket error:", error);
}

/**
* Emit an identity event to notify downstream services to refresh identity cache.
*/
async rpcEmitIdentityEvent(handle: string): Promise<{ seq: number }> {
await this.ensureStorageInitialized();

const time = new Date().toISOString();

// Get next sequence number
const result = this.ctx.storage.sql
.exec(
`INSERT INTO firehose_events (event_type, payload)
VALUES ('identity', ?)
RETURNING seq`,
new Uint8Array(0), // Empty payload, we just need seq
)
.one();
const seq = result.seq as number;

// Build identity event frame
const header = { op: 1, t: "#identity" };
const body = {
seq,
did: this.env.DID,
time,
handle,
};

const headerBytes = cborEncode(header as unknown as import("@atproto/lex-cbor").LexValue);
const bodyBytes = cborEncode(body as unknown as import("@atproto/lex-cbor").LexValue);
const frame = new Uint8Array(headerBytes.length + bodyBytes.length);
frame.set(headerBytes, 0);
frame.set(bodyBytes, headerBytes.length);

// Broadcast to all connected clients
for (const ws of this.ctx.getWebSockets()) {
try {
ws.send(frame);
} catch (e) {
console.error("Error broadcasting identity event:", e);
}
}

return { seq };
}

/**
* HTTP fetch handler for WebSocket upgrades.
* This is used instead of RPC to avoid WebSocket serialization errors.
Expand Down
15 changes: 15 additions & 0 deletions packages/pds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ app.get("/.well-known/did.json", (c) => {
"https://w3id.org/security/suites/secp256k1-2019/v1",
],
id: c.env.DID,
alsoKnownAs: [`at://${c.env.HANDLE}`],
verificationMethod: [
{
id: `${c.env.DID}#atproto`,
Expand All @@ -106,6 +107,13 @@ app.get("/.well-known/did.json", (c) => {
return c.json(didDocument);
});

// Handle verification for AT Protocol
app.get("/.well-known/atproto-did", (c) => {
return new Response(c.env.DID, {
headers: { "Content-Type": "text/plain" },
});
});

// Health check
app.get("/health", (c) =>
c.json({
Expand Down Expand Up @@ -212,6 +220,13 @@ app.get("/xrpc/app.bsky.ageassurance.getState", requireAuth, (c) => {
});
});

// Admin: Emit identity event to refresh handle verification
app.post("/admin/emit-identity", requireAuth, async (c) => {
const accountDO = getAccountDO(c.env);
const result = await accountDO.rpcEmitIdentityEvent(c.env.HANDLE);
return c.json(result);
});

// Proxy unhandled XRPC requests to Bluesky services
app.all("/xrpc/*", async (c) => {
const url = new URL(c.req.url);
Expand Down