Skip to content

Commit

Permalink
feat(Worker): Allows subset of workers to reduce processing (LLC-9)
Browse files Browse the repository at this point in the history
  • Loading branch information
h-kanazawa committed Mar 20, 2020
1 parent 8d0e78b commit 6a8e084
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 13 deletions.
11 changes: 11 additions & 0 deletions .env.example
Expand Up @@ -181,6 +181,17 @@ QUEUE_NAMESPACE=DEV
#PUBSUB_GOOGLE_CLOUD_PROJECT_ID=
#PUBSUB_GOOGLE_CLOUD_SUBSCRIPTION_NAME=ll

# Allowed worker queue list
# Available queue names
# STATEMENT_QUERYBUILDERCACHE_QUEUE
# STATEMENT_PERSON_QUEUE
# STATEMENT_FORWARDING_QUEUE
# Separate them with comma.
#
# Not defining this variable would run all queues
# Having it empty string would run no queues
# ALLOWED_WORKER_QUEUES=STATEMENT_FORWARDING_QUEUE,STATEMENT_QUERYBUILDERCACHE_QUEUE

# Azure service bus endpoint, required if QUEUE_PROVIDER=SERVICE_BUS
# SERVICE_BUS_ENDPOINT='Endpoint=sb://[namespace].servicebus.windows.net/;SharedAccessKeyName=[keyName];SharedAccessKey=[key]'

Expand Down
51 changes: 51 additions & 0 deletions worker/src/handlers/statement/allowedWorkerQueues.js
@@ -0,0 +1,51 @@
import logger from 'lib/logger';
import {
STATEMENT_QUERYBUILDERCACHE_QUEUE,
STATEMENT_EXTRACT_PERSONAS_QUEUE,
STATEMENT_FORWARDING_QUEUE,
} from 'lib/constants/statements';

/**
* allowable Worker Queues
*
* @type {string[]}
*/
const allowableWorkerQueues = [
STATEMENT_QUERYBUILDERCACHE_QUEUE,
STATEMENT_EXTRACT_PERSONAS_QUEUE,
STATEMENT_FORWARDING_QUEUE,
];

/**
* @param {string | undefined} allowedWorkerQueuesString
* @return {string[]}
* @throws {Error}
*/
const getAllowedWorkerQueues = (allowedWorkerQueuesString) => {
if (allowedWorkerQueuesString === undefined) {
return allowableWorkerQueues;
}

if (allowedWorkerQueuesString === '') {
return [];
}

return allowedWorkerQueuesString.split(',').reduce(
(acc, queueString) => {
if (allowableWorkerQueues.includes(queueString)) {
return acc.concat([queueString]);
}
logger.warn(`"${queueString}" is ignored as an allowed worker queue. Allowable worker queues are ${allowableWorkerQueues.map(q => `"${q}"`).join(', ')}`);
return acc;
},
[],
);
};

/**
* @type {string[]}
*/
const allowedWorkerQueues = getAllowedWorkerQueues(process.env.ALLOWED_WORKER_QUEUES);

/* eslint-disable import/prefer-default-export*/
export const isAllowedWorkerQueue = queueName => allowedWorkerQueues.includes(queueName);
32 changes: 20 additions & 12 deletions worker/src/handlers/statement/index.js
Expand Up @@ -22,6 +22,8 @@ import {
STATEMENT_FORWARDING_DEADLETTER_QUEUE
} from 'lib/constants/statements';

import { isAllowedWorkerQueue } from './allowedWorkerQueues';

const defaultHandleResponse = queueName => (err) => {
if (err) logger.error(`ERROR SUBSCRIBING TO QUEUE ${queueName}`, err);
return err;
Expand All @@ -46,20 +48,26 @@ export default (
onProcessed: statementHandlerProcessed
}, handleResponse(STATEMENT_QUEUE));

Queue.subscribe({
queueName: STATEMENT_EXTRACT_PERSONAS_QUEUE,
handler: extractPersonasHandler(personaService)
}, handleResponse(STATEMENT_EXTRACT_PERSONAS_QUEUE));
if (isAllowedWorkerQueue(STATEMENT_EXTRACT_PERSONAS_QUEUE)) {
Queue.subscribe({
queueName: STATEMENT_EXTRACT_PERSONAS_QUEUE,
handler: extractPersonasHandler(personaService)
}, handleResponse(STATEMENT_EXTRACT_PERSONAS_QUEUE));
}

Queue.subscribe({
queueName: STATEMENT_QUERYBUILDERCACHE_QUEUE,
handler: queryBuilderCacheHandler
}, handleResponse(STATEMENT_QUERYBUILDERCACHE_QUEUE));
if (isAllowedWorkerQueue(STATEMENT_QUERYBUILDERCACHE_QUEUE)) {
Queue.subscribe({
queueName: STATEMENT_QUERYBUILDERCACHE_QUEUE,
handler: queryBuilderCacheHandler
}, handleResponse(STATEMENT_QUERYBUILDERCACHE_QUEUE));
}

Queue.subscribe({
queueName: STATEMENT_FORWARDING_QUEUE,
handler: statementForwardingHandler
}, handleResponse(STATEMENT_FORWARDING_QUEUE));
if (isAllowedWorkerQueue(STATEMENT_FORWARDING_QUEUE)) {
Queue.subscribe({
queueName: STATEMENT_FORWARDING_QUEUE,
handler: statementForwardingHandler
}, handleResponse(STATEMENT_FORWARDING_QUEUE));
}

Queue.subscribe({
queueName: STATEMENT_FORWARDING_REQUEST_QUEUE,
Expand Down
5 changes: 4 additions & 1 deletion worker/src/handlers/statement/statementHandler.js
Expand Up @@ -17,6 +17,7 @@ import {
STATEMENT_EXTRACT_PERSONAS_QUEUE,
STATEMENT_FORWARDING_QUEUE
} from 'lib/constants/statements';
import { isAllowedWorkerQueue } from './allowedWorkerQueues';

const queueDependencies = {
[STATEMENT_QUERYBUILDERCACHE_QUEUE]: {
Expand Down Expand Up @@ -49,8 +50,10 @@ export const addStatementToPendingQueues = (statement, passedQueues, done) => {
const queueCompleted = includes(completedQueues, queueName);
// or is this queue in the queues being processed?
const queueProcessing = includes(processingQueues, queueName);
// or is an allowed queue?
const isAllowed = isAllowedWorkerQueue(queueName);

return !preReqsCompleted || queueCompleted || queueProcessing;
return !preReqsCompleted || queueCompleted || queueProcessing || !isAllowed;
});

return Statement.updateOne(
Expand Down

0 comments on commit 6a8e084

Please sign in to comment.