Skip to content

Commit

Permalink
Cache labels in Redis (#1897)
Browse files Browse the repository at this point in the history
* cache did docs in redis

* drop table

* expire from redis

* fix tests

* add cache class

* update api

* refactor

* filter negative labels

* fix up dev-env

* refactor did cache to use new redis cache class

* tidy

* ensure caching negatives

* redis cache tests

* remove timeout on did cache

* fix ns in test

* rename driver

* add timeout & fail open

* add test for timeout & fail open

* small pr feedback

* refactor caches

* bugfixg

* test for caching negative values

* little more to cache

* wire up cache cfg

* switch from redis scratch to redis

* fix build issues

* use different redis clients for tests

* fix test

* fix flaky test

* use separate db for redis cache
  • Loading branch information
dholms authored Dec 5, 2023
1 parent 7e74f00 commit e566bef
Show file tree
Hide file tree
Showing 29 changed files with 802 additions and 288 deletions.
143 changes: 143 additions & 0 deletions packages/bsky/src/cache/read-through.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { cacheLogger as log } from '../logger'
import { Redis } from '../redis'

export type CacheItem<T> = {
val: T | null // null here is for negative caching
updatedAt: number
}

export type CacheOptions<T> = {
staleTTL: number
maxTTL: number
fetchMethod: (key: string) => Promise<T | null>
fetchManyMethod?: (keys: string[]) => Promise<Record<string, T | null>>
}

export class ReadThroughCache<T> {
constructor(public redis: Redis, public opts: CacheOptions<T>) {}

private async _fetchMany(keys: string[]): Promise<Record<string, T | null>> {
if (this.opts.fetchManyMethod) {
return this.opts.fetchManyMethod(keys)
}
const got = await Promise.all(keys.map((k) => this.opts.fetchMethod(k)))
const result: Record<string, T | null> = {}
for (let i = 0; i < keys.length; i++) {
result[keys[i]] = got[i] ?? null
}
return result
}

private async fetchAndCache(key: string): Promise<T | null> {
const fetched = await this.opts.fetchMethod(key)
this.set(key, fetched).catch((err) =>
log.error({ err, key }, 'failed to set cache value'),
)
return fetched
}

private async fetchAndCacheMany(keys: string[]): Promise<Record<string, T>> {
const fetched = await this._fetchMany(keys)
this.setMany(fetched).catch((err) =>
log.error({ err, keys }, 'failed to set cache values'),
)
return removeNulls(fetched)
}

async get(key: string, opts?: { revalidate?: boolean }): Promise<T | null> {
if (opts?.revalidate) {
return this.fetchAndCache(key)
}
let cached: CacheItem<T> | null
try {
const got = await this.redis.get(key)
cached = got ? JSON.parse(got) : null
} catch (err) {
cached = null
log.warn({ key, err }, 'failed to fetch value from cache')
}
if (!cached || this.isExpired(cached)) {
return this.fetchAndCache(key)
}
if (this.isStale(cached)) {
this.fetchAndCache(key).catch((err) =>
log.warn({ key, err }, 'failed to refresh stale cache value'),
)
}
return cached.val
}

async getMany(
keys: string[],
opts?: { revalidate?: boolean },
): Promise<Record<string, T>> {
if (opts?.revalidate) {
return this.fetchAndCacheMany(keys)
}
let cached: Record<string, string>
try {
cached = await this.redis.getMulti(keys)
} catch (err) {
cached = {}
log.warn({ keys, err }, 'failed to fetch values from cache')
}

const stale: string[] = []
const toFetch: string[] = []
const results: Record<string, T> = {}
for (const key of keys) {
const val = cached[key] ? (JSON.parse(cached[key]) as CacheItem<T>) : null
if (!val || this.isExpired(val)) {
toFetch.push(key)
} else if (this.isStale(val)) {
stale.push(key)
} else if (val.val) {
results[key] = val.val
}
}
const fetched = await this.fetchAndCacheMany(toFetch)
this.fetchAndCacheMany(stale).catch((err) =>
log.warn({ keys, err }, 'failed to refresh stale cache values'),
)
return {
...results,
...fetched,
}
}

async set(key: string, val: T | null) {
await this.setMany({ [key]: val })
}

async setMany(vals: Record<string, T | null>) {
const items: Record<string, string> = {}
for (const key of Object.keys(vals)) {
items[key] = JSON.stringify({
val: vals[key],
updatedAt: Date.now(),
})
}
await this.redis.setMulti(items, this.opts.maxTTL)
}

async clearEntry(key: string) {
await this.redis.del(key)
}

isExpired(result: CacheItem<T>) {
return Date.now() > result.updatedAt + this.opts.maxTTL
}

isStale(result: CacheItem<T>) {
return Date.now() > result.updatedAt + this.opts.staleTTL
}
}

const removeNulls = <T>(obj: Record<string, T | null>): Record<string, T> => {
return Object.entries(obj).reduce((acc, [key, val]) => {
if (val !== null) {
acc[key] = val
}
return acc
}, {} as Record<string, T>)
}
65 changes: 64 additions & 1 deletion packages/bsky/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import assert from 'assert'
import { DAY, HOUR, parseIntWithFallback } from '@atproto/common'
import {
DAY,
HOUR,
MINUTE,
SECOND,
parseIntWithFallback,
} from '@atproto/common'

export interface ServerConfigValues {
version: string
Expand All @@ -12,9 +18,15 @@ export interface ServerConfigValues {
dbReplicaPostgresUrls?: string[]
dbReplicaTags?: Record<string, number[]> // E.g. { timeline: [0], thread: [1] }
dbPostgresSchema?: string
redisHost?: string // either set redis host, or both sentinel name and hosts
redisSentinelName?: string
redisSentinelHosts?: string[]
redisPassword?: string
didPlcUrl: string
didCacheStaleTTL: number
didCacheMaxTTL: number
labelCacheStaleTTL: number
labelCacheMaxTTL: number
handleResolveNameservers?: string[]
imgUriEndpoint?: string
blobCacheLocation?: string
Expand All @@ -38,6 +50,19 @@ export class ServerConfig {
const feedGenDid = process.env.FEED_GEN_DID
const envPort = parseInt(process.env.PORT || '', 10)
const port = isNaN(envPort) ? 2584 : envPort
const redisHost =
overrides?.redisHost || process.env.REDIS_HOST || undefined
const redisSentinelName =
overrides?.redisSentinelName ||
process.env.REDIS_SENTINEL_NAME ||
undefined
const redisSentinelHosts =
overrides?.redisSentinelHosts ||
(process.env.REDIS_SENTINEL_HOSTS
? process.env.REDIS_SENTINEL_HOSTS.split(',')
: [])
const redisPassword =
overrides?.redisPassword || process.env.REDIS_PASSWORD || undefined
const didPlcUrl = process.env.DID_PLC_URL || 'http://localhost:2582'
const didCacheStaleTTL = parseIntWithFallback(
process.env.DID_CACHE_STALE_TTL,
Expand All @@ -47,6 +72,14 @@ export class ServerConfig {
process.env.DID_CACHE_MAX_TTL,
DAY,
)
const labelCacheStaleTTL = parseIntWithFallback(
process.env.LABEL_CACHE_STALE_TTL,
30 * SECOND,
)
const labelCacheMaxTTL = parseIntWithFallback(
process.env.LABEL_CACHE_MAX_TTL,
MINUTE,
)
const handleResolveNameservers = process.env.HANDLE_RESOLVE_NAMESERVERS
? process.env.HANDLE_RESOLVE_NAMESERVERS.split(',')
: []
Expand Down Expand Up @@ -93,9 +126,15 @@ export class ServerConfig {
dbReplicaPostgresUrls,
dbReplicaTags,
dbPostgresSchema,
redisHost,
redisSentinelName,
redisSentinelHosts,
redisPassword,
didPlcUrl,
didCacheStaleTTL,
didCacheMaxTTL,
labelCacheStaleTTL,
labelCacheMaxTTL,
handleResolveNameservers,
imgUriEndpoint,
blobCacheLocation,
Expand Down Expand Up @@ -162,6 +201,22 @@ export class ServerConfig {
return this.cfg.dbPostgresSchema
}

get redisHost() {
return this.cfg.redisHost
}

get redisSentinelName() {
return this.cfg.redisSentinelName
}

get redisSentinelHosts() {
return this.cfg.redisSentinelHosts
}

get redisPassword() {
return this.cfg.redisPassword
}

get didCacheStaleTTL() {
return this.cfg.didCacheStaleTTL
}
Expand All @@ -170,6 +225,14 @@ export class ServerConfig {
return this.cfg.didCacheMaxTTL
}

get labelCacheStaleTTL() {
return this.cfg.labelCacheStaleTTL
}

get labelCacheMaxTTL() {
return this.cfg.labelCacheMaxTTL
}

get handleResolveNameservers() {
return this.cfg.handleResolveNameservers
}
Expand Down
14 changes: 7 additions & 7 deletions packages/bsky/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import { ServerConfig } from './config'
import { ImageUriBuilder } from './image/uri'
import { Services } from './services'
import * as auth from './auth'
import DidSqlCache from './did-cache'
import DidRedisCache from './did-cache'
import { BackgroundQueue } from './background'
import { MountedAlgos } from './feed-gen/types'
import { LabelCache } from './label-cache'
import { NotificationServer } from './notifications'
import { Redis } from './redis'

export class AppContext {
public moderationPushAgent: AtpAgent | undefined
Expand All @@ -24,8 +24,8 @@ export class AppContext {
services: Services
signingKey: Keypair
idResolver: IdResolver
didCache: DidSqlCache
labelCache: LabelCache
didCache: DidRedisCache
redis: Redis
backgroundQueue: BackgroundQueue
searchAgent?: AtpAgent
algos: MountedAlgos
Expand Down Expand Up @@ -70,12 +70,12 @@ export class AppContext {
return this.opts.idResolver
}

get didCache(): DidSqlCache {
get didCache(): DidRedisCache {
return this.opts.didCache
}

get labelCache(): LabelCache {
return this.opts.labelCache
get redis(): Redis {
return this.opts.redis
}

get notifServer(): NotificationServer {
Expand Down
3 changes: 0 additions & 3 deletions packages/bsky/src/daemon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { DaemonConfig } from './config'
import { DaemonContext } from './context'
import { createServices } from './services'
import { ImageUriBuilder } from '../image/uri'
import { LabelCache } from '../label-cache'
import { NotificationsDaemon } from './notifications'
import logger from './logger'

Expand All @@ -28,10 +27,8 @@ export class BskyDaemon {
static create(opts: { db: PrimaryDatabase; cfg: DaemonConfig }): BskyDaemon {
const { db, cfg } = opts
const imgUriBuilder = new ImageUriBuilder('https://daemon.invalid') // will not be used by daemon
const labelCache = new LabelCache(db)
const services = createServices({
imgUriBuilder,
labelCache,
})
const ctx = new DaemonContext({
db,
Expand Down
10 changes: 6 additions & 4 deletions packages/bsky/src/daemon/services.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { PrimaryDatabase } from '../db'
import { ActorService } from '../services/actor'
import { ImageUriBuilder } from '../image/uri'
import { LabelCache } from '../label-cache'
import { GraphService } from '../services/graph'
import { LabelService } from '../services/label'

export function createServices(resources: {
imgUriBuilder: ImageUriBuilder
labelCache: LabelCache
}): Services {
const { imgUriBuilder, labelCache } = resources
const { imgUriBuilder } = resources
const graph = GraphService.creator(imgUriBuilder)
const label = LabelService.creator(null)
return {
actor: ActorService.creator(imgUriBuilder, labelCache),
actor: ActorService.creator(imgUriBuilder, graph, label),
}
}

Expand Down
2 changes: 0 additions & 2 deletions packages/bsky/src/db/database-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import * as actorSync from './tables/actor-sync'
import * as record from './tables/record'
import * as notification from './tables/notification'
import * as notificationPushToken from './tables/notification-push-token'
import * as didCache from './tables/did-cache'
import * as moderation from './tables/moderation'
import * as label from './tables/label'
import * as algo from './tables/algo'
Expand Down Expand Up @@ -57,7 +56,6 @@ export type DatabaseSchemaType = duplicateRecord.PartialDB &
record.PartialDB &
notification.PartialDB &
notificationPushToken.PartialDB &
didCache.PartialDB &
moderation.PartialDB &
label.PartialDB &
algo.PartialDB &
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Kysely } from 'kysely'

export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable('did_cache').execute()
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable('did_cache')
.addColumn('did', 'varchar', (col) => col.primaryKey())
.addColumn('doc', 'jsonb', (col) => col.notNull())
.addColumn('updatedAt', 'bigint', (col) => col.notNull())
.execute()
}
1 change: 1 addition & 0 deletions packages/bsky/src/db/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ export * as _20230906T222220386Z from './20230906T222220386Z-thread-gating'
export * as _20230920T213858047Z from './20230920T213858047Z-add-tags-to-post'
export * as _20230929T192920807Z from './20230929T192920807Z-record-cursor-indexes'
export * as _20231003T202833377Z from './20231003T202833377Z-create-moderation-subject-status'
export * as _20231205T000257238Z from './20231205T000257238Z-remove-did-cache'
13 changes: 0 additions & 13 deletions packages/bsky/src/db/tables/did-cache.ts

This file was deleted.

Loading

0 comments on commit e566bef

Please sign in to comment.