Skip to content

Commit

Permalink
Update data source for getSuggestedFollowsByActor (#2630)
Browse files Browse the repository at this point in the history
* Update lex

* Codegen

* Set up StatSig

* Integrate new implementation into old endpoint

* Add todo to crypto module

* Format

* Specify StatSig env

* Downgrade pnpm to match CI, bump lock

* Catch StatSig errors

* Use sep env

* Reset lockfile

* Re-add new dep using correct pnpm version

* tidy

* Integrate into AppContext and lifecycle

* Use camelCase

* Switcheroo

Co-authored-by: devin ivy <devinivy@gmail.com>

* Init prior to server listen start

* Move test env check up to server config

* Add logger and log

* Better comment

---------

Co-authored-by: devin ivy <devinivy@gmail.com>
  • Loading branch information
estrattonbailey and devinivy authored Jul 11, 2024
1 parent 2f40203 commit 8f22a25
Show file tree
Hide file tree
Showing 18 changed files with 221 additions and 20 deletions.
7 changes: 6 additions & 1 deletion lexicons/app/bsky/unspecced/getSuggestionsSkeleton.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
"maximum": 100,
"default": 50
},
"cursor": { "type": "string" }
"cursor": { "type": "string" },
"relativeToDid": {
"type": "string",
"format": "did",
"description": "DID of the account to get suggestions relative to. If not provided, suggestions will be based on the viewer."
}
}
},
"output": {
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/client/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8712,6 +8712,12 @@ export const schemaDict = {
cursor: {
type: 'string',
},
relativeToDid: {
type: 'string',
format: 'did',
description:
'DID of the account to get suggestions relative to. If not provided, suggestions will be based on the viewer.',
},
},
},
output: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export interface QueryParams {
viewer?: string
limit?: number
cursor?: string
/** DID of the account to get suggestions relative to. If not provided, suggestions will be based on the viewer. */
relativeToDid?: string
}

export type InputSchema = undefined
Expand Down
1 change: 1 addition & 0 deletions packages/bsky/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"pino": "^8.21.0",
"pino-http": "^8.2.1",
"sharp": "^0.32.6",
"statsig-node": "^5.23.1",
"structured-headers": "^1.0.1",
"typed-emitter": "^2.1.0",
"uint8arrays": "3.0.0"
Expand Down
70 changes: 55 additions & 15 deletions packages/bsky/src/api/app/bsky/graph/getSuggestedFollowsByActor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { mapDefined } from '@atproto/common'
import { mapDefined, noUndefinedVals } from '@atproto/common'
import { InvalidRequestError } from '@atproto/xrpc-server'
import AtpAgent from '@atproto/api'
import { Server } from '../../../../lexicon'
import { QueryParams } from '../../../../lexicon/types/app/bsky/graph/getSuggestedFollowsByActor'
import AppContext from '../../../../context'
Expand Down Expand Up @@ -27,32 +28,67 @@ export default function (server: Server, ctx: AppContext) {
const viewer = auth.credentials.iss
const labelers = ctx.reqLabelers(req)
const hydrateCtx = await ctx.hydrator.createContext({ labelers, viewer })
const result = await getSuggestedFollowsByActor(
{ ...params, hydrateCtx: hydrateCtx.copy({ viewer }) },
ctx,
)
const headers = noUndefinedVals({
'accept-language': req.headers['accept-language'],
'x-bsky-topics': Array.isArray(req.headers['x-bsky-topics'])
? req.headers['x-bsky-topics'].join(',')
: req.headers['x-bsky-topics'],
})
const { headers: resultHeaders, ...result } =
await getSuggestedFollowsByActor(
{ ...params, hydrateCtx: hydrateCtx.copy({ viewer }), headers },
ctx,
)
const responseHeaders = noUndefinedVals({
'content-language': resultHeaders?.['content-language'],
})
return {
encoding: 'application/json',
body: result,
headers: resHeaders({ labelers: hydrateCtx.labelers }),
headers: {
...responseHeaders,
...resHeaders({ labelers: hydrateCtx.labelers }),
},
}
},
})
}

const skeleton = async (input: SkeletonFnInput<Context, Params>) => {
const { params, ctx } = input
const gates = ctx.featureGates
const [relativeToDid] = await ctx.hydrator.actor.getDids([params.actor])
if (!relativeToDid) {
throw new InvalidRequestError('Actor not found')
}
const { dids, cursor } = await ctx.hydrator.dataplane.getFollowSuggestions({
actorDid: params.hydrateCtx.viewer,
relativeToDid,
})
return {
suggestedDids: dids,
cursor: cursor || undefined,

if (
ctx.suggestionsAgent &&
gates.check(
await gates.user({ did: params.hydrateCtx.viewer }),
gates.ids.NewSuggestedFollowsByActor,
)
) {
const res =
await ctx.suggestionsAgent.api.app.bsky.unspecced.getSuggestionsSkeleton(
{
viewer: params.hydrateCtx.viewer ?? undefined,
relativeToDid,
},
{ headers: params.headers },
)
return {
suggestedDids: res.data.actors.map((a) => a.did),
headers: res.headers,
}
} else {
const { dids } = await ctx.hydrator.dataplane.getFollowSuggestions({
actorDid: params.hydrateCtx.viewer,
relativeToDid,
})
return {
suggestedDids: dids,
}
}
}

Expand Down Expand Up @@ -80,22 +116,26 @@ const presentation = (
input: PresentationFnInput<Context, Params, SkeletonState>,
) => {
const { ctx, hydration, skeleton } = input
const { suggestedDids } = skeleton
const { suggestedDids, headers } = skeleton
const suggestions = mapDefined(suggestedDids, (did) =>
ctx.views.profileDetailed(did, hydration),
)
return { suggestions }
return { suggestions, headers }
}

type Context = {
hydrator: Hydrator
views: Views
suggestionsAgent: AtpAgent | undefined
featureGates: AppContext['featureGates']
}

type Params = QueryParams & {
hydrateCtx: HydrateCtx & { viewer: string }
headers: Record<string, string>
}

type SkeletonState = {
suggestedDids: string[]
headers?: Record<string, string>
}
20 changes: 20 additions & 0 deletions packages/bsky/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export interface ServerConfigValues {
labelsFromIssuerDids?: string[]
// misc/dev
blobCacheLocation?: string
statsigKey?: string
statsigEnv?: string
}

export class ServerConfig {
Expand Down Expand Up @@ -102,6 +104,14 @@ export class ServerConfig {
assert(modServiceDid)
assert(dataplaneUrls.length)
assert(dataplaneHttpVersion === '1.1' || dataplaneHttpVersion === '2')
const statsigKey =
process.env.NODE_ENV === 'test'
? 'secret-key'
: process.env.BSKY_STATSIG_KEY || undefined
const statsigEnv =
process.env.NODE_ENV === 'test'
? 'test'
: process.env.BSKY_STATSIG_ENV || 'development'
return new ServerConfig({
version,
debugMode,
Expand Down Expand Up @@ -132,6 +142,8 @@ export class ServerConfig {
blobRateLimitBypassHostname,
adminPasswords,
modServiceDid,
statsigKey,
statsigEnv,
...stripUndefineds(overrides ?? {}),
})
}
Expand Down Expand Up @@ -264,6 +276,14 @@ export class ServerConfig {
get blobCacheLocation() {
return this.cfg.blobCacheLocation
}

get statsigKey() {
return this.cfg.statsigKey
}

get statsigEnv() {
return this.cfg.statsigEnv
}
}

function stripUndefineds(
Expand Down
6 changes: 6 additions & 0 deletions packages/bsky/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Views } from './views'
import { AuthVerifier } from './auth-verifier'
import { BsyncClient } from './bsync'
import { CourierClient } from './courier'
import { FeatureGates } from './feature-gates'
import {
ParsedLabelers,
defaultLabelerHeader,
Expand All @@ -32,6 +33,7 @@ export class AppContext {
bsyncClient: BsyncClient
courierClient: CourierClient
authVerifier: AuthVerifier
featureGates: FeatureGates
},
) {}

Expand Down Expand Up @@ -83,6 +85,10 @@ export class AppContext {
return this.opts.authVerifier
}

get featureGates(): FeatureGates {
return this.opts.featureGates
}

async serviceAuthJwt(aud: string) {
const iss = this.cfg.serverDid
return createServiceJwt({
Expand Down
66 changes: 66 additions & 0 deletions packages/bsky/src/feature-gates.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { Statsig, StatsigUser } from 'statsig-node'
import { sha256Hex } from '@atproto/crypto'

import { featureGatesLogger } from './logger'
import type { ServerConfig } from './config'

export type Config = {
apiKey?: string
env?: 'development' | 'staging' | 'production' | string
}

export enum GateID {
NewSuggestedFollowsByActor = 'new_sugg_foll_by_actor',
}

/**
* @see https://docs.statsig.com/server/nodejsServerSDK
*/
export class FeatureGates {
ready = false
private statsig = Statsig
ids = GateID

constructor(private config: Config) {}

async start() {
try {
if (this.config.apiKey) {
/**
* Special handling in test mode, see {@link ServerConfig}
*
* {@link https://docs.statsig.com/server/nodejsServerSDK#local-overrides}
*/
await this.statsig.initialize(this.config.apiKey, {
localMode: this.config.env === 'test',
environment: {
tier: this.config.env || 'development',
},
})
this.ready = true
}
} catch (err) {
featureGatesLogger.error({ err }, 'Failed to initialize StatSig')
this.ready = false
}
}

destroy() {
if (this.ready) {
this.ready = false
this.statsig.shutdown()
}
}

async user({ did }: { did: string }): Promise<StatsigUser> {
const userID = await sha256Hex(did)
return {
userID,
}
}

check(user: StatsigUser, gate: GateID) {
if (!this.ready) return false
return this.statsig.checkGateSync(user, gate)
}
}
9 changes: 9 additions & 0 deletions packages/bsky/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { Views } from './views'
import { AuthVerifier } from './auth-verifier'
import { authWithApiKey as bsyncAuth, createBsyncClient } from './bsync'
import { authWithApiKey as courierAuth, createCourierClient } from './courier'
import { FeatureGates } from './feature-gates'

export * from './data-plane'
export type { ServerConfigValues } from './config'
Expand Down Expand Up @@ -116,6 +117,11 @@ export class BskyAppView {
adminPasses: config.adminPasswords,
})

const featureGates = new FeatureGates({
apiKey: config.statsigKey,
env: config.statsigEnv,
})

const ctx = new AppContext({
cfg: config,
dataplane,
Expand All @@ -128,6 +134,7 @@ export class BskyAppView {
bsyncClient,
courierClient,
authVerifier,
featureGates,
})

let server = createServer({
Expand All @@ -154,6 +161,7 @@ export class BskyAppView {
}

async start(): Promise<http.Server> {
await this.ctx.featureGates.start()
const server = this.app.listen(this.ctx.cfg.port)
this.server = server
server.keepAliveTimeout = 90000
Expand All @@ -166,6 +174,7 @@ export class BskyAppView {

async destroy(): Promise<void> {
await this.terminator?.terminate()
this.ctx.featureGates.destroy()
}
}

Expand Down
6 changes: 6 additions & 0 deletions packages/bsky/src/lexicon/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8712,6 +8712,12 @@ export const schemaDict = {
cursor: {
type: 'string',
},
relativeToDid: {
type: 'string',
format: 'did',
description:
'DID of the account to get suggestions relative to. If not provided, suggestions will be based on the viewer.',
},
},
},
output: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface QueryParams {
viewer?: string
limit: number
cursor?: string
/** DID of the account to get suggestions relative to. If not provided, suggestions will be based on the viewer. */
relativeToDid?: string
}

export type InputSchema = undefined
Expand Down
2 changes: 2 additions & 0 deletions packages/bsky/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export const labelerLogger: ReturnType<typeof subsystemLogger> =
subsystemLogger('bsky:labeler')
export const hydrationLogger: ReturnType<typeof subsystemLogger> =
subsystemLogger('bsky:hydration')
export const featureGatesLogger: ReturnType<typeof subsystemLogger> =
subsystemLogger('bsky:featuregates')
export const httpLogger: ReturnType<typeof subsystemLogger> =
subsystemLogger('bsky')

Expand Down
2 changes: 2 additions & 0 deletions packages/crypto/src/sha.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as noble from '@noble/hashes/sha256'
import * as uint8arrays from 'uint8arrays'

// takes either bytes of utf8 input
// @TODO this can be sync
export const sha256 = async (
input: Uint8Array | string,
): Promise<Uint8Array> => {
Expand All @@ -10,6 +11,7 @@ export const sha256 = async (
return noble.sha256(bytes)
}

// @TODO this can be sync
export const sha256Hex = async (
input: Uint8Array | string,
): Promise<string> => {
Expand Down
Loading

0 comments on commit 8f22a25

Please sign in to comment.