Skip to content

Commit

Permalink
fix(channels): read the conversation.started event (#282)
Browse files Browse the repository at this point in the history
* chore(repo): stream layer

* streamer

* post service -> streamer

* fix

* fix(channels): readd the conversation.started event

* fix
  • Loading branch information
samuelmasse committed Dec 10, 2021
1 parent 49419ce commit 145d4f4
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 34 deletions.
28 changes: 24 additions & 4 deletions packages/channels/src/base/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Endpoint } from './endpoint'
import { Kvs } from './kvs'
import { Logger } from './logger'
import { ChannelMeta } from './meta'
import { ChannelService } from './service'
import { ChannelProactiveEvent, ChannelReceiveEvent, ChannelService } from './service'
import { ChannelStream } from './stream'

export interface Channel {
Expand All @@ -23,7 +23,10 @@ export interface Channel {
send(scope: string, endpoint: any, content: any): Promise<void>
stop(scope: string): Promise<void>
has(scope: string): boolean
on(event: 'message', callback: (e: MessageEvent) => Promise<void>): void
on<K extends keyof ChannelEvents>(
event: K,
listener: ((arg: ChannelEvents[K]) => Promise<void>) | ((arg: ChannelEvents[K]) => void)
): void
autoStart(callback: (scope: string) => Promise<any>): void
makeUrl(callback: (scope: string) => Promise<string>): void
}
Expand Down Expand Up @@ -85,8 +88,15 @@ export abstract class ChannelTemplate<
return this.service.get(scope) !== undefined
}

on(event: 'message', callback: (e: MessageEvent) => Promise<void>): void {
this.service.on('receive', callback)
public on<K extends keyof ChannelEvents>(
event: K,
listener: ((arg: ChannelEvents[K]) => Promise<void>) | ((arg: ChannelEvents[K]) => void)
) {
if (event === 'message') {
this.service.on('receive', listener as (arg: ChannelReceiveEvent) => void)
} else if (event === 'proactive') {
this.service.on('proactive', listener as (arg: ChannelProactiveEvent) => void)
}
}

autoStart(callback: (scope: string) => Promise<TConfig>) {
Expand All @@ -98,8 +108,18 @@ export abstract class ChannelTemplate<
}
}

export interface ChannelEvents {
message: MessageEvent
proactive: ProactiveEvent
}

export interface MessageEvent {
scope: string
endpoint: Endpoint
content: any
}

export interface ProactiveEvent {
scope: string
endpoint: Endpoint
}
10 changes: 10 additions & 0 deletions packages/channels/src/base/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export abstract class ChannelService<
start: ChannelStartEvent
initialize: ChannelInitializeEvent
send: ChannelSendEvent
proactive: ChannelProactiveEvent
receive: ChannelReceiveEvent
stop: ChannelStopEvent
}> {
Expand Down Expand Up @@ -61,6 +62,10 @@ export abstract class ChannelService<
await this.emit('send', { scope, endpoint, content })
}

async proactive(scope: string, endpoint: Endpoint) {
await this.emit('proactive', { scope, endpoint })
}

async receive(scope: string, endpoint: Endpoint, content: any) {
await this.emit('receive', { scope, endpoint, content })
}
Expand Down Expand Up @@ -129,6 +134,11 @@ export interface ChannelSendEvent {
content: any
}

export interface ChannelProactiveEvent {
scope: string
endpoint: Endpoint
}

export interface ChannelReceiveEvent {
scope: string
endpoint: Endpoint
Expand Down
59 changes: 49 additions & 10 deletions packages/channels/src/teams/api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TurnContext } from 'botbuilder'
import { ActivityTypes, TurnContext } from 'botbuilder'
import { Response } from 'express'
import { ChannelApi, ChannelApiManager, ChannelApiRequest } from '../base/api'
import { ChannelStartEvent } from '../base/service'
Expand All @@ -20,18 +20,57 @@ export class TeamsApi extends ChannelApi<TeamsService> {

await adapter.processActivity(req, res, async (turnContext) => {
try {
const activity = turnContext.activity
const convoRef = TurnContext.getConversationReference(activity)

await this.service.setRef(req.scope, convoRef.conversation!.id, convoRef)
await this.service.receive(
req.scope,
{ identity: '*', sender: activity.from.id, thread: convoRef.conversation!.id },
{ type: 'text', text: activity.value?.text || activity.text }
)
if (this.botNewlyAddedToConversation(turnContext)) {
await this.sendProactiveMessage(req.scope, turnContext)
} else {
await this.receive(req.scope, turnContext)
}
} catch (e) {
this.service.logger?.error(e, 'Error occurred processing teams activity')
}
})
}

private async receive(scope: string, turnContext: TurnContext) {
const { activity } = turnContext
const convoRef = TurnContext.getConversationReference(activity)

await this.service.setRef(scope, convoRef.conversation!.id, convoRef)
await this.service.receive(
scope,
{ identity: '*', sender: activity.from.id, thread: convoRef.conversation!.id },
{ type: 'text', text: activity.value?.text || activity.text }
)
}

private botNewlyAddedToConversation(turnContext: TurnContext): boolean {
const { activity } = turnContext

// https://docs.microsoft.com/en-us/previous-versions/azure/bot-service/dotnet/bot-builder-dotnet-activities?view=azure-bot-service-3.0#conversationupdate
return (
activity.type === ActivityTypes.ConversationUpdate &&
(activity.membersAdded || []).some((member) => member.id === activity.recipient.id)
)
}

private async sendProactiveMessage(scope: string, turnContext: TurnContext): Promise<void> {
const { activity } = turnContext
const convoRef = TurnContext.getConversationReference(activity)

const { config } = this.service.get(scope)
await this.service.setRef(scope, convoRef.conversation!.id, convoRef)

// Locale format: {lang}-{subtag1}-{subtag2}-... https://en.wikipedia.org/wiki/IETF_language_tag
// TODO: Use Intl.Locale().language once its types are part of TS. See: https://github.com/microsoft/TypeScript/issues/37326
const lang = activity.locale?.split('-')[0]
const proactiveMessages = config.proactiveMessages || {}
const message = lang && proactiveMessages[lang]
const endpoint = { identity: '*', sender: activity.from.id, thread: convoRef.conversation!.id }

if (message) {
await this.service.send(scope, endpoint, message)
}

await this.service.proactive(scope, endpoint)
}
}
2 changes: 1 addition & 1 deletion packages/server/src/base/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Endpoint } from '@botpress/messaging-channels'
* streaming back an event to the service that made the request.
*/
export interface ActionSource {
conduit?: { id: uuid; endpoint: Endpoint }
endpoint?: Endpoint
client?: { id: uuid }
socket?: { id: string }
}
42 changes: 30 additions & 12 deletions packages/server/src/channels/api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Channel, Endpoint } from '@botpress/messaging-channels'
import { Router } from 'express'
import { App } from '../app'
import { Mapping } from '../mapping/types'

export class ChannelApi {
constructor(private router: Router, private app: App) {}
Expand All @@ -18,29 +20,45 @@ export class ChannelApi {
})

channel.on('message', async ({ scope, endpoint, content }) => {
const provider = await this.app.providers.getByName(scope)
const conduit = await this.app.conduits.getByProviderAndChannel(provider!.id, channel.meta.id)

if (!content.type) {
return
}

const clientId = provider!.sandbox
? await this.app.instances.sandbox.getClientId(conduit!.id, endpoint, content)
: (await this.app.clients.getByProviderId(provider!.id))!.id

if (!clientId) {
const mapping = await this.map(channel, scope, endpoint, content)
if (!mapping) {
return
}

const { userId, conversationId } = await this.app.mapping.getMapping(clientId, channel.meta.id, endpoint)

await this.app.messages.create(conversationId, userId, content, {
conduit: { id: conduit!.id, endpoint }
await this.app.messages.create(mapping.conversationId, mapping.userId, content, {
endpoint
})
})

channel.on('proactive', async ({ scope, endpoint }) => {
const mapping = await this.map(channel, scope, endpoint, {})
if (!mapping) {
return
}

await this.app.conversations.start(mapping.conversationId)
})
}

this.router.use('/webhooks', webhookRouter)
}

async map(channel: Channel, scope: string, endpoint: Endpoint, content: any): Promise<Mapping | undefined> {
const provider = await this.app.providers.getByName(scope)
const conduit = await this.app.conduits.getByProviderAndChannel(provider!.id, channel.meta.id)

const clientId = provider!.sandbox
? await this.app.instances.sandbox.getClientId(conduit!.id, endpoint, content)
: (await this.app.clients.getByProviderId(provider!.id))!.id

if (!clientId) {
return undefined
}

return this.app.mapping.getMapping(clientId, channel.meta.id, endpoint)
}
}
10 changes: 8 additions & 2 deletions packages/server/src/conversations/events.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import { Conversation, Emitter } from '@botpress/messaging-base'
import { Conversation, Emitter, uuid } from '@botpress/messaging-base'

export enum ConversationEvents {
Created
Created,
Started
}

export interface ConversationCreatedEvent {
conversation: Conversation
}

export interface ConversationStartedEvent {
conversationId: uuid
}

export class ConversationEmitter extends Emitter<{
[ConversationEvents.Created]: ConversationCreatedEvent
[ConversationEvents.Started]: ConversationStartedEvent
}> {}

export type ConversationWatcher = Omit<ConversationEmitter, 'emit'>
4 changes: 4 additions & 0 deletions packages/server/src/conversations/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ export class ConversationService extends Service {
return conversation
}

public async start(id: uuid): Promise<void> {
await this.emitter.emit(ConversationEvents.Started, { conversationId: id })
}

public async delete(id: uuid): Promise<number> {
await this.batcher.flush()

Expand Down
32 changes: 30 additions & 2 deletions packages/server/src/conversations/stream.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import { uuid } from '@botpress/messaging-base'
import { Streamer } from '../base/streamer'
import { ConversationCreatedEvent, ConversationEvents } from './events'
import { ChannelService } from '../channels/service'
import { MappingService } from '../mapping/service'
import { ConversationCreatedEvent, ConversationEvents, ConversationStartedEvent } from './events'
import { ConversationService } from './service'

export class ConversationStream {
constructor(private streamer: Streamer, private conversations: ConversationService) {}
constructor(
private streamer: Streamer,
private channels: ChannelService,
private conversations: ConversationService,
private mapping: MappingService
) {}

async setup() {
this.conversations.events.on(ConversationEvents.Created, this.handleConversationCreated.bind(this))
this.conversations.events.on(ConversationEvents.Started, this.handleConversationStarted.bind(this))
}

private async handleConversationCreated({ conversation }: ConversationCreatedEvent) {
Expand All @@ -17,4 +26,23 @@ export class ConversationStream {
conversation.userId
)
}

private async handleConversationStarted({ conversationId }: ConversationStartedEvent) {
const conversation = await this.conversations.get(conversationId)
await this.streamer.stream(
'conversation.started',
{ conversationId, userId: conversation!.userId, channel: await this.getChannel(conversationId) },
conversation!.clientId
)
}

private async getChannel(conversationId: uuid) {
const convmaps = await this.mapping.convmap.listByConversationId(conversationId)
if (convmaps.length === 1) {
const tunnel = await this.mapping.tunnels.get(convmaps[0].tunnelId)
return this.channels.getById(tunnel!.channelId).meta.name
} else {
return 'messaging'
}
}
}
4 changes: 2 additions & 2 deletions packages/server/src/instances/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ export class InstanceService extends Service {

// small optimization. If the message comes from a channel, and we are only linked to one channel,
// then we already know that we don't need to spread the message to other connected channels
if (convmaps.length === 1 && source?.conduit) {
if (convmaps.length === 1 && source?.endpoint) {
return
}

for (const { threadId, tunnelId } of convmaps) {
const endpoint = await this.mappingService.getEndpoint(threadId)
const tunnel = await this.mappingService.tunnels.get(tunnelId)

if (!source?.conduit?.endpoint || !this.endpointEqual(source.conduit.endpoint, endpoint)) {
if (!source?.endpoint || !this.endpointEqual(source.endpoint, endpoint)) {
const conduit = await this.conduitService.getByProviderAndChannel(client!.providerId, tunnel!.channelId)
if (!conduit) {
return
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class Stream {
this.streamer = new Streamer(app.dispatches, app.sockets, app.webhooks)
this.health = new HealthStream(this.streamer, app.channels, app.clients, app.conduits, app.health)
this.users = new UserStream(this.streamer, app.users)
this.conversations = new ConversationStream(this.streamer, app.conversations)
this.conversations = new ConversationStream(this.streamer, app.channels, app.conversations, app.mapping)
this.messages = new MessageStream(
this.streamer,
app.channels,
Expand Down

0 comments on commit 145d4f4

Please sign in to comment.