Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions src/OperationQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ class OperationQueue {

constructor() {
this.ee = new EventEmitter();
this.time = {};
this.map = {};
this.operations = [];
this._processedOperations = [];
Expand Down Expand Up @@ -40,10 +39,6 @@ class OperationQueue {
this.ee.off(event, cb);
}

get totalTime() {
return Math.abs((this.time.start.getTime() - this.time.end.getTime()) / 1000);
}

get isExecuting() {
return !this._isEmpty(this.map);
}
Expand All @@ -53,7 +48,6 @@ class OperationQueue {
return;
}

this.time.end = new Date();
this._isDone = true;
this.completionCallback && this.completionCallback();
this.resolve();
Expand All @@ -79,19 +73,31 @@ class OperationQueue {
}

pause() {
this._paused = true;
if (!this._paused) {
this._paused = true;
this.ee.emit(QueueEvent.PAUSED, this);
}
}

resume() {
this._paused = false;
this._checkNextOperation();
if (this._paused) {
this._paused = false;
this.ee.emit(QueueEvent.RESUMED, this);
this._checkNextOperation();
}
}

get isPaused() {
return this._paused;
}

_preProcessOperations(operations) {
try {
new CircularOperationValidator(operations);
} catch (e) {
throw e;
}

operations.forEach(op => {
if (!this.map[op.id]) {
this.map[op.id] = true;
Expand Down Expand Up @@ -121,12 +127,7 @@ class OperationQueue {
this._startOperations();
} else {
this.promise = new Promise((resolve, reject) => {
try {
new CircularOperationValidator(this._processedOperations);
} catch (e) {
return reject(e);
}
this.time.start = new Date();

this.resolve = resolve;

this._startOperations();
Expand Down Expand Up @@ -165,7 +166,7 @@ class OperationQueue {

delete this.map[operation.id];
delete this.runningQueueMap[operation.id];

if (this._isEmpty(this.map)) {
this.ee.emit(QueueEvent.DONE, this);
this.done();
Expand Down
90 changes: 85 additions & 5 deletions tests/OperationQueue.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,47 @@ describe('OperationQueue', () => {

expect(operationQueue.runningQueue.length).toBe(0);
});

test('should emit paused event', () => {
const pausedFunction = jest.fn(() => {
});
const operationQueue = new OperationQueue();
operationQueue.on(QueueEvent.PAUSED, pausedFunction)

operationQueue.pause();

expect(pausedFunction).toHaveBeenCalled();
});

test('should emit paused event only once', () => {
const pausedFunction = jest.fn(() => {
});
const operationQueue = new OperationQueue();
operationQueue.on(QueueEvent.PAUSED, pausedFunction)

operationQueue.pause();
operationQueue.pause();
operationQueue.pause();
operationQueue.pause();

expect(pausedFunction).toHaveBeenCalledTimes(1);
});
});

describe('function off', () => {
test('should remove callback from subscriber', async (done) => {
const mockFunction = jest.fn(() => {
});
const operationQueue = new OperationQueue();

operationQueue.on(OperationEvent.START, mockFunction);
operationQueue.off(OperationEvent.START, mockFunction);
operationQueue.pause();
await nextTick();

expect(mockFunction).not.toHaveBeenCalled();
done();
});
});

describe('function resume', function () {
Expand Down Expand Up @@ -102,6 +143,31 @@ describe('OperationQueue', () => {

expect(operationQueue.runningQueue.length).toBe(2);
});

test('should emit resumed event', () => {
const resumedFunction = jest.fn(() => {
});
const operationQueue = new OperationQueue();
operationQueue.on(QueueEvent.RESUMED, resumedFunction)
operationQueue.pause();

operationQueue.resume();

expect(resumedFunction).toHaveBeenCalled();
});

test('should emit resumed event only once', () => {
const resumedFunction = jest.fn(() => {
});
const operationQueue = new OperationQueue();
operationQueue.on(QueueEvent.PAUSED, resumedFunction)
operationQueue.pause();

operationQueue.resume();
operationQueue.resume();

expect(resumedFunction).toHaveBeenCalledTimes(1);
});
});

describe('adding operations while queue is still executing', () => {
Expand All @@ -119,6 +185,25 @@ describe('OperationQueue', () => {
});

describe('function addOperation/addOperations', () => {
test('should throw error when operation are dependent form each other', async (done) => {
const operationQueue = new OperationQueue();
const operation1 = new TestOperation();
const operation2 = new TestOperation();

operation1.dependencies = [operation2];
operation2.dependencies = [operation1];

try {
operationQueue.addOperation(operation1);
await nextTick();
fail('should have failed');
} catch (e) {
console.log(e);
expect(e.constructor.name === 'CircularOperationValidatorError').toBe(true);
done();
}
});

test('properties before calling start', () => {
const operationQueue = new OperationQueue();

Expand Down Expand Up @@ -219,23 +304,18 @@ describe('OperationQueue', () => {

const doneOperations = [];
operation1.on(OperationEvent.DONE, () => {
console.log(doneOperations.length);
doneOperations.push(operation1);
});
operation2.on(OperationEvent.DONE, () => {
console.log(doneOperations.length);
doneOperations.push(operation2);
});
operation3.on(OperationEvent.DONE, () => {
console.log(doneOperations.length);
doneOperations.push(operation3);
});
operation4.on(OperationEvent.DONE, () => {
console.log(doneOperations.length);
doneOperations.push(operation4);
});
operation5.on(OperationEvent.DONE, () => {
console.log(doneOperations.length);
doneOperations.push(operation5);
});

Expand Down