Skip to content

Commit

Permalink
fix(monitoring): fix listing outdated errored conduits (#261)
Browse files Browse the repository at this point in the history
* fix(monitoring): fix listing oudated errored conduits

* fix

* refact

* fix

* refact

* fix

* fix

* fix

* fix

* remove comment

* migration

* bum version

* more tests

* more tests

* bump version

* shutdown error when migration fails

* better ShutDownSignal

* pr comments

* add limit test

* errMessage

* remove version bump
  • Loading branch information
samuelmasse committed Nov 30, 2021
1 parent 6f04076 commit c046dac
Show file tree
Hide file tree
Showing 15 changed files with 390 additions and 87 deletions.
2 changes: 1 addition & 1 deletion packages/engine/src/base/errors.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export class ShutDownSignal extends Error {
constructor() {
constructor(public readonly code?: number) {
super()
}
}
6 changes: 3 additions & 3 deletions packages/engine/src/migration/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class MigrationService extends Service {
async setup() {
this.srcVersion = process.env.TESTMIG_DB_VERSION || this.meta.get().version
this.dstVersion = process.env.MIGRATE_TARGET || this.meta.app().version
this.autoMigrate = !!yn(process.env.AUTO_MIGRATE)
this.autoMigrate = !!yn(process.env.AUTO_MIGRATE) || !!process.env.MIGRATE_CMD?.length
this.isDown = process.env.MIGRATE_CMD === 'down'
this.isDry = !!yn(process.env.MIGRATE_DRYRUN)
this.loggerDry = this.logger.prefix(this.isDry ? '[DRY] ' : '')
Expand Down Expand Up @@ -60,7 +60,7 @@ export class MigrationService extends Service {

if (!this.autoMigrate) {
this.logger.error(undefined, 'Migrations required. Please restart the messaging server with --auto-migrate')
throw new ShutDownSignal()
throw new ShutDownSignal(1)
}

await this.runMigrations(migrations)
Expand All @@ -87,7 +87,7 @@ export class MigrationService extends Service {
} catch (e) {
await trx.rollback()
this.loggerDry.error(e, 'Migrations failed')
throw new ShutDownSignal()
throw new ShutDownSignal(1)
}
}

Expand Down
32 changes: 2 additions & 30 deletions packages/server/src/conduits/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ export class ConduitService extends Service {
id: uuidv4(),
providerId,
channelId,
config: validConfig,
initialized: undefined
config: validConfig
}

await this.query().insert(this.serialize(conduit))
Expand Down Expand Up @@ -85,21 +84,11 @@ export class ConduitService extends Service {

await this.query()
.where({ id })
.update({ initialized: null, config: this.cryptoService.encrypt(JSON.stringify(validConfig || {})) })
.update({ config: this.cryptoService.encrypt(JSON.stringify(validConfig || {})) })

await this.emitter.emit(ConduitEvents.Updated, id)
}

async updateInitialized(id: uuid) {
const conduit = (await this.get(id))!
this.cacheById.del(id, true)
this.cacheByProviderAndChannel.del(conduit.providerId, conduit.channelId, true)

await this.query()
.where({ id })
.update({ initialized: this.db.setDate(new Date()) })
}

async get(id: uuid): Promise<Conduit | undefined> {
const cached = this.cacheById.get(id)
if (cached) {
Expand Down Expand Up @@ -139,21 +128,6 @@ export class ConduitService extends Service {
return rows.map((x) => _.omit(x, 'config')) as Conduit[]
}

async listOutdated(tolerance: number, limit: number): Promise<Conduit[]> {
const rows = await this.query()
.select(['msg_conduits.id', 'providerId', 'channelId', 'initialized'])
.where({ initiable: true })
.andWhere((k) => {
return k
.where('initialized', '<=', this.db.setDate(new Date(Date.now() - tolerance))!)
.orWhereNull('initialized')
})
.innerJoin('msg_channels', 'msg_channels.id', 'msg_conduits.channelId')
.limit(limit)

return rows as Conduit[]
}

async listByChannel(channelId: uuid): Promise<Conduit[]> {
const rows = await this.query().where({ channelId })
return rows.map((x) => _.omit(x, 'config')) as Conduit[]
Expand All @@ -170,15 +144,13 @@ export class ConduitService extends Service {
private serialize(conduit: Partial<Conduit>) {
return {
...conduit,
initialized: this.db.setDate(conduit.initialized),
config: this.cryptoService.encrypt(JSON.stringify(conduit.config || {}))
}
}

private deserialize(conduit: any): Conduit {
return {
...conduit,
initialized: this.db.getDate(conduit.initialized),
config: JSON.parse(this.cryptoService.decrypt(conduit.config))
}
}
Expand Down
2 changes: 0 additions & 2 deletions packages/server/src/conduits/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ export class ConduitTable extends Table {
table.uuid('id').primary()
table.uuid('providerId').references('id').inTable('msg_providers').notNullable()
table.uuid('channelId').references('id').inTable('msg_channels').notNullable()
table.timestamp('initialized').nullable()
table.text('config').notNullable()
table.unique(['providerId', 'channelId'])
table.index(['initialized'])
table.index(['channelId'])
}
}
1 change: 0 additions & 1 deletion packages/server/src/conduits/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ export interface Conduit {
id: uuid
providerId: uuid
channelId: uuid
initialized: Date | undefined
config: any
}
1 change: 1 addition & 0 deletions packages/server/src/instances/invalidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class InstanceInvalidator {

private async onConduitUpdated(conduitId: uuid) {
this.cache.del(conduitId, true)
await this.status.updateInitializedOn(conduitId, undefined)
await this.status.clearErrors(conduitId)

const conduit = (await this.conduits.get(conduitId))!
Expand Down
23 changes: 12 additions & 11 deletions packages/server/src/instances/monitoring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { StatusService } from '../status/service'
import { InstanceService } from './service'

const MAX_ALLOWED_FAILURES = 5
const MAX_INITIALIZE_BATCH = 100

export class InstanceMonitoring {
private timeout?: NodeJS.Timeout
Expand Down Expand Up @@ -44,30 +45,30 @@ export class InstanceMonitoring {
}

private async initializeOutdatedConduits() {
const outdateds = await this.conduits.listOutdated(ms('10h'), 1000)
if (yn(process.env.SPINNED)) {
return
}

const outdateds = await this.status.listOutdated(ms('10h'), MAX_ALLOWED_FAILURES, MAX_INITIALIZE_BATCH)
for (const outdated of outdateds) {
const failures = (await this.status.getNumberOfErrors(outdated.id)) || 0
if (!yn(process.env.SPINNED) && failures >= MAX_ALLOWED_FAILURES) {
continue
}

await this.instances.initialize(outdated.id)
await this.instances.initialize(outdated.conduitId)
}
}

private async loadNonLazyConduits() {
const lazyLoadingEnabled = !yn(process.env.NO_LAZY_LOADING)
if (!yn(process.env.SPINNED)) {
return
}

for (const channel of this.channels.list()) {
if (channel.lazy && lazyLoadingEnabled) {
if (channel.lazy && !yn(process.env.NO_LAZY_LOADING)) {
continue
}

const conduits = await this.conduits.listByChannel(channel.id)
for (const conduit of conduits) {
const failures = (await this.status.getNumberOfErrors(conduit.id)) || 0
if (!yn(process.env.SPINNED) && failures >= MAX_ALLOWED_FAILURES) {
const failures = (await this.status.get(conduit.id))?.numberOfErrors || 0
if (failures >= MAX_ALLOWED_FAILURES) {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/instances/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class InstanceService extends Service {
return this.emitter.emit(InstanceEvents.InitializationFailed, conduitId)
}

await this.conduitService.updateInitialized(conduitId)
await this.statusService.updateInitializedOn(conduitId, new Date())
await this.statusService.clearErrors(conduitId)
return this.emitter.emit(InstanceEvents.Initialized, conduitId)
}
Expand Down
6 changes: 4 additions & 2 deletions packages/server/src/launcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ export class Launcher {

await this.app.monitor()
} catch (e) {
if (!(e instanceof ShutDownSignal)) {
if (e instanceof ShutDownSignal) {
await this.shutDown(e.code)
} else {
this.logger.error(e, 'Error occurred starting server')
await this.shutDown(1)
}
await this.shutDown()
}
}

Expand Down
35 changes: 35 additions & 0 deletions packages/server/src/migrations/0.1.18-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Migration } from '@botpress/messaging-engine'

export class StatusMigration extends Migration {
meta = {
name: StatusMigration.name,
description: 'Moves status information from conduit table to the status table',
version: '0.1.19'
}

async valid() {
return this.trx.schema.hasTable('msg_conduits')
}

async applied() {
return !(await this.trx.schema.hasColumn('msg_conduits', 'initialized'))
}

async up() {
await this.trx.schema.dropTableIfExists('msg_status')

return this.trx.schema.alterTable('msg_conduits', (table) => {
table.dropIndex(['initialized'])
table.dropColumn('initialized')
})
}

async down() {
await this.trx.schema.dropTableIfExists('msg_status')

return this.trx.schema.alterTable('msg_conduits', (table) => {
table.timestamp('initialized').nullable()
table.index(['initialized'])
})
}
}
3 changes: 2 additions & 1 deletion packages/server/src/migrations/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Migration } from '@botpress/messaging-engine'
import { StatusMigration } from './0.1.18-status'

export const Migrations = [] as { new (): Migration }[]
export const Migrations: { new (): Migration }[] = [StatusMigration]

0 comments on commit c046dac

Please sign in to comment.