Skip to content

Commit

Permalink
Merge pull request #1870 from botpress/rl_timeout-hook
Browse files Browse the repository at this point in the history
fix(hooks): before session timeout not being executed
  • Loading branch information
rndlaine committed Jun 10, 2019
2 parents fa99962 + 4b7cd39 commit 951ba9b
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 91 deletions.
10 changes: 6 additions & 4 deletions src/bp/core/config/botpress.config.ts
Expand Up @@ -12,18 +12,20 @@ export type ModuleConfigEntry = {

export interface DialogConfig {
/**
* Interval between executions of the janitor to check for stale sessions
* Interval between executions of the janitor that checks for stale contexts and sessions.
* @default 10s
*/
janitorInterval: string
/**
* The delay before a stale session will get sweeped by the janitor
* Interval before a session's context expires.
* e.g. when the conversation is stale and has not reach the END of the flow.
* This will reset the position of the user in the flow.
* @default 2m
*/
timeoutInterval: string
/**
* The delay before we consider that it is a new interaction (ex: different subject). We keep the user's last messages
* and variables in the session context to customize interactions.
* Interval before a session expires. e.g. when the user has not spoken for a while.
* The session including its variable will be deleted.
* @default 30m
*/
sessionTimeoutInterval: string
Expand Down
33 changes: 33 additions & 0 deletions src/bp/core/misc/expiry.ts
@@ -0,0 +1,33 @@
import { BotConfig } from 'botpress/sdk'
import { BotpressConfig } from 'core/config/botpress.config'
import _ from 'lodash'
import moment from 'moment'
import ms from 'ms'

export type DialogExpiry = {
context: Date
session: Date
}

/**
* Create expiry dates for dialog session and dialog context based on the bot configuration.
* If no configuration is found for the bot, it will fallback to botpress config.
*
* @param botConfig The bot configuration file i.e. bot.config.json
* @param botpressConfig Botpress configuration file i.e. botpress.config.json
*/
export function createExpiry(botConfig: BotConfig, botpressConfig: BotpressConfig): DialogExpiry {
const contextTimeout = ms(_.get(botConfig, 'dialog.timeoutInterval', botpressConfig.dialog.timeoutInterval))
const sessionTimeout = ms(
_.get(botConfig, 'dialog.sessionTimeoutInterval', botpressConfig.dialog.sessionTimeoutInterval)
)

return {
context: moment()
.add(contextTimeout, 'ms')
.toDate(),
session: moment()
.add(sessionTimeout, 'ms')
.toDate()
}
}
7 changes: 3 additions & 4 deletions src/bp/core/repositories/sessions.ts
Expand Up @@ -60,7 +60,9 @@ export class KnexSessionRepository implements SessionRepository {
temp_data: this.database.knex.json.set(session.temp_data || {}),
session_data: this.database.knex.json.set(session.session_data || {}),
modified_on: this.database.knex.date.now(),
created_on: this.database.knex.date.now()
created_on: this.database.knex.date.now(),
context_expiry: session.context_expiry ? this.database.knex.date.format(session.context_expiry) : eval('null'),
session_expiry: session.session_expiry ? this.database.knex.date.format(session.session_expiry) : eval('null')
},
['id', 'botId', 'context', 'temp_data', 'session_data', 'modified_on', 'created_on']
)
Expand Down Expand Up @@ -107,7 +109,6 @@ export class KnexSessionRepository implements SessionRepository {
.where('botId', botId)
.andWhere(this.database.knex.date.isBefore('session_expiry', new Date()))
.del()
.then()
}

async update(session: DialogSession): Promise<void> {
Expand All @@ -122,14 +123,12 @@ export class KnexSessionRepository implements SessionRepository {
session_expiry: session.session_expiry ? this.database.knex.date.format(session.session_expiry) : eval('null'),
modified_on: this.database.knex.date.now()
})
.then()
}

async delete(id: string) {
await this.database
.knex(this.tableName)
.where({ id })
.del()
.then()
}
}
42 changes: 24 additions & 18 deletions src/bp/core/services/dialog/dialog-engine.ts
@@ -1,9 +1,11 @@
import { IO, Logger } from 'botpress/sdk'
import { IO } from 'botpress/sdk'
import { createForGlobalHooks } from 'core/api'
import { TYPES } from 'core/types'
import { inject, injectable } from 'inversify'
import _ from 'lodash'

import { converseApiEvents } from '../converse'
import { Hooks, HookService } from '../hook/hook-service'

import { FlowView } from '.'
import { FlowError, ProcessingError } from './errors'
Expand All @@ -21,8 +23,8 @@ export class DialogEngine {
private _flowsByBot: Map<string, FlowView[]> = new Map()

constructor(
@inject(TYPES.Logger) private logger: Logger,
@inject(TYPES.FlowService) private flowService: FlowService,
@inject(TYPES.HookService) private hookService: HookService,
@inject(TYPES.InstructionProcessor) private instructionProcessor: InstructionProcessor
) {}

Expand Down Expand Up @@ -55,7 +57,7 @@ export class DialogEngine {
const instruction = queue.dequeue()
// End session if there are no more instructions in the queue
if (!instruction) {
this._logDebug(event.botId, event.target, 'ending flow')
this._debug(event.botId, event.target, 'ending flow')
event.state.context = {}
event.state.temp = {}
return event
Expand All @@ -70,7 +72,7 @@ export class DialogEngine {
return this.processEvent(sessionId, event)
} else if (result.followUpAction === 'wait') {
// We don't call processEvent, because we want to wait for the next event
this._logDebug(event.botId, event.target, 'waiting until next event')
this._debug(event.botId, event.target, 'waiting until next event')
context.queue = queue
} else if (result.followUpAction === 'transition') {
// We reset the queue when we transition to another node.
Expand Down Expand Up @@ -103,7 +105,11 @@ export class DialogEngine {
}

public async processTimeout(botId: string, sessionId: string, event: IO.IncomingEvent) {
this._logDebug(event.botId, event.target, 'processing timeout')
this._debug(event.botId, event.target, 'processing timeout')

const api = await createForGlobalHooks()
await this.hookService.executeHook(new Hooks.BeforeSessionTimeout(api, event))

await this._loadFlows(botId)

// This is the only place we dont want to catch node or flow not found errors
Expand Down Expand Up @@ -154,15 +160,15 @@ export class DialogEngine {
}

if (!timeoutNode || !timeoutFlow) {
throw new Error(`Could not find any timeout node for session "${sessionId}"`)
throw new Error(`Could not find any timeout node or flow for session "${sessionId}"`)
}

event.state.context.currentNode = timeoutNode.name
event.state.context.currentFlow = timeoutFlow.name
event.state.context.queue = undefined
event.state.context.hasJumped = true

return await this.processEvent(sessionId, event)
return this.processEvent(sessionId, event)
}

private initializeContext(event) {
Expand All @@ -173,7 +179,7 @@ export class DialogEngine {
currentFlow: defaultFlow.name
}

this._logDebug(event.botId, event.target, 'init new context', { ...event.state.context })
this._debug(event.botId, event.target, 'init new context', { ...event.state.context })
return event.state.context
}

Expand Down Expand Up @@ -245,7 +251,7 @@ export class DialogEngine {
} else if (transitionTo === 'END') {
// END means the node has a transition of "end flow" in the flow editor
delete event.state.context
this._logDebug(event.botId, event.target, 'ending flow')
this._debug(event.botId, event.target, 'ending flow')
return event
} else {
// Transition to the target node in the current flow
Expand Down Expand Up @@ -326,28 +332,28 @@ export class DialogEngine {
)
}

private _logDebug(botId: string, target: string, action: string, args?: any) {
private _exitingSubflow(event) {
const { currentFlow, currentNode, previousFlow, previousNode } = event.state.context
return previousFlow === currentFlow && previousNode === currentNode
}

private _debug(botId: string, target: string, action: string, args?: any) {
if (args) {
debug.forBot(botId, `[${target}] ${action} %o`, args)
} else {
debug.forBot(botId, `[${target}] ${action}`)
}
}

private _exitingSubflow(event) {
const { currentFlow, currentNode, previousFlow, previousNode } = event.state.context
return previousFlow === currentFlow && previousNode === currentNode
}

private _logExitFlow(botId, target, currentFlow, currentNode, previousFlow, previousNode) {
this._logDebug(botId, target, `transit (${currentFlow}) [${currentNode}] << (${previousFlow}) [${previousNode}]`)
this._debug(botId, target, `transit (${currentFlow}) [${currentNode}] << (${previousFlow}) [${previousNode}]`)
}

private _logEnterFlow(botId, target, currentFlow, currentNode, previousFlow, previousNode) {
this._logDebug(botId, target, `transit (${previousFlow}) [${previousNode}] >> (${currentFlow}) [${currentNode}]`)
this._debug(botId, target, `transit (${previousFlow}) [${previousNode}] >> (${currentFlow}) [${currentNode}]`)
}

private _logTransition(botId, target, currentFlow, currentNode, transitionTo) {
this._logDebug(botId, target, `transit (${currentFlow}) [${currentNode}] -> [${transitionTo}]`)
this._debug(botId, target, `transit (${currentFlow}) [${currentNode}] -> [${transitionTo}]`)
}
}
107 changes: 68 additions & 39 deletions src/bp/core/services/dialog/janitor.ts
@@ -1,6 +1,7 @@
import { IO, Logger } from 'botpress/sdk'
import { BotConfig, IO, Logger } from 'botpress/sdk'
import { createExpiry } from 'core/misc/expiry'
import { SessionRepository } from 'core/repositories'
import { Event, IOEvent } from 'core/sdk/impl'
import { Event } from 'core/sdk/impl'
import { inject, injectable, tagged } from 'inversify'
import _ from 'lodash'
import { Memoize } from 'lodash-decorators'
Expand All @@ -14,6 +15,9 @@ import { Janitor } from '../janitor'
import { DialogEngine } from './dialog-engine'
import { SessionIdFactory } from './session/id-factory'

const debug = DEBUG('janitor')
const dialogDebug = debug.sub('dialog')

@injectable()
export class DialogJanitor extends Janitor {
constructor(
Expand All @@ -38,51 +42,76 @@ export class DialogJanitor extends Janitor {
return config.dialog.janitorInterval
}

/**
* Deletes the sessions that are expired and
* reset the contexts of the sessions that are stale.
* These actions are executed based on two expiries: session_expiry and context_expiry.
*/
protected async runTask(): Promise<void> {
// Bot config can change at runtime
dialogDebug('Running task')

const botsConfigs = await this.botService.getBots()
const botsIds = Array.from(botsConfigs.keys())

await Promise.mapSeries(botsIds, async botId => {
for (const botId of botsIds) {
dialogDebug.forBot(botId, 'Deleting expired sessions')
await this.sessionRepo.deleteExpiredSessions(botId)

const sessionsIds = await this.sessionRepo.getExpiredContextSessionIds(botId)

if (sessionsIds.length > 0) {
this.logger.forBot(botId).debug(`🔎 Found inactive sessions: ${sessionsIds.join(', ')}`)
dialogDebug.forBot(botId, 'Found stale contexts', sessionsIds)
for (const sessionId of sessionsIds) {
await this._processSessionTimeout(sessionId, botId, botsConfigs.get(botId)!)
}
}
}
}

await Promise.mapSeries(sessionsIds, async id => {
try {
const channel = SessionIdFactory.createChannelFromId(id)
const target = SessionIdFactory.createTargetFromId(id)
const threadId = SessionIdFactory.createThreadIdFromId(id)
const session = await this.sessionRepo.get(id)

// This event only exists so that processTimeout can call processEvent
const fakeEvent = Event({
type: 'timeout',
channel: channel,
target: target,
threadId: threadId,
direction: 'incoming',
payload: '',
botId: botId
}) as IO.IncomingEvent

fakeEvent.state.context = session.context as IO.DialogContext
fakeEvent.state.session = session.session_data as IO.CurrentSession

await this.dialogEngine.processTimeout(botId, id, fakeEvent)
} catch (err) {
// We delete the session in both cases
} finally {
const session = await this.sessionRepo.get(id)
session.context = undefined
session.temp_data = undefined
session.context_expiry = undefined
await this.sessionRepo.update(session)
}
})
})
private async _processSessionTimeout(sessionId: string, botId: string, botConfig: BotConfig) {
dialogDebug.forBot(botId, 'Processing timeout', sessionId)

try {
const channel = SessionIdFactory.createChannelFromId(sessionId)
const target = SessionIdFactory.createTargetFromId(sessionId)
const threadId = SessionIdFactory.createThreadIdFromId(sessionId)
const session = await this.sessionRepo.get(sessionId)

// Don't process the timeout when the context is empty.
// This means the conversation has not began.
if (_.isEmpty(session.context)) {
dialogDebug.forBot(botId, 'Skipping. No changes in context', sessionId)
return
}

// This event only exists so that processTimeout can call processEvent
const fakeEvent = Event({
type: 'timeout',
channel: channel,
target: target,
threadId: threadId,
direction: 'incoming',
payload: '',
botId: botId
}) as IO.IncomingEvent

fakeEvent.state.context = session.context as IO.DialogContext
fakeEvent.state.session = session.session_data as IO.CurrentSession

await this.dialogEngine.processTimeout(botId, sessionId, fakeEvent)

const botpressConfig = await this.getBotpresConfig()
const expiry = createExpiry(botConfig!, botpressConfig)

session.context = {}
session.temp_data = {}
session.context_expiry = expiry.context
session.session_expiry = expiry.session

await this.sessionRepo.update(session)

dialogDebug.forBot(botId, `New expiry set for ${session.context_expiry}`, sessionId)
} catch (err) {
this.logger.error(`Could not process the timeout event. ${err.message}`)
}
}
}

0 comments on commit 951ba9b

Please sign in to comment.