/
index.js
293 lines (255 loc) · 6.54 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
//
// The mesh allows big to communicate with other big instances
//
var resource = require('resource'),
mesh = resource.define('mesh');
mesh.schema.description = "provides a distributed p2p event emitter mesh";
resource.use('node', { datasource: 'fs' });
resource.use('system');
resource.use('http');
var emit, meshEmitter;
function events() {
if (emit && meshEmitter) {
return;
}
var EventEmitter = require('eventemitter2').EventEmitter2;
//
// Any events emitted on this eventEmitter will be broadcast to the mesh
// by listeners added by the uplink and downlink methods
//
meshEmitter = new EventEmitter({
wildcard: true,
delimiter: '::',
maxListeners: 0
});
//
// When emitting on the mesh resource, also emit to the meshEmitter so that
// the event is broadcast to other nodes. The standard resource emit is called
// internally by the mesh resource in order to emit events without
// broadcasting.
//
emit = mesh.emit;
mesh.emit = function (event, payload) {
//
// Emit the event over the mesh
//
return meshEmitter.emit(event, payload);
//
// Don't do the regular emit ( since its being used to broadcast to the remote mesh scope, not the local mesh scope)
//
// return emit(event, payload);
};
}
mesh.method('connect', connect, {
"description": "connect to the big mesh",
"properties": {
"options": {
"type": "object",
"properties": {
"port": resource.node.schema.properties['port'],
"host": resource.node.schema.properties['host']
}
},
"callback": {
"description": "the callback executed after connecting to mesh",
"type": "function",
"required": false
}
}
});
mesh.method('listen', listen, {
"description": "listens for incoming nodes",
"properties": {
"options": {
"type": "object",
"properties": {
"port": resource.node.schema.properties['port'],
"host": resource.node.schema.properties['host']
}
},
"callback": {
"description": "the callback executed after connecting to mesh",
"type": "function",
"required": false
}
}
});
mesh.method('downlink', downlink, {
"description": "when an incoming node connection has been made",
"private": true,
"properties": {
"options": {
"type": "object"
},
"callback": {
"description": "the callback executed after connecting to mesh",
"type": "function",
"required": false
}
}
});
mesh.method('uplink', uplink, {
"description": "when an outgoing node connection has been made",
"properties": {
"options": {
"type": "object"
},
"callback": {
"description": "the callback executed after connecting to mesh",
"type": "function",
"required": false
}
},
"private": true
});
function downlink (socket, callback) {
events();
var handler = function (data) {
socket.send(JSON.stringify({
event: this.event,
payload: data
}));
};
meshEmitter.onAny(handler);
socket.on('message', function(data){
var msg = JSON.parse(data);
msg.payload.id = socket.id;
//
// TODO: figure out where engine.io is storing remoteAddress on socket !!!
//
//msg.payload.host = socket.remoteAddress.host;
//msg.payload.port = socket.remoteAddress.port;
//
// Any mesh client events should be rebroadcasted locally,
// but they should not be re-emitted
//
emit(msg.event, msg.payload, false);
});
//
// TODO: Do a `get` first!
//
socket.on('disconnect', function(data){
meshEmitter.removeListener(handler);
resource.node.create({
id: socket.id,
lastSeen: new Date().toString(),
role: "client",
status: "disconnected"
}, function(err, node){});
});
//
// Continue with information about the socket
//
callback(null, {
id: socket.id,
lastSeen: new Date().toString(),
role: "client",
status: "connected"
});
}
function uplink (options, callback) {
events();
var handler = function (data) {
mesh.client.send(JSON.stringify({
event: this.event,
payload: data
}));
};
meshEmitter.onAny(handler);
//
// Any mesh client events should be rebroadcasted locally,
// but they should not be re-emitted
//
mesh.client.on('message', function(data){
var msg = JSON.parse(data);
emit(msg.event, msg.payload, false);
})
mesh.client.on('disconnect', function() {
meshEmitter.removeListener(handler);
});
//
// Send a friendly phone-home message
// Feel free to comment this line out at any time
//
mesh.emit('node::ohai', resource.system.info());
//
// Continue with information about the newly connected to node
//
callback(null, {
id: options.host + ":" + options.port,
port: options.port,
host: options.host,
status: "connected",
lastSeen: new Date().toString(),
role: "server"
});
}
//
// Connects to a Big mesh to broadcast and listen for events
//
function connect (options, callback) {
events();
//
// Since the mesh is just now connecting, set all previous uplink nodes to disconnected
//
var uplinks = 0;
resource.node.find({ status: 'connected' }, function(err, results) {
if(results.length === 0) {
return _connect();
}
results.forEach(function(record) {
record.status = "disconnected";
record.save(function() {
uplinks++;
if(uplinks === results.length) {
_connect();
}
})
});
});
function _connect () {
var client = require('engine.io-client');
mesh.client = new client.Socket({ host: options.host, port: options.port });
mesh.client.on('error', function (err) {
return callback(err);
});
mesh.client.on('open', function(){
mesh.uplink(options, callback);
});
}
};
function listen (options, callback) {
events();
var engine = require('engine.io');
if (typeof resource.http.server === 'object') {
attach();
}
else {
resource.http.listen(options, function (err) {
if (err) {
return callback(err);
}
attach();
});
}
function attach() {
//
// Remark: mesh.server is the same as http.server
//
mesh.server = engine.attach(resource.http.server);
mesh.server.on('connection', function(socket){
mesh.downlink(socket, function(err, result){
if (err) {
throw err;
}
});
});
callback(null, mesh.server);
}
};
mesh.dependencies = {
"engine.io": "0.3.9",
"engine.io-client": "0.3.9",
"eventemitter2": "*"
};
exports.mesh = mesh;