Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more typings, minor functionality changes #2

Merged
merged 3 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 15 additions & 24 deletions src/Job.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as date from 'date.js';
import * as debug from 'debug';
import { parsePriority } from './utils/priority';
import { JobPriority, parsePriority } from './utils/priority';
import type { Agenda } from './index';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';
import { IJobParameters } from './types/JobParameters';
Expand Down Expand Up @@ -46,18 +46,13 @@ export class Job<DATA = any | void> {
data: any;
}
) {
// Remove special args

// Process args
args.priority = parsePriority(args.priority) || 0;

// Set attrs to args
this.attrs = {
...args,
// Set defaults if undefined
priority: args.priority || 0,
priority: parsePriority(args.priority),
nextRunAt: args.nextRunAt || new Date(),
type: args.type // || 'once'
type: args.type
};
}

Expand Down Expand Up @@ -86,7 +81,7 @@ export class Job<DATA = any | void> {
repeatEvery(
interval: string | number,
options: { timezone?: string; skipImmediate?: boolean } = {}
) {
): this {
this.attrs.repeatInterval = interval;
this.attrs.repeatTimezone = options.timezone;
if (options.skipImmediate) {
Expand All @@ -101,28 +96,28 @@ export class Job<DATA = any | void> {
return this;
}

repeatAt(time) {
repeatAt(time: string): this {
this.attrs.repeatAt = time;
return this;
}

disable() {
disable(): this {
this.attrs.disabled = true;
return this;
}

enable() {
enable(): this {
this.attrs.disabled = false;
return this;
}

unique(unique: IJobParameters['unique'], opts?: IJobParameters['uniqueOpts']) {
unique(unique: IJobParameters['unique'], opts?: IJobParameters['uniqueOpts']): this {
this.attrs.unique = unique;
this.attrs.uniqueOpts = opts;
return this;
}

schedule(time) {
schedule(time: string | Date): this {
const d = new Date(time);

this.attrs.nextRunAt = Number.isNaN(d.getTime()) ? date(time) : d;
Expand All @@ -135,17 +130,13 @@ export class Job<DATA = any | void> {
* @param {String} priority priority of when job should be queued
* @returns {exports} instance of Job
*/
priority(priority: 'lowest' | 'low' | 'normal' | 'high' | 'highest' | number) {
priority(priority: JobPriority): this {
this.attrs.priority = parsePriority(priority);
return this;
}

fail(reason: Error | string) {
if (reason instanceof Error) {
reason = reason.message;
}

this.attrs.failReason = reason;
fail(reason: Error | string): this {
this.attrs.failReason = reason instanceof Error ? reason.message : reason;
this.attrs.failCount = (this.attrs.failCount || 0) + 1;
const now = new Date();
this.attrs.failedAt = now;
Expand All @@ -159,7 +150,7 @@ export class Job<DATA = any | void> {
return this;
}

isRunning() {
isRunning(): boolean {
if (!this.attrs.lastRunAt) {
return false;
}
Expand All @@ -178,7 +169,7 @@ export class Job<DATA = any | void> {
return false;
}

save() {
async save(): Promise<Job> {
return this.agenda.db.saveJob(this);
}

Expand Down Expand Up @@ -237,7 +228,7 @@ export class Job<DATA = any | void> {
return this;
}

async run() {
async run(): Promise<void> {
const definition = this.agenda.definitions[this.attrs.name];

this.attrs.lastRunAt = new Date();
Expand Down
37 changes: 23 additions & 14 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import {
MongoClient,
MongoClientOptions,
UpdateQuery,
ObjectId
ObjectId,
SortOptionObject
} from 'mongodb';
import type { Job } from './Job';
import { hasMongoProtocol } from './utils/mongodb';
import { hasMongoProtocol } from './utils/hasMongoProtocol';
import type { Agenda } from './index';
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import { IJobParameters } from './types/JobParameters';
Expand Down Expand Up @@ -49,26 +50,32 @@ export class JobDbRepository {
return !!connectOptions.db?.address;
}

async getJobs(query: any, sort: any = {}, limit = 0, skip = 0) {
async getJobs(
query: FilterQuery<IJobParameters>,
sort: SortOptionObject<IJobParameters> = {},
limit = 0,
skip = 0
): Promise<IJobParameters[]> {
return this.collection.find(query).sort(sort).limit(limit).skip(skip).toArray();
}

async removeJobs(query: any) {
return this.collection.deleteMany(query);
async removeJobs(query: FilterQuery<IJobParameters>): Promise<number> {
const result = await this.collection.deleteMany(query);
return result.result.n || 0;
}

async getQueueSize(): Promise<number> {
return this.collection.countDocuments({ nextRunAt: { $lt: new Date() } });
}

async unlockJob(job) {
await this.collection.updateOne({ _id: job._id }, { $unset: { lockedAt: true } });
async unlockJob(job: Job): Promise<void> {
await this.collection.updateOne({ _id: job.attrs._id }, { $unset: { lockedAt: true } });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, we should add a test to cover successful unlocking

}

/**
* Internal method to unlock jobs so that they can be re-run
*/
async unlockJobs(jobIds: ObjectId[]) {
async unlockJobs(jobIds: ObjectId[]): Promise<void> {
await this.collection.updateMany({ _id: { $in: jobIds } }, { $unset: { lockedAt: true } });
}

Expand Down Expand Up @@ -143,7 +150,7 @@ export class JobDbRepository {
return result.value;
}

async connect() {
async connect(): Promise<void> {
const db = await this.createConnection();
log('successful connection to MongoDB', db.options);

Expand Down Expand Up @@ -184,8 +191,10 @@ export class JobDbRepository {
}

private async database(url: string, options?: MongoClientOptions) {
if (!hasMongoProtocol(url)) {
url = `mongodb://${url}`;
let connectionString = url;

if (!hasMongoProtocol(connectionString)) {
connectionString = `mongodb://${connectionString}`;
}

const client = await MongoClient.connect(url, {
Expand All @@ -197,7 +206,7 @@ export class JobDbRepository {
return client.db();
}

private processDbResult(job: Job, res: IJobParameters) {
private processDbResult(job: Job, res: IJobParameters): Job {
log(
'processDbResult() called with success, checking whether to process job immediately or not'
);
Expand Down Expand Up @@ -245,7 +254,7 @@ export class JobDbRepository {
// Grab current time and set default query options for MongoDB
const now = new Date();
const protect: Partial<IJobParameters> = {};
let update: UpdateQuery<any> = { $set: props };
let update: UpdateQuery<IJobParameters> = { $set: props };
log('current time stored as %s', now.toISOString());

// If the job already had an ID, then update the properties of the job
Expand Down Expand Up @@ -310,7 +319,7 @@ export class JobDbRepository {

if (job.attrs.unique) {
// If we want the job to be unique, then we can upsert based on the 'unique' query object that was passed in
const query: FilterQuery<any> = job.attrs.unique;
const query = job.attrs.unique;
query.name = props.name;
if (job.attrs.uniqueOpts?.insertOnly) {
update = { $setOnInsert: props };
Expand Down
10 changes: 5 additions & 5 deletions src/JobProcessingQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ export class JobProcessingQueue {
this._queue = [];
}

get length() {
get length(): number {
return this._queue.length;
}

/**
* Pops and returns last queue element (next job to be processed) without checking concurrency.
* @returns {Job} Next Job to be processed
*/
pop() {
pop(): Job | undefined {
return this._queue.pop();
}

Expand All @@ -32,11 +32,11 @@ export class JobProcessingQueue {
* @param {Job} job job to add to queue
* @returns {undefined}
*/
push(job: Job) {
push(job: Job): void {
this._queue.push(job);
}

remove(job: Job) {
remove(job: Job): void {
let removeJobIndex = this._queue.indexOf(job);
if (removeJobIndex === -1) {
// lookup by id
Expand All @@ -58,7 +58,7 @@ export class JobProcessingQueue {
* @param {Job} job job to add to queue
* @returns {undefined}
*/
insert(job: Job) {
insert(job: Job): void {
const matchIndex = this._queue.findIndex(element => {
if (
element.attrs.nextRunAt &&
Expand Down
26 changes: 11 additions & 15 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as debug from 'debug';
import { Job } from './Job';
import { IAgendaStatus } from './types/AgendaStatus';
import { IJobDefinition } from './types/JobDefinition';
import { JobProcessingQueue } from './JobProcessingQueue';
import type { Agenda } from './index';
Expand All @@ -15,23 +16,18 @@ const log = debug('agenda:jobProcessor');
*/
export class JobProcessor {
private jobStatus: {
[name: string]:
| {
running: number;
locked: number;
}
| undefined;
[name: string]: { running: number; locked: number } | undefined;
} = {};

private localQueueProcessing = 0;

async getStatus(fullDetails = false) {
async getStatus(fullDetails = false): Promise<IAgendaStatus> {
// eslint-disable-next-line @typescript-eslint/no-var-requires,global-require
const { version } = require('../package.json');

return {
version,
queueName: this.agenda.name,
queueName: this.agenda.attrs.name,
totalQueueSizeDB: await this.agenda.db.getQueueSize(),
config: {
totalLockLimit: this.totalLockLimit,
Expand All @@ -42,7 +38,7 @@ export class JobProcessor {
Object.keys(this.jobStatus).map(job => [
job,
{
...this.jobStatus[job],
...this.jobStatus[job]!,
config: this.agenda.definitions[job]
}
])
Expand Down Expand Up @@ -95,7 +91,7 @@ export class JobProcessor {
}

// processJobs
async process(extraJob?: Job) {
async process(extraJob?: Job): Promise<void> {
// Make sure an interval has actually been set
// Prevents race condition with 'Agenda.stop' and already scheduled run
if (!this.isRunning) {
Expand Down Expand Up @@ -138,7 +134,7 @@ export class JobProcessor {
* @param {String} name name of job to check if we should lock or not
* @returns {boolean} whether or not you should lock job
*/
shouldLock(name) {
shouldLock(name: string): boolean {
const jobDefinition = this.agenda.definitions[name];
let shouldLock = true;
// global lock limit
Expand Down Expand Up @@ -168,7 +164,7 @@ export class JobProcessor {
* @param {boolean} inFront puts the job in front of queue if true
* @returns {undefined}
*/
private enqueueJob(job: Job) {
private enqueueJob(job: Job): void {
this.jobQueue.insert(job);
}

Expand All @@ -178,7 +174,7 @@ export class JobProcessor {
* We do this because sometimes jobs are scheduled but will be run before the next process time
* @returns {undefined}
*/
async lockOnTheFly() {
async lockOnTheFly(): Promise<void> {
// Already running this? Return
if (this.isLockingOnTheFly) {
log.extend('lockOnTheFly')('already running, returning');
Expand Down Expand Up @@ -347,7 +343,7 @@ export class JobProcessor {
return;
}

this.localQueueProcessing++;
this.localQueueProcessing += 1;

let jobEnqueued = false;
try {
Expand Down Expand Up @@ -400,7 +396,7 @@ export class JobProcessor {
);
} */
} finally {
this.localQueueProcessing--;
this.localQueueProcessing -= 1;
}
}

Expand Down
Loading