diff --git a/CHANGES.md b/CHANGES.md index 1ec091e3..2fce2f50 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -98,7 +98,7 @@ To be released. origin. This sidesteps CORS configurations on remote object stores and prevents the visitor's browser from talking directly to the source server. Controlled by a new `MEDIA_PROXY` environment - variable with three levels: [[#481], [#483]] + variable with three levels: [[#481], [#483], [#493]] - `off` (default): the Mastodon API and web UI hand the original remote URL to clients, matching the historical behaviour. @@ -113,8 +113,12 @@ To be released. - `cache`: same URL rewriting, but the streamed body is persisted to the configured storage backend as `proxy/.bin`, with a content-type sidecar alongside it at `proxy/.json`. - Subsequent requests skip the upstream fetch. The admin - dashboard at */thumbnail\_cleanup* can purge the cache on demand. + Subsequent requests skip the upstream fetch. Remote actor avatars + for accounts with an approved follow relationship to the local + account are also prefetched into this same cache when the actor is + stored or refreshed, so stale upstream avatar files can keep + rendering after Hollo has seen them once. The admin dashboard at + */thumbnail\_cleanup* can purge the cache on demand. `MEDIA_PROXY` also accepts the Boolean synonyms `true`/`on`/`1` (as aliases for `proxy`) and `false`/`off`/`0` (as aliases for @@ -401,6 +405,7 @@ To be released. [#489]: https://github.com/fedify-dev/hollo/issues/489 [#490]: https://github.com/fedify-dev/hollo/pull/490 [#491]: https://github.com/fedify-dev/hollo/pull/491 +[#493]: https://github.com/fedify-dev/hollo/pull/493 Version 0.8.4 diff --git a/docs/src/content/docs/install/env.mdx b/docs/src/content/docs/install/env.mdx index d07e6bf5..6ae0a435 100644 --- a/docs/src/content/docs/install/env.mdx +++ b/docs/src/content/docs/install/env.mdx @@ -187,8 +187,11 @@ are: - `cache`: same URL rewriting as `proxy`, but the streamed body is persisted to the configured storage backend as `proxy/.bin` alongside a content-type sidecar at `proxy/.json`. - Subsequent requests skip the upstream fetch. The admin dashboard - at */thumbnail_cleanup* can purge the cache on demand. + Subsequent requests skip the upstream fetch. Remote actor avatars + for accounts with an approved follow relationship to the local account + are also prefetched into this cache when the actor is stored or + refreshed. The admin dashboard at */thumbnail_cleanup* can purge + the cache on demand. The boolean synonyms `true` / `on` / `1` are accepted as aliases for `proxy`, and `false` / `off` / `0` as aliases for `off`. Disk caching diff --git a/docs/src/content/docs/ja/install/env.mdx b/docs/src/content/docs/ja/install/env.mdx index 16891190..1e2dafeb 100644 --- a/docs/src/content/docs/ja/install/env.mdx +++ b/docs/src/content/docs/ja/install/env.mdx @@ -188,8 +188,10 @@ Holloは起動に失敗します。**最初のアカウントを作成する前 本文を設定済みのストレージバックエンドの`proxy/.bin`に 保存し、Content-Type情報を持つサイドカーを`proxy/.json`に 一緒に保存します。以降のリクエストはアップストリームを再取得 - しません。管理ダッシュボードの */thumbnail_cleanup* から必要に - 応じてキャッシュを消去できます。 + しません。ローカルアカウントと承認済みのフォロー関係がある + リモートactorのアバターも、actorが保存または更新されるときに + このキャッシュへ事前取得されます。管理ダッシュボードの + */thumbnail_cleanup* から必要に応じてキャッシュを消去できます。 真偽値の同義語として`true` / `on` / `1`は`proxy`の別名、 `false` / `off` / `0`は`off`の別名として受け付けます。ディスク diff --git a/docs/src/content/docs/ko/install/env.mdx b/docs/src/content/docs/ko/install/env.mdx index be923267..b44106f3 100644 --- a/docs/src/content/docs/ko/install/env.mdx +++ b/docs/src/content/docs/ko/install/env.mdx @@ -182,8 +182,10 @@ Hollo가 L7 로드 밸런서 뒤에 위치할 경우 (일반적으로 그래야 - `cache`: `proxy`와 동일한 URL 재작성에 더해, 스트리밍한 본문을 설정된 저장소 백엔드의 `proxy/.bin`에 저장하고, 콘텐츠 타입 정보를 담은 사이드카 파일을 `proxy/.json`에 함께 저장합니다. - 이후 요청은 원본을 다시 가져오지 않습니다. */thumbnail_cleanup* - 관리자 페이지에서 필요할 때 캐시를 비울 수 있습니다. + 이후 요청은 원본을 다시 가져오지 않습니다. 로컬 계정과 승인된 + 팔로우 관계가 있는 리모트 액터의 아바타도 액터가 저장되거나 갱신될 + 때 이 캐시에 미리 저장됩니다. */thumbnail_cleanup* 관리자 + 페이지에서 필요할 때 캐시를 비울 수 있습니다. 불리언 동의어로 `true` / `on` / `1`은 `proxy`의 별칭으로, `false` / `off` / `0`은 `off`의 별칭으로 받아들입니다. 디스크 캐싱은 diff --git a/docs/src/content/docs/zh-cn/install/env.mdx b/docs/src/content/docs/zh-cn/install/env.mdx index d9797702..6531e77b 100644 --- a/docs/src/content/docs/zh-cn/install/env.mdx +++ b/docs/src/content/docs/zh-cn/install/env.mdx @@ -168,8 +168,9 @@ openssl rand -hex 32 避免远程 CORS 配置的影响,也避免访问者的 IP 被泄露。 - `cache`:URL 改写与 `proxy` 相同,但会把流式获取的响应主体保存到 所配置存储后端的 `proxy/.bin`,并把记录内容类型的旁路文件 - 保存到 `proxy/.json`。后续请求会跳过上游请求。管理面板的 - */thumbnail_cleanup* 页面可以按需清空缓存。 + 保存到 `proxy/.json`。后续请求会跳过上游请求。与本地账号 + 存在已批准关注关系的远程 actor 头像,也会在 actor 被保存或刷新时 + 预取到此缓存中。管理面板的 */thumbnail_cleanup* 页面可以按需清空缓存。 布尔同义值:`true` / `on` / `1` 作为 `proxy` 的别名, `false` / `off` / `0` 作为 `off` 的别名。磁盘缓存必须用 `cache` 显式 diff --git a/docs/src/content/docs/zh-tw/install/env.mdx b/docs/src/content/docs/zh-tw/install/env.mdx index 44c379de..758145be 100644 --- a/docs/src/content/docs/zh-tw/install/env.mdx +++ b/docs/src/content/docs/zh-tw/install/env.mdx @@ -168,8 +168,9 @@ openssl rand -hex 32 避免遠端 CORS 設定的影響,也避免訪客的 IP 被洩露。 - `cache`:URL 改寫與 `proxy` 相同,但會把串流取得的回應主體儲存到 所設定儲存後端的 `proxy/.bin`,並把記錄內容類型的旁路檔案 - 儲存到 `proxy/.json`。後續請求會跳過上游請求。管理面板的 - */thumbnail_cleanup* 頁面可以按需清空快取。 + 儲存到 `proxy/.json`。後續請求會跳過上游請求。與本機帳號 + 存在已核准追蹤關係的遠端 actor 頭像,也會在 actor 被儲存或重新整理時 + 預先擷取到此快取中。管理面板的 */thumbnail_cleanup* 頁面可以按需清空快取。 布林同義值:`true` / `on` / `1` 作為 `proxy` 的別名, `false` / `off` / `0` 作為 `off` 的別名。磁碟快取必須用 `cache` 明確 diff --git a/src/federation/account.persistence.test.ts b/src/federation/account.persistence.test.ts index 01bdb810..e9a90322 100644 --- a/src/federation/account.persistence.test.ts +++ b/src/federation/account.persistence.test.ts @@ -1,10 +1,12 @@ -import { Person } from "@fedify/vocab"; -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { Image, Person } from "@fedify/vocab"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { cleanDatabase } from "../../tests/helpers"; import { createAccount } from "../../tests/helpers/oauth"; import db from "../db"; +import { proxyCacheKeyForUrl } from "../proxy-cache"; import * as Schema from "../schema"; +import { drive } from "../storage"; import type { Uuid } from "../uuid"; import { AccountHandleConflictError, @@ -12,6 +14,34 @@ import { updateAccountStats, } from "./account"; +async function waitFor(condition: () => Promise): Promise { + for (let i = 0; i < 50; i++) { + if (await condition()) return; + await new Promise((resolve) => setTimeout(resolve, 10)); + } +} + +function getFetchMockCalls(): Parameters[] { + return ( + globalThis.fetch as unknown as { + mock: { calls: Parameters[] }; + } + ).mock.calls; +} + +function isFetchCallForUrl( + [input]: Parameters, + url: string, +): boolean { + const requestUrl = + typeof input === "string" + ? input + : input instanceof URL + ? input.href + : input.url; + return requestUrl === url; +} + async function createRemoteAccount(params: { iri: string; handle: string; @@ -78,13 +108,19 @@ async function createLocalPost(accountId: Uuid): Promise { return post; } -function createRemotePerson(iri: string, username: string): Person { +function createRemotePerson( + iri: string, + username: string, + avatarUrl?: string, +): Person { return new Person({ id: new URL(iri), preferredUsername: username, name: "Michael Foster", inbox: new URL(`${iri}/inbox`), url: new URL(`https://${new URL(iri).host}/@${username}`), + icon: + avatarUrl == null ? undefined : new Image({ url: new URL(avatarUrl) }), }); } @@ -298,3 +334,126 @@ describe.sequential("persistAccount canonical handle reassignment", () => { } }); }); + +describe.sequential("persistAccount remote avatar cache", () => { + beforeEach(async () => { + await cleanDatabase(); + drive.fake(); + vi.restoreAllMocks(); + vi.unstubAllGlobals(); + }); + + afterEach(() => { + drive.restore(); + }); + + it("does not prefetch unrelated remote avatars in cache mode", async () => { + expect.assertions(3); + + const avatarUrl = "https://remote.test/users/michael/avatar.webp"; + vi.stubGlobal( + "fetch", + vi.fn(async () => new Response(null, { status: 404 })), + ); + + const account = await persistAccount( + db, + createRemotePerson( + "https://remote.test/users/michael", + "michael", + avatarUrl, + ), + "https://hollo.test", + { mediaProxyMode: "cache" }, + ); + + const key = proxyCacheKeyForUrl(avatarUrl); + + expect(account?.avatarUrl).toBe(avatarUrl); + expect( + getFetchMockCalls().some((call) => isFetchCallForUrl(call, avatarUrl)), + ).toBe(false); + expect(await drive.use().exists(`${key}.bin`)).toBe(false); + }); + + it("prefetches a related remote avatar into the proxy cache in cache mode", async () => { + expect.assertions(6); + + const avatarUrl = "https://remote.test/users/michael/avatar.webp"; + const avatar = new Uint8Array([10, 20, 30, 40]); + let resolveAvatarFetch: (response: Response) => void; + const avatarFetch = new Promise((resolve) => { + resolveAvatarFetch = resolve; + }); + vi.stubGlobal( + "fetch", + vi.fn(async (input: string | URL | Request) => { + const requestUrl = + typeof input === "string" + ? input + : input instanceof URL + ? input.href + : input.url; + if (requestUrl === avatarUrl) { + return await avatarFetch; + } + return new Response(null, { status: 404 }); + }), + ); + + const initialAccount = await persistAccount( + db, + createRemotePerson( + "https://remote.test/users/michael", + "michael", + avatarUrl, + ), + "https://hollo.test", + { mediaProxyMode: "cache" }, + ); + if (initialAccount == null) throw new Error("Expected remote account"); + + const localAccount = await createAccount(); + await db.insert(Schema.follows).values({ + iri: "https://hollo.test/#follows/remote-michael", + followingId: initialAccount.id, + followerId: localAccount.id, + approved: new Date(), + }); + + const account = await persistAccount( + db, + createRemotePerson( + "https://remote.test/users/michael", + "michael", + avatarUrl, + ), + "https://hollo.test", + { mediaProxyMode: "cache" }, + ); + + const disk = drive.use(); + const key = proxyCacheKeyForUrl(avatarUrl); + + expect(account?.avatarUrl).toBe(avatarUrl); + expect(await disk.exists(`${key}.bin`)).toBe(false); + await waitFor(async () => + getFetchMockCalls().some((call) => isFetchCallForUrl(call, avatarUrl)), + ); + expect(globalThis.fetch).toHaveBeenCalledWith( + avatarUrl, + expect.any(Object), + ); + + resolveAvatarFetch!( + new Response(avatar.buffer as ArrayBuffer, { + status: 200, + headers: { "Content-Type": "image/webp" }, + }), + ); + await waitFor(async () => await disk.exists(`${key}.bin`)); + expect(await disk.exists(`${key}.bin`)).toBe(true); + expect(await disk.exists(`${key}.json`)).toBe(true); + expect(await disk.getBytes(`${key}.bin`)).toEqual(avatar); + }); +}); diff --git a/src/federation/account.ts b/src/federation/account.ts index 18cbd764..ff8d3cf3 100644 --- a/src/federation/account.ts +++ b/src/federation/account.ts @@ -21,6 +21,8 @@ import { getLogger } from "@logtape/logtape"; import { and, count, eq, inArray, isNotNull, sql } from "drizzle-orm"; import type { DatabaseLike } from "../db"; +import { MEDIA_PROXY, type MediaProxyMode } from "../media-proxy"; +import { scheduleProxyCachePrefetchForMode } from "../proxy-cache"; import type { NewPinnedPost, Post } from "../schema"; import * as schema from "../schema"; import { type Uuid, uuidv7 } from "../uuid"; @@ -55,6 +57,7 @@ export const REFRESH_ACTORS_ON_INTERACTION = refreshOnInteractionEnv === "yes"; export type PersistAccountHandleConflictPolicy = "throw" | "skip"; +export type AvatarCachePrefetchPolicy = "always" | "related" | "never"; export class AccountHandleConflictError extends Error { readonly actorIri: string; @@ -90,6 +93,8 @@ export type PersistAccountOptions = { documentLoader?: DocumentLoader; skipUpdate?: boolean; handleConflictPolicy?: PersistAccountHandleConflictPolicy; + mediaProxyMode?: MediaProxyMode; + avatarCachePrefetch?: AvatarCachePrefetchPolicy; }; function getAcctUri(handle: string): string { @@ -165,6 +170,56 @@ function skipAccountConflict(error: AccountHandleConflictError): void { ); } +async function hasApprovedLocalFollowRelationship( + db: DatabaseLike, + accountId: Uuid, +): Promise { + const followedByLocal = await db + .select({ iri: schema.follows.iri }) + .from(schema.follows) + .innerJoin( + schema.accountOwners, + eq(schema.follows.followerId, schema.accountOwners.id), + ) + .where( + and( + eq(schema.follows.followingId, accountId), + isNotNull(schema.follows.approved), + ), + ) + .limit(1); + if (followedByLocal.length > 0) return true; + + const followingLocal = await db + .select({ iri: schema.follows.iri }) + .from(schema.follows) + .innerJoin( + schema.accountOwners, + eq(schema.follows.followingId, schema.accountOwners.id), + ) + .where( + and( + eq(schema.follows.followerId, accountId), + isNotNull(schema.follows.approved), + ), + ) + .limit(1); + return followingLocal.length > 0; +} + +async function shouldPrefetchAvatar( + db: DatabaseLike, + account: schema.Account & { owner: schema.AccountOwner | null }, + mode: MediaProxyMode, + avatarUrl: string | null | undefined, + policy: AvatarCachePrefetchPolicy, +): Promise { + if (mode !== "cache" || avatarUrl == null) return false; + if (account.owner != null || policy === "never") return false; + if (policy === "always") return true; + return await hasApprovedLocalFollowRelationship(db, account.id); +} + export function getFollowOrderingKey( followerIri: string, followingIri: string, @@ -347,6 +402,20 @@ export async function persistAccount( where: { iri: { eq: actorId.href } }, }); if (account == null) return null; + const mediaProxyMode = options.mediaProxyMode ?? MEDIA_PROXY; + if ( + await shouldPrefetchAvatar( + db, + account, + mediaProxyMode, + values.avatarUrl, + options.avatarCachePrefetch ?? "related", + ) + ) { + // This is cache warming only. Account persistence should not wait for a + // slow remote avatar CDN after the database row has already been stored. + scheduleProxyCachePrefetchForMode(mediaProxyMode, values.avatarUrl); + } const [{ posts }] = await db .select({ posts: count() }) .from(schema.posts) diff --git a/src/proxy-cache.test.ts b/src/proxy-cache.test.ts new file mode 100644 index 00000000..35c2b25f --- /dev/null +++ b/src/proxy-cache.test.ts @@ -0,0 +1,247 @@ +import { + afterEach, + beforeEach, + describe, + expect, + it, + vi, + type MockInstance, +} from "vitest"; + +import { signProxyUrl } from "./media-proxy"; +import { createProxyApp } from "./proxy"; +import { + hasProxyCacheEntry, + prefetchProxyCacheForMode, + PROXY_CACHE_PREFETCH_CONCURRENCY, + proxyCacheKeyForUrl, + readProxyCacheEntry, + scheduleProxyCachePrefetchForMode, +} from "./proxy-cache"; +import { drive } from "./storage"; + +async function waitFor(condition: () => Promise): Promise { + for (let i = 0; i < 50; i++) { + if (await condition()) return; + await new Promise((resolve) => setTimeout(resolve, 10)); + } +} + +function buildResponse( + body: Uint8Array | string, + options: { status?: number; contentType?: string } = {}, +): Response { + const status = options.status ?? 200; + const contentType = options.contentType ?? "image/png"; + const init: BodyInit = + typeof body === "string" ? body : (body.buffer as ArrayBuffer); + return new Response(init, { + status, + headers: { "Content-Type": contentType }, + }); +} + +describe.sequential("proxy cache prefetch", () => { + let fetchMock: MockInstance; + + beforeEach(() => { + drive.fake(); + fetchMock = vi.spyOn(globalThis, "fetch") as unknown as MockInstance< + typeof fetch + >; + fetchMock.mockReset(); + }); + + afterEach(() => { + drive.restore(); + fetchMock.mockRestore(); + }); + + it("prefetches into the same cache entry served by the proxy route", async () => { + expect.assertions(6); + + const avatar = new Uint8Array([1, 3, 5, 7]); + fetchMock.mockResolvedValue( + buildResponse(avatar, { contentType: "image/webp" }), + ); + + const url = "https://remote.example/avatar.webp"; + const prefetched = await prefetchProxyCacheForMode("cache", url); + const disk = drive.use(); + const key = proxyCacheKeyForUrl(url); + + expect(prefetched).toBe(true); + expect(await disk.exists(`${key}.bin`)).toBe(true); + expect(await disk.exists(`${key}.json`)).toBe(true); + + const app = createProxyApp("cache"); + const { sig, b64url } = signProxyUrl(url); + const response = await app.request(`/${sig}/${b64url}`); + + expect(response.status).toBe(200); + expect(new Uint8Array(await response.arrayBuffer())).toEqual(avatar); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it("does not fetch when the cache entry already exists", async () => { + expect.assertions(4); + + const avatar = new Uint8Array([2, 4, 6, 8]); + fetchMock.mockResolvedValueOnce( + buildResponse(avatar, { contentType: "image/png" }), + ); + + const url = "https://remote.example/already.png"; + const disk = drive.use(); + await expect(prefetchProxyCacheForMode("cache", url)).resolves.toBe(true); + const getBytesMock = vi.spyOn(disk, "getBytes"); + await expect(prefetchProxyCacheForMode("cache", url)).resolves.toBe(true); + + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(getBytesMock).not.toHaveBeenCalled(); + }); + + it("treats missing, null, or malformed cache metadata as a cache miss", async () => { + expect.assertions(6); + + const disk = drive.use(); + const missingMetaKey = proxyCacheKeyForUrl( + "https://remote.example/missing-meta.png", + ); + await disk.put(`${missingMetaKey}.bin`, new Uint8Array([1, 2, 3]), { + contentType: "image/png", + visibility: "public", + }); + + await expect(readProxyCacheEntry(missingMetaKey)).resolves.toBeNull(); + await expect(hasProxyCacheEntry(missingMetaKey)).resolves.toBe(false); + + const nullMetaKey = proxyCacheKeyForUrl( + "https://remote.example/null-meta.png", + ); + await disk.put(`${nullMetaKey}.bin`, new Uint8Array([4, 5, 6]), { + contentType: "image/png", + visibility: "public", + }); + await disk.put(`${nullMetaKey}.json`, "null", { + contentType: "application/json", + visibility: "public", + }); + + await expect(readProxyCacheEntry(nullMetaKey)).resolves.toBeNull(); + await expect(hasProxyCacheEntry(nullMetaKey)).resolves.toBe(false); + + const malformedMetaKey = proxyCacheKeyForUrl( + "https://remote.example/malformed-meta.png", + ); + await disk.put(`${malformedMetaKey}.bin`, new Uint8Array([7, 8, 9]), { + contentType: "image/png", + visibility: "public", + }); + await disk.put(`${malformedMetaKey}.json`, "{", { + contentType: "application/json", + visibility: "public", + }); + + await expect(readProxyCacheEntry(malformedMetaKey)).resolves.toBeNull(); + await expect(hasProxyCacheEntry(malformedMetaKey)).resolves.toBe(false); + }); + + it("is a no-op outside cache mode", async () => { + expect.assertions(3); + + const url = "https://remote.example/no-cache.png"; + + await expect(prefetchProxyCacheForMode("off", url)).resolves.toBe(false); + await expect(prefetchProxyCacheForMode("proxy", url)).resolves.toBe(false); + expect(fetchMock).not.toHaveBeenCalled(); + }); + + it("does not cache unsafe or unsupported responses", async () => { + expect.assertions(4); + + await expect( + prefetchProxyCacheForMode("cache", "http://127.0.0.1/avatar.png"), + ).resolves.toBe(false); + + fetchMock.mockResolvedValue( + buildResponse("", { contentType: "image/svg+xml" }), + ); + + const svgUrl = "https://remote.example/avatar.svg"; + await expect(prefetchProxyCacheForMode("cache", svgUrl)).resolves.toBe( + false, + ); + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(await drive.use().exists(`${proxyCacheKeyForUrl(svgUrl)}.bin`)).toBe( + false, + ); + }); + + it("deduplicates scheduled prefetches for the same cache key", async () => { + expect.assertions(4); + + let resolveFetch: (response: Response) => void; + fetchMock.mockImplementationOnce( + async () => + await new Promise((resolve) => { + resolveFetch = resolve; + }), + ); + + const url = "https://remote.example/dedupe.png"; + const key = proxyCacheKeyForUrl(url); + + expect(scheduleProxyCachePrefetchForMode("cache", url)).toBe(true); + expect(scheduleProxyCachePrefetchForMode("cache", url)).toBe(false); + await waitFor(async () => fetchMock.mock.calls.length === 1); + expect(fetchMock).toHaveBeenCalledTimes(1); + + resolveFetch!(buildResponse(new Uint8Array([9, 9, 9]))); + await waitFor(async () => await drive.use().exists(`${key}.bin`)); + expect(await drive.use().exists(`${key}.bin`)).toBe(true); + }); + + it("bounds scheduled prefetch concurrency", async () => { + expect.assertions(4); + + const resolveFetches: Array<(response: Response) => void> = []; + fetchMock.mockImplementation( + async () => + await new Promise((resolve) => { + resolveFetches.push(resolve); + }), + ); + + const urls = Array.from( + { length: PROXY_CACHE_PREFETCH_CONCURRENCY + 2 }, + (_, i) => `https://remote.example/concurrent-${i}.png`, + ); + + expect( + urls.map((url) => scheduleProxyCachePrefetchForMode("cache", url)), + ).toEqual(urls.map(() => true)); + await waitFor( + async () => + fetchMock.mock.calls.length === PROXY_CACHE_PREFETCH_CONCURRENCY, + ); + expect(fetchMock).toHaveBeenCalledTimes(PROXY_CACHE_PREFETCH_CONCURRENCY); + + resolveFetches[0]?.(buildResponse(new Uint8Array([1]))); + await waitFor( + async () => + fetchMock.mock.calls.length === PROXY_CACHE_PREFETCH_CONCURRENCY + 1, + ); + expect(fetchMock).toHaveBeenCalledTimes( + PROXY_CACHE_PREFETCH_CONCURRENCY + 1, + ); + + for (let i = 1; i < urls.length; i++) { + await waitFor(async () => resolveFetches.length > i); + resolveFetches[i]?.(buildResponse(new Uint8Array([i + 1]))); + } + const lastKey = proxyCacheKeyForUrl(urls[urls.length - 1]); + await waitFor(async () => await drive.use().exists(`${lastKey}.bin`)); + expect(await drive.use().exists(`${lastKey}.bin`)).toBe(true); + }); +}); diff --git a/src/proxy-cache.ts b/src/proxy-cache.ts new file mode 100644 index 00000000..00952faf --- /dev/null +++ b/src/proxy-cache.ts @@ -0,0 +1,362 @@ +import { createHash } from "node:crypto"; + +import { getLogger } from "@logtape/logtape"; +// @ts-expect-error: No type definitions available +// cSpell: ignore ssrfcheck +import { isSSRFSafeURL } from "ssrfcheck"; + +import { type MediaProxyMode } from "./media-proxy"; +import { drive } from "./storage"; + +const logger = getLogger(["hollo", "media-proxy"]); + +export const PROXY_CACHE_CONTROL = "public, max-age=2592000, immutable"; + +const MAX_BYTES = 32 * 1024 * 1024; +const FETCH_TIMEOUT_MS = 30_000; +export const PROXY_CACHE_PREFETCH_CONCURRENCY = 4; +const MAX_PROXY_CACHE_PREFETCH_QUEUE = 1024; +const MAX_REDIRECTS = 3; +const CACHE_PREFIX = "proxy/"; +const ALLOWED_TYPE_PREFIXES = ["image/", "video/", "audio/"]; +// SVG can carry inline scripts that execute under the serving origin, so +// proxying it would amount to a same-origin XSS primitive even when delivered +// with the right Content-Type. Most fediverse media is PNG / JPEG / WebP / +// MP4, so the loss is small. +const BLOCKED_CONTENT_TYPES = new Set(["image/svg+xml", "image/svg"]); + +export interface ProxyCacheEntry { + body: Uint8Array; + contentType: string; +} + +export interface ProxyMediaResponse { + status: 200 | 206; + body: Uint8Array; + contentType: string; + contentRange: string | null; + acceptRanges: string | null; +} + +export interface ProxyRangeNotSatisfiableResponse { + status: 416; + contentRange: string | null; +} + +export type ProxyFetchResult = + | ProxyMediaResponse + | ProxyRangeNotSatisfiableResponse; + +export function proxyCacheKeyForUrl(url: string): string { + return CACHE_PREFIX + createHash("sha256").update(url).digest("hex"); +} + +function isAllowedContentType(value: string): boolean { + const lower = value.toLowerCase().split(";", 1)[0].trim(); + if (BLOCKED_CONTENT_TYPES.has(lower)) return false; + return ALLOWED_TYPE_PREFIXES.some((p) => lower.startsWith(p)); +} + +function parseProxyCacheMetadata( + content: string | null | undefined, +): { contentType: string } | null { + if (content == null) return null; + let meta: unknown; + try { + meta = JSON.parse(content) as unknown; + } catch { + return null; + } + if (meta == null || typeof meta !== "object") return null; + const contentType = (meta as { contentType?: unknown }).contentType; + if (typeof contentType !== "string" || !isAllowedContentType(contentType)) { + return null; + } + return { contentType }; +} + +// Convert a Uint8Array to an ArrayBuffer with no surrounding bytes. Node's +// `Buffer` uses a shared backing pool, so `.buffer` on a small read can expose +// unrelated memory if we don't slice to the view's exact range. +export function toExactArrayBuffer(buf: Uint8Array): ArrayBuffer { + if ( + buf.byteOffset === 0 && + buf.byteLength === buf.buffer.byteLength && + buf.buffer instanceof ArrayBuffer + ) { + return buf.buffer; + } + const out = new ArrayBuffer(buf.byteLength); + new Uint8Array(out).set(buf); + return out; +} + +export async function discardBody(response: Response): Promise { + try { + await response.body?.cancel(); + } catch { + /* upstream is already gone — nothing to do */ + } +} + +async function readBoundedBody( + response: Response, + maxBytes: number, +): Promise { + const reader = response.body?.getReader(); + if (reader == null) return null; + const chunks: Uint8Array[] = []; + let total = 0; + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value == null) continue; + total += value.byteLength; + if (total > maxBytes) { + await reader.cancel().catch(() => {}); + return null; + } + chunks.push(value); + } + } catch { + await reader.cancel().catch(() => {}); + return null; + } + const result = new Uint8Array(total); + let offset = 0; + for (const chunk of chunks) { + result.set(chunk, offset); + offset += chunk.byteLength; + } + return result; +} + +// Follow up to MAX_REDIRECTS redirects manually so we can re-run the SSRF +// check on each hop. `fetch(..., { redirect: "follow" })` would silently +// chase a 302 from a public hostname to a private one. +// +// Caveat for future maintainers: `isSSRFSafeURL` only inspects the URL string +// (scheme, host literal, port). It does not resolve DNS, so a hostname that +// looks public but resolves to a private address at fetch time can still slip +// through. Fixing this in the proxy alone would leave the same gap on every +// other server-side fetch in the codebase (e.g. src/federation/post.ts, +// preview-card scraping); the proper fix is a shared SSRF-aware fetch connector +// that pins the resolved IP. +async function fetchWithSSRFAwareRedirects( + initialUrl: string, + signal: AbortSignal, + requestHeaders: Record = {}, +): Promise { + let url = initialUrl; + for (let i = 0; i <= MAX_REDIRECTS; i++) { + let parsed: URL; + try { + parsed = new URL(url); + } catch { + return null; + } + if (parsed.protocol !== "http:" && parsed.protocol !== "https:") { + return null; + } + if (!isSSRFSafeURL(url)) return null; + const response = await fetch(url, { + signal, + redirect: "manual", + headers: requestHeaders, + }); + if (response.status >= 300 && response.status < 400) { + const location = response.headers.get("location"); + await discardBody(response); + if (location == null) return null; + try { + url = new URL(location, url).href; + } catch { + return null; + } + continue; + } + return response; + } + return null; +} + +export async function readProxyCacheEntry( + key: string, +): Promise { + const disk = drive.use(); + try { + if (!(await disk.exists(`${key}.bin`))) return null; + const meta = parseProxyCacheMetadata(await disk.get(`${key}.json`)); + if (meta == null) return null; + const body = await disk.getBytes(`${key}.bin`); + return { body, contentType: meta.contentType }; + } catch (error) { + logger.warn("Failed to read proxy cache entry {key}: {error}", { + key, + error: error instanceof Error ? error.message : String(error), + }); + return null; + } +} + +export async function hasProxyCacheEntry(key: string): Promise { + const disk = drive.use(); + try { + if (!(await disk.exists(`${key}.bin`))) return false; + return parseProxyCacheMetadata(await disk.get(`${key}.json`)) != null; + } catch (error) { + logger.warn("Failed to inspect proxy cache entry {key}: {error}", { + key, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } +} + +export async function writeProxyCacheEntry( + key: string, + body: Uint8Array, + contentType: string, +): Promise { + const disk = drive.use(); + await disk.put(`${key}.bin`, body, { + contentType, + contentLength: body.byteLength, + visibility: "public", + }); + await disk.put(`${key}.json`, JSON.stringify({ contentType }), { + contentType: "application/json", + visibility: "public", + }); +} + +export async function fetchProxyMedia( + url: string, + signal: AbortSignal, + requestHeaders: Record = {}, +): Promise { + const upstream = await fetchWithSSRFAwareRedirects( + url, + signal, + requestHeaders, + ); + if (upstream == null) return null; + + if (upstream.status === 416) { + await discardBody(upstream); + return { + status: 416, + contentRange: upstream.headers.get("Content-Range"), + }; + } + if (!upstream.ok) { + await discardBody(upstream); + return null; + } + const status = upstream.status === 206 ? 206 : 200; + const contentType = + upstream.headers.get("Content-Type") ?? "application/octet-stream"; + if (!isAllowedContentType(contentType)) { + await discardBody(upstream); + return null; + } + + // When the upstream tells us the body length up front we can short-circuit + // oversized responses without spending bandwidth on the read. A missing or + // malformed header just falls through to the streaming cap inside + // readBoundedBody, which still enforces the limit. + const contentLengthHeader = upstream.headers.get("Content-Length"); + if (contentLengthHeader != null) { + const declaredLength = Number.parseInt(contentLengthHeader, 10); + if (Number.isFinite(declaredLength) && declaredLength > MAX_BYTES) { + await discardBody(upstream); + return null; + } + } + + const body = await readBoundedBody(upstream, MAX_BYTES); + if (body == null) return null; + + return { + status, + body, + contentType, + contentRange: upstream.headers.get("Content-Range"), + acceptRanges: upstream.headers.get("Accept-Ranges"), + }; +} + +export async function prefetchProxyCacheForMode( + mode: MediaProxyMode, + url: string | null | undefined, +): Promise { + if (mode !== "cache" || url == null) return false; + const key = proxyCacheKeyForUrl(url); + if (await hasProxyCacheEntry(key)) return true; + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); + try { + const result = await fetchProxyMedia(url, controller.signal); + if (result == null || result.status !== 200) return false; + await writeProxyCacheEntry(key, result.body, result.contentType); + return true; + } catch (error) { + logger.warn("Failed to prefetch remote media cache for {url}: {error}", { + url, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } finally { + clearTimeout(timeout); + } +} + +type ProxyCachePrefetchJob = { + key: string; + mode: MediaProxyMode; + url: string; +}; + +const proxyCachePrefetchQueue: ProxyCachePrefetchJob[] = []; +const scheduledProxyCachePrefetchKeys = new Set(); +let activeProxyCachePrefetches = 0; + +function drainProxyCachePrefetchQueue(): void { + while ( + activeProxyCachePrefetches < PROXY_CACHE_PREFETCH_CONCURRENCY && + proxyCachePrefetchQueue.length > 0 + ) { + const job = proxyCachePrefetchQueue.shift(); + if (job == null) return; + activeProxyCachePrefetches++; + void prefetchProxyCacheForMode(job.mode, job.url).finally(() => { + activeProxyCachePrefetches--; + scheduledProxyCachePrefetchKeys.delete(job.key); + drainProxyCachePrefetchQueue(); + }); + } +} + +export function scheduleProxyCachePrefetchForMode( + mode: MediaProxyMode, + url: string | null | undefined, +): boolean { + if (mode !== "cache" || url == null) return false; + const key = proxyCacheKeyForUrl(url); + if (scheduledProxyCachePrefetchKeys.has(key)) return false; + if ( + activeProxyCachePrefetches + proxyCachePrefetchQueue.length >= + PROXY_CACHE_PREFETCH_CONCURRENCY + MAX_PROXY_CACHE_PREFETCH_QUEUE + ) { + logger.warn("Dropping remote media cache prefetch for {url}: queue full", { + url, + }); + return false; + } + scheduledProxyCachePrefetchKeys.add(key); + proxyCachePrefetchQueue.push({ key, mode, url }); + drainProxyCachePrefetchQueue(); + return true; +} diff --git a/src/proxy.ts b/src/proxy.ts index 5061954f..592abd2c 100644 --- a/src/proxy.ts +++ b/src/proxy.ts @@ -1,202 +1,32 @@ -import { createHash } from "node:crypto"; - import { getLogger } from "@logtape/logtape"; import { Hono } from "hono"; -// @ts-expect-error: No type definitions available -// cSpell: ignore ssrfcheck -import { isSSRFSafeURL } from "ssrfcheck"; import { MEDIA_PROXY, type MediaProxyMode, verifyProxySignature, } from "./media-proxy"; -import { drive } from "./storage"; +import { + fetchProxyMedia, + PROXY_CACHE_CONTROL, + proxyCacheKeyForUrl, + readProxyCacheEntry, + toExactArrayBuffer, + writeProxyCacheEntry, +} from "./proxy-cache"; const logger = getLogger(["hollo", "media-proxy"]); -const MAX_BYTES = 32 * 1024 * 1024; const FETCH_TIMEOUT_MS = 30_000; -const MAX_REDIRECTS = 3; -const CACHE_PREFIX = "proxy/"; -const ALLOWED_TYPE_PREFIXES = ["image/", "video/", "audio/"]; -// SVG can carry inline scripts that execute under the serving origin, so -// proxying it would amount to a same-origin XSS primitive even when delivered -// with the right Content-Type. Most fediverse media is PNG / JPEG / WebP / -// MP4, so the loss is small. -const BLOCKED_CONTENT_TYPES = new Set(["image/svg+xml", "image/svg"]); -const CACHE_CONTROL = "public, max-age=2592000, immutable"; - -interface CachedEntry { - body: Uint8Array; - contentType: string; -} - -function cacheKeyForUrl(url: string): string { - return CACHE_PREFIX + createHash("sha256").update(url).digest("hex"); -} - -function isAllowedContentType(value: string): boolean { - const lower = value.toLowerCase().split(";", 1)[0].trim(); - if (BLOCKED_CONTENT_TYPES.has(lower)) return false; - return ALLOWED_TYPE_PREFIXES.some((p) => lower.startsWith(p)); -} - -// Convert a Uint8Array to an ArrayBuffer with no surrounding bytes. Node's -// `Buffer` uses a shared backing pool, so `.buffer` on a small read can -// expose unrelated memory if we don't slice to the view's exact range. -function toExactArrayBuffer(buf: Uint8Array): ArrayBuffer { - if ( - buf.byteOffset === 0 && - buf.byteLength === buf.buffer.byteLength && - buf.buffer instanceof ArrayBuffer - ) { - return buf.buffer; - } - const out = new ArrayBuffer(buf.byteLength); - new Uint8Array(out).set(buf); - return out; -} const RESPONSE_HEADERS = (contentType: string): Record => ({ "Content-Type": contentType, - "Cache-Control": CACHE_CONTROL, + "Cache-Control": PROXY_CACHE_CONTROL, // Prevent the browser from MIME-sniffing the body into a different, // possibly active, content type. "X-Content-Type-Options": "nosniff", }); -async function discardBody(response: Response): Promise { - try { - await response.body?.cancel(); - } catch { - /* upstream is already gone — nothing to do */ - } -} - -async function readBoundedBody( - response: Response, - maxBytes: number, -): Promise { - const reader = response.body?.getReader(); - if (reader == null) return null; - const chunks: Uint8Array[] = []; - let total = 0; - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - if (value == null) continue; - total += value.byteLength; - if (total > maxBytes) { - await reader.cancel().catch(() => {}); - return null; - } - chunks.push(value); - } - } catch { - await reader.cancel().catch(() => {}); - return null; - } - const result = new Uint8Array(total); - let offset = 0; - for (const chunk of chunks) { - result.set(chunk, offset); - offset += chunk.byteLength; - } - return result; -} - -// Follow up to MAX_REDIRECTS redirects manually so we can re-run the SSRF -// check on each hop. `fetch(..., { redirect: "follow" })` would silently -// chase a 302 from a public hostname to a private one. -// -// Caveat for future maintainers: `isSSRFSafeURL` only inspects the URL -// string (scheme, host literal, port). It does not resolve DNS, so a -// hostname that looks public but resolves to a private address at fetch -// time can still slip through. Fixing this in the proxy alone would -// leave the same gap on every other server-side fetch in the codebase -// (e.g. src/federation/post.ts, preview-card scraping); the proper fix -// is a shared SSRF-aware fetch connector that pins the resolved IP. -async function fetchWithSSRFAwareRedirects( - initialUrl: string, - signal: AbortSignal, - requestHeaders: Record = {}, -): Promise { - let url = initialUrl; - for (let i = 0; i <= MAX_REDIRECTS; i++) { - let parsed: URL; - try { - parsed = new URL(url); - } catch { - return null; - } - if (parsed.protocol !== "http:" && parsed.protocol !== "https:") { - return null; - } - if (!isSSRFSafeURL(url)) return null; - const response = await fetch(url, { - signal, - redirect: "manual", - headers: requestHeaders, - }); - if (response.status >= 300 && response.status < 400) { - const location = response.headers.get("location"); - await discardBody(response); - if (location == null) return null; - try { - url = new URL(location, url).href; - } catch { - return null; - } - continue; - } - return response; - } - return null; -} - -async function readCached(key: string): Promise { - const disk = drive.use(); - try { - if (!(await disk.exists(`${key}.bin`))) return null; - const meta = JSON.parse(await disk.get(`${key}.json`)) as { - contentType?: unknown; - }; - if ( - typeof meta.contentType !== "string" || - !isAllowedContentType(meta.contentType) - ) { - return null; - } - const body = await disk.getBytes(`${key}.bin`); - return { body, contentType: meta.contentType }; - } catch (error) { - logger.warn("Failed to read proxy cache entry {key}: {error}", { - key, - error: error instanceof Error ? error.message : String(error), - }); - return null; - } -} - -async function writeCached( - key: string, - body: Uint8Array, - contentType: string, -): Promise { - const disk = drive.use(); - await disk.put(`${key}.bin`, body, { - contentType, - contentLength: body.byteLength, - visibility: "public", - }); - await disk.put(`${key}.json`, JSON.stringify({ contentType }), { - contentType: "application/json", - visibility: "public", - }); -} - export function createProxyApp(mode: MediaProxyMode = MEDIA_PROXY): Hono { const app = new Hono(); if (mode === "off") return app; @@ -214,7 +44,7 @@ export function createProxyApp(mode: MediaProxyMode = MEDIA_PROXY): Hono { const isRangeRequest = rangeHeader != null && rangeHeader.length > 0; if (mode === "cache" && !isRangeRequest) { - const cached = await readCached(cacheKeyForUrl(url)); + const cached = await readProxyCacheEntry(proxyCacheKeyForUrl(url)); if (cached != null) { return c.body( toExactArrayBuffer(cached.body), @@ -231,7 +61,7 @@ export function createProxyApp(mode: MediaProxyMode = MEDIA_PROXY): Hono { try { const forwardedHeaders: Record = {}; if (isRangeRequest) forwardedHeaders.Range = rangeHeader; - const upstream = await fetchWithSSRFAwareRedirects( + const upstream = await fetchProxyMedia( url, controller.signal, forwardedHeaders, @@ -241,52 +71,27 @@ export function createProxyApp(mode: MediaProxyMode = MEDIA_PROXY): Hono { // if the upstream supplied one, but discard the body to avoid forwarding // an arbitrary error page under our origin. if (upstream.status === 416) { - await discardBody(upstream); - const upstreamContentRange = upstream.headers.get("Content-Range"); const headers: Record = { - "Cache-Control": CACHE_CONTROL, + "Cache-Control": PROXY_CACHE_CONTROL, "X-Content-Type-Options": "nosniff", }; - if (upstreamContentRange != null) { - headers["Content-Range"] = upstreamContentRange; + if (upstream.contentRange != null) { + headers["Content-Range"] = upstream.contentRange; } return c.body(null, 416, headers); } - if (!upstream.ok) { - await discardBody(upstream); - return c.notFound(); - } const isPartial = upstream.status === 206; - const contentType = - upstream.headers.get("Content-Type") ?? "application/octet-stream"; - if (!isAllowedContentType(contentType)) { - await discardBody(upstream); - return c.notFound(); - } - - // When the upstream tells us the body length up front we can - // short-circuit oversized responses without spending bandwidth on - // the read. A missing or malformed header just falls through to - // the streaming cap inside readBoundedBody, which still enforces - // the limit. - const contentLengthHeader = upstream.headers.get("Content-Length"); - if (contentLengthHeader != null) { - const declaredLength = Number.parseInt(contentLengthHeader, 10); - if (Number.isFinite(declaredLength) && declaredLength > MAX_BYTES) { - await discardBody(upstream); - return c.notFound(); - } - } - - const body = await readBoundedBody(upstream, MAX_BYTES); - if (body == null) return c.notFound(); // Only cache full (200) responses. Partial responses and any request // that carried a Range header skip the write so the on-disk entry // always represents the complete resource. if (mode === "cache" && !isPartial && !isRangeRequest) { try { - await writeCached(cacheKeyForUrl(url), body, contentType); + await writeProxyCacheEntry( + proxyCacheKeyForUrl(url), + upstream.body, + upstream.contentType, + ); } catch (error) { logger.warn("Failed to write proxy cache for {url}: {error}", { url, @@ -296,15 +101,17 @@ export function createProxyApp(mode: MediaProxyMode = MEDIA_PROXY): Hono { } const responseHeaders: Record = { - ...RESPONSE_HEADERS(contentType), + ...RESPONSE_HEADERS(upstream.contentType), }; - const contentRange = upstream.headers.get("Content-Range"); - if (contentRange != null) responseHeaders["Content-Range"] = contentRange; - const acceptRanges = upstream.headers.get("Accept-Ranges"); - if (acceptRanges != null) responseHeaders["Accept-Ranges"] = acceptRanges; + if (upstream.contentRange != null) { + responseHeaders["Content-Range"] = upstream.contentRange; + } + if (upstream.acceptRanges != null) { + responseHeaders["Accept-Ranges"] = upstream.acceptRanges; + } return c.body( - toExactArrayBuffer(body), + toExactArrayBuffer(upstream.body), isPartial ? 206 : 200, responseHeaders, );