Skip to content

Commit

Permalink
Config to start notifications daemon from a specific did (#1922)
Browse files Browse the repository at this point in the history
config to start notifications daemon from a specific did
  • Loading branch information
devinivy authored Dec 5, 2023
1 parent f9fd3e6 commit 49e7f98
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 8 deletions.
10 changes: 10 additions & 0 deletions packages/bsky/src/daemon/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export interface DaemonConfigValues {
version: string
dbPostgresUrl: string
dbPostgresSchema?: string
notificationsDaemonFromDid?: string
}

export class DaemonConfig {
Expand All @@ -15,11 +16,16 @@ export class DaemonConfig {
overrides?.dbPostgresUrl || process.env.DB_PRIMARY_POSTGRES_URL
const dbPostgresSchema =
overrides?.dbPostgresSchema || process.env.DB_POSTGRES_SCHEMA
const notificationsDaemonFromDid =
overrides?.notificationsDaemonFromDid ||
process.env.BSKY_NOTIFS_DAEMON_FROM_DID ||
undefined
assert(dbPostgresUrl)
return new DaemonConfig({
version,
dbPostgresUrl,
dbPostgresSchema,
notificationsDaemonFromDid,
...stripUndefineds(overrides ?? {}),
})
}
Expand All @@ -35,6 +41,10 @@ export class DaemonConfig {
get dbPostgresSchema() {
return this.cfg.dbPostgresSchema
}

get notificationsDaemonFromDid() {
return this.cfg.notificationsDaemonFromDid
}
}

function stripUndefineds(
Expand Down
6 changes: 4 additions & 2 deletions packages/bsky/src/daemon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ export class BskyDaemon {
}

async start() {
const { db } = this.ctx
const { db, cfg } = this.ctx
const pool = db.pool
this.notifications.run()
this.notifications.run({
startFromDid: cfg.notificationsDaemonFromDid,
})
this.dbStatsInterval = setInterval(() => {
dbLogger.info(
{
Expand Down
6 changes: 5 additions & 1 deletion packages/bsky/src/daemon/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ export class NotificationsDaemon {
}
}

type RunOptions = { forever?: boolean; batchSize?: number }
type RunOptions = {
forever?: boolean
batchSize?: number
startFromDid?: string
}
20 changes: 15 additions & 5 deletions packages/bsky/src/services/actor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,24 +147,34 @@ export class ActorService {
}

async *all(
opts: { batchSize?: number; forever?: boolean; cooldownMs?: number } = {},
opts: {
batchSize?: number
forever?: boolean
cooldownMs?: number
startFromDid?: string
} = {},
) {
const { cooldownMs = 1000, batchSize = 1000, forever = false } = opts
const {
cooldownMs = 1000,
batchSize = 1000,
forever = false,
startFromDid,
} = opts
const baseQuery = this.db.db
.selectFrom('actor')
.selectAll()
.orderBy('did')
.limit(batchSize)
while (true) {
let cursor: ActorResult | undefined
let cursor = startFromDid
do {
const actors = cursor
? await baseQuery.where('did', '>', cursor.did).execute()
? await baseQuery.where('did', '>', cursor).execute()
: await baseQuery.execute()
for (const actor of actors) {
yield actor
}
cursor = actors.at(-1)
cursor = actors.at(-1)?.did
} while (cursor)
if (forever) {
await wait(cooldownMs)
Expand Down
3 changes: 3 additions & 0 deletions services/bsky/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const main = async () => {
version: env.version,
dbPostgresUrl: env.dbPostgresUrl,
dbPostgresSchema: env.dbPostgresSchema,
notificationsDaemonFromDid: env.notificationsDaemonFromDid,
})
const daemon = BskyDaemon.create({ db, cfg })
await daemon.start()
Expand All @@ -34,6 +35,8 @@ const getEnv = () => ({
dbPoolSize: maybeParseInt(process.env.DB_POOL_SIZE),
dbPoolMaxUses: maybeParseInt(process.env.DB_POOL_MAX_USES),
dbPoolIdleTimeoutMs: maybeParseInt(process.env.DB_POOL_IDLE_TIMEOUT_MS),
notificationsDaemonFromDid:
process.env.BSKY_NOTIFS_DAEMON_FROM_DID || undefined,
})

const maybeParseInt = (str) => {
Expand Down

0 comments on commit 49e7f98

Please sign in to comment.