Skip to content

Commit

Permalink
fix(messaging-service): add lock (#11733)
Browse files Browse the repository at this point in the history
* fix(messaging-service): add lock

* commet
  • Loading branch information
samuelmasse committed Apr 12, 2022
1 parent f716c72 commit 7946c9d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
10 changes: 9 additions & 1 deletion packages/bp/src/core/messaging/messaging-service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Logger } from 'botpress/sdk'
import { ConfigProvider } from 'core/config'
import Database from 'core/database'
import { JobService } from 'core/distributed'
import { EventEngine, EventRepository } from 'core/events'
import { TYPES } from 'core/types'
import { inject, injectable, postConstruct, tagged } from 'inversify'
Expand All @@ -27,13 +28,20 @@ export class MessagingService {
@inject(TYPES.EventEngine) private eventEngine: EventEngine,
@inject(TYPES.EventRepository) private eventRepo: EventRepository,
@inject(TYPES.ConfigProvider) private configProvider: ConfigProvider,
@inject(TYPES.JobService) private jobService: JobService,
@inject(TYPES.Logger)
@tagged('name', 'Messaging')
private logger: Logger
) {
this.entries = new MessagingEntries(this.database)
this.interactor = new MessagingInteractor(this.logger)
this.lifetime = new MessagingLifetime(this.logger, this.configProvider, this.entries, this.interactor)
this.lifetime = new MessagingLifetime(
this.logger,
this.configProvider,
this.jobService,
this.entries,
this.interactor
)
this.collector = new MessagingCollector(this.logger, this.eventEngine, this.interactor, this.lifetime)
this.listener = new MessagingListener(
this.eventEngine,
Expand Down
19 changes: 18 additions & 1 deletion packages/bp/src/core/messaging/subservices/lifetime.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { MessagingClient, uuid } from '@botpress/messaging-client'
import { Logger } from 'botpress/sdk'
import { Logger, RedisLock } from 'botpress/sdk'
import chalk from 'chalk'
import { ConfigProvider } from 'core/config'
import { JobService } from 'core/distributed'
import ms from 'ms'
import { VError } from 'verror'
import { MessagingEntries, MessagingEntry } from './entries'
import { MessagingInteractor } from './interactor'

Expand All @@ -13,6 +16,7 @@ export class MessagingLifetime {
constructor(
private logger: Logger,
private configProvider: ConfigProvider,
private jobService: JobService,
private entries: MessagingEntries,
private interactor: MessagingInteractor
) {}
Expand All @@ -31,7 +35,10 @@ export class MessagingLifetime {

async loadMessagingForBot(botId: string, failsafe: boolean = true) {
await this.interactor.waitReady()

const lock = await this.lockMessaging(botId)
const { clientId, clientToken, config } = await this.loadMessagingEntry(botId)
await lock.unlock()

const botConfig = await this.configProvider.getBotConfig(botId)
const channels = { ...config, ...botConfig.messaging?.channels }
Expand Down Expand Up @@ -62,6 +69,15 @@ export class MessagingLifetime {

this.printWebhooks(botId, channels)
}
private async lockMessaging(botId: string) {
let lock: RedisLock | undefined
do {
lock = await this.jobService.acquireLock(`load_messaging_${botId}`, ms('5s'))
await Promise.delay(ms('50ms'))
} while (!lock)

return lock
}
private async loadMessagingEntry(botId: string): Promise<MessagingEntry> {
const entry = await this.entries.getByBotId(botId)
if (entry) {
Expand All @@ -71,6 +87,7 @@ export class MessagingLifetime {
if (reachable) {
return entry
} else {
// if the clientId was deleted on remote messaging for some reason, we create a new one
await this.entries.delete(entry.clientId)
return this.createMessagingEntry(botId)
}
Expand Down

0 comments on commit 7946c9d

Please sign in to comment.