Skip to content

Commit

Permalink
Add hijack for file-queue: threez/file-queue#6
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Stepanov <astepano@redhat.com>
  • Loading branch information
Andrei-Stepanov committed Jun 28, 2022
1 parent a28af90 commit a618159
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
38 changes: 33 additions & 5 deletions src/fqueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/

import _ from 'lodash';
import fs from 'fs';
import debug from 'debug';
const graceful_fs = require('graceful-fs');
const { Queue } = require('file-queue');
Expand Down Expand Up @@ -56,7 +57,7 @@ export interface FileQueueMessage {
broker_extra?: any;
}

async function make(path: string) {
async function make(path: string, opts = { poll: false, optimizeList: false }) {
return new Promise((resolve, reject) => {
var queue = new Queue(
{
Expand All @@ -71,10 +72,37 @@ async function make(path: string) {
* https://github.com/threez/file-queue/blob/6f2bfa60fda2205801ca1c558f4cfc1536e8825c/queue.js#L28
* function(messages) {} - ignores its argument and checks actual presence of messages.
*/
setInterval(() => {
log('[i] polling-tick to check new messages');
queue.maildir.emit('new');
}, 1000 * 60);
if (opts.poll) {
log(' [i] enable polling for new messages');
setInterval(() => {
log(' [i] polling-tick to check new messages');
queue.maildir.emit('new');
}, 1000 * 60);
}
if (opts.optimizeList) {
/*
* https://github.com/threez/file-queue/issues/6
*/
log(' [i] file-queue: hijack listNew method.');
queue.maildir.listNew = function (
callback: (err: Error | null, files?: string[] | null) => {},
) {
let len = 32;
const NEW = 1;
const files: string[] = [];
const dir = fs.opendirSync(queue.maildir.dirPaths[NEW]);
while (len) {
const file = dir.readSync();
if (!file) {
break;
}
files.push(file.name);
len--;
}
dir.closeSync();
callback(null, files);
};
}
resolve(queue);
},
);
Expand Down
10 changes: 6 additions & 4 deletions src/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async function handle_signal(
async function start(): Promise<never> {
file_queue_path = mkDirParents(file_queue_path_cfg);
log('File-queue path: %s', file_queue_path);
fqueue = await fq.make(file_queue_path);
fqueue = await fq.make(file_queue_path, { poll: true, optimizeList: true });
log('File-queue length at start: %s', await fq.length(fqueue));
var artifacts: Artifacts;
var validation_errors: ValidationErrors;
Expand Down Expand Up @@ -148,11 +148,13 @@ async function start(): Promise<never> {
);
continue;
}
const fq_length = await fq.length(fqueue);
/*
* const fq_length = await fq.length(fqueue);
* Do not do this: it is not efficient on large number of files.
*/
log(
' [i] Adding message to DB with file-queue message id %O. Remain unprocessed messages: %s',
' [i] Adding message to DB with file-queue message id %O.',
fq_msg.fq_msg_id,
fq_length,
);
try {
await artifacts.add_to_db(fq_msg);
Expand Down

0 comments on commit a618159

Please sign in to comment.