Skip to content

Commit

Permalink
feat(mqtt): Stream map serialization to improve memory usage with lar…
Browse files Browse the repository at this point in the history
…ge maps
  • Loading branch information
Hypfer committed May 30, 2021
1 parent bc8feb9 commit 98b2757
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 5 deletions.
9 changes: 4 additions & 5 deletions backend/lib/mqtt/handles/MapNodeMqttHandle.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const Logger = require("../../Logger");
const NodeMqttHandle = require("./NodeMqttHandle");
const path = require("path");
const PropertyMqttHandle = require("./PropertyMqttHandle");
const zlib = require("zlib");
const {stringifyAndDeflate} = require("../../utils/streamHelpers");

class MapNodeMqttHandle extends NodeMqttHandle {
/**
Expand Down Expand Up @@ -157,10 +157,7 @@ class MapNodeMqttHandle extends NodeMqttHandle {
const robot = this.robot;

const promise = new Promise((resolve, reject) => {
zlib.deflate(JSON.stringify(robot.state.map), (err, buf) => {
if (err !== null) {
return reject(err);
}
stringifyAndDeflate(robot.state.map).then(buf => {
let payload;

if (mapHack) {
Expand Down Expand Up @@ -189,6 +186,8 @@ class MapNodeMqttHandle extends NodeMqttHandle {
}

resolve(payload);
}).catch(err => {
reject(err);
});
});

Expand Down
20 changes: 20 additions & 0 deletions backend/lib/utils/BufferWriteStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const stream = require("stream");

class BufferWriteStream extends stream.Writable {
constructor() {
super();

this.chunks = [];
}

_write(chunk, encoding, done) {
this.chunks.push(chunk);
done();
}

getBuffer() {
return Buffer.concat(this.chunks);
}
}

module.exports = BufferWriteStream;
37 changes: 37 additions & 0 deletions backend/lib/utils/streamHelpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const BufferWriteStream = require("./BufferWriteStream");
const stream = require("stream");
const zlib = require("zlib");
const {JsonStream} = require("jetsons");

function stringifyAndCompress(data, algorithm) {
return new Promise((resolve, reject) => {
const bufferStream = new BufferWriteStream();

stream.pipeline(
new JsonStream(data),
algorithm,
bufferStream,

(err) => {
if (err !== null && err !== undefined) {
reject(err);
} else {
resolve(bufferStream.getBuffer());
}
}
);
});
}

function stringifyAndDeflate(data) {
return stringifyAndCompress(data, zlib.createDeflate());
}

function stringifyAndGZip(data) {
return stringifyAndCompress(data, zlib.createGzip());
}

module.exports = {
stringifyAndDeflate: stringifyAndDeflate,
stringifyAndGZip: stringifyAndGZip
};
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"express-list-endpoints": "^5.0.0",
"express-rate-limit": "^5.2.6",
"expresse": "git+https://npm@github.com/Hypfer/expresse",
"jetsons": "^1.2.2",
"mqtt": "^4.0.0",
"multer": "^1.4.1",
"uuid": "^8.3.0",
Expand Down
36 changes: 36 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 98b2757

Please sign in to comment.