Skip to content

Commit

Permalink
feat(converse): close collector when the bot has finished processing (#…
Browse files Browse the repository at this point in the history
…217)

* end turn route

* responses

* return message also

* use cache for collectors

* hintRespondingTo

* incomingId

* bump messaging client

* turn id

* collect flag

* custom timeout

* pr comments

* fix
  • Loading branch information
samuelmasse committed Oct 28, 2021
1 parent d5c0230 commit ccb2154
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 32 deletions.
2 changes: 1 addition & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@botpress/messaging-client",
"version": "0.0.7",
"version": "0.0.8",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"source": "src/index.ts",
Expand Down
16 changes: 14 additions & 2 deletions packages/client/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,16 @@ import { BaseClient } from './base'
import { handleNotFound } from './errors'

export class MessageClient extends BaseClient {
async create(conversationId: string, authorId: string | undefined, payload: any): Promise<Message> {
return this.deserialize((await this.http.post<Message>('/messages', { conversationId, authorId, payload })).data)
async create(
conversationId: string,
authorId: string | undefined,
payload: any,
flags?: { incomingId: string }
): Promise<Message> {
return this.deserialize(
(await this.http.post<Message>('/messages', { conversationId, authorId, payload, incomingId: flags?.incomingId }))
.data
)
}

async get(id: string): Promise<Message | undefined> {
Expand All @@ -25,6 +33,10 @@ export class MessageClient extends BaseClient {
return (await this.http.delete<{ count: number }>('/messages', { params: filters })).data.count
}

async endTurn(id: string) {
await this.http.post(`/messages/turn/${id}`)
}

public deserialize(message: Message): Message {
return {
...message,
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class App {
this.userTokens = new UserTokenService(this.database, this.crypto, this.caching, this.batching, this.users)
this.conversations = new ConversationService(this.database, this.caching, this.batching, this.users)
this.messages = new MessageService(this.database, this.caching, this.batching, this.conversations)
this.converse = new ConverseService(this.messages)
this.converse = new ConverseService(this.caching, this.messages)
this.mapping = new MappingService(this.database, this.caching, this.batching, this.users, this.conversations)
this.status = new StatusService(this.database, this.distributed, this.caching, this.conduits)
this.instances = new InstanceService(
Expand Down Expand Up @@ -121,6 +121,7 @@ export class App {
this.users,
this.conversations,
this.messages,
this.converse,
this.mapping
)
}
Expand Down
90 changes: 71 additions & 19 deletions packages/server/src/converse/service.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,116 @@
import { Message, uuid } from '@botpress/messaging-base'
import ms from 'ms'
import { Service } from '../base/service'
import { ServerCache } from '../caching/cache'
import { CachingService } from '../caching/service'
import { MessageCreatedEvent, MessageEvents } from '../messages/events'
import { MessageService } from '../messages/service'
import { Collector } from './types'

const DEFAULT_COLLECT_TIMEOUT = ms('10s')

export class ConverseService extends Service {
private collectors: { [conversationId: string]: Collector[] } = {}
private collectors!: ServerCache<uuid, Collector[]>
private incomingIdCache!: ServerCache<uuid, uuid>
private collectingForMessageCache!: ServerCache<uuid, boolean>

constructor(private messages: MessageService) {
constructor(private caching: CachingService, private messages: MessageService) {
super()
}

async setup() {
this.messages.events.on(MessageEvents.Created, this.handleMessageCreated.bind(this))

this.collectors = await this.caching.newServerCache('cache_converse_collectors', {})
this.incomingIdCache = await this.caching.newServerCache('cache_converse_incoming_id')
this.collectingForMessageCache = await this.caching.newServerCache('cache_converse_collecting_for_message')
}

private async handleMessageCreated({ message }: MessageCreatedEvent) {
const collectors = this.collectors[message.conversationId] || []
if (message.authorId) {
// we only collect bot messages
return
}

for (const collector of collectors) {
collector.messages.push(message)
const incomingId = this.incomingIdCache.get(message.id)
const collectors = this.collectors.get(message.conversationId) || []

if (collector.messages.length > 0) {
this.resetCollectorTimeout(collector, 250)
for (const collector of collectors) {
if (!incomingId || incomingId === collector.messageId) {
collector.messages.push(message)
}
}
}

async collect(conversationId: uuid): Promise<Message[]> {
const collector = this.addCollector(conversationId)
this.resetCollectorTimeout(collector, 5000)
setIncomingId(messageId: uuid, incomingId: uuid) {
this.incomingIdCache.set(messageId, incomingId)
}

isCollectingForMessage(messageId: uuid) {
return !!this.collectingForMessageCache.get(messageId)
}

async collect(messageId: uuid, conversationId: uuid, timeout: number): Promise<Message[]> {
const collector = this.addCollector(messageId, conversationId)
if (timeout !== 0) {
this.resetCollectorTimeout(collector, timeout || DEFAULT_COLLECT_TIMEOUT)
}

this.collectingForMessageCache.set(messageId, true)

return new Promise<Message[]>((resolve) => {
collector.resolve = resolve
})
}

private addCollector(conversationId: uuid): Collector {
if (!this.collectors[conversationId]) {
this.collectors[conversationId] = []
async stopCollecting(messageId: uuid, conversationId: uuid) {
const collectors = this.collectors.get(conversationId) || []
const childCollectors = collectors.filter((x) => x.messageId === messageId)

for (const collector of childCollectors) {
clearTimeout(collector.timeout!)
this.removeCollector(collector)
this.resolveCollect(collector)
}
}

private addCollector(messageId: uuid, conversationId: uuid): Collector {
let collectors = this.collectors.get(conversationId)

if (!collectors) {
collectors = []
this.collectors.set(conversationId, collectors)
}

const collector: Collector = {
messageId,
conversationId,
messages: []
}

this.collectors[conversationId].push(collector)
collectors.push(collector)

return collector
}

private resolveCollect(collector: Collector) {
if (collector.resolve) {
collector.resolve(collector.messages)
collector.resolve = undefined
}
}

private removeCollector(collector: Collector) {
const { conversationId } = collector

const index = this.collectors[conversationId].indexOf(collector)
this.collectors[conversationId].splice(index, 1)
const collectors = this.collectors.get(conversationId)!
const index = collectors.indexOf(collector)
if (index >= 0) {
collectors.splice(index, 1)
}

if (this.collectors[conversationId].length === 0) {
delete this.collectors[conversationId]
if (!collectors.length) {
this.collectors.del(conversationId)
}
}

Expand All @@ -69,7 +121,7 @@ export class ConverseService extends Service {

collector.timeout = setTimeout(() => {
this.removeCollector(collector)
collector.resolve!(collector.messages)
this.resolveCollect(collector)
}, time)
}
}
1 change: 1 addition & 0 deletions packages/server/src/converse/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { uuid, Message } from '@botpress/messaging-base'

export interface Collector {
messageId: uuid
conversationId: uuid
messages: Message[]
resolve?: (x: Message[]) => void
Expand Down
44 changes: 39 additions & 5 deletions packages/server/src/messages/api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { uuid } from '@botpress/messaging-base'
import { Router } from 'express'
import { v4 as uuidv4 } from 'uuid'
import { Auth } from '../base/auth/auth'
import { ConversationService } from '../conversations/service'
import { ConverseService } from '../converse/service'
Expand All @@ -11,7 +12,8 @@ import {
DeleteMsgSchema,
GetMsgSchema,
ListMsgSchema,
ListMsgSocketSchema
ListMsgSocketSchema,
TurnMsgSchema
} from './schema'
import { MessageService } from './service'

Expand All @@ -35,7 +37,7 @@ export class MessageApi {
return res.status(400).send(error.message)
}

const { conversationId, authorId, payload, collect } = req.body
const { conversationId, authorId, payload, collect, incomingId, timeout } = req.body
const conversation = await this.conversations.get(conversationId)

if (!conversation) {
Expand All @@ -44,7 +46,11 @@ export class MessageApi {
return res.sendStatus(403)
}

const collector = collect ? this.converse.collect(conversationId) : undefined
const messageId = uuidv4()
const collector = collect ? this.converse.collect(messageId, conversationId, +timeout) : undefined
if (incomingId) {
this.converse.setIncomingId(messageId, incomingId)
}

const message = await this.messages.create(
conversationId,
Expand All @@ -54,17 +60,45 @@ export class MessageApi {
? undefined
: {
client: { id: req.client!.id }
}
},
messageId
)

if (collect) {
res.send(await collector)
res.send({ message, responses: await collector })
} else {
res.send(message)
}
})
)

this.router.post(
'/messages/turn/:id',
this.auth.client.auth(async (req, res) => {
const { error } = TurnMsgSchema.validate(req.params)
if (error) {
return res.status(400).send(error.message)
}

const { id } = req.params

const message = await this.messages.get(id)
if (!message) {
return res.sendStatus(404)
}

const conversation = await this.conversations.get(message.conversationId)
if (!conversation) {
return res.sendStatus(404)
} else if (conversation.clientId !== req.client!.id) {
return res.sendStatus(403)
}

await this.converse.stopCollecting(message.id, message.conversationId)
res.sendStatus(200)
})
)

this.router.get(
'/messages/:id',
this.auth.client.auth(async (req, res) => {
Expand Down
8 changes: 7 additions & 1 deletion packages/server/src/messages/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ export const CreateMsgSchema = Joi.object({
collect: Joi.boolean().optional(),
conversationId: Joi.string().guid().required(),
authorId: Joi.string().guid().optional(),
payload: Joi.object().required()
payload: Joi.object().required(),
incomingId: Joi.string().guid().optional(),
timeout: Joi.number().min(0).optional()
})

export const TurnMsgSchema = Joi.object({
id: Joi.string().guid().required()
})

export const GetMsgSchema = Joi.object({
Expand Down
5 changes: 3 additions & 2 deletions packages/server/src/messages/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ export class MessageService extends Service {
conversationId: uuid,
authorId: uuid | undefined,
payload: any,
source?: ActionSource
source?: ActionSource,
forceId?: uuid
): Promise<Message> {
const message = {
id: uuidv4(),
id: forceId || uuidv4(),
conversationId,
authorId,
sentOn: new Date(),
Expand Down
9 changes: 8 additions & 1 deletion packages/server/src/stream/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { ClientService } from '../clients/service'
import { ConduitService } from '../conduits/service'
import { ConversationEvents } from '../conversations/events'
import { ConversationService } from '../conversations/service'
import { ConverseService } from '../converse/service'
import { HealthEvents } from '../health/events'
import { HealthService } from '../health/service'
import { Logger } from '../logger/types'
Expand All @@ -34,6 +35,7 @@ export class StreamService extends Service {
private users: UserService,
private conversations: ConversationService,
private messages: MessageService,
private converse: ConverseService,
private mapping: MappingService
) {
super()
Expand Down Expand Up @@ -73,7 +75,12 @@ export class StreamService extends Service {

await this.stream(
'message.new',
{ channel: await this.getChannel(conversation!.id), conversationId: conversation!.id, message },
{
channel: await this.getChannel(conversation!.id),
conversationId: conversation!.id,
collect: this.converse.isCollectingForMessage(message.id),
message
},
conversation!.clientId,
conversation!.userId,
source
Expand Down

0 comments on commit ccb2154

Please sign in to comment.