forked from welovekpop/plugged
-
Notifications
You must be signed in to change notification settings - Fork 0
/
query.js
154 lines (126 loc) · 4.1 KB
/
query.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
var request = require("request");
var verbs = ["GET", "POST", "PUT", "DELETE"];
function processEntry(query, entry) {
request(entry.options, function requestCB(err, res, body) {
query.active--;
if(typeof entry.callback !== "undefined") {
if(!err && res.statusCode == 200) {
// remove unnecessary information.
if(body && body.hasOwnProperty("data"))
body = body.data;
// extract unnecessary array
if(entry.extractArray && body.length === 1)
body = body[0];
else if(entry.extractArray && body.length === 0)
body = null;
entry.options = {};
entry.callback(null, body);
} else {
// don't bother trying it again in case this entry got flushed through (tries === -1).
if((entry.tries >= 0 && entry.tries < 2) && (res ? res.statusCode : 0) >= 500) {
entry.tries++;
setTimeout(pushAndProcess, 5*1000, query, entry)
} else {
entry.options = {};
entry.callback({
code: (res ? res.statusCode : 0),
message: (body ? (body.data.length > 0 ? body.data[0] : (body.status ? body.status : err)) : err)
});
}
}
} else {
entry = null;
}
});
}
function pushAndProcess(query, entry) {
query.queue.push(entry);
query.process();
}
function watcher(query) {
if(query.queue.length === 0)
query.stopWatcher();
for(var i = 0; i < Math.min(5, query.queue.length); i++)
query.process();
}
function Query() {
this.jar = null;
this.queue = [];
this.active = 0;
this.watcherID = 0;
this.startWatcher();
}
Query.prototype.setJar = function(jar, storage) {
this.jar = jar || request.jar(storage);
};
Query.prototype.getJar = function() {
return this.jar;
};
Query.prototype.query = function(verb, url, data, callback, extractArray, flush) {
extractArray = extractArray || false;
flush = flush || false;
//reorganize arguments since parameter data is optional
if(typeof data === "function") {
if(typeof callback === "boolean") {
flush = extractArray;
extractArray = callback;
}
callback = data;
data = {};
}
if(!verb || verbs.indexOf(verb.toUpperCase()) < 0)
throw new Error("verb was not defined or invalid");
if(!url || typeof url !== "string")
throw new Error("url was not defined or not of type string");
var entry = {
tries: (flush ? -1 : 0),
extractArray: extractArray,
callback: callback,
options: {
url: url,
method: verb,
jar: this.jar,
encoding: "utf8",
body: data,
json: true,
headers: {
"User-Agent": "PlugClient/1.0 (NODE)",
"Accept": "application/json, text/javascript; q=0.1, */*; q=0.5",
"Content-Type": "application/json"
}
}
};
if(!flush) {
this.queue.push(entry);
this.process();
} else {
this.active++;
processEntry(this, entry);
}
};
Query.prototype.flushQueue = function() {
this.queue = [];
};
Query.prototype.process = function() {
if(this.queue.length > 0) {
if(this.active <= 5) {
clearTimeout(this.timeoutID);
this.timeoutID = 0;
this.active++;
processEntry(this, this.queue.shift());
} else if(this.watcherID === 0) {
this.startWatcher();
}
}
};
Query.prototype.startWatcher = function() {
if(this.watcherID > 0)
this.stopWatcher();
//otherwise plug flips its shit and tells us to stop flooding its API
this.watcherID = setInterval(watcher, 5*1000, this);
};
Query.prototype.stopWatcher = function() {
clearInterval(this.watcherID);
this.watcherID = 0;
};
module.exports = Query;