Skip to content

Commit

Permalink
fix: support PG reconnects (#9663)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Krick <matt.krick@gmail.com>
  • Loading branch information
mattkrick committed Apr 23, 2024
1 parent 01372bc commit 32574a6
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 21 deletions.
20 changes: 12 additions & 8 deletions packages/embedder/EmbeddingsJobQueueStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,20 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
.returningAll()
.executeTakeFirst()
}
const job = (await getJob(false)) || (await getJob(true))
if (!job) {
Logger.log('JobQueueStream: no jobs found')
// queue is empty, so sleep for a while
await sleep(ms('10s'))
try {
const job = (await getJob(false)) || (await getJob(true))
if (!job) {
Logger.log('JobQueueStream: no jobs found')
// queue is empty, so sleep for a while
await sleep(ms('10s'))
return this.next()
}
await this.orchestrator.runStep(job)
return {done: false, value: job}
} catch (e) {
await sleep(1000)
return this.next()
}

await this.orchestrator.runStep(job)
return {done: false, value: job}
}
return() {
return Promise.resolve({done: true as const, value: undefined})
Expand Down
31 changes: 18 additions & 13 deletions packages/server/postgres/getKysely.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,26 @@ import getPg from './getPg'
import {DB} from './pg.d'

let kysely: Kysely<DB> | undefined

const makeKysely = () => {
const nextPg = getPg()
nextPg.on('poolChange' as any, makeKysely)
return new Kysely<DB>({
dialect: new PostgresDialect({
pool: nextPg
})
// ,log(event) {
// if (event.level === 'query') {
// console.log(event.query.sql)
// console.log(event.query.parameters)
// }
// }
})
}

const getKysely = () => {
if (!kysely) {
const pg = getPg()
kysely = new Kysely<DB>({
dialect: new PostgresDialect({
pool: pg
})
// query logging, if you'd like it:
// log(event) {
// if (event.level === 'query') {
// console.log(event.query.sql)
// console.log(event.query.parameters)
// }
// }
})
kysely = makeKysely()
}
return kysely
}
Expand Down
19 changes: 19 additions & 0 deletions packages/server/postgres/getPg.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
import {Pool} from 'pg'
import sleep from '../../client/utils/sleep'
import getPgConfig from './getPgConfig'

const config = getPgConfig()

const graceFullyReconnect = async () => {
for (let i = 0; i < 1e6; i++) {
const nextPool = new Pool(getPgConfig())
try {
const testClient = await nextPool.connect()
testClient.release()
nextPool.on('error', graceFullyReconnect)
const oldPool = pool
pool = nextPool
oldPool?.emit('changePool')
return
} catch (e) {
await sleep(1000)
}
}
}

let pool: Pool | undefined
const getPg = () => {
if (!pool) {
pool = new Pool(config)
pool.on('error', graceFullyReconnect)
}
return pool
}
Expand Down

0 comments on commit 32574a6

Please sign in to comment.