Skip to content

Commit

Permalink
fix(mapping): fix insert race conditions (#255)
Browse files Browse the repository at this point in the history
* add locks to map functions

* usermap and convmap map functions

* fix convmap and usermap

* cleanup

* tests

* test

* more tests

* barrier service

* service

* use barrier for convmap

* usermap use barrier

* tunnel use barrier

* more tests

* identities use barrier

* sender use barrier

* thread use barrier
  • Loading branch information
samuelmasse committed Nov 25, 2021
1 parent 03e1bd4 commit 7e070eb
Show file tree
Hide file tree
Showing 14 changed files with 463 additions and 75 deletions.
20 changes: 20 additions & 0 deletions packages/engine/src/barrier/barrier.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { ServerCache2D } from '../caching/cache2D'

export class Barrier2D<T> {
constructor(private id: string, private locks: ServerCache2D<Promise<T>>) {}

async once(keyX: string, keyY: string, callback: () => Promise<T>): Promise<T> {
let promise = this.locks.get(keyX, keyY)

if (!promise) {
promise = new Promise(async (resolve) => {
resolve(await callback())
this.locks.del(keyX, keyY)
})

this.locks.set(keyX, keyY, promise)
}

return promise
}
}
16 changes: 16 additions & 0 deletions packages/engine/src/barrier/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Service } from '../base/service'
import { CachingService } from '../caching/service'
import { Barrier2D } from './barrier'

export class BarrierService extends Service {
constructor(private caching: CachingService) {
super()
}

async setup() {}

async newBarrier2D<T>(id: string): Promise<Barrier2D<T>> {
const locks = await this.caching.newServerCache2D<Promise<T>>(`cache_locks_${id}`)
return new Barrier2D(id, locks)
}
}
4 changes: 4 additions & 0 deletions packages/engine/src/engine.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DispatchService } from '.'
import { BarrierService } from './barrier/service'
import { BatchingService } from './batching/service'
import { CachingService } from './caching/service'
import { CryptoService } from './crypto/service'
Expand All @@ -19,6 +20,7 @@ export class Engine {
dispatches: DispatchService
caching: CachingService
batching: BatchingService
barriers: BarrierService
kvs: KvsService

constructor() {
Expand All @@ -31,6 +33,7 @@ export class Engine {
this.dispatches = new DispatchService(this.distributed)
this.caching = new CachingService(this.distributed)
this.batching = new BatchingService()
this.barriers = new BarrierService(this.caching)
this.kvs = new KvsService(this.database, this.caching)
}

Expand All @@ -44,6 +47,7 @@ export class Engine {
await this.dispatches.setup()
await this.caching.setup()
await this.batching.setup()
await this.barriers.setup()
await this.kvs.setup()
}
}
2 changes: 2 additions & 0 deletions packages/engine/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './barrier/barrier'
export * from './barrier/service'
export * from './base/errors'
export * from './base/service'
export * from './base/table'
Expand Down
9 changes: 8 additions & 1 deletion packages/server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,14 @@ export class App extends Engine {
this.conversations = new ConversationService(this.database, this.caching, this.batching, this.users)
this.messages = new MessageService(this.database, this.caching, this.batching, this.conversations)
this.converse = new ConverseService(this.caching, this.dispatches, this.messages)
this.mapping = new MappingService(this.database, this.caching, this.batching, this.users, this.conversations)
this.mapping = new MappingService(
this.database,
this.caching,
this.batching,
this.barriers,
this.users,
this.conversations
)
this.status = new StatusService(this.database, this.distributed, this.caching, this.conduits)
this.instances = new InstanceService(
this.logger,
Expand Down
20 changes: 20 additions & 0 deletions packages/server/src/mapping/convmap/service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { uuid } from '@botpress/messaging-base'
import {
Barrier2D,
BarrierService,
Batcher,
BatchingService,
CachingService,
Expand All @@ -10,6 +12,7 @@ import {
} from '@botpress/messaging-engine'
import { ConversationService } from '../../conversations/service'
import { ThreadService } from '../threads/service'
import { TunnelService } from '../tunnels/service'
import { ConvmapTable } from './table'
import { Convmap } from './types'

Expand All @@ -19,12 +22,15 @@ export class ConvmapService extends Service {
private table: ConvmapTable
private cacheByThreadId!: ServerCache2D<Convmap>
private cacheByConversationId!: ServerCache<uuid, Convmap[]>
private barrier!: Barrier2D<Convmap>

constructor(
private db: DatabaseService,
private caching: CachingService,
private batching: BatchingService,
private barriers: BarrierService,
private conversations: ConversationService,
private tunnels: TunnelService,
private threads: ThreadService
) {
super()
Expand All @@ -35,6 +41,7 @@ export class ConvmapService extends Service {
async setup() {
this.cacheByThreadId = await this.caching.newServerCache2D('cache_convmap_by_thread_id')
this.cacheByConversationId = await this.caching.newServerCache('cache_convmap_by_conversation_id')
this.barrier = await this.barriers.newBarrier2D('barrier_convmap')

this.batcher = await this.batching.newBatcher(
'batcher_convmap',
Expand Down Expand Up @@ -97,6 +104,19 @@ export class ConvmapService extends Service {
return convmaps
}

async map(tunnelId: uuid, threadId: uuid, userId: uuid): Promise<Convmap> {
const convmap = await this.getByThreadId(tunnelId, threadId)
if (convmap) {
return convmap
}

return this.barrier.once(tunnelId, threadId, async () => {
const tunnel = await this.tunnels.get(tunnelId)
const conversation = await this.conversations.create(tunnel!.clientId, userId)
return this.create(tunnelId, conversation.id, threadId)
})
}

private query() {
return this.db.knex(this.table.id)
}
Expand Down
49 changes: 37 additions & 12 deletions packages/server/src/mapping/identities/service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { uuid } from '@botpress/messaging-base'
import { CachingService, DatabaseService, ServerCache, ServerCache2D, Service } from '@botpress/messaging-engine'
import {
Barrier2D,
BarrierService,
CachingService,
DatabaseService,
ServerCache,
ServerCache2D,
Service
} from '@botpress/messaging-engine'
import { v4 as uuidv4 } from 'uuid'
import { IdentityTable } from './table'
import { Identity } from './types'
Expand All @@ -8,15 +16,17 @@ export class IdentityService extends Service {
private table: IdentityTable
private cacheById!: ServerCache<uuid, Identity>
private cacheByName!: ServerCache2D<Identity>
private barrier!: Barrier2D<Identity>

constructor(private db: DatabaseService, private caching: CachingService) {
constructor(private db: DatabaseService, private caching: CachingService, private barriers: BarrierService) {
super()
this.table = new IdentityTable()
}

async setup() {
this.cacheById = await this.caching.newServerCache('cache_identity_by_id')
this.cacheByName = await this.caching.newServerCache2D('cache_identity_by_name')
this.barrier = await this.barriers.newBarrier2D('barrier_identity')

await this.db.registerTable(this.table)
}
Expand All @@ -39,6 +49,17 @@ export class IdentityService extends Service {
}

async map(tunnelId: uuid, name: string): Promise<Identity> {
const identity = await this.getByName(tunnelId, name)
if (identity) {
return identity
}

return this.barrier.once(tunnelId, name, async () => {
return this.create(tunnelId, name)
})
}

private async getByName(tunnelId: uuid, name: string): Promise<Identity | undefined> {
const cached = this.cacheByName.get(tunnelId, name)
if (cached) {
return cached
Expand All @@ -51,18 +72,22 @@ export class IdentityService extends Service {
this.cacheByName.set(tunnelId, name, identity)
return identity
} else {
const identity = {
id: uuidv4(),
tunnelId,
name
}

await this.query().insert(identity)
this.cacheByName.set(tunnelId, name, identity)
this.cacheById.set(identity.id, identity)
return undefined
}
}

return identity
private async create(tunnelId: uuid, name: string): Promise<Identity> {
const identity = {
id: uuidv4(),
tunnelId,
name
}

await this.query().insert(identity)
this.cacheByName.set(tunnelId, name, identity)
this.cacheById.set(identity.id, identity)

return identity
}

private query() {
Expand Down
49 changes: 37 additions & 12 deletions packages/server/src/mapping/senders/service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { uuid } from '@botpress/messaging-base'
import {
Barrier2D,
BarrierService,
Batcher,
BatchingService,
CachingService,
Expand All @@ -13,19 +15,27 @@ import { SenderTable } from './table'
import { Sender } from './types'

export class SenderService extends Service {
public batcher!: Batcher<Sender>

private table: SenderTable
private cacheById!: ServerCache<uuid, Sender>
private cacheByName!: ServerCache2D<Sender>
public batcher!: Batcher<Sender>
private barrier!: Barrier2D<Sender>

constructor(private db: DatabaseService, private caching: CachingService, private batching: BatchingService) {
constructor(
private db: DatabaseService,
private caching: CachingService,
private batching: BatchingService,
private barriers: BarrierService
) {
super()
this.table = new SenderTable()
}

async setup() {
this.cacheById = await this.caching.newServerCache('cache_sender_by_id')
this.cacheByName = await this.caching.newServerCache2D('cache_sender_by_name')
this.barrier = await this.barriers.newBarrier2D('barrier_sender')

this.batcher = await this.batching.newBatcher('batcher_sender', [], this.handleBatchFlush.bind(this))

Expand Down Expand Up @@ -55,6 +65,17 @@ export class SenderService extends Service {
}

async map(identityId: uuid, name: string): Promise<Sender> {
const sender = await this.getByName(identityId, name)
if (sender) {
return sender
}

return this.barrier.once(identityId, name, async () => {
return this.create(identityId, name)
})
}

private async getByName(identityId: uuid, name: string): Promise<Sender | undefined> {
const cached = this.cacheByName.get(identityId, name)
if (cached) {
return cached
Expand All @@ -68,18 +89,22 @@ export class SenderService extends Service {
this.cacheByName.set(identityId, name, sender)
return sender
} else {
const sender = {
id: uuidv4(),
identityId,
name
}

await this.batcher.push(sender)
this.cacheByName.set(identityId, name, sender)
this.cacheById.set(sender.id, sender)
return undefined
}
}

return sender
private async create(identityId: uuid, name: string): Promise<Sender> {
const sender = {
id: uuidv4(),
identityId,
name
}

await this.batcher.push(sender)
this.cacheByName.set(identityId, name, sender)
this.cacheById.set(sender.id, sender)

return sender
}

private query() {
Expand Down

0 comments on commit 7e070eb

Please sign in to comment.