/
mutex.js
115 lines (101 loc) · 3.26 KB
/
mutex.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*jslint node: true */
"use strict";
var arrQueuedJobs = [];
var arrLockedKeyArrays = [];
function getCountOfQueuedJobs(){
return arrQueuedJobs.length;
}
function getCountOfLocks(){
return arrLockedKeyArrays.length;
}
function isAnyOfKeysLocked(arrKeys){
for (var i=0; i<arrLockedKeyArrays.length; i++){
var arrLockedKeys = arrLockedKeyArrays[i];
for (var j=0; j<arrLockedKeys.length; j++){
if (arrKeys.indexOf(arrLockedKeys[j]) !== -1)
return true;
}
}
return false;
}
function arraysAreEqual(array1, array2) {
return (array1.length === array2.length && array1.every((element, index) => element === array2[index]));
}
function release(arrKeys){
for (var i=0; i<arrLockedKeyArrays.length; i++){
if (arraysAreEqual(arrKeys, arrLockedKeyArrays[i])){
arrLockedKeyArrays.splice(i, 1);
return;
}
}
}
function exec(arrKeys, proc, next_proc){
arrLockedKeyArrays.push(arrKeys);
console.log("lock acquired", arrKeys);
var bLocked = true;
proc(function(){
if (!bLocked)
throw Error("double unlock?");
bLocked = false;
release(arrKeys);
console.log("lock released", arrKeys);
if (next_proc)
next_proc.apply(next_proc, arguments);
handleQueue();
});
}
function handleQueue(){
console.log("handleQueue "+arrQueuedJobs.length+" items");
for (var i=0; i<arrQueuedJobs.length; i++){
var job = arrQueuedJobs[i];
if (isAnyOfKeysLocked(job.arrKeys))
continue;
arrQueuedJobs.splice(i, 1); // do it before exec as exec can trigger another job added, another lock unlocked, another handleQueue called
console.log("starting job held by keys", job.arrKeys);
exec(job.arrKeys, job.proc, job.next_proc);
i--; // we've just removed one item
}
console.log("handleQueue done "+arrQueuedJobs.length+" items");
}
function lock(arrKeys, proc, next_proc){
if (arguments.length === 1)
return new Promise(resolve => lock(arrKeys, resolve));
if (typeof arrKeys === 'string')
arrKeys = [arrKeys];
if (isAnyOfKeysLocked(arrKeys)){
console.log("queuing job held by keys", arrKeys);
arrQueuedJobs.push({arrKeys: arrKeys, proc: proc, next_proc: next_proc, ts:Date.now()});
}
else
exec(arrKeys, proc, next_proc);
}
function lockOrSkip(arrKeys, proc, next_proc){
if (arguments.length === 1)
return new Promise(resolve => lockOrSkip(arrKeys, resolve));
if (typeof arrKeys === 'string')
arrKeys = [arrKeys];
if (isAnyOfKeysLocked(arrKeys)){
console.log("skipping job held by keys", arrKeys);
if (next_proc)
next_proc();
}
else
exec(arrKeys, proc, next_proc);
}
function checkForDeadlocks(){
for (var i=0; i<arrQueuedJobs.length; i++){
var job = arrQueuedJobs[i];
if (Date.now() - job.ts > 30*1000)
throw Error("possible deadlock on job "+require('util').inspect(job)+",\nproc:"+job.proc.toString()+" \nall jobs: "+require('util').inspect(arrQueuedJobs, {depth: null}));
}
}
// long running locks are normal in multisig scenarios
//setInterval(checkForDeadlocks, 1000);
setInterval(function(){
console.log("queued jobs: "+JSON.stringify(arrQueuedJobs.map(function(job){ return job.arrKeys; }))+", locked keys: "+JSON.stringify(arrLockedKeyArrays));
}, 10000);
exports.lock = lock;
exports.lockOrSkip = lockOrSkip;
exports.isAnyOfKeysLocked = isAnyOfKeysLocked;
exports.getCountOfQueuedJobs = getCountOfQueuedJobs;
exports.getCountOfLocks = getCountOfLocks;