Skip to content

Commit

Permalink
fix(bullmq): consider delayed markers (#605) fixes #600
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jan 6, 2023
1 parent d41ecc3 commit 8b6edae
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 17 deletions.
8 changes: 2 additions & 6 deletions example/bullmq.js
@@ -1,5 +1,5 @@
const Arena = require('../');
const {Queue, QueueScheduler, Worker, FlowProducer} = require('bullmq');
const {Queue, Worker, FlowProducer} = require('bullmq');
const RedisServer = require('redis-server');

// Select ports that are unlikely to be used by other services a developer might be running locally.
Expand All @@ -14,11 +14,6 @@ async function main() {
const queueName = 'name_of_my_queue';
const parentQueueName = 'name_of_my_parent_queue';

const queueScheduler = new QueueScheduler(queueName, {
connection: {port: REDIS_SERVER_PORT},
});
await queueScheduler.waitUntilReady();

const queue = new Queue(queueName, {
connection: {port: REDIS_SERVER_PORT},
});
Expand All @@ -44,6 +39,7 @@ async function main() {
}
},
{
concurrency: 3,
connection: {port: REDIS_SERVER_PORT},
}
);
Expand Down
2 changes: 1 addition & 1 deletion example/package.json
Expand Up @@ -15,7 +15,7 @@
"dependencies": {
"bee-queue": "^1.4.0",
"bull": "^3.22.6",
"bullmq": "^1.34.0",
"bullmq": "^3.0.0",
"express": "^4.17.1",
"redis-server": "^1.2.2"
}
Expand Down
24 changes: 16 additions & 8 deletions src/server/views/dashboard/queueJobsByState.js
Expand Up @@ -59,7 +59,7 @@ async function _json(req, res) {
const words = state.split('-');
const finalStateName = words.map((word) => _.capitalize(word)).join('');
jobs = await queue[`get${finalStateName}`](0, 1000);
jobs = jobs.map((j) => j.toJSON());
jobs = jobs.map((j) => j && j.toJSON());
}

const filename = `${queueName}-${state}-dump.json`;
Expand Down Expand Up @@ -125,13 +125,21 @@ async function _html(req, res) {
jobs = await queue.getJobs(stateTypes, startId, endId, order === 'asc');
}

for (const job of jobs) {
const jobState = queue.IS_BEE ? job.status : await job.getState();
job.showRetryButton = !queue.IS_BEE || jobState === 'failed';
job.retryButtonText = jobState === 'failed' ? 'Retry' : 'Trigger';
job.showPromoteButton = !queue.IS_BEE && jobState === 'delayed';
job.showDeleteRepeatableButton = queue.IS_BULL && job.opts.repeat;
job.parent = JobHelpers.getKeyProperties(job.parentKey);
for (let i = 0; i < jobs.length; i++) {
if (!jobs[i]) {
jobs[i] = {
showRetryButton: false,
showPromoteButton: false,
showDeleteRepeatableButton: false,
};
} else {
const jobState = queue.IS_BEE ? jobs[i].status : await jobs[i].getState();
jobs[i].showRetryButton = !queue.IS_BEE || jobState === 'failed';
jobs[i].retryButtonText = jobState === 'failed' ? 'Retry' : 'Trigger';
jobs[i].showPromoteButton = !queue.IS_BEE && jobState === 'delayed';
jobs[i].showDeleteRepeatableButton = queue.IS_BULL && jobs[i].opts.repeat;
jobs[i].parent = JobHelpers.getKeyProperties(jobs[i].parentKey);
}
}

let pages = _.range(page - 6, page + 7).filter((page) => page >= 1);
Expand Down
7 changes: 7 additions & 0 deletions src/server/views/dashboard/templates/queueJobsByState.hbs
Expand Up @@ -136,7 +136,9 @@
<li class="list-group-item">
<div class="js-bulk-action-container bulk-job-container">
<input type="hidden" name="jobId" value="{{ this.id }}" />
{{#if this.id}}
<input class="js-bulk-job" name="jobChecked" type="checkbox" />
{{/if}}
</div>

<a role="button" data-toggle="collapse" href="#collapse{{hashIdAttr this}}">
Expand All @@ -151,9 +153,14 @@
{{/if}}
</h4>
</a>
{{#if this.id}}
<div id="collapse{{hashIdAttr this}}" class="collapse">
{{~> dashboard/jobDetails this basePath=../basePath displayJobInline=true queueName=../queueName queueHost=../queueHost jobState=../state }}
</div>
{{else}}
<div id ="collapse-undefined" class="collapse">
</div>
{{/if}}
</li>
{{/each}}
</ul>
Expand Down
2 changes: 1 addition & 1 deletion src/server/views/helpers/handlebars.js
Expand Up @@ -73,7 +73,7 @@ const helpers = {
return mapping;
},

getDelayedExectionAt(job) {
getDelayedExecutionAt(job) {
// Bull
if (job.delay) {
return job.delay + getTimestamp(job);
Expand Down
2 changes: 1 addition & 1 deletion src/server/views/partials/dashboard/jobDetails.hbs
Expand Up @@ -59,7 +59,7 @@
{{#eq jobState 'delayed'}}
<div class="col-sm-3">
<h5>Executes At</h5>
{{moment (getDelayedExectionAt this) "llll"}}
{{moment (getDelayedExecutionAt this) "llll"}}
</div>
{{/eq}}

Expand Down

0 comments on commit 8b6edae

Please sign in to comment.