diff --git a/.gitignore b/.gitignore index c48503f..1fd6e1b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,6 @@ node_modules dist -examples test .vscode .jshintrc jsconfig.json -webpack.config.babel.js \ No newline at end of file diff --git a/examples/showLog.js b/examples/showLog.js new file mode 100644 index 0000000..7c54ec7 --- /dev/null +++ b/examples/showLog.js @@ -0,0 +1,39 @@ + +var MikroNode = require('../dist/mikronode.js'); + +// Create API link to host. No connection yet.. +var device = new MikroNode('10.10.10.10'); + +// Debug level is "DEBUG" +// device.setDebug(MikroNode.DEBUG); + +var removeId=[]; +// Connect to the MikroTik device. +device.connect() + .then(([login])=>login('username','password')) + .then(function(conn) { + + console.log("Connected") + // var channel=conn.openChannel('all_addresses'); + // channel.closeOnDone(true); // only use this channel for one command. + var listener=conn.openChannel('address_changes'); + listener.closeOnDone(true); // only use this channel for one command. + + // channel.write('/ip/address/print'); + listener.write('/log/listen'); + // channel.write('/ip/firewall/filter/print'); + + listener.data.filter(d=>d.data[d.data.length-1].field!=='.dead').subscribe(d=>{ + const data = MikroNode.resultsToObj(d.data.filter(col=>["time","topics","message"].indexOf(col.field)!=-1)); + console.log("Log:",JSON.stringify(data)); + }); + + // in 5 seconds, stop listening for address changes. + setTimeout(function() { + console.log("Closing out listener"); + listener.write('/cancel'); /* cancel listen */ + },500000); +}).catch(function(err) { + console.log("Failed to connect. ",err); +}); + diff --git a/package.json b/package.json index 13de99b..805e463 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "mikronode", "description": "Mikrotik API implemented in Node", - "version": "2.3.3", + "version": "2.3.4", "author": "Brandon Myers ", "scripts": { "build": "webpack --color --progress", diff --git a/src/Util.js b/src/Util.js index 7d443af..f95f85b 100644 --- a/src/Util.js +++ b/src/Util.js @@ -40,35 +40,41 @@ function encodeString(s,d) { return data; } -function decodeLength(data){ // Ported from the PHP API on the Wiki. Thanks - if (!data.length) return [[],0]; - var idx=0; - var len=data.length; - var b=data[idx++]; - if (b&128) { - if ((b&192)==128) { - len=((b&63)<<8)+data[idx++]; - } else { - if ((b & 224) == 192) { - len = ((b & 31) << 8 ) + data[idx++]; - len = (len << 8 ) + data[idx++]; +function decodePacket(data){ + if (!data.length) return []; + const buf=[]; + let idx=0; + while (idx{p[f.field]=f.value;return p},{}); } -export {hexDump, decodeLength, encodeString, objToAPIParams, resultsToObj,getUnwrappedPromise}; \ No newline at end of file +export {hexDump, decodePacket, encodeString, objToAPIParams, resultsToObj,getUnwrappedPromise}; \ No newline at end of file diff --git a/src/index.js b/src/index.js index 5c725fa..e46a416 100644 --- a/src/index.js +++ b/src/index.js @@ -7,7 +7,7 @@ import {autobind} from 'core-decorators'; import crypto from 'crypto'; import dns from 'dns'; -import {hexDump, decodeLength, encodeString, objToAPIParams, resultsToObj, getUnwrappedPromise} from './Util.js'; +import {hexDump, decodePacket, encodeString, objToAPIParams, resultsToObj, getUnwrappedPromise} from './Util.js'; import {STRING_TYPE, DEBUG, CONNECTION,CHANNEL,EVENT} from './constants.js'; import parser from './parser.js'; @@ -256,10 +256,30 @@ class SocketStream { this.sentence$=new Subject(); // Each raw sentence from the stream passes through this parser. + let holdBuffer=[]; this.parsed$=this.sentence$ + .do(d=>this.debug>=DEBUG.SILLY&&console.log("Data to parse:",JSON.stringify(d))) .map(o=>o.map(x=>x.split("\r").join("\\r").split("\n").join("\\n")).join('\n')) // Make array string. - .do(d=>this.debug>=DEBUG.SILLY&&console.log("Data to parse:",d)) - .map(d=>{var s=parser.parse(d);s.host=this.host;return s;}) + .map(d=>{ + if (holdBuffer.length) { + console.log("Hold buffer:",holdBuffer); + holdBuffer=[]; + } + var s=parser.parse(d); + s.host=this.host; + return s; + }) + .catch(e=>{ + holdBuffer=[]; + console.error("***************************************************************************"); + console.error("***************************************************************************"); + console.error("Error processing sentence:",e); + console.error("Skipping and continuing"); + console.error("***************************************************************************"); + console.error("***************************************************************************"); + return this.parsed$; + }) + .filter(e=>!!e) .flatMap(d=>{ Object.keys(d).forEach(k=>{if(typeof d[k]==="string")d[k]=d[k].split("\\r").join("\r").split("\\n").join("\n")}); return Observable.from(d); @@ -269,56 +289,22 @@ class SocketStream { // When we receive data, it is pushed into the stream defined below. this.data$=Observable.fromEvent(this.socket,'data'); // this is the stream reader/parser. - this.data$.scan((last,stream,i)=>{ - let buff=Buffer.concat([last.b,stream]), - l=last.len, - o=last.o, - c,go; - - this.debug>=DEBUG.DEBUG&&console.log("Packet received: ",last,stream); - // If the xpected length of lst process is zero, we expect to be told next buffer length. - if(!last.len) { - // Getting length; - this.debug>=DEBUG.SILLY&&console.log("Getting length"); - [buff,l] = decodeLength(buff); - this.debug>=DEBUG.SILLY&&console.log("Length: ",l); - // We didn't get all of the data from this buffer. Wait for next packet. - if (buff.length=DEBUG.DEBUG&&console.log("Buffer shorter than expected data, waiting for next packet.",{b:buff,len:l,o:o}); - return {b:buff,len:l,o:o}; - } - } - go=buff.length>0; - this.debug>=DEBUG.SILLY&&console.log("Starting parse loop w/existing length ",l); - while(go) { - c = buff.slice(0,l).toString('utf8'); - this.debug>=DEBUG.SILLY&&console.log("Extracted data: ",c); - // Push content as sentence piece. - o.push(c); - // If we detected end of sentence - if (buff[l]===0) { - // then post new sentence. - this.debug>=DEBUG.DEBUG&&console.log('Detected end of sentence, posting existing sentence',o); - this.sentence$.next(o); - // Reset sentence buffer. - l++; - o=[]; - } - this.debug>=DEBUG.SILLY&&console.log("Getting length",buff.slice(l)); - [buff,l] = decodeLength(buff.slice(l)); - this.debug>=DEBUG.SILLY&&console.log("Length",l); - if (!l) { - this.debug>=DEBUG.DEBUG&&console.log('End of data, nothing left to process'); - go=false; - return {b:Buffer.from([]),len:0,o:[]}; - } - if (buff.length=DEBUG.DEBUG&&console.log("Buffer shorter than expected data, waiting for next packet.",{b:buff,len:l,o:o}); - return {b:buff,len:l,o:o}; - } + // My poor stream parser + this.data$.scan((/* @type Buffer */ last,/* @type Buffer */stream,i)=>{ + let buff=Buffer.concat([last,stream]),end=0,idx=0,packet; + this.debug>=DEBUG.DEBUG&&console.log("Packet received: ",stream.toString().split('\u0000')); + this.debug>=DEBUG.DEBUG&&last.length>0&&console.log("Starting parse loop w/existing packet ",last.toString()); + + while(idx=DEBUG.SILLY&&console.log("Decoding: ",idx,end,buff.length,buff.slice(idx,end)); + packet=decodePacket(buff.slice(idx,end)); + idx=end+1; + this.debug>=DEBUG.SILLY&&console.log('Detected end of sentence, posting existing sentence',packet); + this.sentence$.next(packet); } - },{b:Buffer.from([]),len:0,o:[]}) - .subscribe(e=>this.debug>=DEBUG.DEBUG&&e.len&&console.log('Buffer leftover: ',e),closeSocket,closeSocket); + return idx>=buff.length?Buffer.alloc(0):buff.slice(idx,buff.length); + },Buffer.from([])) + .subscribe(e=>this.debug>=DEBUG.DEBUG&&e.length&&console.log('Buffer leftover: ',e),closeSocket,closeSocket); this.socket.on('end',a => {