Skip to content

Commit

Permalink
fix(core): clearing state from redis when reset session
Browse files Browse the repository at this point in the history
  • Loading branch information
allardy committed Nov 18, 2019
1 parent 3436030 commit 34ecaed
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
25 changes: 9 additions & 16 deletions src/bp/core/api.ts
Expand Up @@ -27,6 +27,7 @@ import { JobService } from './services/job-service'
import { KeyValueStore } from './services/kvs'
import MediaService from './services/media'
import { EventEngine } from './services/middleware/event-engine'
import { StateManager } from './services/middleware/state-manager'
import { NotificationsService } from './services/notification/service'
import RealtimeService from './services/realtime'
import { WorkspaceService } from './services/workspace-service'
Expand Down Expand Up @@ -76,20 +77,12 @@ const event = (eventEngine: EventEngine, eventRepo: EventRepository): typeof sdk
}
}

const dialog = (dialogEngine: DialogEngine, sessionRepo: SessionRepository): typeof sdk.dialog => {
const dialog = (dialogEngine: DialogEngine, stateManager: StateManager): typeof sdk.dialog => {
return {
createId(eventDestination: sdk.IO.EventDestination) {
return SessionIdFactory.createIdFromEvent(eventDestination)
},
async processEvent(sessionId: string, event: sdk.IO.IncomingEvent): Promise<sdk.IO.IncomingEvent> {
return dialogEngine.processEvent(sessionId, event)
},
async deleteSession(userId: string): Promise<void> {
await sessionRepo.delete(userId)
},
async jumpTo(sessionId: string, event: any, flowName: string, nodeName?: string): Promise<void> {
await dialogEngine.jumpTo(sessionId, event, flowName, nodeName)
}
createId: SessionIdFactory.createIdFromEvent.bind(SessionIdFactory),
processEvent: dialogEngine.processEvent.bind(dialogEngine),
deleteSession: stateManager.deleteDialogSession.bind(stateManager),
jumpTo: dialogEngine.jumpTo.bind(dialogEngine)
}
}

Expand Down Expand Up @@ -277,7 +270,6 @@ export class BotpressAPIProvider {
@inject(TYPES.HTTPServer) httpServer: HTTPServer,
@inject(TYPES.UserRepository) userRepo: UserRepository,
@inject(TYPES.RealtimeService) realtimeService: RealtimeService,
@inject(TYPES.SessionRepository) sessionRepo: SessionRepository,
@inject(TYPES.KeyValueStore) keyValueStore: KeyValueStore,
@inject(TYPES.NotificationsService) notificationService: NotificationsService,
@inject(TYPES.BotService) botService: BotService,
Expand All @@ -288,11 +280,12 @@ export class BotpressAPIProvider {
@inject(TYPES.HookService) hookService: HookService,
@inject(TYPES.EventRepository) eventRepo: EventRepository,
@inject(TYPES.WorkspaceService) workspaceService: WorkspaceService,
@inject(TYPES.JobService) jobService: JobService
@inject(TYPES.JobService) jobService: JobService,
@inject(TYPES.StateManager) stateManager: StateManager
) {
this.http = http(httpServer)
this.events = event(eventEngine, eventRepo)
this.dialog = dialog(dialogEngine, sessionRepo)
this.dialog = dialog(dialogEngine, stateManager)
this.config = config(moduleLoader, configProvider)
this.realtime = new RealTimeAPI(realtimeService)
this.database = db.knex
Expand Down
21 changes: 16 additions & 5 deletions src/bp/core/services/middleware/state-manager.ts
Expand Up @@ -28,6 +28,7 @@ export class StateManager {
private batch!: { event: sdk.IO.IncomingEvent; ignoreContext?: boolean }[]
private knex!: Knex & KnexExtension
private currentPromise
private useRedis

constructor(
@inject(TYPES.Logger)
Expand All @@ -39,10 +40,12 @@ export class StateManager {
@inject(TYPES.KeyValueStore) private kvs: KeyValueStore,
@inject(TYPES.Database) private database: Database,
@inject(TYPES.JobService) private jobService: JobService
) {}
) {
this.useRedis = process.CLUSTER_ENABLED && !process.core_env.BP_NO_REDIS_STATE
}

public initialize() {
if (!process.CLUSTER_ENABLED || process.core_env.BP_NO_REDIS_STATE) {
if (!this.useRedis) {
return
}

Expand All @@ -63,7 +66,7 @@ export class StateManager {
public async restore(event: sdk.IO.IncomingEvent) {
const sessionId = SessionIdFactory.createIdFromEvent(event)

if (process.CLUSTER_ENABLED && !process.core_env.BP_NO_REDIS_STATE) {
if (this.useRedis) {
try {
const userState = await this._redisClient.get(getRedisSessionKey(sessionId))
if (userState) {
Expand Down Expand Up @@ -93,10 +96,10 @@ export class StateManager {
public async persist(event: sdk.IO.IncomingEvent, ignoreContext: boolean) {
const sessionId = SessionIdFactory.createIdFromEvent(event)

if (process.CLUSTER_ENABLED && !process.core_env.BP_NO_REDIS_STATE) {
if (this.useRedis) {
await this._redisClient.set(
getRedisSessionKey(sessionId),
JSON.stringify(event.state),
JSON.stringify(_.omit(event.state, ['__stacktrace', '__error'])),
'PX',
REDIS_MEMORY_DURATION
)
Expand All @@ -107,6 +110,14 @@ export class StateManager {
await this._saveState(event, ignoreContext)
}

public async deleteDialogSession(sessionId: string) {
await this.sessionRepo.delete(sessionId)

if (this.useRedis) {
await this._redisClient.del(getRedisSessionKey(sessionId))
}
}

private async _saveState(event: sdk.IO.IncomingEvent, ignoreContext?: boolean, trx?: Knex.Transaction) {
const { user, context, session, temp } = event.state
const sessionId = SessionIdFactory.createIdFromEvent(event)
Expand Down
7 changes: 6 additions & 1 deletion src/bp/sdk/botpress.d.ts
Expand Up @@ -1437,7 +1437,12 @@ declare module 'botpress/sdk' {
* @param nodeName The name of the optionnal node to jump to.
* The node will default to the starting node of the flow if this value is omitted.
*/
export function jumpTo(sessionId: string, event: IO.Event, flowName: string, nodeName?: string): Promise<void>
export function jumpTo(
sessionId: string,
event: IO.IncomingEvent,
flowName: string,
nodeName?: string
): Promise<void>
}

export namespace config {
Expand Down

0 comments on commit 34ecaed

Please sign in to comment.