This repository has been archived by the owner on Jan 12, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 158
/
SearchQueue.js
149 lines (126 loc) · 4.57 KB
/
SearchQueue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
var fbutil = require('./fbutil');
function SearchQueue(esc, reqRef, resRef, cleanupInterval) {
this.esc = esc;
this.inRef = reqRef;
this.outRef = resRef;
this.cleanupInterval = cleanupInterval;
console.log('Queue started, IN: "%s", OUT: "%s"'.grey, fbutil.pathName(this.inRef), fbutil.pathName(this.outRef));
setTimeout(function() {
this.inRef.on('child_added', this._process, this);
}.bind(this), 1000);
this._nextInterval();
}
SearchQueue.prototype = {
_process: function(snap) {
var dat = snap.val();
var key = snap.key;
console.log('processing query request'.grey, key);
var query = this._buildQuery(key, dat);
if( query === null ) { return; }
console.log('built query'.grey, query);
this.esc.search(query, function(error, response) {
if (error) {
this._replyError(key, error);
} else {
this._reply(key, response);
}
}.bind(this));
},
_reply: function(key, results) {
if( results.error ) {
this._replyError(key, results.error);
}
else {
console.log('query result %s: %d hits'.yellow, key, results.hits.total);
this._send(key, results);
}
},
_replyError: function(key, err) {
this._send(key, { total: 0, error: fbutil.unwrapError(err), timestamp: Date.now() })
},
_send: function(key, data) {
this.inRef.child(key).remove().catch(this._logErrors.bind(this, 'Unable to remove queue item!'));
this.outRef.child(key).set(data).catch(this._logErrors.bind(this, 'Unable to send reply!'));
},
_logErrors: function(message, err) {
if( err ) {
console.error(message.red);
console.error(err.red);
}
},
_housekeeping: function() {
var self = this;
// remove all responses which are older than CHECK_INTERVAL
this.outRef.orderByChild('timestamp')
.endAt(new Date().valueOf() - self.cleanupInterval)
.once('value', function(snap) {
var count = snap.numChildren();
if( count ) {
console.warn('housekeeping: found %d outbound orphans (removing them now) %s'.red, count, new Date());
snap.forEach(function(ss) { ss.ref.remove(); });
}
self._nextInterval();
});
},
_nextInterval: function() {
var interval = this.cleanupInterval > 60000? 'minutes' : 'seconds';
console.log('Next cleanup in %d %s'.grey, Math.round(this.cleanupInterval/(interval==='seconds'? 1000 : 60000)), interval);
setTimeout(this._housekeeping.bind(this), this.cleanupInterval);
},
_buildQuery: function(key, queryData) {
if( !this._assertValidSearch(key, queryData) ) {
return null;
}
// legacy support: q and body were merged on the client as `query`
// in previous versions; this makes sure they still work
if( fbutil.isString(queryData.query) ) {
queryData.q = queryData.query;
}
else if( fbutil.isObject(queryData.query) ) {
queryData.body = queryData.query;
}
if( fbutil.isString(queryData.body) ) {
queryData.body = this._getJSON(queryData.body);
if( queryData.body === null ) {
this._replyError(key, 'Search body was a string but did not contain a valid JSON object. It must be an object or a JSON parsable string.');
return null;
}
}
var query = {};
Object.keys(queryData).filter(function(k) {
return k !== 'query';
}).forEach(function(k) {
query[k] = queryData[k];
});
return query;
},
_assertValidSearch: function(key, props) {
if( !fbutil.isObject(props) ||
!fbutil.isString(props.index) ||
!fbutil.isString(props.type)
) {
this._replyError(key, 'Search request must be a valid object with keys index, type, and one of q or body.');
return false;
}
if( !fbutil.isString(props.query) && fbutil.isObject(props.query) &&
!fbutil.isString(props.q) &&
!fbutil.isObject(props.body) &&
!fbutil.isString(props.body)
) {
this._replyError(key, 'Search must contain one of (string)q or (object)body. (Legacy `query` is deprecated but still works)');
return false;
}
return true;
},
_getJSON: function(str) {
try {
return JSON.parse(str);
} catch (e) {
console.log('Error parsing JSON body', e);
return null;
}
}
};
exports.init = function(esc, reqPath, resPath, matchWholeWords, cleanupInterval) {
new SearchQueue(esc, fbutil.fbRef(reqPath), fbutil.fbRef(resPath), matchWholeWords, cleanupInterval);
};