Skip to content

Commit

Permalink
Ozone label endpoints (#2043)
Browse files Browse the repository at this point in the history
* sketching out label sequencer

* refactor sequencer

* sequencer tests

* tests

* add query labels endpoint & tests

* add pagination

* fix label formatting on temp

* tidy

* format labels

* make use listen/notify for sequencer

* fix hanging server test

* Update packages/ozone/src/api/label/queryLabels.ts

Co-authored-by: devin ivy <devinivy@gmail.com>

* pr tidy

---------

Co-authored-by: devin ivy <devinivy@gmail.com>
  • Loading branch information
dholms and devinivy authored Feb 28, 2024
1 parent 5424776 commit b3434d4
Show file tree
Hide file tree
Showing 20 changed files with 834 additions and 82 deletions.
4 changes: 4 additions & 0 deletions packages/ozone/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import getRepo from './admin/getRepo'
import queryModerationStatuses from './admin/queryModerationStatuses'
import queryModerationEvents from './admin/queryModerationEvents'
import getModerationEvent from './admin/getModerationEvent'
import queryLabels from './label/queryLabels'
import subscribeLabels from './label/subscribeLabels'
import fetchLabels from './temp/fetchLabels'
import createCommunicationTemplate from './admin/createCommunicationTemplate'
import updateCommunicationTemplate from './admin/updateCommunicationTemplate'
Expand All @@ -27,6 +29,8 @@ export default function (server: Server, ctx: AppContext) {
getModerationEvent(server, ctx)
queryModerationEvents(server, ctx)
queryModerationStatuses(server, ctx)
queryLabels(server, ctx)
subscribeLabels(server, ctx)
fetchLabels(server, ctx)
listCommunicationTemplates(server, ctx)
createCommunicationTemplate(server, ctx)
Expand Down
58 changes: 58 additions & 0 deletions packages/ozone/src/api/label/queryLabels.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { Server } from '../../lexicon'
import AppContext from '../../context'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { sql } from 'kysely'
import { formatLabel } from '../../mod-service/util'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.label.queryLabels(async ({ params }) => {
const { uriPatterns, sources, limit, cursor } = params
let builder = ctx.db.db.selectFrom('label').selectAll().limit(limit)
// if includes '*', then we don't need a where clause
if (!uriPatterns.includes('*')) {
builder = builder.where((qb) => {
// starter where clause that is always false so that we can chain `orWhere`s
qb = qb.where(sql`1 = 0`)
for (const pattern of uriPatterns) {
// if no '*', then we're looking for an exact match
if (!pattern.includes('*')) {
qb = qb.orWhere('uri', '=', pattern)
} else {
if (pattern.indexOf('*') < pattern.length - 1) {
throw new InvalidRequestError(`invalid pattern: ${pattern}`)
}
const searchPattern = pattern
.slice(0, -1)
.replaceAll('%', '') // sanitize search pattern
.replaceAll('_', '\\_') // escape any underscores
qb = qb.orWhere('uri', 'like', `${searchPattern}%`)
}
}
return qb
})
}
if (sources && sources.length > 0) {
builder = builder.where('src', 'in', sources)
}
if (cursor) {
const cursorId = parseInt(cursor, 10)
if (isNaN(cursorId)) {
throw new InvalidRequestError('invalid cursor')
}
builder = builder.where('id', '>', cursorId)
}

const res = await builder.execute()

const labels = res.map((l) => formatLabel(l))
const resCursor = res.at(-1)?.id.toString(10)

return {
encoding: 'application/json',
body: {
cursor: resCursor,
labels,
},
}
})
}
25 changes: 25 additions & 0 deletions packages/ozone/src/api/label/subscribeLabels.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Server } from '../../lexicon'
import AppContext from '../../context'
import Outbox from '../../sequencer/outbox'
import { InvalidRequestError } from '@atproto/xrpc-server'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.label.subscribeLabels(async function* ({
params,
signal,
}) {
const { cursor } = params
const outbox = new Outbox(ctx.sequencer)

if (cursor !== undefined) {
const curr = await ctx.sequencer.curr()
if (cursor > (curr ?? 0)) {
throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
}
}

for await (const evt of outbox.events(cursor, signal)) {
yield evt
}
})
}
6 changes: 2 additions & 4 deletions packages/ozone/src/api/temp/fetchLabels.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Server } from '../../lexicon'
import AppContext from '../../context'
import { formatLabel } from '../../mod-service/util'
import {
UNSPECCED_TAKEDOWN_BLOBS_LABEL,
UNSPECCED_TAKEDOWN_LABEL,
Expand Down Expand Up @@ -28,10 +29,7 @@ export default function (server: Server, ctx: AppContext) {
.limit(limit)
.execute()

const labels = labelRes.map((l) => ({
...l,
cid: l.cid === '' ? undefined : l.cid,
}))
const labels = labelRes.map((l) => formatLabel(l))

return {
encoding: 'application/json',
Expand Down
6 changes: 6 additions & 0 deletions packages/ozone/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => {
const dbCfg: OzoneConfig['db'] = {
postgresUrl: env.dbPostgresUrl,
postgresSchema: env.dbPostgresSchema,
poolSize: env.dbPoolSize,
poolMaxUses: env.dbPoolMaxUses,
poolIdleTimeoutMs: env.dbPoolIdleTimeoutMs,
}

assert(env.appviewUrl)
Expand Down Expand Up @@ -67,6 +70,9 @@ export type ServiceConfig = {
export type DatabaseConfig = {
postgresUrl: string
postgresSchema?: string
poolSize?: number
poolMaxUses?: number
poolIdleTimeoutMs?: number
}

export type AppviewConfig = {
Expand Down
6 changes: 6 additions & 0 deletions packages/ozone/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ export const readEnv = (): OzoneEnvironment => {
pdsDid: envStr('OZONE_PDS_DID'),
dbPostgresUrl: envStr('OZONE_DB_POSTGRES_URL'),
dbPostgresSchema: envStr('OZONE_DB_POSTGRES_SCHEMA'),
dbPoolSize: envInt('OZONE_DB_POOL_SIZE'),
dbPoolMaxUses: envInt('OZONE_DB_POOL_MAX_USES'),
dbPoolIdleTimeoutMs: envInt('OZONE_DB_POOL_IDLE_TIMEOUT_MS'),
didPlcUrl: envStr('OZONE_DID_PLC_URL'),
adminPassword: envStr('OZONE_ADMIN_PASSWORD'),
moderatorPassword: envStr('OZONE_MODERATOR_PASSWORD'),
Expand All @@ -33,6 +36,9 @@ export type OzoneEnvironment = {
pdsDid?: string
dbPostgresUrl?: string
dbPostgresSchema?: string
dbPoolSize?: number
dbPoolMaxUses?: number
dbPoolIdleTimeoutMs?: number
didPlcUrl?: string
adminPassword?: string
moderatorPassword?: string
Expand Down
12 changes: 12 additions & 0 deletions packages/ozone/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as auth from './auth'
import { BackgroundQueue } from './background'
import assert from 'assert'
import { EventPusher } from './daemon'
import Sequencer from './sequencer/sequencer'
import {
CommunicationTemplateService,
CommunicationTemplateServiceCreator,
Expand All @@ -25,6 +26,7 @@ export type AppContextOptions = {
signingKey: Keypair
idResolver: IdResolver
backgroundQueue: BackgroundQueue
sequencer: Sequencer
}

export class AppContext {
Expand All @@ -38,6 +40,9 @@ export class AppContext {
const db = new Database({
url: cfg.db.postgresUrl,
schema: cfg.db.postgresSchema,
poolSize: cfg.db.poolSize,
poolMaxUses: cfg.db.poolMaxUses,
poolIdleTimeoutMs: cfg.db.poolIdleTimeoutMs,
})
const signingKey = await Secp256k1Keypair.import(secrets.signingKeyHex)
const appviewAgent = new AtpAgent({ service: cfg.appview.url })
Expand Down Expand Up @@ -74,6 +79,8 @@ export class AppContext {
plcUrl: cfg.identity.plcUrl,
})

const sequencer = new Sequencer(db)

return new AppContext(
{
db,
Expand All @@ -85,6 +92,7 @@ export class AppContext {
signingKey,
idResolver,
backgroundQueue,
sequencer,
...(overrides ?? {}),
},
secrets,
Expand Down Expand Up @@ -139,6 +147,10 @@ export class AppContext {
return this.opts.backgroundQueue
}

get sequencer(): Sequencer {
return this.opts.sequencer
}

get authVerifier() {
return auth.authVerifier(this.idResolver, { aud: this.cfg.service.did })
}
Expand Down
8 changes: 7 additions & 1 deletion packages/ozone/src/db/migrations/20231219T205730722Z-init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,19 @@ export async function up(db: Kysely<unknown>): Promise<void> {
// Label
await db.schema
.createTable('label')
.addColumn('id', 'bigserial', (col) => col.primaryKey())
.addColumn('src', 'varchar', (col) => col.notNull())
.addColumn('uri', 'varchar', (col) => col.notNull())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('val', 'varchar', (col) => col.notNull())
.addColumn('neg', 'boolean', (col) => col.notNull())
.addColumn('cts', 'varchar', (col) => col.notNull())
.addPrimaryKeyConstraint('label_pkey', ['src', 'uri', 'cid', 'val'])
.execute()
await db.schema
.createIndex('unique_label_idx')
.unique()
.on('label')
.columns(['src', 'uri', 'cid', 'val'])
.execute()
await db.schema
.createIndex('label_uri_index')
Expand Down
7 changes: 7 additions & 0 deletions packages/ozone/src/db/schema/label.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Generated, Selectable } from 'kysely'

export const tableName = 'label'

export interface Label {
id: Generated<number>
src: string
uri: string
cid: string
Expand All @@ -9,4 +12,8 @@ export interface Label {
cts: string
}

export type LabelRow = Selectable<Label>

export type PartialDB = { [tableName]: Label }

export const LabelChannel = 'label_channel' // used with notify/listen
2 changes: 2 additions & 0 deletions packages/ozone/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export class OzoneService {
'background queue stats',
)
}, 10000)
await this.ctx.sequencer.start()
const server = this.app.listen(this.ctx.cfg.service.port)
this.server = server
server.keepAliveTimeout = 90000
Expand All @@ -94,6 +95,7 @@ export class OzoneService {
async destroy(): Promise<void> {
await this.terminator?.terminate()
await this.ctx.backgroundQueue.destroy()
await this.ctx.sequencer.destroy()
await this.ctx.db.close()
clearInterval(this.dbStatsInterval)
}
Expand Down
2 changes: 2 additions & 0 deletions packages/ozone/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { subsystemLogger } from '@atproto/common'

export const dbLogger: ReturnType<typeof subsystemLogger> =
subsystemLogger('ozone:db')
export const seqLogger: ReturnType<typeof subsystemLogger> =
subsystemLogger('ozone:sequencer')
export const httpLogger: ReturnType<typeof subsystemLogger> =
subsystemLogger('ozone')
export const langLogger: ReturnType<typeof subsystemLogger> =
Expand Down
Loading

0 comments on commit b3434d4

Please sign in to comment.