Skip to content

Commit

Permalink
fix(server): fix response to other node (#240)
Browse files Browse the repository at this point in the history
* fix converse

* unsubscribe

* publish

* small refactor

* fix socket

* remove todo

* fix

* remove changes to api.rest

* add benchmark script (#241)

* chore(engine): create engine package

* base service class in engine

* move table to engine

* move distributed and logger to engine

* move database service

* move crypto service

* move caching service

* move batching

* fix

* move meta service

* fix meta

* move migrations

* engine class

* fix build

* move kvs

* move global config

* fix tests

* dispatcher

* converse use dispatcher

* dispatcher for socket

* add more typings

* include scope in callback

* listen -> subscribe

* redis handle callback errors

* prevent promise rejection cache

* fix

* fix for local

* send -> publish

* data -> payload

* fix Dockerfile

* fix test

Co-authored-by: Laurent Leclerc-Poulin <laurentleclercpoulin@gmail.com>
  • Loading branch information
samuelmasse and laurentlp committed Nov 22, 2021
1 parent 2e89f39 commit a54b554
Show file tree
Hide file tree
Showing 22 changed files with 365 additions and 105 deletions.
17 changes: 17 additions & 0 deletions benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

# To run this :
# yarn global add loadtest

clientId=YOUR_CLIENT_ID
clientToken=YOUR_CLIENT_TOKEN
conversationId=YOUR_CONVERSATION_ID
authorId=YOUR_AUTHOR_ID

loadtest http://localhost:3100/api/messages \
-m POST \
-T application/json \
-c 64 \
-P "{\"conversationId\":\"$conversationId\",\"authorId\":\"$authorId\",\"payload\":{\"type\":\"text\",\"text\":\"Hello this is a text message!\"}}" \
-H "x-bp-messaging-client-id:${clientId}" \
-H "x-bp-messaging-client-token:${clientToken}"
10 changes: 8 additions & 2 deletions packages/engine/src/caching/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export class ServerCache<K, V> {

constructor(private id: string, private distributed: DistributedService, options: LRU.Options<K, V>) {
this.lru = new LRU(options)
void this.distributed.listen(this.id, this.process.bind(this))
void this.distributed.subscribe(this.id, this.process.bind(this))
}

async process(event: ServerCacheEvent<K, V>) {
Expand All @@ -24,7 +24,13 @@ export class ServerCache<K, V> {
}

invalidate(key: K) {
void this.distributed.send(this.id, { key })
void this.sendInvalidation(key)
}

async sendInvalidation(key: K) {
try {
await this.distributed.publish(this.id, { key })
} catch (e) {}
}

set(key: K, value: V, maxAge?: number, invalidate?: boolean): boolean {
Expand Down
10 changes: 8 additions & 2 deletions packages/engine/src/caching/cache2D.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export class ServerCache2D<V> {

constructor(private id: string, private distributed: DistributedService, options: LRU.Options<string, V>) {
this.lru = new LRU(options)
void this.distributed.listen(this.id, this.process.bind(this))
void this.distributed.subscribe(this.id, this.process.bind(this))
}

async process(event: ServerCacheEvent<string, V>) {
Expand All @@ -24,7 +24,13 @@ export class ServerCache2D<V> {
}

invalidate(keyX: string, keyY: string) {
void this.distributed.send(this.id, { key: this.getKey(keyX, keyY) })
void this.sendInvalidation(keyX, keyY)
}

async sendInvalidation(keyX: string, keyY: string) {
try {
await this.distributed.publish(this.id, { key: this.getKey(keyX, keyY) })
} catch (e) {}
}

set(keyX: string, keyY: string, value: V, maxAge?: number, invalidate?: boolean): boolean {
Expand Down
70 changes: 70 additions & 0 deletions packages/engine/src/dispatch/dispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { DistributedService } from '..'

const SCOPE_SEPERATOR = '::'

export class Dispatcher<T extends { [key: number]: any }> {
private name!: string
private distributed!: DistributedService

private listeners: { [eventId: number]: ((scope: string, arg: any) => Promise<void>)[] } = {}
private handleDispatchCallback!: (msg: any, channel: string) => Promise<void>

async setup(name: string, distributed: DistributedService) {
this.name = name
this.distributed = distributed
this.handleDispatchCallback = this.handleDispatch.bind(this)
}

async subscribe(scope: string) {
await this.distributed.subscribe(this.getChannel(scope), this.handleDispatchCallback)
}

async unsubscribe(scope: string) {
await this.distributed.unsubscribe(this.getChannel(scope))
}

async publish<K extends keyof T>(event: K, scope: string, arg: T[K]) {
await this.emit(event, scope, arg)
await this.distributed.publish(this.getChannel(scope), {
cmd: event,
data: arg
})
}

private getChannel(scope: string) {
return `${this.name}${SCOPE_SEPERATOR}${scope}`
}

private async handleDispatch<K extends keyof T>({ cmd, data }: { cmd: K; data: T[K] }, channel: string) {
const scopeStart = channel.lastIndexOf(SCOPE_SEPERATOR)
const scope = channel.substr(scopeStart + SCOPE_SEPERATOR.length)
await this.emit(cmd, scope, data)
}

public on<K extends keyof T>(
event: K,
listener: (scope: string, arg: T[K]) => Promise<void>,
pushBack: boolean = false
) {
const listeners = this.listeners[event as number]
if (!listeners) {
this.listeners[event as number] = [listener]
} else if (!pushBack) {
listeners.push(listener)
} else {
listeners.unshift(listener)
}
}

private async emit<K extends keyof T>(event: K, scope: string, arg: T[K]): Promise<boolean> {
const listeners = this.listeners[event as number]
if (listeners?.length) {
for (const listener of listeners) {
await listener(scope, arg)
}
return true
} else {
return false
}
}
}
16 changes: 16 additions & 0 deletions packages/engine/src/dispatch/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Dispatcher, DistributedService } from '..'
import { Service } from '../base/service'

export class DispatchService extends Service {
constructor(private distributed: DistributedService) {
super()
}

async setup() {}

async create<T extends Dispatcher<any>>(name: string, t: new () => T): Promise<T> {
const dispatcher = new t()
await dispatcher.setup(name, this.distributed)
return dispatcher
}
}
5 changes: 3 additions & 2 deletions packages/engine/src/distributed/base/subservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { Lock } from '../types'
export interface DistributedSubservice {
setup(): Promise<void>
destroy(): Promise<void>
listen(channel: string, callback: (message: any) => Promise<void>): Promise<void>
send(channel: string, message: any): Promise<void>
subscribe(channel: string, callback: (message: any, channel: string) => Promise<void>): Promise<void>
unsubscribe(channel: string): Promise<void>
publish(channel: string, message: any): Promise<void>
lock(ressource: string): Promise<Lock>
release(lock: Lock): Promise<void>
}
6 changes: 4 additions & 2 deletions packages/engine/src/distributed/local/subservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ export class LocalSubservice implements DistributedSubservice {

async destroy() {}

async listen(channel: string, callback: (message: any) => Promise<void>) {}
async subscribe(channel: string, callback: (message: any, channel: string) => Promise<void>) {}

async send(channel: string, message: any) {}
async unsubscribe(channel: string) {}

async publish(channel: string, message: any) {}

async lock(ressource: string): Promise<Lock> {
let attemptCount = 0
Expand Down
12 changes: 6 additions & 6 deletions packages/engine/src/distributed/redis/ping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,26 @@ export class PingPong {
constructor(private nodeId: number, private distributed: RedisSubservice, private logger: Logger) {}

async setup() {
await this.distributed.listen('ping', async (ping: PingEvent) => {
await this.distributed.subscribe('ping', async (ping: PingEvent) => {
this.acknowledge(ping.name)
void this.pong(ping.name)
await this.pong(ping.name)
})

await this.distributed.listen('pong', async (pong: PongEvent) => {
await this.distributed.subscribe('pong', async (pong: PongEvent) => {
if (pong.to === this.nodeId) {
this.acknowledge(pong.name)
}
})

await this.distributed.send('ping', { name: this.nodeId })
await this.distributed.publish('ping', { name: this.nodeId })
}

async ping() {
return this.distributed.send('ping', { name: this.nodeId })
return this.distributed.publish('ping', { name: this.nodeId })
}

async pong(to: number) {
return this.distributed.send('pong', { to, name: this.nodeId })
return this.distributed.publish('pong', { to, name: this.nodeId })
}

acknowledge(foreignNodeId: number) {
Expand Down
25 changes: 21 additions & 4 deletions packages/engine/src/distributed/redis/subservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class RedisSubservice implements DistributedSubservice {
private sub!: Redis
private redlock!: Redlock
private locks: { [ressource: string]: RedisLock } = {}
private callbacks: { [channel: string]: (message: any) => Promise<void> } = {}
private callbacks: { [channel: string]: (message: any, channel: string) => Promise<void> } = {}
private pings!: PingPong
private scope!: string

Expand All @@ -35,7 +35,7 @@ export class RedisSubservice implements DistributedSubservice {
const parsed = JSON.parse(message)
if (parsed.nodeId !== RedisSubservice.nodeId) {
delete parsed.nodeId
void callback(parsed)
void this.callCallback(callback, parsed, channel)
}
}
})
Expand All @@ -44,6 +44,18 @@ export class RedisSubservice implements DistributedSubservice {
await this.pings.setup()
}

private async callCallback(
callback: (message: any, channel: string) => Promise<void>,
message: any,
channel: string
) {
try {
await callback(message, channel)
} catch (e) {
this.logger.error(e, 'Error occured in callback', channel)
}
}

private setupClient(): Redis {
let connection = undefined
let options = {}
Expand Down Expand Up @@ -108,16 +120,21 @@ export class RedisSubservice implements DistributedSubservice {
}
}

async listen(channel: string, callback: (message: any) => Promise<void>) {
async subscribe(channel: string, callback: (message: any, channel: string) => Promise<void>) {
const scopedChannel = this.makeScopedChannel(channel)

await this.sub.subscribe(scopedChannel)
this.callbacks[scopedChannel] = callback
}

async send(channel: string, message: any) {
async unsubscribe(channel: string) {
const scopedChannel = this.makeScopedChannel(channel)
await this.sub.unsubscribe(scopedChannel)
delete this.callbacks[scopedChannel]
}

async publish(channel: string, message: any) {
const scopedChannel = this.makeScopedChannel(channel)
await this.pub.publish(scopedChannel, JSON.stringify({ nodeId: RedisSubservice.nodeId, ...message }))
}

Expand Down
12 changes: 8 additions & 4 deletions packages/engine/src/distributed/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ export class DistributedService extends Service {
await this.subservice.destroy()
}

async listen(channel: string, callback: (message: any) => Promise<void>) {
return this.subservice.listen(channel, callback)
async subscribe(channel: string, callback: (message: any, channel: string) => Promise<void>) {
return this.subservice.subscribe(channel, callback)
}

async send(channel: string, message: any) {
return this.subservice.send(channel, message)
async unsubscribe(channel: string) {
return this.subservice.unsubscribe(channel)
}

async publish(channel: string, message: any) {
return this.subservice.publish(channel, message)
}

async using(ressource: string, callback: () => Promise<void>) {
Expand Down
4 changes: 4 additions & 0 deletions packages/engine/src/engine.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { DispatchService } from '.'
import { BatchingService } from './batching/service'
import { CachingService } from './caching/service'
import { CryptoService } from './crypto/service'
Expand All @@ -15,6 +16,7 @@ export class Engine {
migration: MigrationService
crypto: CryptoService
distributed: DistributedService
dispatches: DispatchService
caching: CachingService
batching: BatchingService
kvs: KvsService
Expand All @@ -26,6 +28,7 @@ export class Engine {
this.migration = new MigrationService(this.database, this.meta)
this.crypto = new CryptoService()
this.distributed = new DistributedService()
this.dispatches = new DispatchService(this.distributed)
this.caching = new CachingService(this.distributed)
this.batching = new BatchingService()
this.kvs = new KvsService(this.database, this.caching)
Expand All @@ -38,6 +41,7 @@ export class Engine {
await this.migration.setup()
await this.crypto.setup()
await this.distributed.setup()
await this.dispatches.setup()
await this.caching.setup()
await this.batching.setup()
await this.kvs.setup()
Expand Down
2 changes: 2 additions & 0 deletions packages/engine/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export * from './caching/cache2D'
export * from './caching/service'
export * from './crypto/service'
export * from './database/service'
export * from './dispatch/dispatcher'
export * from './dispatch/service'
export * from './distributed/service'
export * from './logger/service'
export * from './logger/types'
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class App extends Engine {
this.userTokens = new UserTokenService(this.database, this.crypto, this.caching, this.batching, this.users)
this.conversations = new ConversationService(this.database, this.caching, this.batching, this.users)
this.messages = new MessageService(this.database, this.caching, this.batching, this.conversations)
this.converse = new ConverseService(this.caching, this.messages)
this.converse = new ConverseService(this.caching, this.dispatches, this.messages)
this.mapping = new MappingService(this.database, this.caching, this.batching, this.users, this.conversations)
this.status = new StatusService(this.database, this.distributed, this.caching, this.conduits)
this.instances = new InstanceService(
Expand Down Expand Up @@ -87,6 +87,7 @@ export class App extends Engine {
)
this.sockets = new SocketService(this.caching, this.users)
this.stream = new StreamService(
this.dispatches,
this.post,
this.sockets,
this.channels,
Expand Down
20 changes: 20 additions & 0 deletions packages/server/src/converse/dispatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Message, uuid } from '@botpress/messaging-base'
import { Dispatcher } from '@botpress/messaging-engine'

export enum ConverseDispatches {
Message,
Stop
}

export interface ConverseMessageDispatch {
message: Message
}

export interface ConverseStopDispatch {
conversationId: uuid
}

export class ConverseDispatcher extends Dispatcher<{
[ConverseDispatches.Message]: ConverseMessageDispatch
[ConverseDispatches.Stop]: ConverseStopDispatch
}> {}
Loading

0 comments on commit a54b554

Please sign in to comment.