From 80f4e645a8a482a267a627db3a03fc685be7092d Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Sun, 10 Dec 2023 18:48:54 +0200 Subject: [PATCH] fix(authorization): jobs not being processed correctly (#841) --- .../authorization/src/controllers/queue.controller.ts | 9 ++++++++- modules/authorization/src/jobs/constructRelationIndex.ts | 3 ++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/modules/authorization/src/controllers/queue.controller.ts b/modules/authorization/src/controllers/queue.controller.ts index df6a72aa7..75f71c6bb 100644 --- a/modules/authorization/src/controllers/queue.controller.ts +++ b/modules/authorization/src/controllers/queue.controller.ts @@ -33,8 +33,15 @@ export class QueueController { const processorFile = path.normalize( path.join(__dirname, '../jobs', 'constructRelationIndex.js'), ); - new Worker('authorization-index-queue', processorFile, { + const worker = new Worker('authorization-index-queue', processorFile, { connection: this.redisConnection, + // autorun: true, + }); + worker.on('completed', (job: any) => { + ConduitGrpcSdk.Logger.info(`Job ${job.id} completed`); + }); + worker.on('error', (error: any) => { + ConduitGrpcSdk.Logger.info(`Job error:`, error); }); } diff --git a/modules/authorization/src/jobs/constructRelationIndex.ts b/modules/authorization/src/jobs/constructRelationIndex.ts index 3d2cf4079..213852fff 100644 --- a/modules/authorization/src/jobs/constructRelationIndex.ts +++ b/modules/authorization/src/jobs/constructRelationIndex.ts @@ -10,8 +10,9 @@ type ConstructRelationIndexWorkerData = { module.exports = async (job: SandboxedJob) => { const { relations } = job.data; if (!process.env.CONDUIT_SERVER) throw new Error('No serverUrl provided!'); - const grpcSdk = new ConduitGrpcSdk(process.env.CONDUIT_SERVER, 'authorization'); + const grpcSdk = new ConduitGrpcSdk(process.env.CONDUIT_SERVER, 'authorization', false); await grpcSdk.initialize(); + await grpcSdk.initializeEventBus(); await grpcSdk.waitForExistence('database'); ObjectIndex.getInstance(grpcSdk.database!); ActorIndex.getInstance(grpcSdk.database!);