Skip to content

Commit

Permalink
fix(api): better support react-native
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieusieben committed Apr 30, 2024
1 parent 06875aa commit 2d86132
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions packages/api/src/dispatcher/session-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import {
} from '../types'
import { AtpDispatcher } from './atp-dispatcher'

const ReadableStream = globalThis.ReadableStream as
| typeof globalThis.ReadableStream
| undefined

export type Fetch = (this: void, request: Request) => Promise<Response>
export interface SessionDispatcherOptions {
service: string | URL
Expand Down Expand Up @@ -97,21 +101,31 @@ export class SessionDispatcher extends AtpDispatcher {
if (!this.session?.refreshJwt) {
return initialRes
}
if (!(await isErrorResponse(initialRes, [400], ['ExpiredToken']))) {
const isExpiredToken = await isErrorResponse(
initialRes,
[400],
['ExpiredToken'],
)

if (!isExpiredToken) {
return initialRes
}

try {
await this.refreshSession()
} catch {
// Should never happen...
return initialRes
}

if (reqInit?.signal?.aborted) {
return initialRes
}

// The stream was already consumed. We cannot retry the request. A solution
// would be to tee() the input stream but that would bufferize the entire
// stream in memory which can lead to memory starvation. Instead, we will
// return the original response and let the calling code handle retries.
if (reqInit.body instanceof ReadableStream) {
if (ReadableStream && reqInit.body instanceof ReadableStream) {
return initialRes
}

Expand All @@ -121,7 +135,7 @@ export class SessionDispatcher extends AtpDispatcher {
return initialRes
}

// Make sure the initial request is canceled to avoid leaking resources
// Make sure the initial request is cancelled to avoid leaking resources
// (NodeJS 👀): https://undici.nodejs.org/#/?id=garbage-collection
await initialRes.body?.cancel()

Expand Down Expand Up @@ -237,7 +251,11 @@ export class SessionDispatcher extends AtpDispatcher {
): Promise<ComAtprotoServerGetSession.Response> {
try {
this.session = session
const res = await this.client.com.atproto.server.getSession()
// For this particular call, we want this._dispatch() to be used in order
// to refresh the session if needed. To do so, we use a (new) AtpClient
// instance to build the HTTP request, and pass "this" as the dispatcher
// so that this._dispatch() is called.
const res = await new AtpClient(this).com.atproto.server.getSession()
if (res.data.did !== this.session.did) {
throw new XRPCError(
ResponseType.InvalidRequest,
Expand Down Expand Up @@ -360,7 +378,8 @@ async function isErrorResponse(
errorNames: string[],
): Promise<boolean> {
if (!status.includes(response.status)) return false
if (!response.body) return false
// Some engines (react-native 👀) don't expose a response.body property...
// if (!response.body) return false
try {
const json = await peekJson(response, 10 * 1024)
return isErrorObject(json) && (errorNames as any[]).includes(json.error)
Expand All @@ -375,23 +394,7 @@ async function peekJson(
): Promise<unknown> {
if (extractType(response) !== 'application/json') throw new Error('Not JSON')
if (extractLength(response) > maxSize) throw new Error('Response too large')
if (!response.body) throw new Error('No body')
const transform = maxSizeTransform(maxSize)
return new Response(
response.clone().body!.pipeThrough(transform),
response,
).json()
}

function maxSizeTransform(maxBytes: number) {
if (!TransformStream) throw new Error('TransformStream not available')
let bytesRead = 0
return new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, ctrl) {
if ((bytesRead += chunk.length) <= maxBytes) ctrl.enqueue(chunk)
else ctrl.error('Response too large')
},
})
return response.clone().json()
}

function extractLength({ headers }: Response) {
Expand Down

0 comments on commit 2d86132

Please sign in to comment.