diff --git a/pods/fulltext/src/manager.ts b/pods/fulltext/src/manager.ts index a584e1ff4ad..12dcd04bfa3 100644 --- a/pods/fulltext/src/manager.ts +++ b/pods/fulltext/src/manager.ts @@ -18,6 +18,7 @@ import { QueueTopic, QueueWorkspaceEvent, workspaceEvents, + getDeadletterTopic, type ConsumerControl, type ConsumerHandle, type ConsumerMessage, @@ -49,6 +50,7 @@ export class WorkspaceManager { fulltextConsumer?: ConsumerHandle txConsumer?: ConsumerHandle + txDeadLetterProducer?: PlatformQueueProducer | TxDomainEvent>> constructor ( readonly ctx: MeasureContext, @@ -150,25 +152,39 @@ export class WorkspaceManager { await this.processTransactions(msg, control) } ) + + this.txDeadLetterProducer = this.opt.queue.getProducer | TxDomainEvent>>( + this.ctx, + getDeadletterTopic(QueueTopic.Tx) + ) } private async processTransactions ( m: ConsumerMessage> | TxDomainEvent>>, control: ConsumerControl ): Promise { - const ws = m.workspace - - let token: string try { - token = generateToken(systemAccountUuid, ws, { service: 'fulltext' }) + const ws = m.workspace + + let token: string + try { + token = generateToken(systemAccountUuid, ws, { service: 'fulltext' }) + } catch (err: any) { + this.ctx.error('Error generating token', { err, systemAccountUuid, ws }) + throw err + } + + await this.withIndexer(this.ctx, ws, token, true, async (indexer) => { + await indexer.fulltext.processTransactions(this.ctx, [m.value], control) + }) } catch (err: any) { - this.ctx.error('Error generating token', { err, systemAccountUuid, ws }) - throw err + if (this.txDeadLetterProducer !== undefined) { + await this.txDeadLetterProducer.send(this.ctx, m?.workspace ?? 'N/A', [m.value]) + } else { + this.ctx.error('Could not send failed transaction to dead letter queue - no producer available', { m }) + } + this.ctx.error('Could not process transactions', { err, m }) } - - await this.withIndexer(this.ctx, ws, token, true, async (indexer) => { - await indexer.fulltext.processTransactions(this.ctx, [m.value], control) - }) } private async processWorkspaceEvent ( diff --git a/server/core/src/queue/index.ts b/server/core/src/queue/index.ts index 044d5f3cd8c..9121f8d22a6 100644 --- a/server/core/src/queue/index.ts +++ b/server/core/src/queue/index.ts @@ -2,3 +2,4 @@ export * from './types' export * from './workspace' export * from './dummyQueue' export * from './users' +export * from './utils' diff --git a/server/core/src/queue/utils.ts b/server/core/src/queue/utils.ts new file mode 100644 index 00000000000..647d618ffe3 --- /dev/null +++ b/server/core/src/queue/utils.ts @@ -0,0 +1,5 @@ +import { type QueueTopic } from './types' + +export function getDeadletterTopic (topic: QueueTopic): string { + return `${topic}-d` +}