Skip to content

Commit

Permalink
fix(channels): readd clearing and fix auto start (#297)
Browse files Browse the repository at this point in the history
* fix auto start

* channel clearing

* clearing service

* instance subservices

* events in lifetime subservice

* better naming

* monitoring service

* sandbox subservice

* move code

* instance messaging service

* fix

* fix

* fix tests

* fix

* auto start from botpress

* dont clear if removed from cache intentionnaliy

* prevent auto start when failed or untinitialized

* clear errors at sync

* pr comments
  • Loading branch information
samuelmasse authored Jan 11, 2022
1 parent 3bf606e commit 8d01733
Show file tree
Hide file tree
Showing 23 changed files with 597 additions and 322 deletions.
21 changes: 16 additions & 5 deletions packages/channels/src/base/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export interface Channel {
set logger(logger: Logger | undefined)
get kvs(): Kvs | undefined
set kvs(kvs: Kvs | undefined)
setup(router: Router): Promise<void>
setup(router: Router, logger?: Logger): Promise<void>
start(scope: string, config: any): Promise<void>
initialize(scope: string): Promise<void>
send(scope: string, endpoint: any, content: any): Promise<void>
Expand All @@ -27,10 +27,17 @@ export interface Channel {
event: K,
listener: ((arg: ChannelEvents[K]) => Promise<void>) | ((arg: ChannelEvents[K]) => void)
): void
autoStart(callback: (scope: string) => Promise<any>): void
autoStart(callback: (scope: string) => Promise<void>): void
stateManager(manager: ChannelStateManager): void
makeUrl(callback: (scope: string) => Promise<string>): void
}

export interface ChannelStateManager {
set(scope: string, val: any): boolean
get(scope: string): any
del(scope: string): void
}

export abstract class ChannelTemplate<
TConfig extends ChannelConfig,
TService extends ChannelService<TConfig, any>,
Expand Down Expand Up @@ -62,9 +69,9 @@ export abstract class ChannelTemplate<

constructor(public readonly service: TService, public readonly api: TApi, public readonly stream: TStream) {}

async setup(router: Router) {
async setup(router: Router, logger?: Logger) {
await this.service.setup()
await this.api.setup(new ChannelApiManager(this.service, router))
await this.api.setup(new ChannelApiManager(this.service, router, logger))
await this.stream.setup()
}

Expand Down Expand Up @@ -99,10 +106,14 @@ export abstract class ChannelTemplate<
}
}

autoStart(callback: (scope: string) => Promise<TConfig>) {
autoStart(callback: (scope: string) => Promise<void>) {
this.service.autoStart(callback)
}

stateManager(manager: ChannelStateManager): void {
this.service.stateManager(manager)
}

makeUrl(callback: (scope: string) => Promise<string>): void {
return this.api.makeUrl(callback)
}
Expand Down
36 changes: 29 additions & 7 deletions packages/channels/src/base/service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import LRU from 'lru-cache'
import ms from 'ms'
import { ChannelStateManager } from '..'
import { Emitter } from '../base/emitter'
import { Endpoint } from '../base/endpoint'
import { ChannelConfig } from './config'
Expand Down Expand Up @@ -27,7 +28,8 @@ export abstract class ChannelService<

protected cacheIndexResponses: LRU<string, IndexChoiceOption[]> = new LRU({ max: 50000, maxAge: ms('5min') })
protected states: { [scope: string]: TState } = {}
protected startCallback?: (scope: string) => Promise<TConfig>
protected startCallback?: (scope: string) => Promise<void>
protected manager?: ChannelStateManager

get scopes() {
return Object.keys(this.states)
Expand All @@ -36,7 +38,14 @@ export abstract class ChannelService<
async setup() {}

async start(scope: string, config: TConfig) {
this.states[scope] = await this.create(scope, config)
const state = await this.create(scope, config)

if (this.manager) {
this.manager.set(scope, state)
} else {
this.states[scope] = state
}

await this.emit('start', { scope })
}

Expand All @@ -49,13 +58,17 @@ export abstract class ChannelService<
return
}

await this.start(scope, await this.startCallback!(scope))
await this.startCallback!(scope)
}

autoStart(callback: (scope: string) => Promise<TConfig>) {
autoStart(callback: (scope: string) => Promise<void>) {
this.startCallback = callback
}

stateManager(manager: ChannelStateManager) {
this.manager = manager
}

abstract create(scope: string, config: TConfig): Promise<TState>

async send(scope: string, endpoint: Endpoint, content: any) {
Expand All @@ -73,13 +86,22 @@ export abstract class ChannelService<
async stop(scope: string) {
await this.emit('stop', { scope })
await this.destroy(scope, this.get(scope))
delete this.states[scope]

if (this.manager) {
this.manager.del(scope)
} else {
delete this.states[scope]
}
}

async destroy(scope: string, state: TState): Promise<void> {}

public get(scope: string) {
return this.states[scope]
public get(scope: string): TState {
if (this.manager) {
return this.manager.get(scope)
} else {
return this.states[scope]
}
}

public prepareIndexResponse(scope: string, identity: string, sender: string, options: IndexChoiceOption[]) {
Expand Down
2 changes: 2 additions & 0 deletions packages/channels/src/base/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export abstract class ChannelStream<TService extends ChannelService<any, any>, T
}

protected async handleSend({ scope, endpoint, content }: ChannelSendEvent) {
await this.service.require(scope)

const context = await this.getContext({
scope,
state: this.service.get(scope),
Expand Down
6 changes: 5 additions & 1 deletion packages/engine/src/caching/cache2D.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,13 @@ export class ServerCache2D<V> {
return this.lru.keys()
}

private getKey(keyX: string, keyY: string) {
getKey(keyX: string, keyY: string) {
return `${keyX}~\`${keyY}`
}

getValues(key: string) {
return key.split('~`')
}
}

interface ServerCacheEvent<K, V> {
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ export class App extends Engine {
this.providers,
this.conduits,
this.clients,
this.webhooks
this.webhooks,
this.status
)
this.health = new HealthService(
this.database,
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/channels/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ export class ChannelApi {
constructor(private router: Router, private app: App) {}

async setup() {
const logger = this.app.logger.root.sub('channels')
const webhookRouter = Router()

for (const channel of this.app.channels.list()) {
await channel.setup(webhookRouter)
await channel.setup(webhookRouter, logger.sub(channel.meta.name))

channel.logger = this.app.logger.root.sub(channel.meta.name)
channel.kvs = this.app.kvs
Expand Down
20 changes: 13 additions & 7 deletions packages/server/src/health/listener.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { uuid } from '@botpress/messaging-base'
import { ConduitEvents } from '../conduits/events'
import { ConduitService } from '../conduits/service'
import { InstanceEvents } from '../instances/events'
import { InstanceLifetimeEvents } from '../instances/lifetime/events'
import { InstanceService } from '../instances/service'
import { HealthService } from './service'
import { HealthEventType } from './types'
Expand All @@ -17,14 +17,20 @@ export class HealthListener {
this.conduitService.events.on(ConduitEvents.Created, this.handleConduitCreated.bind(this), true)
this.conduitService.events.on(ConduitEvents.Updated, this.handleConduitUpdated.bind(this), true)
this.conduitService.events.on(ConduitEvents.Deleting, this.handleConduitDeleting.bind(this), true)
this.instanceService.events.on(InstanceEvents.Setup, this.handleInstanceSetup.bind(this))
this.instanceService.events.on(InstanceEvents.SetupFailed, this.handleInstanceSetupFailed.bind(this))
this.instanceService.events.on(InstanceEvents.Initialized, this.handleInstanceInitialized.bind(this))
this.instanceService.events.on(
InstanceEvents.InitializationFailed,
this.instanceService.lifetimes.events.on(InstanceLifetimeEvents.Setup, this.handleInstanceSetup.bind(this))
this.instanceService.lifetimes.events.on(
InstanceLifetimeEvents.SetupFailed,
this.handleInstanceSetupFailed.bind(this)
)
this.instanceService.lifetimes.events.on(
InstanceLifetimeEvents.Initialized,
this.handleInstanceInitialized.bind(this)
)
this.instanceService.lifetimes.events.on(
InstanceLifetimeEvents.InitializationFailed,
this.handleInstanceInitializationFailed.bind(this)
)
this.instanceService.events.on(InstanceEvents.Destroyed, this.handleInstanceDestroyed.bind(this))
this.instanceService.lifetimes.events.on(InstanceLifetimeEvents.Destroyed, this.handleInstanceDestroyed.bind(this))
}

private async handleConduitCreated(conduitId: uuid) {
Expand Down
85 changes: 85 additions & 0 deletions packages/server/src/instances/clearing/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { CachingService, Logger, ServerCache2D, Service } from '@botpress/messaging-engine'
import ms from 'ms'
import { ChannelService } from '../../channels/service'
import { ConduitService } from '../../conduits/service'
import { ProviderService } from '../../providers/service'
import { InstanceLifetimeService } from '../lifetime/service'

export class InstanceClearingService extends Service {
private destroyed: boolean
private statesCache!: ServerCache2D<any>
private statesDeleting!: { [key: string]: any }
private statesDeleted!: { [key: string]: boolean }

constructor(
private caching: CachingService,
private channels: ChannelService,
private providers: ProviderService,
private conduits: ConduitService,
private lifetimes: InstanceLifetimeService,
private logger: Logger
) {
super()
this.destroyed = false
}

public async setup() {
this.statesCache = await this.caching.newServerCache2D('cache_channel_states', {
dispose: this.handleCacheDispose.bind(this),
max: 2000,
maxAge: ms('30min')
})
this.statesDeleting = {}
this.statesDeleted = {}

for (const channel of this.channels.list()) {
channel.stateManager({
set: (providerName, val) => this.statesCache.set(channel.meta.id, providerName, val),
get: (providerName) => {
return (
this.statesCache.get(channel.meta.id, providerName) ||
this.statesDeleting[this.statesCache.getKey(channel.meta.id, providerName)]
)
},
del: (providerName) => {
// indicates this key is deleted intentionnaly, and did not fall out of the cache
this.statesDeleted[this.statesCache.getKey(channel.meta.id, providerName)] = true
this.statesCache.del(channel.meta.id, providerName)
}
})
}
}

async destroy() {
this.destroyed = true
}

private handleCacheDispose(key: string, value: any) {
if (this.destroyed) {
return
}

// if it's been deleted intentionally we assume disposing was done
if (this.statesDeleted) {
delete this.statesDeleted[key]
return
}

this.statesDeleting[key] = value
void this.handleInstanceClearing(key)
}

private async handleInstanceClearing(key: string) {
try {
const [channelId, providerName] = this.statesCache.getValues(key)
const provider = await this.providers.getByName(providerName)
const conduit = await this.conduits.getByProviderAndChannel(provider.id, channelId)

await this.lifetimes.stop(conduit.id)
} catch (e) {
this.logger.error(e, 'Error trying to clear channel')
} finally {
delete this.statesDeleting[key]
}
}
}
11 changes: 0 additions & 11 deletions packages/server/src/instances/dispatch.ts

This file was deleted.

19 changes: 0 additions & 19 deletions packages/server/src/instances/events.ts

This file was deleted.

Loading

0 comments on commit 8d01733

Please sign in to comment.