Skip to content

Commit

Permalink
chore: Gracefully shutdown the embedder (#9693)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dschoordsch committed May 6, 2024
1 parent 2b0c4bc commit 695ccad
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 84 deletions.
8 changes: 8 additions & 0 deletions packages/embedder/EmbeddingsJobQueueStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
}

orchestrator: WorkflowOrchestrator
done: boolean

constructor(orchestrator: WorkflowOrchestrator) {
this.orchestrator = orchestrator
this.done = false
}
async next(): Promise<IteratorResult<DBJob>> {
if (this.done) {
return {done: true as const, value: undefined}
}
const pg = getKysely()
const getJob = (isFailed: boolean) => {
return pg
Expand Down Expand Up @@ -60,9 +66,11 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
}
}
return() {
this.done = true
return Promise.resolve({done: true as const, value: undefined})
}
throw(error: any) {
this.done = true
return Promise.resolve({done: true as const, value: error})
}
}
10 changes: 6 additions & 4 deletions packages/embedder/embedder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ const run = async () => {
const streams = mergeAsyncIterators(jobQueueStreams)

const kill: NodeJS.SignalsListener = (signal) => {
Logger.log(`Kill signal received: ${signal}`)
primaryLock?.release()
Logger.log(
`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`
)
primaryLock?.release().catch(() => {})
streams.return?.()
process.exit()
}
process.on('SIGTERM', kill)
process.on('SIGINT', kill)
Expand All @@ -66,7 +67,8 @@ const run = async () => {
}

// On graceful shutdown
Logger.log('Streaming Complete. Goodbye!')
Logger.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`)
process.exit()
}

run()
5 changes: 5 additions & 0 deletions packages/embedder/lint-staged.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = {
'*.{ts,tsx}': ['eslint --fix', 'prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'],
'*.graphql': ['prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'],
'**/*.{ts,tsx}': () => 'tsc --noEmit -p tsconfig.json'
}
2 changes: 1 addition & 1 deletion packages/embedder/logMemoryUse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ export const logMemoryUse = () => {
const {rss} = memoryUsage
const usedMB = Math.floor(rss / MB)
console.log('Memory use:', usedMB, 'MB')
}, 10000)
}, 10000).unref()
}
2 changes: 1 addition & 1 deletion packages/embedder/logPerformance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ export const logPerformance = (logEvery: number, resetEvery: number) => {
logs = 0
start = performance.now()
}
}, logEvery * 1000)
}, logEvery * 1000).unref()
return counter
}
135 changes: 65 additions & 70 deletions packages/embedder/mergeAsyncIterators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,85 +12,80 @@ type Result<T extends AsyncIterator<any>> = UnYield<Awaited<ReturnType<T['next']

// Promise.race has a memory leak
// To avoid: https://github.com/tc39/proposal-async-iterator-helpers/issues/15#issuecomment-1937011820
export function mergeAsyncIterators<T extends AsyncIterator<any>[] | []>(
iterators: T
): AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result<T[P]>]}[number]> {
return (async function* () {
type ResultThunk = () => [number, Result<T[number]>]
let count = iterators.length as number
let capability: PromiseCapability<ResultThunk | null> | undefined
const queuedResults: ResultThunk[] = []
const getNext = async (idx: number, iterator: T[number]) => {
try {
const next = await iterator.next()
if (next.done) {
if (--count === 0 && capability !== undefined) {
capability.resolve(null)
}
} else {
resolveResult(() => {
void getNext(idx, iterator)
return [idx, next.value]
})
export function mergeAsyncIterators<T extends AsyncIterator<any>[] | []>(iterators: T) {
type ResultThunk = () => [number, Result<T[number]>]
let count = iterators.length as number
let capability: PromiseCapability<ResultThunk | null> | undefined
const queuedResults: ResultThunk[] = []
const getNext = async (idx: number, iterator: T[number]) => {
try {
const next = await iterator.next()
if (next.done) {
if (--count === 0 && capability !== undefined) {
capability.resolve(null)
}
} catch (error) {
} else {
resolveResult(() => {
throw error
void getNext(idx, iterator)
return [idx, next.value]
})
}
} catch (error) {
resolveResult(() => {
throw error
})
}
const resolveResult = (resultThunk: ResultThunk) => {
if (capability === undefined) {
queuedResults.push(resultThunk)
} else {
capability.resolve(resultThunk)
}
}
const resolveResult = (resultThunk: ResultThunk) => {
if (capability === undefined) {
queuedResults.push(resultThunk)
} else {
capability.resolve(resultThunk)
}
}

try {
// Begin all iterators
for (const [idx, iterable] of iterators.entries()) {
void getNext(idx, iterable)
// Begin all iterators
for (const [idx, iterable] of iterators.entries()) {
void getNext(idx, iterable)
}

const it: AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result<T[P]>]}[number]> = {
[Symbol.asyncIterator]: () => it,
next: async () => {
const nextQueuedResult = queuedResults.shift()
if (nextQueuedResult !== undefined) {
return {done: false as const, value: nextQueuedResult()}
}
if (count === 0) {
return {done: true as const, value: undefined}
}

// Delegate to iterables as results complete
while (true) {
while (true) {
const nextQueuedResult = queuedResults.shift()
if (nextQueuedResult === undefined) {
break
} else {
yield nextQueuedResult()
}
}
if (count === 0) {
break
} else {
// Promise.withResolvers() is not yet implemented in node
capability = {
resolve: undefined as any,
reject: undefined as any,
promise: undefined as any
}
capability.promise = new Promise((res, rej) => {
capability!.resolve = res
capability!.reject = rej
})
const nextResult = await capability.promise
if (nextResult === null) {
break
} else {
capability = undefined
yield nextResult()
}
}
// Promise.withResolvers() is not yet implemented in node
capability = {
resolve: undefined as any,
reject: undefined as any,
promise: undefined as any
}
capability.promise = new Promise((res, rej) => {
capability!.resolve = res
capability!.reject = rej
})
const nextResult = await capability.promise
if (nextResult === null) {
return {done: true as const, value: undefined}
} else {
capability = undefined
return {done: false as const, value: nextResult()}
}
} catch (err) {
// Unwind remaining iterators on failure
try {
await Promise.all(iterators.map((iterator) => iterator.return?.()))
} catch {}
throw err
},
return: async () => {
await Promise.allSettled(iterators.map((iterator) => iterator.return?.()))
return {done: true as const, value: undefined}
},
throw: async () => {
await Promise.allSettled(iterators.map((iterator) => iterator.return?.()))
return {done: true as const, value: undefined}
}
})()
}
return it
}
1 change: 1 addition & 0 deletions packages/embedder/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"url": "git+https://github.com/ParabolInc/parabol.git"
},
"scripts": {
"precommit": "lint-staged",
"lint": "eslint --fix . --ext .ts,.tsx",
"lint:check": "eslint . --ext .ts,.tsx",
"prettier": "prettier --config ../../.prettierrc --write \"**/*.{ts,tsx}\"",
Expand Down
2 changes: 1 addition & 1 deletion packages/embedder/resetStalledJobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ export const resetStalledJobs = () => {
}))
.where('startAt', '<', new Date(Date.now() - ms('5m')))
.execute()
}, ms('5m'))
}, ms('5m')).unref()
}
4 changes: 3 additions & 1 deletion packages/gql-executor/gqlExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ const run = async () => {
const executorChannel = GQLExecutorChannelId.join(SERVER_ID!)

// on shutdown, remove consumer from the group
process.on('SIGTERM', async () => {
process.on('SIGTERM', async (signal) => {
console.log(`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`)
await publisher.xgroup(
'DELCONSUMER',
ServerChannel.GQL_EXECUTOR_STREAM,
ServerChannel.GQL_EXECUTOR_CONSUMER_GROUP,
executorChannel
)
console.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`)
process.exit()
})

Expand Down
18 changes: 12 additions & 6 deletions packages/server/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import tracer from 'dd-trace'
import {r} from 'rethinkdb-ts'
import uws, {SHARED_COMPRESSOR} from 'uWebSockets.js'
import sleep from '../client/utils/sleep'
import ICSHandler from './ICSHandler'
import PWAHandler from './PWAHandler'
import activeClients from './activeClients'
Expand Down Expand Up @@ -37,14 +38,19 @@ if (!__PRODUCTION__) {
})
}

process.on('SIGTERM', () => {
process.on('SIGTERM', async (signal) => {
console.log(
`Server ID: ${process.env.SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`
)
const RECONNECT_WINDOW = 60_000 // ms
Object.values(activeClients.store).forEach((connectionContext) => {
const disconnectIn = ~~(Math.random() * RECONNECT_WINDOW)
setTimeout(() => {
await Promise.allSettled(
Object.values(activeClients.store).map(async (connectionContext) => {
const disconnectIn = Math.floor(Math.random() * RECONNECT_WINDOW)
await sleep(disconnectIn)
handleDisconnect(connectionContext)
}, disconnectIn)
})
})
)
console.log(`Server ID: ${process.env.SERVER_ID}. Graceful shutdown complete, exiting.`)
})

const PORT = Number(__PRODUCTION__ ? process.env.PORT : process.env.SOCKET_PORT)
Expand Down

0 comments on commit 695ccad

Please sign in to comment.