Skip to content
Permalink
Browse files

improve node with new ack mechanism. May break previous usage

  • Loading branch information...
JpEncausseVISEO committed Aug 1, 2019
1 parent 52f272d commit ec199f0a4bcde4927a061c29e6a55e08f523c1d8
Showing with 128 additions and 101 deletions.
  1. +126 −100 node-red-contrib-bot-socketio/node-socketio-server.js
  2. +2 −1 node-red-contrib-bot-socketio/package.json
@@ -1,165 +1,170 @@
const helper = require('node-red-viseo-helper')
const botmgr = require('node-red-viseo-bot-manager')
const CARRIER = "socketServer"
const uuidv4 = require('uuid/v4');
const CARRIER = "SocketIOServer"

// --------------------------------------------------------------------------
// NODE-RED
// --------------------------------------------------------------------------

let Server = require('socket.io');
let LISTENERS_END = {};
let LISTENERS_REPLY = {};
let SOCKETS = [];
let io;


module.exports = function(RED) {

RED.httpAdmin.post("/socketioserver/:id", RED.auth.needsPermission("inject.write"), function(req,res) {
var node = RED.nodes.getNode(req.params.id);
let node = RED.nodes.getNode(req.params.id);
if (node != null) {
try {
if (io) {
for (let socket of SOCKETS) socket.disconnect(true);
io.removeAllListeners('connection');
SOCKETS = [];
}
io = new Server(RED.server);
startIOServer(RED);
res.sendStatus(200);
} catch(err) {
res.sendStatus(500);
node.error("Error trying to restart the Socketio server");
node.error("[SocketIO Server] Error trying to restart the IO server");
}
} else {
res.sendStatus(404);
}
});

const ioServer = function (config) {
RED.nodes.createNode(this, config);
serverConfig = config;
var node = this;
this.name = config.name;
const registerConfig = function (config) {
let node = this;
RED.nodes.createNode(node, config);
node.name = config.name;

if (!io) {
io = new Server(RED.server);
console.log("[Socket server] Connection to Socket server...");
}

node.on('close', function() {
console.log("[Socket server] Socket server closed");
for (let socket of SOCKETS) socket.disconnect(true);
io.removeAllListeners('connection');
SOCKETS = [];
});
startIOServer(RED);
node.on('close', stopIOServer);
}

const webServer = function(config) {
var node = this;
const registerNode = function(config) {
let node = this;
RED.nodes.createNode(this, config);

startServer(node, config);
this.on('close', (done) => { stop(node, done) });
bindIOServer(node, config);
this.on('close', (done) => { removeListeners(node, done) });
}

RED.nodes.registerType("socketio-server-config", ioServer);
RED.nodes.registerType("socketio-server", webServer, { credentials: { secret: {type: "text"}} });
RED.nodes.registerType("socketio-server-config", registerConfig);
RED.nodes.registerType("socketio-server", registerNode, { credentials: { secret: {type: "text"}} });
}

// ------------------------------------------
// SocketIO
// ------------------------------------------

let CALLBACK_REPLY = {};
let CLIENTS = [];
let io;

const log = (msg, data) => {
console.log("[SocketIO Server] " + msg, data || '');
}

const startServer = (node, config) => {
const stopIOServer = () => {
if (!io) { return; }

log("Disconnect all sockets...");
for (let client of Object.values(CLIENTS)) client.socket.disconnect(true);
io.removeAllListeners('connection');
CLIENTS = {};
}

const startIOServer = (RED) => {
stopIOServer();
CLIENTS = {};
log("Starting WebSocket server...");
io = new Server(RED.server);
}

const bindIOServer = (node, config) => {
if (!io) { return log("WebSocket server not available..."); }

let namespace = config.namespace || 'assistant';
log("Bind WebSocket server on " + namespace);

io.on('connection', (socket) => {
let client = { "socket" : socket, replies : {} }
CLIENTS[client.socket.id] = client;

io.on('connection', function(socket){
msgs = [];
SOCKETS.push(socket);
socket.emit(namespace, { event: '[Socket server] Socket connected' });
socket.on(namespace, function(data){ receive(node, config, socket, data)});
socket.on('disconnect', function(reason) { console.log("[Socket server] Socket disconnected: ", reason) })
socket.on('error', function(error) { console.log("[Socket server] Socket error: ", error) })
socket.emit(namespace, { event: '[SocketIO Server] connected' });
socket.on(namespace, (data) => { receive(node, config, client, data)});
socket.on('disconnect', (reason) => { cleanClient(socket); log("Disconnected: ", reason); })
socket.on('error', (error) => { log("Error: ", error) })
});
io.on('connect_error', function (error) { console.log("[Socket server] Socket error: connect_error - ", error) })
io.on('connect_timeout', function (error) { console.log("[Socket server] Socket error: connect_timeout - ", error) })
io.on('reconnect_error', function (error) { console.log("[Socket server] Socket error: reconnect_error - ", error) })
io.on('reconnect_failed', function (error) { console.log("[Socket server] Socket error: reconnect_failed - ", error) })
io.on('connect_error', (error) => { log("Error: connect_error - ", error) })
io.on('connect_timeout', (error) => { log("Error: connect_timeout - ", error) })
io.on('reconnect_error', (error) => { log("Error: reconnect_error - ", error) })
io.on('reconnect_failed', (error) => { log("Error: reconnect_failed - ", error) })

// Add listener to reply
let listenerReply = LISTENERS_REPLY[node.id] = (srcNode, data, srcConfig) => { reply(node, data, config) }
helper.listenEvent('reply', listenerReply)

let listenerEnd = LISTENERS_END[node.id] = (node) => { endSound(node) }
helper.listenEvent('endSound', listenerEnd)
}

const stop = (node, done) => {
let callback = CALLBACK_REPLY[node.id] = (srcNode, data, srcConfig) => { reply(node, data, config); }
helper.listenEvent('reply', callback)
}

msgs = [];
let listenerReply = LISTENERS_REPLY[node.id]
helper.removeListener('reply', listenerReply)
const removeListeners = (node, next) => {
let callback = CALLBACK_REPLY[node.id]
helper.removeListener('reply', callback)
next();
}

let listenerEnd = LISTENERS_END[node.id]
helper.removeListener('endSound', listenerEnd)
done();
const cleanClient = (socket) => {
delete CLIENTS[socket.id]
}

// ------------------------------------------
// RECEIVE
// ------------------------------------------

function receive(node, config, socket, message) {
const receive = (node, config, client, message) => {
// Log activity
try { setTimeout(function() { helper.trackActivities(node)},0); }
catch(err) { console.log(err); }

message.socket = socket.id

// The UI can send message to acknowledge messages
if (message.type === 'ack'){
let replyId = message.data;
let data = client.replies[replyId];
if (data){
delete client.replies[replyId];
return helper.fireAsyncCallback(data);
}
}

// Bind ClientID to socket's data (if provided)
if (message._client_id){
client._client_id = message._client_id
}

message.socket = client.socket.id

let data = botmgr.buildMessageFlow({ "message" : message }, {
userId: 'message.socket',
userId: 'message._client_id',
convId: 'message.socket',
payload: 'message.content',
inputType: 'message.type',
source: CARRIER
})


if (data.message.type === "event") {
if (data.message.content === "cmd-next") {
return helper.emitEvent('endSound', node);
}
}
else {
// Handle Prompt
let convId = botmgr.getConvId(data)
if (botmgr.hasDelayedCallback(convId, data.message)) return;
}
// Handle Prompt
let convId = botmgr.getConvId(data)
if (botmgr.hasDelayedCallback(convId, data.message)) return;

// Trigger received message
helper.emitEvent('received', node, data, config);
node.send(data);
}

let msgs = [];
// ------------------------------------------
// END SOUND
// ------------------------------------------

const endSound = (node) => {
if (msgs.length < 1) return;

// Get the last message and send it in the flow
let data = msgs.shift();

helper.fireAsyncCallback(data);
helper.emitAsyncEvent('received', node, data, config, (data) => {
node.send(data);
});
}

// ------------------------------------------
// REPLY
// ------------------------------------------

const reply = (node, data, config) => {
const reply = (node, data, config) => {

let namespace = config.namespace || 'assistant';

try {

let address = botmgr.getUserAddress(data)
if (!address || address.carrier !== CARRIER) return false;

@@ -168,15 +173,38 @@ const reply = (node, data, config) => {
if (!message) return false;

// Emit the message
let convId = botmgr.getConvId(data);
io.to(convId).emit(namespace, { message: message });
let socket = botmgr.getConvId(data);
let client = CLIENTS[socket];
if (!client){
let userId = data.user.id
if (!userId){ return node.warn('Client SocketId ' + socket + ' not found ')}

// Find the first client matching given userID has a fallback
for (let sock of Object.keys(CLIENTS)){
let c = CLIENTS[sock]
if (c._client_id === userId){
client = c;
break;
}
}

// Keep it to send it in the flow later
msgs.push(data)
if (!client){
node.warn('Client SocketId ' + socket + ' not found for userId '+ userId)
return;
}
}

// Store a replyId x data to a given Socket
// and wait client acknowledge message
// to call: helper.fireAsyncCallback(data);
let replyId = uuidv4();
client.replies[replyId] = data;
client.socket.emit(namespace, { message, replyId });

} catch(ex){ node.warn(ex) }
}


// ------------------------------------------
// MESSAGES
// https://github.com/api-ai/fulfillment-webhook-nodejs/blob/master/functions/index.js
@@ -207,6 +235,4 @@ const getMessage = exports.getMessage = (replies) => {
}

return replies[0];
}


}
@@ -1,9 +1,10 @@
{
"name" : "node-red-contrib-viseo-bot-socketio",
"version" : "0.1.0",
"version" : "0.2.0",
"description" : "VISEO Bot Maker - SocketIO channel connector",
"dependencies" : {
"socket.io" : "~2.1.0",
"uuid": "^3.3.2",
"node-red-viseo-helper" : "~0.3.0",
"node-red-viseo-bot-manager" : "~0.1.0"
},

0 comments on commit ec199f0

Please sign in to comment.
You can’t perform that action at this time.