Skip to content

Commit

Permalink
Experimental node clustering for bsky frontends (#1985)
Browse files Browse the repository at this point in the history
* experimental node clustering for bsky frontends

* build
  • Loading branch information
devinivy authored Dec 28, 2023
1 parent 50f209e commit a2a07f1
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-push-bsky-aws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ on:
push:
branches:
- main
- timeline-limit-1-opt
- bsky-node-clustering
env:
REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }}
USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }}
Expand Down
36 changes: 33 additions & 3 deletions services/bsky/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require('dd-trace') // Only works with commonjs
// Tracer code above must come before anything else
const path = require('path')
const assert = require('assert')
const cluster = require('cluster')
const {
BunnyInvalidator,
CloudfrontInvalidator,
Expand Down Expand Up @@ -140,12 +141,14 @@ const main = async () => {

await bsky.start()
// Graceful shutdown (see also https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/)
process.on('SIGTERM', async () => {
const shutdown = async () => {
// Gracefully shutdown periodic-moderation-event-reversal before destroying bsky instance
periodicModerationEventReversal.destroy()
await periodicModerationEventReversalRunning
await bsky.destroy()
})
}
process.on('SIGTERM', shutdown)
process.on('disconnect', shutdown) // when clustering
}

const getEnv = () => ({
Expand Down Expand Up @@ -223,4 +226,31 @@ const maintainXrpcResource = (span, req) => {
}
}

main()
const workerCount = maybeParseInt(process.env.CLUSTER_WORKER_COUNT)

if (workerCount) {
if (cluster.isPrimary) {
console.log(`primary ${process.pid} is running`)
const workers = new Set()
for (let i = 0; i < workerCount; ++i) {
workers.add(cluster.fork())
}
let teardown = false
cluster.on('exit', (worker) => {
workers.delete(worker)
if (!teardown) {
workers.add(cluster.fork()) // restart on crash
}
})
process.on('SIGTERM', () => {
teardown = true
console.log('disconnecting workers')
workers.forEach((w) => w.disconnect())
})
} else {
console.log(`worker ${process.pid} is running`)
main()
}
} else {
main() // non-clustering
}

0 comments on commit a2a07f1

Please sign in to comment.