Skip to content

Commit

Permalink
Rewritten to better incorporate the idea of ensured-delivery.
Browse files Browse the repository at this point in the history
Basically I've fixed architectural failings of the first release. This should be far more stable than it has been previously with hopefully not too much increased overhead.
  • Loading branch information
argon committed Oct 18, 2011
1 parent 38c885c commit 463d22c
Showing 1 changed file with 88 additions and 53 deletions.
141 changes: 88 additions & 53 deletions lib/apn.js
Expand Up @@ -16,13 +16,14 @@ var Errors = {
}

var Connection = function (optionArgs) {
this.currentId = 0;
this.cachedNotes = [];
var currentId = 0;
var cachedNotes = [];

var self = this;
var hasKey = hasCert = false;
var socketOptions = {};
var offlineCache = [];
var openingSocket = false;
var writeBuffer = [];

var options = { cert: 'cert.pem' /* Certificate file */
, key: 'key.pem' /* Key file */
Expand All @@ -46,42 +47,57 @@ var Connection = function (optionArgs) {
}

var onDrain = function() {
if (offlineCache.length) {
while (self.socket.socket.bufferSize == 0 && offlineCache.length) {
self.writeNotificationToSocket(offlineCache.pop());
}
if (writeBuffer.length) {
writeNotificationToSocket(writeBuffer.shift());
}
};

var startSocket = function () {
self.socket = tls.connect(options['port'], options['gateway'], socketOptions,
callback = function() {
if(!self.socket.authorized) {
throw self.socket.authorizationError
}

onDrain();
if(!self.openingSocket) {
console.log("opening");
process.nextTick(function() {
self.socket = tls.connect(options['port'], options['gateway'], socketOptions,
callback = function() {
console.log("open(ed)?: " + self.socket.authorized);
if(!self.socket.authorized) {
throw self.socket.authorizationError
}

onDrain();
self.openingSocket=false;
});

self.socket.on('data', function(data) {
handleTransmissionError(data);
});

self.socket.on('error', function(data) {
console.log("error");
self.socket.removeAllListeners();
self.socket = undefined;
});

self.socket.once('close', function () {
console.log("close");
console.log("wB length: " + writeBuffer.length);
if(writeBuffer.length && readyToConnect()) {
startSocket();
}
if (!self.socket) {
return;
}

self.socket.removeAllListeners();
self.socket = undefined;
});

self.socket.on("drain", onDrain);
});

self.socket.on('data', function(data) {
handleTransmissionError(data);
});

self.socket.on('error', function(data) {
self.socket.removeAllListeners();
self.socket = undefined;
});

self.socket.once('close', function () {
if (!self.socket) {
return;
}

self.socket.removeAllListeners();
self.socket = undefined;
});

self.socket.on("drain", onDrain);
self.openingSocket = true;
}
else {
console.log("not opening");
}
}

var connect = invoke_after(function() { startSocket(); });
Expand All @@ -102,22 +118,29 @@ var Connection = function (optionArgs) {
hasKey = true;
}));

this.writeNotificationToSocket = function(data) {
var writeNotificationToSocket = function(data) {
console.log("queued: " + writeBuffer.length);
if (self.socket === undefined || self.socket.readyState != 'open') {
if ((self.socket === undefined || self.socket.readyState == 'closed') && readyToConnect()) {
startSocket();
}

offlineCache.push(data);
console.log("queueing1: " + data);
bufferDataForWrite(data);
} else {
if (self.socket.socket.bufferSize > 0) {
offlineCache.push(data);
console.log("queueing: " + data);
bufferDataForWrite(data);
} else {
console.log("sending: " + data);
self.socket.write(data);
}
}
};

var bufferDataForWrite = function(data) {
writeBuffer.push(data);
}

this.sendNotification = function (note) {
var token = note.device.token;
var message = JSON.stringify(note);
Expand All @@ -131,8 +154,7 @@ var Connection = function (optionArgs) {
return Errors['invalidPayloadSize'];
}

note._uid = this.currentId++;

note._uid = currentId++;
if(options.enhanced) {
var data = new Buffer(1 + 4 + 4 + 2 + token.length + 2 + messageLength);
// Command
Expand All @@ -145,7 +167,7 @@ var Connection = function (optionArgs) {
// Expiry
pos += int2buf(note.expiry, data, pos, 4);

self.cachedNotes.push(note);
cachedNotes.push(note);
tidyCachedNotes();
}
else {
Expand All @@ -159,44 +181,57 @@ var Connection = function (optionArgs) {
pos += int2buf(messageLength, data, pos, 2);
pos += data.write(message, pos);

this.writeNotificationToSocket(data);
writeNotificationToSocket(data);
}

var tidyCachedNotes = function() {
// Maybe a timestamp should be stored for each note and kept for a duration?
if(self.cachedNotes.length > options.cacheLength) {
self.cachedNotes.shift();
if(cachedNotes.length > options.cacheLength) {
cachedNotes.shift();
}
}

var handleTransmissionError = function(data) {
// Need to check message that errors
// return failed notification to owner
// purge writeBuffer and start again with cachedNotes
// perhaps we should keep an identifier with each writeBuffer
// entry to know where to stop when cycling through cachedNotes
if (data[0] == 8) {
self.socket.end();

if (typeof options.errorCallback != 'function') {
self.cachedNotes = [];
/*if (typeof options.errorCallback != 'function') {
cachedNotes = [];
return;
}

var currentCache = self.cachedNotes;
self.cachedNotes = [];
}*/
console.log("handling error: " + data[1]);
console.log("cache length: " + cachedNotes.length);

// This is an error condition
var errorCode = data[1];
var identifier = bytes2int(data.slice(2,6), 4);
var note = undefined;
while(currentCache.length) {
note = currentCache.shift();
console.log("identifier: " + identifier);
while(cachedNotes.length) {
note = cachedNotes.shift();
console.log("note uid: " + note['_uid']);
if(note['_uid'] == identifier) {
break;
}
}
console.log("cache length: " + cachedNotes.length);
// Notify callback of failed notification
if(typeof options.errorCallback == 'function') {
options.errorCallback(errorCode, note);
}
if(cachedNotes.length) {
var count = cachedNotes.length;
for(var i=0; i<count; i++) {
note = cachedNotes.shift();
console.log("Notification: " + note['_uid']);
self.sendNotification(note);
}
}
}
}
}
Expand Down Expand Up @@ -310,7 +345,7 @@ var Feedback = function (optionArgs) {
self.socket = tls.connect(options['port'], options['address'], socketOptions);
self.socket.pair.on('secure', function () { if(!self.socket.authorized) { throw self.socket.authorizationError } });
self.socket.on('data', function(data) { processData(data); });
self.socket.on('error', function(data) {self.socket.removeAllListeners(); self.socket = undefined; });
self.socket.once('error', function(data) {self.socket.removeAllListeners(); self.socket = undefined; });
self.socket.once('end', function () { });
self.socket.once('close', function () { self.socket.removeAllListeners(); self.socket = undefined; });
}
Expand Down

0 comments on commit 463d22c

Please sign in to comment.