Skip to content

Commit 9a7ad97

Browse files
committedJan 18, 2015
Refactoring of the Aphlict server
Summary: Tidy the Aphlict server by splitting the functionality into two main modules, `AphlictClientServer` and `AphlictAdminServer. There is still further tidying that could be done here, but I feel that this puts us in a much better place. Test Plan: Sent notifications via `/notification/status/`. Reviewers: epriestley, #blessed_reviewers Reviewed By: epriestley, #blessed_reviewers Subscribers: Korvin, epriestley Differential Revision: https://secure.phabricator.com/D11383
1 parent 19be326 commit 9a7ad97

6 files changed

+268
-192
lines changed
 

‎support/aphlict/server/aphlict_server.js

+19-157
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ var config = parse_command_line_arguments(process.argv);
4444

4545
process.on('uncaughtException', function(err) {
4646
var context = null;
47-
if ((err.code == 'EACCES') &&
48-
(err.path == config.log)) {
47+
if (err.code == 'EACCES' && err.path == config.log) {
4948
context = util.format(
5049
'Unable to open logfile ("%s"). Check that permissions are set ' +
5150
'correctly.',
@@ -65,9 +64,8 @@ process.on('uncaughtException', function(err) {
6564
process.exit(1);
6665
});
6766

68-
var WebSocket;
6967
try {
70-
WebSocket = require('ws');
68+
require('ws');
7169
} catch (ex) {
7270
throw new Error(
7371
'You need to install the Node.js "ws" module for websocket support. ' +
@@ -88,7 +86,7 @@ if (ssl_config.enabled) {
8886

8987
// Add the logfile so we'll fail if we can't write to it.
9088
if (config.log) {
91-
debug.addLogfile(config.log);
89+
debug.addLog(config.log);
9290
}
9391

9492
// If we're just doing a configuration test, exit here before starting any
@@ -98,166 +96,30 @@ if (config.test) {
9896
process.exit(0);
9997
}
10098

101-
var start_time = new Date().getTime();
102-
var messages_out = 0;
103-
var messages_in = 0;
99+
JX.require('lib/AphlictAdminServer', __dirname);
100+
JX.require('lib/AphlictClientServer', __dirname);
104101

105-
var clients = new JX.AphlictListenerList();
106-
107-
function https_discard_handler(req, res) {
108-
res.writeHead(501);
109-
res.end('HTTP/501 Use Websockets\n');
110-
}
111-
112-
var ws;
102+
var server;
113103
if (ssl_config.enabled) {
114-
var https_server = https.createServer({
104+
server = https.createServer({
115105
key: ssl_config.key,
116106
cert: ssl_config.cert
117-
}, https_discard_handler).listen(
118-
config['client-port'],
119-
config['client-host']);
120-
121-
ws = new WebSocket.Server({server: https_server});
122-
} else {
123-
ws = new WebSocket.Server({
124-
port: config['client-port'],
125-
host: config['client-host'],
126-
});
127-
}
128-
129-
ws.on('connection', function(ws) {
130-
var listener = clients.addListener(ws);
131-
132-
function log() {
133-
debug.log(
134-
util.format('<%s>', listener.getDescription()) +
135-
' ' +
136-
util.format.apply(null, arguments));
137-
}
138-
139-
log('Connected from %s.', ws._socket.remoteAddress);
140-
141-
ws.on('message', function(data) {
142-
log('Received message: %s', data);
143-
144-
var message;
145-
try {
146-
message = JSON.parse(data);
147-
} catch (err) {
148-
log('Message is invalid: %s', err.message);
149-
return;
150-
}
151-
152-
switch (message.command) {
153-
case 'subscribe':
154-
log(
155-
'Subscribed to: %s',
156-
JSON.stringify(message.data));
157-
listener.subscribe(message.data);
158-
break;
159-
160-
case 'unsubscribe':
161-
log(
162-
'Unsubscribed from: %s',
163-
JSON.stringify(message.data));
164-
listener.unsubscribe(message.data);
165-
break;
166-
167-
default:
168-
log('Unrecognized command "%s".', message.command || '<undefined>');
169-
}
107+
}, function(req, res) {
108+
res.writeHead(501);
109+
res.end('HTTP/501 Use Websockets\n');
170110
});
171-
172-
ws.on('close', function() {
173-
clients.removeListener(listener);
174-
log('Disconnected.');
175-
});
176-
177-
ws.on('error', function(err) {
178-
log('Error: %s', err.message);
179-
});
180-
});
181-
182-
function transmit(msg) {
183-
var listeners = clients.getListeners().filter(function(client) {
184-
return client.isSubscribedToAny(msg.subscribers);
185-
});
186-
187-
for (var i = 0; i < listeners.length; i++) {
188-
var listener = listeners[i];
189-
190-
try {
191-
listener.writeMessage(msg);
192-
193-
++messages_out;
194-
debug.log('<%s> Wrote Message', listener.getDescription());
195-
} catch (error) {
196-
clients.removeListener(listener);
197-
debug.log('<%s> Write Error: %s', listener.getDescription(), error);
198-
}
199-
}
111+
} else {
112+
server = http.createServer(function() {});
200113
}
201114

202-
http.createServer(function(request, response) {
203-
// Publishing a notification.
204-
if (request.url == '/') {
205-
if (request.method == 'POST') {
206-
var body = '';
207-
208-
request.on('data', function(data) {
209-
body += data;
210-
});
115+
var client_server = new JX.AphlictClientServer(server);
116+
var admin_server = new JX.AphlictAdminServer();
211117

212-
request.on('end', function() {
213-
try {
214-
var msg = JSON.parse(body);
118+
client_server.setLogger(debug);
119+
admin_server.setLogger(debug);
120+
admin_server.setClientServer(client_server);
215121

216-
debug.log('Received notification: ' + JSON.stringify(msg));
217-
++messages_in;
218-
219-
try {
220-
transmit(msg);
221-
response.writeHead(200, {'Content-Type': 'text/plain'});
222-
} catch (err) {
223-
debug.log(
224-
'<%s> Internal Server Error! %s',
225-
request.socket.remoteAddress,
226-
err);
227-
response.writeHead(500, 'Internal Server Error');
228-
}
229-
} catch (err) {
230-
debug.log(
231-
'<%s> Bad Request! %s',
232-
request.socket.remoteAddress,
233-
err);
234-
response.writeHead(400, 'Bad Request');
235-
} finally {
236-
response.end();
237-
}
238-
});
239-
} else {
240-
response.writeHead(405, 'Method Not Allowed');
241-
response.end();
242-
}
243-
} else if (request.url == '/status/') {
244-
var status = {
245-
'uptime': (new Date().getTime() - start_time),
246-
'clients.active': clients.getActiveListenerCount(),
247-
'clients.total': clients.getTotalListenerCount(),
248-
'messages.in': messages_in,
249-
'messages.out': messages_out,
250-
'log': config.log,
251-
'version': 6
252-
};
253-
254-
response.writeHead(200, {'Content-Type': 'application/json'});
255-
response.write(JSON.stringify(status));
256-
response.end();
257-
} else {
258-
response.writeHead(404, 'Not Found');
259-
response.end();
260-
}
261-
}).listen(config['admin-port'], config['admin-host']);
122+
client_server.listen(config['client-port'], config['client-host']);
123+
admin_server.listen(config['admin-port'], config['admin-host']);
262124

263125
debug.log('Started Server (PID %d)', process.pid);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
var JX = require('javelin').JX;
2+
3+
JX.require('AphlictListenerList', __dirname);
4+
5+
var http = require('http');
6+
7+
JX.install('AphlictAdminServer', {
8+
9+
construct: function() {
10+
this.setLogger(new JX.AphlictLog());
11+
12+
this._startTime = new Date().getTime();
13+
this._messagesIn = 0;
14+
this._messagesOut = 0;
15+
16+
var handler = this._handler.bind(this);
17+
this._server = http.createServer(handler);
18+
},
19+
20+
members: {
21+
_messagesIn: null,
22+
_messagesOut: null,
23+
_server: null,
24+
_startTime: null,
25+
26+
getListeners: function() {
27+
return this.getListenerList().getListeners();
28+
},
29+
30+
getListenerList: function() {
31+
return this.getClientServer().getListenerList();
32+
},
33+
34+
listen: function() {
35+
return this._server.listen.apply(this._server, arguments);
36+
},
37+
38+
_handler: function(request, response) {
39+
var self = this;
40+
41+
// Publishing a notification.
42+
if (request.url == '/') {
43+
if (request.method == 'POST') {
44+
var body = '';
45+
46+
request.on('data', function(data) {
47+
body += data;
48+
});
49+
50+
request.on('end', function() {
51+
try {
52+
var msg = JSON.parse(body);
53+
54+
self.getLogger().log(
55+
'Received notification: ' + JSON.stringify(msg));
56+
++this._messagesIn;
57+
58+
try {
59+
self._transmit(msg);
60+
response.writeHead(200, {'Content-Type': 'text/plain'});
61+
} catch (err) {
62+
self.getLogger().log(
63+
'<%s> Internal Server Error! %s',
64+
request.socket.remoteAddress,
65+
err);
66+
response.writeHead(500, 'Internal Server Error');
67+
}
68+
} catch (err) {
69+
self.getLogger().log(
70+
'<%s> Bad Request! %s',
71+
request.socket.remoteAddress,
72+
err);
73+
response.writeHead(400, 'Bad Request');
74+
} finally {
75+
response.end();
76+
}
77+
});
78+
} else {
79+
response.writeHead(405, 'Method Not Allowed');
80+
response.end();
81+
}
82+
} else if (request.url == '/status/') {
83+
var status = {
84+
'uptime': (new Date().getTime() - this._startTime),
85+
'clients.active': this.getListenerList().getActiveListenerCount(),
86+
'clients.total': this.getListenerList().getTotalListenerCount(),
87+
'messages.in': this._messagesIn,
88+
'messages.out': this._messagesOut,
89+
'version': 6
90+
};
91+
92+
response.writeHead(200, {'Content-Type': 'application/json'});
93+
response.write(JSON.stringify(status));
94+
response.end();
95+
} else {
96+
response.writeHead(404, 'Not Found');
97+
response.end();
98+
}
99+
},
100+
101+
/**
102+
* Transmits a message to all subscribed listeners.
103+
*/
104+
_transmit: function(message) {
105+
var listeners = this.getListeners().filter(function(client) {
106+
return client.isSubscribedToAny(message.subscribers);
107+
});
108+
109+
for (var i = 0; i < listeners.length; i++) {
110+
var listener = listeners[i];
111+
112+
try {
113+
listener.writeMessage(message);
114+
115+
++this._messagesOut;
116+
this.getLogger().log(
117+
'<%s> Wrote Message',
118+
listener.getDescription());
119+
} catch (error) {
120+
this.getListenerList().removeListener(listener);
121+
this.getLogger().log(
122+
'<%s> Write Error: %s',
123+
listener.getDescription(),
124+
error);
125+
}
126+
}
127+
},
128+
},
129+
130+
properties: {
131+
clientServer: null,
132+
logger: null,
133+
}
134+
135+
});

0 commit comments

Comments
 (0)
Failed to load comments.