Skip to content

Commit

Permalink
Fixed stream parsing error.
Browse files Browse the repository at this point in the history
  • Loading branch information
Trakkasure committed Sep 10, 2017
1 parent 4cc3a97 commit c9c4164
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 79 deletions.
2 changes: 0 additions & 2 deletions .gitignore
@@ -1,8 +1,6 @@
node_modules
dist
examples
test
.vscode
.jshintrc
jsconfig.json
webpack.config.babel.js
39 changes: 39 additions & 0 deletions examples/showLog.js
@@ -0,0 +1,39 @@

var MikroNode = require('../dist/mikronode.js');

// Create API link to host. No connection yet..
var device = new MikroNode('10.10.10.10');

// Debug level is "DEBUG"
// device.setDebug(MikroNode.DEBUG);

var removeId=[];
// Connect to the MikroTik device.
device.connect()
.then(([login])=>login('username','password'))
.then(function(conn) {

console.log("Connected")
// var channel=conn.openChannel('all_addresses');
// channel.closeOnDone(true); // only use this channel for one command.
var listener=conn.openChannel('address_changes');
listener.closeOnDone(true); // only use this channel for one command.

// channel.write('/ip/address/print');
listener.write('/log/listen');
// channel.write('/ip/firewall/filter/print');

listener.data.filter(d=>d.data[d.data.length-1].field!=='.dead').subscribe(d=>{
const data = MikroNode.resultsToObj(d.data.filter(col=>["time","topics","message"].indexOf(col.field)!=-1));
console.log("Log:",JSON.stringify(data));
});

// in 5 seconds, stop listening for address changes.
setTimeout(function() {
console.log("Closing out listener");
listener.write('/cancel'); /* cancel listen */
},500000);
}).catch(function(err) {
console.log("Failed to connect. ",err);
});

2 changes: 1 addition & 1 deletion package.json
@@ -1,7 +1,7 @@
{
"name": "mikronode",
"description": "Mikrotik API implemented in Node",
"version": "2.3.3",
"version": "2.3.4",
"author": "Brandon Myers <trakkasure@gmail.com>",
"scripts": {
"build": "webpack --color --progress",
Expand Down
54 changes: 30 additions & 24 deletions src/Util.js
Expand Up @@ -40,35 +40,41 @@ function encodeString(s,d) {
return data;
}

function decodeLength(data){ // Ported from the PHP API on the Wiki. Thanks
if (!data.length) return [[],0];
var idx=0;
var len=data.length;
var b=data[idx++];
if (b&128) {
if ((b&192)==128) {
len=((b&63)<<8)+data[idx++];
} else {
if ((b & 224) == 192) {
len = ((b & 31) << 8 ) + data[idx++];
len = (len << 8 ) + data[idx++];
function decodePacket(data){
if (!data.length) return [];
const buf=[];
let idx=0;
while (idx<data.length) {
let len;
let b=data[idx++];
if (b&128) { // Ported from the PHP API on the Wiki. Thanks
if ((b&192)==128) {
len=((b&63)<<8)+data[idx++];
} else {
if ((b & 240) == 224) {
len = ((b & 15) << 8 ) + data[idx++];
len = (len << 8 ) + data[idx++];
if ((b & 224) == 192) {
len = ((b & 31) << 8 ) + data[idx++];
len = (len << 8 ) + data[idx++];
} else {
len = data[idx++];
len = (len << 8 ) + data[idx++];
len = (len << 8 ) + data[idx++];
len = (len << 8 ) + data[idx++];
if ((b & 240) == 224) {
len = ((b & 15) << 8 ) + data[idx++];
len = (len << 8 ) + data[idx++];
len = (len << 8 ) + data[idx++];
} else {
len = data[idx++];
len = (len << 8 ) + data[idx++];
len = (len << 8 ) + data[idx++];
len = (len << 8 ) + data[idx++];
}
}
}
}
} else {
len=b;
} else {
len=b;
}
// console.log("Pushing ",idx,len,data.slice(idx,idx+len));
buf.push(data.slice(idx,idx+len).toString('utf8'));
idx+=len;
}
return [data.slice(idx),len];
return buf;
}
//hexDump=require('./hexdump');
function hexDump(data) {
Expand Down Expand Up @@ -191,4 +197,4 @@ function resultsToObj(r) {
if (!Array.isArray(r)) return {};
return r.reduce((p,f)=>{p[f.field]=f.value;return p},{});
}
export {hexDump, decodeLength, encodeString, objToAPIParams, resultsToObj,getUnwrappedPromise};
export {hexDump, decodePacket, encodeString, objToAPIParams, resultsToObj,getUnwrappedPromise};
90 changes: 38 additions & 52 deletions src/index.js
Expand Up @@ -7,7 +7,7 @@ import {autobind} from 'core-decorators';
import crypto from 'crypto';
import dns from 'dns';

import {hexDump, decodeLength, encodeString, objToAPIParams, resultsToObj, getUnwrappedPromise} from './Util.js';
import {hexDump, decodePacket, encodeString, objToAPIParams, resultsToObj, getUnwrappedPromise} from './Util.js';
import {STRING_TYPE, DEBUG, CONNECTION,CHANNEL,EVENT} from './constants.js';
import parser from './parser.js';

Expand Down Expand Up @@ -256,10 +256,30 @@ class SocketStream {

this.sentence$=new Subject();
// Each raw sentence from the stream passes through this parser.
let holdBuffer=[];
this.parsed$=this.sentence$
.do(d=>this.debug>=DEBUG.SILLY&&console.log("Data to parse:",JSON.stringify(d)))
.map(o=>o.map(x=>x.split("\r").join("\\r").split("\n").join("\\n")).join('\n')) // Make array string.
.do(d=>this.debug>=DEBUG.SILLY&&console.log("Data to parse:",d))
.map(d=>{var s=parser.parse(d);s.host=this.host;return s;})
.map(d=>{
if (holdBuffer.length) {
console.log("Hold buffer:",holdBuffer);
holdBuffer=[];
}
var s=parser.parse(d);
s.host=this.host;
return s;
})
.catch(e=>{
holdBuffer=[];
console.error("***************************************************************************");
console.error("***************************************************************************");
console.error("Error processing sentence:",e);
console.error("Skipping and continuing");
console.error("***************************************************************************");
console.error("***************************************************************************");
return this.parsed$;
})
.filter(e=>!!e)
.flatMap(d=>{
Object.keys(d).forEach(k=>{if(typeof d[k]==="string")d[k]=d[k].split("\\r").join("\r").split("\\n").join("\n")});
return Observable.from(d);
Expand All @@ -269,56 +289,22 @@ class SocketStream {
// When we receive data, it is pushed into the stream defined below.
this.data$=Observable.fromEvent(this.socket,'data');
// this is the stream reader/parser.
this.data$.scan((last,stream,i)=>{
let buff=Buffer.concat([last.b,stream]),
l=last.len,
o=last.o,
c,go;

this.debug>=DEBUG.DEBUG&&console.log("Packet received: ",last,stream);
// If the xpected length of lst process is zero, we expect to be told next buffer length.
if(!last.len) {
// Getting length;
this.debug>=DEBUG.SILLY&&console.log("Getting length");
[buff,l] = decodeLength(buff);
this.debug>=DEBUG.SILLY&&console.log("Length: ",l);
// We didn't get all of the data from this buffer. Wait for next packet.
if (buff.length<l) {
this.debug>=DEBUG.DEBUG&&console.log("Buffer shorter than expected data, waiting for next packet.",{b:buff,len:l,o:o});
return {b:buff,len:l,o:o};
}
}
go=buff.length>0;
this.debug>=DEBUG.SILLY&&console.log("Starting parse loop w/existing length ",l);
while(go) {
c = buff.slice(0,l).toString('utf8');
this.debug>=DEBUG.SILLY&&console.log("Extracted data: ",c);
// Push content as sentence piece.
o.push(c);
// If we detected end of sentence
if (buff[l]===0) {
// then post new sentence.
this.debug>=DEBUG.DEBUG&&console.log('Detected end of sentence, posting existing sentence',o);
this.sentence$.next(o);
// Reset sentence buffer.
l++;
o=[];
}
this.debug>=DEBUG.SILLY&&console.log("Getting length",buff.slice(l));
[buff,l] = decodeLength(buff.slice(l));
this.debug>=DEBUG.SILLY&&console.log("Length",l);
if (!l) {
this.debug>=DEBUG.DEBUG&&console.log('End of data, nothing left to process');
go=false;
return {b:Buffer.from([]),len:0,o:[]};
}
if (buff.length<l) {
this.debug>=DEBUG.DEBUG&&console.log("Buffer shorter than expected data, waiting for next packet.",{b:buff,len:l,o:o});
return {b:buff,len:l,o:o};
}
// My poor stream parser
this.data$.scan((/* @type Buffer */ last,/* @type Buffer */stream,i)=>{
let buff=Buffer.concat([last,stream]),end=0,idx=0,packet;
this.debug>=DEBUG.DEBUG&&console.log("Packet received: ",stream.toString().split('\u0000'));
this.debug>=DEBUG.DEBUG&&last.length>0&&console.log("Starting parse loop w/existing packet ",last.toString());

while(idx<buff.length&&(end = buff.indexOf("\u0000",idx,"utf8")) !== -1 ) {
this.debug>=DEBUG.SILLY&&console.log("Decoding: ",idx,end,buff.length,buff.slice(idx,end));
packet=decodePacket(buff.slice(idx,end));
idx=end+1;
this.debug>=DEBUG.SILLY&&console.log('Detected end of sentence, posting existing sentence',packet);
this.sentence$.next(packet);
}
},{b:Buffer.from([]),len:0,o:[]})
.subscribe(e=>this.debug>=DEBUG.DEBUG&&e.len&&console.log('Buffer leftover: ',e),closeSocket,closeSocket);
return idx>=buff.length?Buffer.alloc(0):buff.slice(idx,buff.length);
},Buffer.from([]))
.subscribe(e=>this.debug>=DEBUG.DEBUG&&e.length&&console.log('Buffer leftover: ',e),closeSocket,closeSocket);


this.socket.on('end',a => {
Expand Down

0 comments on commit c9c4164

Please sign in to comment.