Permalink
Browse files

initial Sender/Receiver implementation one-way to server

  • Loading branch information...
1 parent 063ff1f commit fe06de72fe6ec288079306f2840d3d82bb6a2f2e @cgreenhalgh committed Apr 23, 2012
Showing with 167 additions and 42 deletions.
  1. +35 −5 client/client2.js
  2. +116 −36 client/ubistate.js
  3. +16 −1 server/ubiserver2.js
View
@@ -66,19 +66,26 @@ function getPinDigest(nonce, pin) {
//=============================================================================
+var RETRY_TIMEOUT = 10000;
+
function connect_socketio(device, peer) {
// Note: don't reconnect at the socket.io level - we'll do it at a higher level
// Note: if the initial handshake fails then we don't get any event back - we'd just have to
// set a timeout for the lack of connecting.
+ console.log('connect_socketio...');
var socket = io.connect('http://:49891', { transports: [ 'jsonp-polling' ], // 'websocket'
- reconnect: false, 'connect timeout': 10000 });
+ reconnect: false, 'connect timeout': 10000, 'force new connection':true });
peer.socket = socket;
peer.connected = false;
// timeout for initial handshake (inferred from first call to connect - could trap connecting but then
// also deal with connect_failed, etc.)
peer.connectTimeout = setTimeout(function() {
logmessage('Event','connect timeout');
+ socket.disconnect();
+ peer.retryTimeout = setTimeout(function() {
+ connect_socketio(device, peer);
+ }, RETRY_TIMEOUT)
}, CONNECT_TIMEOUT);
socket.on('connect', function() {
@@ -132,8 +139,13 @@ function connect_socketio(device, peer) {
logmessage('Event','disconnect','');
peer.connected = false;
delete peer.socket;
- if (peer.sender!==undefined)
- peer.sender.disconnected();
+ for (var senderid in peer.senders) {
+ var sender = peer.senders[senderid];
+ sender.disconnected();
+ }
+ peer.retryTimeout = setTimeout(function() {
+ connect_socketio(device, peer);
+ }, RETRY_TIMEOUT)
});
socket.on('reconnect', function(transport_type,reconnectionAttempts) {
logmessage('Event','reconnect',{transport_type:transport_type,reconnectionAttempts:reconnectionAttempts});
@@ -160,8 +172,10 @@ function connect_socketio(device, peer) {
peer.connstate = STATE_PEERED;
console.log('Now peered with id='+peer.id+', name='+peer.name);
peer.known = true;
- peer.sender = clientState.sender(peer.id);
- peer.sender.connected(function(sendermsg) {
+ var defaultSender = clientState.sender(peer.id);
+ peer.senders = {};
+ peer.senders['default'] = defaultSender;
+ defaultSender.connected(function(sendermsg) {
var msg = {type: 'sender', sender: 'default', msg: sendermsg};
socket.json.send(msg);
logmessage('Send', 'sender', msg);
@@ -189,6 +203,20 @@ function connect_socketio(device, peer) {
}
// TODO
}
+ else if (msg.type=='receiver') {
+ // sender,msg
+ if (msg.sender===undefined || msg.msg===undefined) {
+ console.log('incomplete receiver message ('+JSON.stringify(msg)+')');
+ socket.disconnect();
+ return;
+ }
+ var sender = peer.senders[msg.sender];
+ if (sender===undefined) {
+ console.log('reveiver message for unknown sender '+msg.sender);
+ return;
+ }
+ sender.acked(msg.msg);
+ }
}
});
@@ -220,4 +248,6 @@ function disconnect() {
peer.socket.disconnect();
logmessage('Sent', 'disconnect');
}
+ clearTimeout(peer.connectTimeout);
+ clearTimeout(peer.retryTimeout);
}
View
@@ -30,15 +30,20 @@
State.prototype.begin = function() {
this.transaction++;
}
+ State.prototype.intransaction = function() {
+ return this.transaction>0;
+ }
/** end transaction */
- State.prototype.end = function() {
+ State.prototype.end = function(opttimestamp) {
this.transaction--;
if (this.transaction<0) {
console.log('Too many State.end');
this.transaction = 0;
}
if (this.transaction==0) {
- var newtimestamp = new Date().getTime();
+ var newtimestamp = opttimestamp;
+ if (newtimestamp===undefined)
+ newtimestamp = new Date().getTime();
if (newtimestamp<=this.timestamp) {
console.log('Note: new timestamp would have been lower/unchanged: '+newtimestamp);
newtimestamp = this.timestamp+1;
@@ -142,46 +147,28 @@
this.check();
}
- Sender.prototype.check = function() {
- var awaitingack = false;
- for (var key in this.stateawaitingack) {
- awaitingack = true;
+ function empty(a) {
+ var yes = true;
+ for (var i in a) {
+ yes = false;
break;
}
- if (awaitingack) {
- if (!this.isconnected) {
- // these should be noack now
- for (var ackid in this.stateawaitingack) {
- var s = this.stateawaitingack[ackid];
- for (key in s) {
- this.statenoack[key] = s[key];
- }
- }
- this.stateawaitingack = {};
- console.log('check: failed awaitingack messages without connection');
- return;
- }
- console.log('check: awaitingack for '+JSON.stringify(this.stateawaitingack));
- return;
- }
+ return yes;
+ }
+
+ Sender.prototype.check = function() {
if (!this.isconnected) {
console.log('check: not connected');
return;
}
- // send unsent
- var notsent = false;
- for (key in this.statenotsent) {
- notsent = true;
- break;
+ if (!empty(this.stateawaitingack)) {
+ console.log('check: awaitingack for '+JSON.stringify(this.stateawaitingack));
+ return;
}
- if (!notsent) {
+ // send unsent
+ if (empty(this.statenotsent)) {
// send unsent
- var noack = false;
- for (key in this.statenoack) {
- noack = true;
- break;
- }
- if (noack) {
+ if (empty(this.statenoack)) {
console.log('check: nothing to send');
return;
}
@@ -202,19 +189,67 @@
this.stateawaitingack[ackid] = send;
var sendermsg = { ackid: ackid, updates: send };
+ if (empty(this.stateknown))
+ // new state flag
+ sendermsg.newstate = true;
+ // complete update in one... (this is set in the first message of a new update)
+ sendermsg.newupdate = send[TIMESTAMP];
+
this.sendfn(sendermsg);
// debug
this.dump();
}
/** notify ack */
Sender.prototype.acked = function(ackmsg) {
- // TODO
+ var ackids = ackmsg.ackids;
+ if (ackids===undefined) {
+ console.log('Warning: ackmsg.ackids undefined: '+JSON.stringify(ackmsg));
+ return;
+ }
+ var docheck = false;
+ for (var i in ackids) {
+ var ackid = ackids[i];
+ var state = this.stateawaitingack[ackid];
+ if (state===undefined) {
+ console.log('received unknown ackid '+ackid+' (nextackid='+this.nextackid+')');
+ } else {
+ console.log('received ack '+ackid);
+ // move to known
+ var timestamp;
+ for (var key in state) {
+ if (key==TIMESTAMP)
+ // don't update timestamp until/unless all updates acked
+ timestamp = state[key];
+ else
+ this.stateknown[key] = state[key];
+ }
+ delete this.stateawaitingack[ackid];
+ if (timestamp!==undefined) {
+ if (empty(this.stateawaitingack) && empty(this.statenoack)) {
+ console.log('know that received '+timestamp);
+ this.stateknown[timestamp] = timestamp;
+ }
+ }
+ docheck = true;
+ }
+ }
+ if (docheck)
+ this.check();
}
/** notify disconnected */
Sender.prototype.disconnected = function() {
this.isconnected = false;
delete this.sendfn;
+ // these should be noack now
+ for (var ackid in this.stateawaitingack) {
+ var s = this.stateawaitingack[ackid];
+ for (key in s) {
+ this.statenoack[key] = s[key];
+ }
+ }
+ this.stateawaitingack = {};
+ console.log('disconnect: failed awaitingack messages without connection');
this.check();
}
/** notify State updated
@@ -245,14 +280,59 @@
function Receiver() {
this.state = new State;
this.state.timestamp = 0;
+ this.newstate = true;
+ this.intransaction = false;
+ //this.newupdate
+ //this.newupdateackid
}
/** get state */
Receiver.prototype.state = function() {
return this.state;
}
/** handle sendermessage, return optional message (ackmsg) */
Receiver.prototype.received = function(sendermsg) {
- // TODO
+ if (sendermsg.newstate==true) {
+ if (!this.newstate) {
+ this.state = new State;
+ console.log('receiver reset state on newstate');
+ }
+ }
+ var ackids = [];
+ // ackid, updates, newstate?, newupdate?
+ if (sendermsg.newupdate!==undefined) {
+ this.newupdate = sendermsg.newupdate;
+ this.newupdateackid = sendermsg.ackid;
+ } else {
+ // hope in order
+ if (sendermsg.ackid==this.newupdateackid+1)
+ this.newupdateackid = sendermsg.ackid;
+ }
+ ackids.push(sendermsg.ackid);
+
+ if (!this.intransaction) {
+ this.state.begin();
+ this.intransaction = true;
+ }
+ // handle updates
+ for (var key in sendermsg.updates) {
+ if (key!=TIMESTAMP) {
+ this.state.set(key, sendermsg.updates[key]);
+ }
+ }
+ var timestamp = sendermsg.updates[TIMESTAMP];
+ if (timestamp!==undefined && this.newupdateackid==sendermsg.ackid) {
+ // completed update!
+ this.state.end(timestamp);
+ this.intransaction = false;
+ console.log('updated state to '+timestamp+': '+JSON.stringify(this.state.values));
+ }
+ else
+ {
+ console.log('updated state to intermediate of '+this.newupdate+': '+JSON.stringify(this.state.values));
+ }
+
+ if (!empty(ackids))
+ return {ackids:ackids};
return null;
}
})('object' === typeof module ? module.exports : (this.ubistate = {}), this);
View
@@ -92,6 +92,8 @@ io.sockets.on('connection', function(socket) {
// needs secret
peer.secret = Crypto.util.randomBytes(8);
peer.state = conn.state = STATE_PEERED;
+ // peer state receivers, key by sender name
+ peer.receivers = {};
peer.serverid = serverid;
// add to peers
peers[peer.id] = peer;
@@ -143,8 +145,21 @@ io.sockets.on('connection', function(socket) {
socket.disconnect();
return;
}
- // TODO
console.log('sender message '+JSON.stringify(msg));
+ var receiver = conn.peer.receivers[msg.sender];
+ if (receiver===undefined) {
+ if (msg.msg.newstate!=true) {
+ console.log('sender message for unknown receiver '+msg.sender+' not newstate - ignored');
+ return;
+ }
+ receiver = new ubistate.Receiver;
+ conn.peer.receivers[msg.sender] = receiver;
+ }
+ var ackmsg = receiver.received(msg.msg);
+ if (ackmsg!=null) {
+ var repl = {type:'receiver',sender:msg.sender,msg:ackmsg};
+ socket.json.send(repl);
+ }
}
}
// ...

0 comments on commit fe06de7

Please sign in to comment.