Skip to content

Commit

Permalink
feat(channels): custom channels (#425)
Browse files Browse the repository at this point in the history
* feat(channels): custom channels

* fixes

* migration

* down migrate

* fix

* add tests

* remove client action source

* fix

* fix

* fix

* bump client package

* bump packages
  • Loading branch information
samuelmasse committed Mar 24, 2022
1 parent d7f4f69 commit a818301
Show file tree
Hide file tree
Showing 25 changed files with 310 additions and 62 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@botpress/messaging",
"version": "1.1.6",
"version": "1.1.7",
"description": "Botpress messaging repo",
"author": "Botpress, Inc.",
"license": "AGPL-3.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/base/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@botpress/messaging-base",
"version": "1.1.0",
"version": "1.1.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"source": "src/index.ts",
Expand Down
2 changes: 1 addition & 1 deletion packages/base/src/endpoint.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface Endpoint {
channel: { name: string; version: string }
channel: string | { name: string; version: string }
identity: string
sender: string
thread: string
Expand Down
4 changes: 2 additions & 2 deletions packages/client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@botpress/messaging-client",
"version": "1.1.6",
"version": "1.1.8",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"source": "src/index.ts",
Expand Down Expand Up @@ -41,7 +41,7 @@
"uuid": "^8.3.2"
},
"dependencies": {
"@botpress/messaging-base": "1.1.0",
"@botpress/messaging-base": "1.1.1",
"axios": "^0.21.4",
"cookie": "^0.4.2",
"joi": "^17.6.0"
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const Schemas = {
.object({
id: joi.string().uuid().required(),
conversationId: joi.string().uuid().required(),
authorId: joi.string().uuid().required(),
authorId: joi.string().uuid().optional(),
sentOn: joi.date().required(),
payload: joi.object().required()
})
Expand Down
4 changes: 2 additions & 2 deletions packages/inject/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@botpress/webchat-inject",
"version": "0.3.0",
"version": "0.3.1",
"license": "AGPL-3.0",
"scripts": {
"build": "yarn && yarn run -T parcel build src/index.html src/inject.js --public-url ./",
Expand All @@ -21,7 +21,7 @@
"@types/react-dom": "^17.0.11"
},
"dependencies": {
"@botpress/webchat": "0.3.0",
"@botpress/webchat": "0.3.1",
"react": "^17.0.2",
"react-dom": "^17.0.2"
}
Expand Down
2 changes: 1 addition & 1 deletion packages/server/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@botpress/messaging-server",
"version": "1.1.6",
"version": "1.1.7",
"main": "index.ts",
"license": "AGPL-3.0",
"scripts": {
Expand Down
3 changes: 0 additions & 3 deletions packages/server/src/base/source.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import { uuid } from '@botpress/messaging-base'
import { Endpoint } from '@botpress/messaging-channels'

/**
* Indicates whether an action is performed due to a request made by
* - a conduit (we receive a request from an external service such as Telegram)
* - a client (using the http api with a clientId and clientToken)
* - a socket (using a websocket connection)
*
* Knowing this allows messaging to avoid redundantly
* streaming back an event to the service that made the request.
*/
export interface ActionSource {
endpoint?: Endpoint
client?: { id: uuid }
socket?: { id: string }
}
16 changes: 7 additions & 9 deletions packages/server/src/base/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,13 @@ export class Streamer {
await this.dispatcher.publish(StreamerDispatches.Message, userId, { source: source?.socket?.id, payload })
}

if (source?.client?.id !== clientId) {
const webhooks = await this.webhooks.list(clientId)

for (const webhook of webhooks) {
void this.send(process.env.SPINNED_URL || webhook.url, payload, {
'x-bp-messaging-client-id': clientId,
'x-bp-messaging-webhook-token': webhook.token
})
}
const webhooks = await this.webhooks.list(clientId)

for (const webhook of webhooks) {
void this.send(process.env.SPINNED_URL || webhook.url, payload, {
'x-bp-messaging-client-id': clientId,
'x-bp-messaging-webhook-token': webhook.token
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/conversations/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class ConversationStream {
const convmaps = await this.mapping.convmap.listByConversationId(conversationId)
if (convmaps.length === 1) {
const tunnel = await this.mapping.tunnels.get(convmaps[0].tunnelId)
return this.channels.getById(tunnel!.channelId).meta.name
return tunnel!.customChannelName ? tunnel!.customChannelName : this.channels.getById(tunnel!.channelId!).meta.name
} else {
return 'messaging'
}
Expand Down
5 changes: 4 additions & 1 deletion packages/server/src/instances/messaging/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ export class InstanceMessagingService extends Service {
for (const { threadId, tunnelId } of convmaps) {
const endpoint = await this.mapping.getEndpoint(threadId)
const tunnel = await this.mapping.tunnels.get(tunnelId)
if (!tunnel?.channelId) {
continue
}

if (!source?.endpoint || !this.endpointEqual(source.endpoint, endpoint)) {
const conduit = await this.conduits.fetchByProviderAndChannel(provision.providerId, tunnel!.channelId)
const conduit = await this.conduits.fetchByProviderAndChannel(provision.providerId, tunnel.channelId)
if (!conduit) {
return
}
Expand Down
24 changes: 18 additions & 6 deletions packages/server/src/mapping/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ export class MappingApi {

async map(req: ClientApiRequest, res: Response) {
const endpoint = req.body as Endpoint
let conversationId

const channel = this.channels.getByNameAndVersion(endpoint.channel.name, endpoint.channel.version)
const { conversationId } = await this.mapping.getMapping(req.clientId, channel.meta.id, endpoint)
if (typeof endpoint.channel === 'string') {
conversationId = (await this.mapping.getCustomMapping(req.clientId, endpoint.channel, endpoint)).conversationId
} else {
const channel = this.channels.getByNameAndVersion(endpoint.channel.name, endpoint.channel.version)
conversationId = (await this.mapping.getMapping(req.clientId, channel.meta.id, endpoint)).conversationId
}

res.send({ conversationId })
}
Expand All @@ -37,14 +42,21 @@ export class MappingApi {
}

const convmaps = await this.mapping.convmap.listByConversationId(conversationId)
const endpoints = []
const endpoints: Endpoint[] = []

for (const convmap of convmaps) {
const endpoint = await this.mapping.getEndpoint(convmap.threadId)
const tunnel = await this.mapping.tunnels.get(convmap.tunnelId)
const channel = this.channels.getById(tunnel!.channelId)
const tunnel = (await this.mapping.tunnels.get(convmap.tunnelId))!
let channel

if (tunnel.customChannelName) {
channel = tunnel.customChannelName
} else {
const channelMeta = this.channels.getById(tunnel.channelId!).meta
channel = { name: channelMeta.name, version: channelMeta.version }
}

endpoints.push({ channel: { name: channel.meta.name, version: channel.meta.version }, ...endpoint })
endpoints.push({ channel, ...endpoint })
}

res.send(endpoints)
Expand Down
7 changes: 4 additions & 3 deletions packages/server/src/mapping/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import { ReqSchema } from '../base/schema'
export const makeMapRequestSchema = (channels: Channel[]) => {
return ReqSchema({
body: {
channel: Joi.alternatives(
channels.map((x) =>
channel: Joi.alternatives([
Joi.string(),
...channels.map((x) =>
Joi.object({
name: Joi.string().valid(x.meta.name).required(),
version: Joi.string().valid(x.meta.version).required()
})
)
).required(),
]).required(),
identity: Joi.string().required(),
sender: Joi.string().required(),
thread: Joi.string().required()
Expand Down
17 changes: 13 additions & 4 deletions packages/server/src/mapping/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,23 @@ export class MappingService extends Service {

async getMapping(clientId: uuid, channelId: uuid, endpoint: Endpoint): Promise<Mapping> {
const tunnel = await this.tunnels.map(clientId, channelId)
const identity = await this.identities.map(tunnel.id, endpoint.identity)
return this.getMappingFromTunnel(tunnel.id, endpoint)
}

async getCustomMapping(clientId: uuid, customChannelName: string, endpoint: Endpoint): Promise<Mapping> {
const tunnel = await this.tunnels.mapCustom(clientId, customChannelName)
return this.getMappingFromTunnel(tunnel.id, endpoint)
}

private async getMappingFromTunnel(tunnelId: uuid, endpoint: Endpoint) {
const identity = await this.identities.map(tunnelId, endpoint.identity)
const sender = await this.senders.map(identity.id, endpoint.sender)
const thread = await this.threads.map(sender.id, endpoint.thread)
const usermap = await this.usermap.map(tunnel.id, sender.id)
const convmap = await this.convmap.map(tunnel.id, thread.id, usermap.userId)
const usermap = await this.usermap.map(tunnelId, sender.id)
const convmap = await this.convmap.map(tunnelId, thread.id, usermap.userId)

return {
tunnelId: tunnel.id,
tunnelId,
identityId: identity.id,
senderId: sender.id,
threadId: thread.id,
Expand Down
52 changes: 48 additions & 4 deletions packages/server/src/mapping/tunnels/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export class TunnelService extends Service {
private table: TunnelTable
private cacheById!: ServerCache<uuid, Tunnel>
private cacheByClientAndChannel!: ServerCache2D<Tunnel>
private cacheByClientAndCustomChannel!: ServerCache2D<Tunnel>
private barrier!: Barrier2D<Tunnel>

constructor(private db: DatabaseService, private caching: CachingService, private barriers: BarrierService) {
Expand All @@ -26,6 +27,9 @@ export class TunnelService extends Service {
async setup() {
this.cacheById = await this.caching.newServerCache('cache_tunnel_by_id')
this.cacheByClientAndChannel = await this.caching.newServerCache2D('cache_tunnel_by_client_and_channel')
this.cacheByClientAndCustomChannel = await this.caching.newServerCache2D(
'cache_tunnel_by_client_and_custom_channel'
)
this.barrier = await this.barriers.newBarrier2D('barrier_tunnel')

await this.db.registerTable(this.table)
Expand Down Expand Up @@ -55,19 +59,42 @@ export class TunnelService extends Service {
}

return this.barrier.once(clientId, channelId, async () => {
return this.create(clientId, channelId)
return this.create(clientId, channelId, undefined)
})
}

private async create(clientId: uuid, channelId: uuid): Promise<Tunnel> {
async mapCustom(clientId: uuid, customChannelName: string): Promise<Tunnel> {
const tunnel = await this.getByClientAndCustomChannel(clientId, customChannelName)
if (tunnel) {
return tunnel
}

return this.barrier.once(clientId, customChannelName, async () => {
return this.create(clientId, undefined, customChannelName)
})
}

private async create(
clientId: uuid,
channelId: uuid | undefined,
customChannelName: string | undefined
): Promise<Tunnel> {
const tunnel = {
id: uuidv4(),
clientId,
channelId
channelId,
customChannelName
}

await this.query().insert(tunnel)
this.cacheByClientAndChannel.set(clientId, channelId, tunnel)

if (channelId) {
this.cacheByClientAndChannel.set(clientId, channelId, tunnel)
}
if (customChannelName) {
this.cacheByClientAndCustomChannel.set(clientId, customChannelName, tunnel)
}

this.cacheById.set(tunnel.id, tunnel)

return tunnel
Expand All @@ -90,6 +117,23 @@ export class TunnelService extends Service {
}
}

private async getByClientAndCustomChannel(clientId: uuid, customChannelName: string): Promise<Tunnel | undefined> {
const cached = this.cacheByClientAndCustomChannel.get(clientId, customChannelName)
if (cached) {
return cached
}

const rows = await this.query().where({ clientId, customChannelName })

if (rows?.length) {
const tunnel = rows[0] as Tunnel
this.cacheByClientAndCustomChannel.set(clientId, customChannelName, tunnel)
return tunnel
} else {
return undefined
}
}

private query() {
return this.db.knex(this.table.id)
}
Expand Down
4 changes: 3 additions & 1 deletion packages/server/src/mapping/tunnels/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ export class TunnelTable extends Table {
create(table: Knex.CreateTableBuilder) {
table.uuid('id').primary()
table.uuid('clientId').references('id').inTable(getTableId('msg_clients')).notNullable()
table.uuid('channelId').references('id').inTable(getTableId('msg_channels')).notNullable()
table.uuid('channelId').references('id').inTable(getTableId('msg_channels')).nullable()
table.string('customChannelName').nullable()
table.unique(['clientId', 'channelId'])
table.unique(['clientId', 'customChannelName'])
}
}
3 changes: 2 additions & 1 deletion packages/server/src/mapping/tunnels/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ import { uuid } from '@botpress/messaging-base'
export interface Tunnel {
id: uuid
clientId: uuid
channelId: uuid
channelId?: uuid
customChannelName?: string
}
8 changes: 1 addition & 7 deletions packages/server/src/messages/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,7 @@ export class MessageApi {
this.converse.setIncomingId(messageId, incomingId)
}

const source = authorId
? undefined
: {
client: { id: req.clientId }
}
const message = await this.messages.create(conversationId, authorId, payload, source, messageId)

const message = await this.messages.create(conversationId, authorId, payload, undefined, messageId)
res.status(201).send(message)
}

Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/messages/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class MessageStream {
const convmaps = await this.mapping.convmap.listByConversationId(conversationId)
if (convmaps.length === 1) {
const tunnel = await this.mapping.tunnels.get(convmaps[0].tunnelId)
return this.channels.getById(tunnel!.channelId).meta.name
return tunnel!.customChannelName ? tunnel!.customChannelName : this.channels.getById(tunnel!.channelId!).meta.name
} else {
return 'messaging'
}
Expand Down
35 changes: 35 additions & 0 deletions packages/server/src/migrations/1.1.7-custom-channels.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { getTableId, Migration } from '@botpress/messaging-engine'

export class CustomChannelsMigration extends Migration {
meta = {
name: CustomChannelsMigration.name,
description: 'Modifies the msg_tunnels table to support custom channels',
version: '1.1.7'
}

async valid() {
return true
}

async applied() {
return this.trx.schema.hasColumn(getTableId('msg_tunnels'), 'customChannelName')
}

async up() {
await this.trx.schema.alterTable(getTableId('msg_tunnels'), (table) => {
table.uuid('channelId').nullable().alter()
table.string('customChannelName').nullable()
table.unique(['clientId', 'customChannelName'])
})
}

async down() {
await this.trx(getTableId('msg_tunnels')).whereNull('channelId').del()

await this.trx.schema.alterTable(getTableId('msg_tunnels'), (table) => {
table.uuid('channelId').notNullable().alter()
table.dropUnique(['clientId', 'customChannelName'])
table.dropColumn('customChannelName')
})
}
}

0 comments on commit a818301

Please sign in to comment.