Skip to content

Commit

Permalink
say and rooms updated for sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed May 14, 2012
1 parent 77a1a7c commit c7ee116
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 76 deletions.
2 changes: 1 addition & 1 deletion actions/say.js
Expand Up @@ -33,7 +33,7 @@ action.run = function(api, connection, next){
connection.public = {id: connection.id };

// say it!
api.socketServer.socketRoomBroadcast(api, connection, message, true);
api.socketServer.socketRoomBroadcast(api, connection, message);
api.socketServer.socketRoomStatus(api, room, function(status){
connection.response.roomStatus = status;
next(connection, true);
Expand Down
42 changes: 37 additions & 5 deletions initializers/initRedis.js
@@ -1,18 +1,24 @@
////////////////////////////////////////////////////////////////////////////
// Connnect to Redis and setup channels


/*
Queues and what they do:
Data and what they do:
- actionHero::peers [] a list of all the peers in the action cluster. New members add themselves to it
- actionHero::tasks [] a list of tasks to be completed. Any memeber can push to the queue; all workers will pull one at a time from the queue
- actionHero::cache [] the common shared cache object
- actionHero::stats [] the common shared stats object
- actionHero::roomMembers-{roomName} [] a list of the folks in a given socket room
Channels and what they do:
- actionHero::say a channel for saying stuff to everyone
- actionHero::tasks a channel for saying stuff to everyone
*/

var c = {};

var initRedis = function(api, next)
{
var c = api.configData.redis;
c = api.configData.redis;
api.redis = {};
api.redis.enable = c.enable;
if(c.enable == true){
Expand All @@ -36,8 +42,34 @@ var init = function(api, next){
// add myself to the list
api.redis.client.lrem("actionHero::peers", 1, api.id, function(){ // remove me if I already exist
api.redis.client.rpush("actionHero::peers", api.id, function(){
api.log("connected to redis @ "+api.configData.redis.host+":"+api.configData.redis.port);
next();

// set up say pub/sub listeners
api.redis.clientSubscriber = api.redisPackage.createClient(c.port, c.host, c.options);
api.redis.clientSubscriber.on("connect", function (err) {
if(c.password != null){ api.redis.client.auth(c.password); }

api.redis.clientSubscriber.subscribe("actionHero::say");
api.redis.clientSubscriber.subscribe("actionHero::tasks");
api.redis.clientSubscriber.on("message", function(channel, message){
message = JSON.parse(message);
if(channel === "actionHero::say"){
api.socketServer.socketRoomBroadcast(api, message.connection, message.message, true);
}else if(channel === "actionHero::tasks"){

}else{
api.log("message from unknown channel ("+channel+"): "+message, "red");
}
});

// complete
api.log("connected to redis @ "+c.host+":"+c.port);
next();
});

api.redis.clientSubscriber.on("error", function (err) {
api.log("Redis Error: " + err, ["red", "bold"]);
process.exit(); // redis is really important...
});
});
});
}
Expand Down
160 changes: 91 additions & 69 deletions initializers/initSocketServer.js
Expand Up @@ -6,6 +6,12 @@ var initSocketServer = function(api, next){
api.socketServer.connections = [];
api.socketServer.socketDataString = "";
api.socketServer.numberOfSocketRequests = 0;

if(api.redis.enable === false){
api.socketServer.rooms = {};
}else{
api.socketServer.redisRoomPrefix = "actionHero::roomMembers::";
}

////////////////////////////////////////////////////////////////////////////
// server
Expand All @@ -31,7 +37,7 @@ var initSocketServer = function(api, next){
api.stats.incrament(api, "numberOfActiveSocketClients");
api.socketServer.sendSocketMessage(connection, {welcome: api.configData.socketServerWelcomeMessage, room: connection.room, context: "api"});
api.log("socket connection "+connection.remoteIP+" | connected");
api.socketServer.calculateRoomStatus(api, false);
api.socketServer.roomAddMember(api, connection);
});

connection.on("data", function (chunk) {
Expand Down Expand Up @@ -74,10 +80,12 @@ var initSocketServer = function(api, next){
api.socketServer.sendSocketMessage(connection, {context: "response", status: "OK"});
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+line, "grey");}
}else if(words[0] == "roomChange"){
connection.room = words[1];
api.socketServer.calculateRoomStatus(api, false);
api.socketServer.sendSocketMessage(connection, {context: "response", status: "OK", room: connection.room});
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+line, "grey");}
api.socketServer.roomRemoveMember(api, connection, function(){
connection.room = words[1];
api.socketServer.roomAddMember(api, connection);
api.socketServer.sendSocketMessage(connection, {context: "response", status: "OK", room: connection.room});
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+line, "grey");}
});
}else if(words[0] == "roomView"){
api.socketServer.socketRoomStatus(api, connection.room, function(roomStatus){
api.socketServer.sendSocketMessage(connection, {context: "response", status: "OK", room: connection.room, roomStatus: roomStatus});
Expand All @@ -88,9 +96,6 @@ var initSocketServer = function(api, next){
api.socketServer.socketRoomBroadcast(api, connection, message);
api.socketServer.sendSocketMessage(connection, {context: "response", status: "OK"});
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+line, "grey");}
}else if(words[0] == "actionCluster"){
var message = line.substr(14);
api.actionCluster.parseMessage(api, connection, message);
}else{
connection.error = false;
connection.response = {};
Expand All @@ -104,13 +109,16 @@ var initSocketServer = function(api, next){
});

connection.on("end", function () {
api.stats.incrament(api, "numberOfActiveSocketClients", -1);
for(var i in api.socketServer.connections){
if(api.socketServer.connections[i].id == connection.id){ api.socketServer.connections.splice(i,1); }
}
try{ connection.end(); }catch(e){}
api.socketServer.calculateRoomStatus(api, false);
if(api.configData.logRequests){api.log(" > socket connection " + connection.remoteIP + " disconnected", "white");}
api.socketServer.roomRemoveMember(api, connection, function(){
api.stats.incrament(api, "numberOfActiveSocketClients", -1);
for(var i in api.socketServer.connections){
if(api.socketServer.connections[i].id == connection.id){ api.socketServer.connections.splice(i,1); }
}
try{ connection.end(); }catch(e){
//
}
if(api.configData.logRequests){api.log(" > socket connection " + connection.remoteIP + " disconnected", "white");}
});
});

connection.on("error", function(e){
Expand All @@ -121,29 +129,29 @@ var initSocketServer = function(api, next){

////////////////////////////////////////////////////////////////////////////
// broadcast a message to all connections in a room
api.socketServer.socketRoomBroadcast = function(api, connection, message, clusterRelay){
if(clusterRelay == null){clusterRelay = true;}
if(clusterRelay){
api.actionCluster.sendToAllPeers({action: "broadcast", connection: {
type: connection.type,
params: connection.params,
remoteIP: connection.remoteIP,
room: connection.room,
public: connection.public,
messageCount: connection.messageCount,
id: connection.id
}, message: message});
api.socketServer.socketRoomBroadcast = function(api, connection, message, fromQueue){
if(fromQueue == null){fromQueue = false;}
if(api.redis.enable === true && fromQueue == false){
var payload = {
message: message,
connection: {
room: connection.room,
public: {
id: connection.public.id
}
}
};
api.redis.client.publish("actionHero::say", JSON.stringify(payload));
}

if(clusterRelay == false || api.utils.hashLength(api.actionCluster.peers) == 0){
else{
for(var i in api.socketServer.connections){
var thisConnection = api.socketServer.connections[i];
if(thisConnection.room == connection.room && ( connection.type != "actionCluster" )){
if(thisConnection.room == connection.room){
if(connection == null){
api.socketServer.sendSocketMessage(thisConnection, {message: message, from: api.configData.serverName, context: "user"});
}else{
if(thisConnection.id != connection.id){
api.socketServer.sendSocketMessage(thisConnection, {message: message, from: connection.id, context: "user"});
if(thisConnection.public.id != connection.public.id){
api.socketServer.sendSocketMessage(thisConnection, {message: message, from: connection.public.id, context: "user"});
}
}
}
Expand All @@ -154,51 +162,65 @@ var initSocketServer = function(api, next){
////////////////////////////////////////////////////////////////////////////
// status for a room
api.socketServer.socketRoomStatus = function(api, room, next){
if(api.utils.hashLength(api.actionCluster.peers) == 0){
api.cache.load(api, "_roomStatus", function(resp){
next(resp.rooms[room]);
if(api.redis.enable === true){
var key = api.socketServer.redisRoomPrefix + room;
api.redis.client.llen(key, function(err, length){
api.redis.client.lrange(key, 0, length, function(err, members){
next({
members: members,
membersCount: length
});
});
});
}else{
api.actionCluster.cache.load(api, "_roomStatus", function(resp){
var returnVal = {};
returnVal.membersCount = 0
returnVal.members = [];
for(var i in resp){
for(var j in resp[i]["value"]["rooms"]){
if(j == room){
for(var z in resp[i]["value"]["rooms"][j]["members"]){
returnVal.membersCount++;
returnVal.members.push(resp[i]["value"]["rooms"][j]["members"][z]);
}
}
}
}
next(returnVal);
next({
members: api.socketServer.rooms[room],
membersCount: api.socketServer.rooms[room].length
});
}
}
////////////////////////////////////////////////////////////////////////////
// room status
api.socketServer.calculateRoomStatus = function(api, loop){
if(loop == null){loop = true;}
results = {};
results.rooms = {};
for(var i in api.socketServer.connections){
var thisConnection = api.socketServer.connections[i];
var thisRoom = thisConnection.room;
if(results.rooms[thisRoom] == null){
results.rooms[thisRoom] = {members: [], membersCount: 0};

api.socketServer.roomAddMember = function(api, connection, next){
var room = connection.room;
var name = connection.public.id;
if(api.redis.enable === true){
var key = api.socketServer.redisRoomPrefix + connection.room;
api.redis.client.rpush(key, name, function(){
if(typeof next == "function"){ next(true) }
});
}else{
if(api.socketServer.rooms[room] == null){
api.socketServer.rooms[room] = [];
}
results.rooms[thisRoom].membersCount++;
results.rooms[thisRoom].members.push(thisConnection.public);
api.socketServer.rooms[room].push(name);
if(typeof next == "function"){ next(true) }
}
}

api.socketServer.roomRemoveMember = function(api, connection, next){
var room = connection.room;
var name = connection.public.id;
if(api.redis.enable === true){
var key = api.socketServer.redisRoomPrefix + connection.room;
api.redis.client.lrem(key, 1, name, function(){
if(typeof next == "function"){ next(true) }
});
}else{
for(var i in api.socketServer.rooms){
if(i == room){
var rList = api.socketServer.rooms[i];
for(var j in rList){
if(rList[j] == name){
rList.splice(j,1);
break;
}
}
break;
}
}
if(typeof next == "function"){ next(true) }
}
var expireTimeSeconds = 60*60; // 1 hour
api.cache.save(api,"_roomStatus",results,expireTimeSeconds,function(){
if(loop){ setTimeout(api.socketServer.calculateRoomStatus, 5000, api); }
});
}
api.socketServer.calculateRoomStatus(api, true);

////////////////////////////////////////////////////////////////////////////
// action response helper
Expand Down
2 changes: 1 addition & 1 deletion tasks/cleanOldCacheObjects.js
Expand Up @@ -5,7 +5,7 @@ var task = {};
task.name = "cleanOldCacheObjects";
task.description = "I will clean out expired cache objects";
task.scope = "all";
task.frequency = 10000;
task.frequency = 60000;

/////////////////////////////////////////////////////////////////////
// functional
Expand Down

0 comments on commit c7ee116

Please sign in to comment.