Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieusieben committed Apr 22, 2024
1 parent 285ea54 commit 99d3033
Show file tree
Hide file tree
Showing 23 changed files with 277 additions and 455 deletions.
35 changes: 18 additions & 17 deletions packages/api/src/atp-agent.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import { OAuthClient } from '@atproto/oauth-client'
import { XrpcClient } from '@atproto/xrpc'
import {
ComAtprotoServerXrpcAgent,
OAuthXrpcAgent,
AuthXrpcAgent,
} from './xrpc-agent'
import { AtpClient } from './client'
import { schemas } from './client/lexicons'
import { BSKY_LABELER_DID } from './const'
import {
AtpAgentGlobalOpts,
ComAtprotoServerXrpcAgentOptionsOpts,
AtprotoServiceType,
ComAtprotoServerXrpcAgentOptionsOpts,
} from './types'
import {
AuthenticatedXrpcAgent,
ComAtprotoServerXrpcAgent,
OAuthXrpcAgent,
isAuthenticatedXrpcAgent,
} from './xrpc-agent'

const MAX_LABELERS = 10

Expand All @@ -34,18 +33,20 @@ export class AtpAgent {
api: AtpClient
labelersHeader: string[] = []

protected xrpcAgent: AuthXrpcAgent
protected xrpcAgent: AuthenticatedXrpcAgent

constructor(
options: AuthXrpcAgent | ComAtprotoServerXrpcAgentOptionsOpts | OAuthClient,
options:
| AuthenticatedXrpcAgent
| ComAtprotoServerXrpcAgentOptionsOpts
| OAuthClient,
) {
this.xrpcAgent =
options instanceof AuthXrpcAgent
? options
: options instanceof OAuthClient
? new OAuthXrpcAgent(options)
: new ComAtprotoServerXrpcAgent(options)
this.api = new AtpClient(new XrpcClient(this.xrpcAgent, schemas))
this.xrpcAgent = isAuthenticatedXrpcAgent(options)
? options
: options instanceof OAuthClient
? new OAuthXrpcAgent(options)
: new ComAtprotoServerXrpcAgent(options)
this.api = new AtpClient(this.xrpcAgent)

this.api.xrpc.setHeader('atproto-accept-labelers', () =>
// Make sure to read the static property from the subclass if it was
Expand Down
35 changes: 4 additions & 31 deletions packages/api/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import {
Client as XrpcBaseClient,
ServiceClient as XrpcServiceClient,
XrpcClient,
XrpcClientOptions,
} from '@atproto/xrpc'
import { XrpcClient, XrpcAgent, XrpcFetchAgentOptions } from '@atproto/xrpc'
import { schemas } from './lexicons'
import { CID } from 'multiformats/cid'
import * as ComAtprotoAdminDefs from './types/com/atproto/admin/defs'
Expand Down Expand Up @@ -371,7 +366,7 @@ export class AtpClient {
app: AppNS
tools: ToolsNS

constructor(options: XrpcClient | XrpcClientOptions) {
constructor(options: XrpcClient | XrpcAgent | XrpcFetchAgentOptions) {
this.xrpc =
options instanceof XrpcClient ? options : new XrpcClient(options, schemas)
this.com = new ComNS(this)
Expand All @@ -382,31 +377,9 @@ export class AtpClient {
setHeader(key: string, value: string): void {
this.xrpc.setHeader(key, value)
}
}

/** @deprecated Use {@link AtpClient} instead */
export class AtpBaseClient {
xrpc: XrpcBaseClient = new XrpcBaseClient()

constructor() {
this.xrpc.addLexicons(schemas)
}

service(serviceUri: string | URL): AtpServiceClient {
return new AtpServiceClient(this, this.xrpc.service(serviceUri))
}
}

/** @deprecated Use {@link AtpClient} instead */
export class AtpServiceClient extends AtpClient {
_baseClient: AtpBaseClient

constructor(
baseClient: AtpBaseClient,
override xrpc: XrpcServiceClient,
) {
super(xrpc)
this._baseClient = baseClient
unsetHeader(key: string): void {
this.xrpc.unsetHeader(key)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CID } from 'multiformats/cid'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface CallOptions {
headers?: Headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CID } from 'multiformats/cid'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface OutputSchema {
blob: BlobRef
Expand Down
121 changes: 58 additions & 63 deletions packages/api/src/xrpc-agent.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,45 @@
import { getPdsEndpoint, isValidDidDoc } from '@atproto/common-web'
import { OAuthClient } from '@atproto/oauth-client'
import { Lexicons } from '@atproto/lexicon'
import {
ErrorResponseBody,
ResponseType,
XRPCError,
XrpcAgent,
XrpcClient,
errorResponseBody,
isXrpcAgent,
} from '@atproto/xrpc'
import {
AtpClient,
ComAtprotoServerCreateAccount,
ComAtprotoServerCreateSession,
ComAtprotoServerGetSession,
ComAtprotoServerRefreshSession,
} from './client'
import { schemas } from './client/lexicons'
import {
AtpAgentLoginOpts,
ComAtprotoServerXrpcAgentOptionsOpts,
AtpPersistSessionHandler,
AtpSessionData,
ComAtprotoServerXrpcAgentOptionsOpts,
} from './types'

const REFRESH_SESSION = 'com.atproto.server.refreshSession'
export interface AuthenticatedXrpcAgent extends XrpcAgent {
ownDid(): string | PromiseLike<string>
}

export abstract class AuthXrpcAgent extends XrpcAgent {
abstract ownDid(): string | PromiseLike<string>
export function isAuthenticatedXrpcAgent<T>(
agent: T,
): agent is T & AuthenticatedXrpcAgent {
return (
isXrpcAgent(agent) &&
'ownDid' in agent &&
typeof agent.ownDid === 'function'
)
}

export class OAuthXrpcAgent extends AuthXrpcAgent {
constructor(protected oauthClient: OAuthClient) {
super((url, init) => oauthClient.request(url, init))
export class OAuthXrpcAgent implements AuthenticatedXrpcAgent {
constructor(protected oauthClient: OAuthClient) {}

async fetchHandler(url: string, init: RequestInit): Promise<Response> {
return this.oauthClient.request(url, init)
}

async ownDid() {
Expand All @@ -47,24 +54,32 @@ export class OAuthXrpcAgent extends AuthXrpcAgent {
*
* @deprecated Use {@link OAuthClient} instead.
*/
export class ComAtprotoServerXrpcAgent extends AuthXrpcAgent {
service: URL
api: AtpClient
session?: AtpSessionData
pdsUrl?: URL // The PDS URL, driven by the did doc. May be undefined.
export class ComAtprotoServerXrpcAgent implements AuthenticatedXrpcAgent {
protected service: URL
protected api: AtpClient
protected session?: AtpSessionData
protected pdsUrl?: URL // The PDS URL, driven by the did doc. May be undefined.

protected _persistSession?: AtpPersistSessionHandler
protected _refreshSessionPromise: Promise<void> | undefined

constructor(opts: ComAtprotoServerXrpcAgentOptionsOpts) {
super((url, init) => this._fetch(url, init))

this.service = new URL(opts.service)
this._persistSession = opts.persistSession

// For convenience, we create an AtpClient to call the com.atproto.server.*
// endpoints.
this.api = new AtpClient(new XrpcClient(this, schemas))
// 'this' being an XrpcAgent, it *would* be used as agent for the AtpClient
// constructor (`new AtpClient(this)`). However, since this class's
// fetchHandler performs session management, which we don't want to happen
// for session management purposes, we create an inner XrpcAgent that
// doesn't have session management and uses `this.service` instead of trying
// to send requests towards the `this.pdsUrl`.
this.api = new AtpClient({
service: () => this.service,
headers: () => ({
authorization:
this.session?.accessJwt && `Bearer ${this.session?.accessJwt}`,
}),
})
}

ownDid() {
Expand Down Expand Up @@ -209,7 +224,7 @@ export class ComAtprotoServerXrpcAgent extends AuthXrpcAgent {
* @note We define this as a method on the prototype instead of inlining the
* function in the constructor to allow overriding in subclasses.
*/
protected async _fetch(url: string, reqInit: RequestInit): Promise<Response> {
async fetchHandler(url: string, reqInit: RequestInit): Promise<Response> {
// wait for any active session-refreshes to finish
await this._refreshSessionPromise

Expand Down Expand Up @@ -293,40 +308,32 @@ export class ComAtprotoServerXrpcAgent extends AuthXrpcAgent {
return
}

// send the refresh request
const url = new URL(`/xrpc/${REFRESH_SESSION}`, this.pdsUrl || this.service)

const res = await globalThis.fetch(url.toString(), {
method: 'POST',
headers: {
authorization: `Bearer ${this.session.refreshJwt}`,
},
})

const body = await res.json()

if (
res.status === 400 &&
isErrorResponse(body, ['ExpiredToken', 'InvalidToken'])
) {
// failed due to a bad refresh token
this.session = undefined
this._persistSession?.('expired', undefined)
} else if (isNewSessionObject(this.api.xrpc.lex, body)) {
try {
const res = await this.api.com.atproto.server.refreshSession()
// succeeded, update the session
this.session = {
...(this.session || {}),
accessJwt: body.accessJwt,
refreshJwt: body.refreshJwt,
handle: body.handle,
did: body.did,
...this.session,
accessJwt: res.data.accessJwt,
refreshJwt: res.data.refreshJwt,
handle: res.data.handle,
did: res.data.did,
}
this._updateApiEndpoint(body.didDoc)
this._updateApiEndpoint(res.data.didDoc)
this._persistSession?.('update', this.session)
} catch (err) {
if (
err instanceof XRPCError &&
err.error &&
['ExpiredToken', 'InvalidToken'].includes(err.error)
) {
// failed due to a bad refresh token
this.session = undefined
this._persistSession?.('expired', undefined)
}
// else: other failures should be ignored - the issue will
// propagate in the fetchHandler() handler's second attempt to run
// the request
}
// else: other failures should be ignored - the issue will
// propagate in the _fetch() handler's second attempt to run
// the request
}

/**
Expand Down Expand Up @@ -361,15 +368,3 @@ function isErrorResponse(body: unknown, errorNames: string[]): boolean {
errorNames.includes(body.error)
)
}

function isNewSessionObject(
lex: Lexicons,
v: unknown,
): v is ComAtprotoServerRefreshSession.OutputSchema {
try {
lex.assertValidXrpcOutput('com.atproto.server.refreshSession', v)
return true
} catch {
return false
}
}

0 comments on commit 99d3033

Please sign in to comment.