diff --git a/.husky/pre-push b/.husky/pre-push new file mode 100755 index 0000000..84766dc --- /dev/null +++ b/.husky/pre-push @@ -0,0 +1,13 @@ +#!/bin/bash + +POSTMAN_FILE="postman.json" + +if [ -f "$POSTMAN_FILE" ]; then + # Check if any "src" value is not empty + if grep -Po '"src"\s*:\s*"\K[^"]+' "$POSTMAN_FILE" | grep -qv '^$'; then + echo "$POSTMAN_FILE contains personal src paths" >&2 + exit 1 + fi +fi + +exit 0 \ No newline at end of file diff --git a/components/endpoints/class.command.js b/components/endpoints/class.command.js index f888abb..f650fa2 100644 --- a/components/endpoints/class.command.js +++ b/components/endpoints/class.command.js @@ -11,6 +11,56 @@ const { parentPort, isMainThread } = require("worker_threads"); const { commands } = require("../../system/worker/shared.js"); const { randomUUID } = require("crypto"); + +// check if a passed callback uses old +// command arugments, or the new signature +// see #504 - "change command handler function arguments" +function compatWrapper(fn, { logger }) { + return function (...args) { + + // args[0] = cmd + // args[1] = iface + // args[2] = params + // args[3] = timer/cb + + // this, provokes "Cannot set headers after they are sent to the client" + // see issue #528 + //let { logger } = Command.scope; + + if (fn.length === 4) { + + // print a deprecation notice (only in dev mode) + // for production this would be to verbose + if (process.env.NODE_ENV === "development") { + + let msg = "Command handler signature deprecated!\n"; + msg += "`(cmd, iface, params, done) => {}` will be removed in further versions.\n"; + msg += "Use `(obj, done) => {}` instead. See: https://github.com/OpenHausIO/backend/issues/504#issuecomment-2922734270"; + + logger.warn(msg); + + } + + // old signature/arguments + return fn(...args); + + } else if (fn.length === 2) { + + // new signature/arguments accepted + return fn({ + params: args[2] + }, args[3]); + + } else { + + return fn(...args); + + } + + }; +} + + /** * @description * Single command @@ -246,7 +296,7 @@ module.exports = class Command { // when worker = handler like default beaufer let { events, logger } = Command.scope; - let wrapper = () => { + let wrapper = (abort = null) => { // feedback logger.verbose(`Trigger command "${this.name}"`, this); @@ -261,15 +311,12 @@ module.exports = class Command { cb = () => { }; } - let worker = this.#privates.get("handler"); - let iface = interfaces.get(this.interface); - // moved up, and used as callback debounce function // see #528, timeout helper has a internal "called" flag let timer = _timeout(this.#privates.get("timeout"), (timedout, duration, args) => { if (timedout) { - logger.warn(`Command timedout for "${this._id}"! Execution was not successful, worker function:`, worker); + logger.warn(`Command timedout for "${this._id}"! Execution was not successful, worker function:`); cb(null, false); } else { @@ -280,6 +327,10 @@ module.exports = class Command { } }); + if (abort) { + timer(abort, false); + } + try { params = params.map((obj) => { @@ -303,21 +354,35 @@ module.exports = class Command { } - // convert to params array with .lean method - params = new Params(...params); + try { - if (!iface) { - let err = new Error(`Interface "${this.interface}" not found, cant write to it.`); - err.code = "NO_INTERFACE"; - return timer(err, false); - } + // convert to params array with .lean method + params = new Params(...params); + + let handler = this.#privates.get("handler"); + let iface = interfaces.get(this.interface); + let worker = compatWrapper(handler, Command.scope); + + if (!iface) { + let err = new Error(`Interface "${this.interface}" not found, cant write to it.`); + err.code = "NO_INTERFACE"; + return timer(err, false); + } + + // emit command event, see #529 + events.emit("command", this, params); - // emit command event, see #529 - events.emit("command", this, params); + // handle timeout stuff here? + // when so, timeout applys to custom functions too! + worker.call(this, this, iface, params, timer); - // handle timeout stuff here? - // when so, timeout applys to custom functions too! - worker.call(this, this, iface, params, timer); + } catch (err) { + + logger.warn(err, "Error catched in worker function"); + + timer(err, false); + + } }; @@ -355,8 +420,12 @@ module.exports = class Command { } else { + let err = new Error("No command handler registered"); + err.code = "NO_HANDLER"; + // feedback - logger.warn("No command hanlder registered"); + logger.warn(err.message); + wrapper(err); } diff --git a/components/mqtt/message-handler.js b/components/mqtt/message-handler.js index bf22ee9..a4a3f5a 100644 --- a/components/mqtt/message-handler.js +++ b/components/mqtt/message-handler.js @@ -5,6 +5,7 @@ const VERSION = Number(process.env.MQTT_BROKER_VERSION); const exitCodes = require("./exit-codes.js")(VERSION); const _debounce = require("../../helper/debounce.js"); +const { isMainThread } = require("worker_threads"); const parser = mqtt.parser({ protocolVersion: VERSION @@ -33,6 +34,31 @@ module.exports = (scope) => { }, 100); + let publisher = (item) => { + return (payload, options = {}) => { + + // feedback + logger.verbose(`Publish on topic ${item.topic}`, payload); + + let pub = mqtt.generate({ + cmd: "publish", + messageId: crypto.randomInt(0, 65535), + qos: 0, + dup: false, + topic: item.topic, + payload: Buffer.from(payload), + retain: false, + ...options + }); + + // helper event for publishing + // listener registerd in router.api.mqtt.js + events.emit("transmit", pub, item, payload); + + }; + }; + + // listen for published topics // call publish handler on each mqtt item events.on("publish", (packet) => { @@ -66,13 +92,20 @@ module.exports = (scope) => { } */ + // convert Uint8Array serilaized array back + if (!isMainThread) { + packet.payload = Buffer.from(packet.payload); + } + timestamps.published = Date.now(); _subscriber.forEach((cb) => { cb(packet.payload, packet); }); - process.nextTick(updater, item); + if (isMainThread) { + process.nextTick(updater, item); + } } }); @@ -80,156 +113,135 @@ module.exports = (scope) => { }); - // "connected" fires every time a websocket connection is made, e.g. from a connector. - // So we need to react and create a new connection to the browker - events.on("connected", async (ws) => { - try { + events.on("add", (item) => { + item._publisher = publisher(item); + }); - // connecto to broker - // TODO: Add here credentials for authentication - await new Promise((resolve, reject) => { + items.forEach((item) => { + item._publisher = publisher(item); + }); - logger.verbose("Connect to broker..."); - let data = mqtt.generate({ - cmd: "connect", - protocolId: "MQTT", - protocolVersion: VERSION, - clean: true, - clientId: process.env.MQTT_CLIENT_ID, - keepalive: 10, - }); + // "connected" fires every time a websocket connection is made, e.g. from a connector. + // So we need to react and create a new connection to the browker + if (isMainThread) { + events.on("connected", async (ws) => { + try { + + // connecto to broker + // TODO: Add here credentials for authentication + await new Promise((resolve, reject) => { + + logger.debug("Connect to broker..."); + + let data = mqtt.generate({ + cmd: "connect", + protocolId: "MQTT", + protocolVersion: VERSION, + clean: true, + clientId: process.env.MQTT_CLIENT_ID, + keepalive: 10 + }); - ws.send(data, () => { - parser.once("packet", ({ cmd, returnCode }) => { - if (cmd === "connack" && returnCode === 0) { + ws.send(data, () => { + parser.once("packet", ({ cmd, returnCode }) => { + if (cmd === "connack" && returnCode === 0) { - // feedback - logger.debug("Connected to broker"); + // feedback + logger.info("Connected to broker"); - resolve(); + resolve(); - } else { - reject(new Error(exitCodes[returnCode])); - } + } else { + reject(new Error(exitCodes[returnCode])); + } + }); }); - }); - }); - - - // subscribe to # topic - // so we can handle all topics - await new Promise((resolve, reject) => { - - // feedback - logger.verbose("[MQTT] Subscribe to # topic..."); - - let data = mqtt.generate({ - cmd: "subscribe", - messageId: crypto.randomInt(0, 65535), - subscriptions: [{ - topic: "#", - //topic: "#", - qos: 0, - nl: false, // no Local MQTT 5.0 flag - rap: true, // Retain as Published MQTT 5.0 flag - rh: 1 // Retain Handling MQTT 5.0 - }] }); - ws.send(data, () => { - parser.once("packet", ({ cmd, granted }) => { - if (cmd === "suback" && granted[0] === 0) { - // feedback - logger.debug("Subscribed to # topic"); + // subscribe to # topic + // so we can handle all topics + await new Promise((resolve, reject) => { - resolve(); + // feedback + logger.verbose("[MQTT] Subscribe to # topic..."); - } else { - reject(new Error("Subscription not granted")); - } + let data = mqtt.generate({ + cmd: "subscribe", + messageId: crypto.randomInt(0, 65535), + subscriptions: [{ + topic: "#", + //topic: "#", + qos: 0, + nl: false, // no Local MQTT 5.0 flag + rap: true, // Retain as Published MQTT 5.0 flag + rh: 1 // Retain Handling MQTT 5.0 + }] }); - }); - - }); - - - // monkey patch/override publisher function - // on mqtt item topics with current connection - await new Promise((resolve) => { - - items.forEach((item) => { - item._publisher = (payload, options = {}) => { - - // feedback - logger.verbose(`Publish on topic ${item.topic}`, payload); - let pub = mqtt.generate({ - cmd: "publish", - messageId: crypto.randomInt(0, 65535), - qos: 0, - dup: false, - topic: item.topic, - payload: Buffer.from(payload), - retain: false, - ...options - }); + ws.send(data, () => { + parser.once("packet", ({ cmd, granted }) => { + if (cmd === "suback" && granted[0] === 0) { - ws.send(pub, () => { + // feedback + logger.debug("Subscribed to # topic"); - // feedback - // log self send published messages for debugging - logger.trace(`Send publish: ${item.topic}=${payload}`); + resolve(); + } else { + reject(new Error("Subscription not granted")); + } }); + }); - }; }); - resolve(); - }); + // listen for publishes and other packates + // send ping requests regulary to broker + await new Promise((resolve) => { + let ping = mqtt.generate({ + cmd: "pingreq" + }); - // listen for publishes and other packates - // send ping requests regulary to broker - await new Promise((resolve) => { - - let ping = mqtt.generate({ - cmd: "pingreq" - }); - - interval = setInterval(() => { - ws.send(ping); - }, Number(process.env.MQTT_PING_INTERVAL)); + // TODO: store interval in array/set + // multiple connections = multiple pings + // TODO: unref timer + interval = setInterval(() => { + ws.send(ping); + }, Number(process.env.MQTT_PING_INTERVAL)); - resolve(); + resolve(); - }); + }); - } catch (err) { + } catch (err) { - // feedback - logger.error(err, "Could not setup MQTT broker handling"); + // feedback + logger.error(err, "Could not setup MQTT broker handling"); - } - }); + } + }); + } // listen for disconnects from the connector // clear the ping requests intveral, cant reach broker - events.once("disconnected", () => { + events.on("disconnected", () => { + logger.warn("Disconnected from broker"); clearInterval(interval); }); // re-emit events on component scope/events - parser.on("packet", (packet) => { - scope.events.emit(packet.cmd, packet); - }); - + if (isMainThread) { + parser.on("packet", (packet) => { + scope.events.emit(packet.cmd, packet); + }); + } // handle messages from the websockt as mqtt packets events.on("message", (msg) => { diff --git a/components/plugins/class.plugin.js b/components/plugins/class.plugin.js index 2c604fd..74081ea 100644 --- a/components/plugins/class.plugin.js +++ b/components/plugins/class.plugin.js @@ -11,6 +11,8 @@ const stdio = require("./stdout-wrapper.js"); const Item = require("../../system/component/class.item.js"); const { connections, commands } = require("../../system/worker/shared.js"); +const { createServer, ServerResponse } = require("http"); +const os = require("os"); //const Bootstrap = require("./class.bootstrap.js"); @@ -61,8 +63,17 @@ module.exports = class Plugin extends Item { writable: true }); + Object.defineProperty(this, Plugin.symbolkServer, { + value: null, + configurable: false, + enumerable: false, + writable: true + }); + } + static symbolkServer = Symbol("kServer"); + static schema() { return Joi.object({ _id: Joi.string().pattern(/^[0-9a-fA-F]{24}$/).default(() => { @@ -441,4 +452,98 @@ module.exports = class Plugin extends Item { } } + + httpServer(autostart = true, handler) { + + if (autostart instanceof Function) { + handler = autostart; + autostart = true; + } + + let logger = this.logger; + + logger.debug(`httpServer() called, autostart=${autostart}`); + + let server = createServer(); + let sock = path.join(os.tmpdir(), `OpenHaus/plugins/${this.uuid}.sock`); + + server.timeout = 5000; + server.requestTimeout = 5000; + server.setTimeout(5000); + + this[Plugin.symbolkServer] = server; + + fs.mkdirSync(path.dirname(sock), { + recursive: true, + force: true + }); + + fs.rmSync(sock, { + force: true + }); + + server.once("close", () => { + logger.info(`HTTP Server closed on ${sock}`); + }); + + server.on("error", (err) => { + logger.warn(err, "HTTP Server error"); + }); + + if (autostart) { + server.listen(sock, (err) => { + if (err) { + + logger.error(err, `Could not start HTTP Server on ${sock}`); + + } else { + + logger.debug(`HTTP Server listening on ${sock}`); + + } + }); + } + + if (handler) { + + // use request handler + server.on("request", (req, res) => { + + // tell the client connection will be closed + // "keep-alive" breaks the proxy + // see comment in "router.api.plugins.js" -> "FIXME: "connection=keep-alive""" + res.setHeader("connection", "close"); + + handler(req, res); + + }); + + // fix #408, see: + // https://github.com/OpenHausIO/connector/issues/38 + // https://github.com/websockets/ws/issues/2193 + // keep this for native socket + server.on("upgrade", (req, socket) => { + + let res = new ServerResponse(req); + res.assignSocket(socket); + + res.on("finish", () => { + res.socket.destroy(); + }); + + handler(req, res); + + }); + + } + + process.on("SIGINT", () => { + server.closeAllConnections(); + server.close(); + }); + + return server; + + } + }; \ No newline at end of file diff --git a/components/store/!class.namespace.js b/components/store/!class.namespace.js index 5856c95..ac0cc19 100644 --- a/components/store/!class.namespace.js +++ b/components/store/!class.namespace.js @@ -55,41 +55,42 @@ class Namespace extends EventEmitter { // https://medium.com/trabe/async-getters-and-setters-is-it-possible-c18759b6f7e4 Reflect.defineProperty(this, obj.key, { set: (value) => { - return new Promise((resolve, reject) => { + // ESLint "Setter cannot return a value" + //return new Promise((resolve, reject) => { - // update value in database - scope.collection.updateOne({ - namespace, - key: obj.key - }, { - $set: { - value, - timestamps: { - created: obj.timestamps.created, - updated: Date.now() - } + // update value in database + scope.collection.updateOne({ + namespace, + key: obj.key + }, { + $set: { + value, + timestamps: { + created: obj.timestamps.created, + updated: Date.now() } - }, (err, { result }) => { - if (err) { + } + }, (err, { result }) => { + if (err) { - scope.logger.warn(`Could not update value for namespace/key (${namespace}/${obj.key}):`, err.message); - reject(err); + scope.logger.warn(`Could not update value for namespace/key (${namespace}/${obj.key}):`, err.message); + //reject(err); - } else { + } else { - // update value - obj.value = value; + // update value + obj.value = value; - if (result.n === 1 && result.nModified === 1) { - scope.logger.verbose(`Key/Value for namespace (${namespace}) updated: ${obj.key} = ${value}`); - } + if (result.n === 1 && result.nModified === 1) { + scope.logger.verbose(`Key/Value for namespace (${namespace}) updated: ${obj.key} = ${value}`); + } - resolve(true); - this.emit("changed", obj.key, value); + //resolve(true); + this.emit("changed", obj.key, value); - } - }); + } }); + //}); }, get: () => { return obj.value; diff --git a/eslint.config.mjs b/eslint.config.mjs index 1c49846..39a2d55 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -19,16 +19,17 @@ export default defineConfig([ "**/node_modules", "**/build", "**/dist", - "**/plugins" + "**/plugins", + "**/tests" ]), { - files: ["*.js"], + files: ["**/*.js"], extends: compat.extends("eslint:recommended"), languageOptions: { globals: { ...globals.node, ...globals.commonjs, }, - ecmaVersion: 12, + ecmaVersion: 2022, sourceType: "commonjs" }, rules: { diff --git a/postman.json b/postman.json index 80fdb67..1827d65 100644 --- a/postman.json +++ b/postman.json @@ -1600,6 +1600,95 @@ { "name": "Plugins", "item": [ + { + "name": "Proxy", + "item": [ + { + "name": "Simple GET Request", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "http://{{HOST}}:{{PORT}}/api/plugins/658188e93cde9987c3228806/proxy", + "protocol": "http", + "host": [ + "{{HOST}}" + ], + "port": "{{PORT}}", + "path": [ + "api", + "plugins", + "658188e93cde9987c3228806", + "proxy" + ] + } + }, + "response": [] + }, + { + "name": "POST JSON body", + "request": { + "method": "POST", + "header": [], + "body": { + "mode": "raw", + "raw": "{\n \"string\": \"Hello from postman\",\n \"array\": [1,2,3],\n \"bool\": true\n}", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "http://{{HOST}}:{{PORT}}/api/plugins/658188e93cde9987c3228806/proxy/json", + "protocol": "http", + "host": [ + "{{HOST}}" + ], + "port": "{{PORT}}", + "path": [ + "api", + "plugins", + "658188e93cde9987c3228806", + "proxy", + "json" + ] + } + }, + "response": [] + }, + { + "name": "Binary file upload", + "request": { + "method": "PUT", + "header": [], + "body": { + "mode": "file", + "file": { + "src": "" + } + }, + "url": { + "raw": "http://{{HOST}}:{{PORT}}/api/plugins/658188e93cde9987c3228806/proxy/binary", + "protocol": "http", + "host": [ + "{{HOST}}" + ], + "port": "{{PORT}}", + "path": [ + "api", + "plugins", + "658188e93cde9987c3228806", + "proxy", + "binary" + ] + } + }, + "response": [] + } + ], + "description": "**NOTE**: The requests in this folder depend on plugins, not OpenHaus. \nThis is just a demonstration of the proxy functionality." + }, { "name": "Create new plugin", "request": { @@ -1731,7 +1820,7 @@ "body": { "mode": "file", "file": { - "src": "/home/marc/Downloads/oh-plg-dummy-v1.0.0.tgz" + "src": "" } }, "url": { @@ -1811,7 +1900,7 @@ "body": { "mode": "file", "file": { - "src": "/home/marc/projects/playground/oh-plg-dummy/oh-plg-dummy-v1.0.0.tgz" + "src": "" } }, "url": { @@ -1990,7 +2079,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"_id\": \"658189336fa19198939caa21\",\n \"name\": \"Dummy Endpoint\",\n \"device\": \"6398ae590dce390161f7fc2f\",\n \"states\": [\n {\n \"_id\": \"658190d232fe653a07638863\",\n \"name\": \"Total Power\",\n \"alias\": \"POWER_TOTAL\",\n \"type\": \"number\"\n },\n {\n \"_id\": \"658190de4a1369d179c3387f\",\n \"name\": \"Powerd on?\",\n \"alias\": \"POWERD_ON\",\n \"type\": \"boolean\"\n }\n ],\n \"commands\": [\n {\n \"_id\": \"658190acbccb180491c2672c\",\n \"name\": \"Switch to Input 03\",\n \"payload\": {\n \"type\": \"Buffer\",\n \"data\": [\n 115,\n 119,\n 32,\n 105,\n 48,\n 51,\n 13,\n 10\n ]\n },\n \"alias\": \"INPUT_03\",\n \"interface\": \"6581c55abc21a0a3122b9998\"\n },\n {\n \"_id\": \"658190ec0e09c0d5d22c59f0\",\n \"name\": \"Command as String\",\n \"payload\": \"foobar\",\n \"alias\": \"STRING_CMD\",\n \"interface\": \"6581c55abc21a0a3122b9998\"\n }\n ]\n}\n", + "raw": "{\n \"_id\": \"658189336fa19198939caa21\",\n \"name\": \"Dummy Endpoint\",\n \"device\": \"6398ae590dce390161f7fc2f\",\n \"states\": [\n {\n \"_id\": \"658190d232fe653a07638863\",\n \"name\": \"Total Power\",\n \"alias\": \"POWER_TOTAL\",\n \"type\": \"number\"\n },\n {\n \"_id\": \"658190de4a1369d179c3387f\",\n \"name\": \"Powerd on?\",\n \"alias\": \"POWERD_ON\",\n \"type\": \"boolean\"\n }\n ],\n \"commands\": [\n {\n \"_id\": \"658190acbccb180491c2672c\",\n \"name\": \"Switch to Input 03\",\n \"payload\": {\n \"type\": \"Buffer\",\n \"data\": [\n 115,\n 119,\n 32,\n 105,\n 48,\n 51,\n 13,\n 10\n ]\n },\n \"alias\": \"INPUT_03\",\n \"interface\": \"6581c55abc21a0a3122b9998\"\n },\n {\n \"_id\": \"658190ec0e09c0d5d22c59f0\",\n \"name\": \"Command as String\",\n \"payload\": \"foobar\",\n \"alias\": \"STRING_CMD\",\n \"interface\": \"6581c55abc21a0a3122b9998\"\n },\n {\n \"_id\": \"6581fc8ac20cb522e02868ff\",\n \"name\": \"Volume\",\n \"alias\": \"VOLUME\",\n \"interface\": \"6581c55abc21a0a3122b9998\",\n \"params\": [{\n \"key\": \"value\",\n \"type\": \"number\",\n \"min\": 0,\n \"max\": 100\n }]\n } \n ]\n}\n", "options": { "raw": { "language": "json" @@ -2134,7 +2223,7 @@ "header": [], "body": { "mode": "raw", - "raw": "[{\n \"key\": \"VOLUME\",\n \"value\": 15\n}]", + "raw": "[{\n \"key\": \"value\",\n \"value\": 15\n}]", "options": { "raw": { "language": "json" @@ -3476,7 +3565,7 @@ } }, "url": { - "raw": "http://{{HOST}}:{{PORT}}/api/system/backup/import?truncate=true", + "raw": "http://{{HOST}}:{{PORT}}/api/system/backup/import?truncate=true&decode=true&includes[]=plugins&includes[]=database", "protocol": "http", "host": [ "{{HOST}}" @@ -3509,7 +3598,19 @@ { "key": "decode", "value": "true", - "description": "Decode base64 .env values", + "description": "Decode base64 .env values" + }, + { + "key": "includes[]", + "value": "plugins" + }, + { + "key": "includes[]", + "value": "database" + }, + { + "key": "includes[]", + "value": "env", "disabled": true } ] @@ -3574,12 +3675,14 @@ " \"/system\",", " \"/api/plugins/658188e93cde9987c3228806/files\",", " \"/api/plugins/658188e93cde9987c3228806/start\",", - " \"/api/endpoints/658189336fa19198939caa21/commands/6581fc8ac20cb522e02868ff\", ", + " \"/api/plugins/658188e93cde9987c3228806/stop\",", + " \"/api/plugins/658188e93cde9987c3228806/proxy\",", + " \"/api/endpoints/658189336fa19198939caa21/commands/6581fc8ac20cb522e02868ff\",", "].some(path => url.includes(path));", "", "console.log(\"URL ignore\", ignore, url);", "", - "if(ignore){", + "if (ignore) {", " return;", "}", "", @@ -3603,7 +3706,7 @@ " let length = pm.response.headers.get(\"content-length\");", "", " // ignore empty response/no body", - " if(!length || Number(length) === 0){", + " if (!length || Number(length) === 0) {", " return;", " }", "", diff --git a/routes/index.js b/routes/index.js index 14efa15..9ccf6e7 100644 --- a/routes/index.js +++ b/routes/index.js @@ -1,5 +1,5 @@ const express = require("express"); -const bodyParser = require("body-parser"); +//const bodyParser = require("body-parser"); const C_PLUGINS = require("../components/plugins"); const C_ROOMS = require("../components/rooms"); @@ -70,10 +70,23 @@ app.use((req, res, next) => { }); -// NOTE: Remove limit?, since when is a 25mb json needed?! -app.use(bodyParser.json({ - limit: (Number(process.env.API_LIMIT_SIZE) * 1024) // default to 25, (=25mb) -})); +let parser = express.json(); + +app.use((req, res, next) => { + // ESLint erorr: /^\/api\/plugins\/[^\/]+\/proxy/ + // ESLint ok: /^\/api\/plugins\/[^/]+\/proxy/ + if (req.path.match(/^\/api\/plugins\/[^/]+\/proxy/)) { + + // forward request 1:1 + next(); + + } else { + + // parse http body + parser(req, res, next); + + } +}); // mount api router app.use("/api", api); diff --git a/routes/router.api.endpoints.js b/routes/router.api.endpoints.js index ad0217d..9b29d06 100644 --- a/routes/router.api.endpoints.js +++ b/routes/router.api.endpoints.js @@ -41,17 +41,27 @@ module.exports = (app, router) => { req.cmd.trigger(req.body, (err, success) => { if (err) { - if (err.code === "NO_INTERFACE") { + res.status(424).json({ error: err.message }); + + } else if (err.code === "NO_HANDLER") { + + res.status(425).json({ + error: err.message, + stack: err.stack + }); + } else { + res.status(500).json({ - error: err.message + error: err.message, + stack: err.stack }); - } + } } else { res.json({ diff --git a/routes/router.api.mqtt.js b/routes/router.api.mqtt.js index da3323a..804222f 100644 --- a/routes/router.api.mqtt.js +++ b/routes/router.api.mqtt.js @@ -35,7 +35,23 @@ module.exports = (app, router) => { // listen for websockt clients // forward messages between component & ws client - wss.once("connection", (ws) => { + wss.on("connection", (ws) => { + + let { logger } = C_MQTT; + + let transmitter = (packet, item, payload) => { + ws.send(packet, (err) => { + if (err) { + + logger.warn(`Could not publish on topic ${item.topic}`); + + } else { + + logger.trace(`Send publish: ${item.topic}=${payload}`); + + } + }); + }; C_MQTT.events.emit("connected", ws); @@ -44,9 +60,13 @@ module.exports = (app, router) => { }); ws.on("close", () => { + C_MQTT.events.off("transmit", transmitter); C_MQTT.events.emit("disconnected", ws); }); + // helper event for publishing on topics + C_MQTT.events.on("transmit", transmitter); + }); @@ -60,6 +80,10 @@ module.exports = (app, router) => { return; } + if (wss.clients.size > 0) { + return res.status(423).end(); + } + // handle request as websocket // perform websocket handshake wss.handleUpgrade(req, req.socket, req.headers, (ws) => { diff --git a/routes/router.api.plugins.js b/routes/router.api.plugins.js index c5db854..e5045f5 100644 --- a/routes/router.api.plugins.js +++ b/routes/router.api.plugins.js @@ -4,6 +4,8 @@ const { exec } = require("child_process"); const process = require("process"); const fs = require("fs/promises"); const { statSync } = require("fs"); +const { createConnection } = require("net"); +const os = require("os"); const C_PLUGINS = require("../components/plugins"); const { logger } = C_PLUGINS; @@ -43,6 +45,9 @@ module.exports = (app, router) => { // req.item is set from rest-handler router.param() // TODO: Make this optional e.g. via req.query // In the frontend then should a checkbox which sets it to true + // when expert settings are disabled, there should be modal which asks to stop the plugin before delete + // when enabled, delete anyway, but dont stop plugin, instead a 202 http code shoult be returned when the plugin is still running + // where then the frontend notification changes and says "restart required, plugin still running" or so... if (req?.item?.started) { await req.item.stop(); } @@ -228,4 +233,105 @@ module.exports = (app, router) => { } }); + router.all("/:_id/proxy(/*)?", (req, res) => { + + let { method, httpVersion, headers } = req; + let url = req.url.replace(`/${req.params._id}/proxy`, "/"); + url = path.normalize(url); + + // TODO: configure path to sockets? + let sock = path.join(os.tmpdir(), `OpenHaus/plugins/${req.item.uuid}.sock`); + + // TODO: implement leading/railing-slash error + // FIXME: "connection=keep-alive" results in "Cannot read properties of null (reading 'server')" : + /* + at /home/marc/projects/OpenHaus/backend/routes/auth-handler.js:12:31 + at Layer.handle [as handle_request] (/home/marc/projects/OpenHaus/backend/node_modules/express/lib/router/layer.js:95:5) + at trim_prefix (/home/marc/projects/OpenHaus/backend/node_modules/express/lib/router/index.js:328:13) + at /home/marc/projects/OpenHaus/backend/node_modules/express/lib/router/index.js:286:9 + at Function.process_params (/home/marc/projects/OpenHaus/backend/node_modules/express/lib/router/index.js:346:12) + at next (/home/marc/projects/OpenHaus/backend/node_modules/express/lib/router/index.js:280:10) + at Function.handle (/home/marc/projects/OpenHaus/backend/node_modules/express/lib/router/index.js:175:3) + at router (/home/marc/projects/OpenHaus/backend/node_modules/express/lib/router/index.js:47:12) + at Layer.handle [as handle_request] (/home/marc/projects/OpenHaus/backend/node_modules/express/lib/router/layer.js:95:5) + at trim_prefix (/home/marc/projects/OpenHaus/backend/node_modules/express/lib/router/index.js:328:13) + + Add req.socket.unpipe();? + */ + + logger.verbose(`[proxy] Incoming request: ${req.method} ${req.url}`, req.headers); + + + const client = createConnection(sock, () => { + + // write http header first line + client.write(`${method} ${url} HTTP/${httpVersion}\r\n`); + + // send http request headers to proxy target + for (let key in headers) { + if (key.toLowerCase() === "connection" && headers[key] !== "Upgrade") { + + // override client connection header + // fix "Cannot read properties of null (reading 'server')" error above + // multiple/frequent requests result in the error above if "connection=keep-alive" + client.write("connection: close\r\n"); + + } else { + + // forward original header/value + client.write(`${key}: ${headers[key]}\r\n`); + + } + } + + // sperate header&body + client.write(`\r\n`); + + // TODO: toLowerCase() header keys + if (req.headers["upgrade"] && req.headers["connection"]) { + + // handle websocket + client.pipe(res.socket); + req.socket.pipe(client); + + } else { + + // handle regular http + client.pipe(res.socket); + req.pipe(client); + + } + + }); + + res.socket.once("error", (err) => { + logger.error(err, "[proxy] Error on res.socket"); + client.destroy(); + }); + + client.on("error", (err) => { + logger.error(err, "[proxy] Error on client object"); + res.status(502); + res.end("Bad Gateway"); + }); + + client.once("end", () => { + logger.verbose("[proxy] client socket ended"); + res.end(); + client.end(); + }); + + req.on("error", (err) => { + logger.error(err, "[proxy] Error on req object"); + client.end(); + }); + + req.once("end", () => { + logger.verbose("[proxy] req ended"); + client.end(); + }); + + }); + + }; \ No newline at end of file diff --git a/routes/router.system.backup.js b/routes/router.system.backup.js index 182d95a..b5b8ab8 100644 --- a/routes/router.system.backup.js +++ b/routes/router.system.backup.js @@ -106,6 +106,13 @@ module.exports = (router) => { router.post("/import", async (req, res) => { + // set deafult resotre includes to "all" + req.query.includes = req?.query?.includes || [ + "database", + "plugins", + "env" + ]; + // NOTE: this also deletes .gitkeep if (req.query?.truncate === "true") { for (let file of await fs.promises.readdir(BASE_PATH)) { @@ -174,7 +181,7 @@ module.exports = (router) => { extract.on("entry", async (header, stream, next) => { - if (header.name.startsWith("database/")) { + if (header.name.startsWith("database/") && req.query?.includes?.includes("database")) { console.log("restartoe database collection", header.name, header.size); @@ -226,7 +233,7 @@ module.exports = (router) => { }); - } else if (header.name.startsWith("plugins/")) { + } else if (header.name.startsWith("plugins/") && req.query?.includes?.includes("plugins")) { console.log("restroe plugin file", header.name, header.size); @@ -242,7 +249,7 @@ module.exports = (router) => { next(); }); - } else if (header.name === ".env") { + } else if (header.name === ".env" && req.query?.includes?.includes("env")) { let envPath = path.join(process.cwd(), ".env"); let fd = null; @@ -286,7 +293,8 @@ module.exports = (router) => { } else { - console.log("unknown file prefix/name", header); + //console.log("unknown file prefix/name", header); + next(); } }); diff --git a/system/component/class.component.js b/system/component/class.component.js index 1400920..5987928 100644 --- a/system/component/class.component.js +++ b/system/component/class.component.js @@ -489,6 +489,9 @@ module.exports = class COMPONENT extends COMMON { _id: new mongodb.ObjectId(_id) }).then((result) => { + // remove id when error occurs + PENDING_CHANGE_EVENTS.delete(target._id); + //if (result.n === 1 && result.ok === 1 && target) { if (result.acknowledged && result.deletedCount > 0) { @@ -502,7 +505,7 @@ module.exports = class COMPONENT extends COMMON { }).catch((err) => { // remove id when error occurs - PENDING_CHANGE_EVENTS.delete(result.value._id); + PENDING_CHANGE_EVENTS.delete(target._id); reject(err); @@ -586,7 +589,20 @@ module.exports = class COMPONENT extends COMMON { // muss in middlware erflogen!!!!!!!!!!!!!! // feedback - this.logger.info(`Item "${target.name || target.description}" updated`); + if (process.env.NODE_ENV === "production") { + + // info is to verbose on production + // if there is problem that needs debugging, logging should be set to debug + // so keep the production logging at a lower level + this.logger.debug(`Item "${target.name || target.description}" updated`); + + } else { + + // in development or test environment, a more talkative logging is welcome + this.logger.info(`Item "${target.name || target.description}" updated`); + + } + // TODO CHECK RESUTL! // extend exisiting object in items array diff --git a/system/component/class.events.js b/system/component/class.events.js index 3a6b5d4..14581ae 100644 --- a/system/component/class.events.js +++ b/system/component/class.events.js @@ -1,5 +1,6 @@ const { EventEmitter } = require("events"); const { isMainThread, BroadcastChannel, threadId } = require("worker_threads"); +const logger = require("../logger/index.js"); const channel = new BroadcastChannel("events"); @@ -65,9 +66,24 @@ module.exports = class Events extends EventEmitter { }); if (process.env.WORKER_THREADS_ENABLED === "true") { - if (!["ready", "error"].includes(event)) { + // ready = each component instance emits ready on its own + // error = each component instance emits error on its own + // connect = pass Websocket instance as argument in mqtt, which breaks serialazion (and is only needed once in main) + // disconnect = pass websocket instance as argument in mqtt, which breaks serialazion (and is only needed once in main) + // TODO: scope events into components to prevent event name conflicts? + if (!["ready", "error", "connected", "disconnected"].includes(event)) { try { + // fix/workaround for dataclone error because of proxies, see #556 + if (this.name === "scenes") { + + // item.timestamps = proxy + // item.scenes = proxy + // structuredClone fails/cant handle proxies + args = JSON.parse(JSON.stringify(args)); + + } + channel.postMessage({ origin: threadId, message: { @@ -78,10 +94,17 @@ module.exports = class Events extends EventEmitter { }); } catch (err) { + if (process.env.NODE_ENV === "development") { - //console.log(event, args); - //console.error("Error, could not post message", err); + //console.log(event, args); + //console.error("Error, could not post message", err); + logger.warn(err, "Could not serialize event", { + component: this.name, + event, + args + }); + } } } } diff --git a/system/cronjob/class.job.js b/system/cronjob/class.job.js index 3c4a625..526c3ab 100644 --- a/system/cronjob/class.job.js +++ b/system/cronjob/class.job.js @@ -23,7 +23,7 @@ module.exports = class Job { constructor(cron, fnc) { // TODO (mstirner) fix eslint rule - let parts = cron.match(/^([0-9,\-\/]+|\*{1}|\*{1}\/[0-9]+)\s+([0-9,\-\/]+|\*{1}|\*{1}\/[0-9]+)\s+([0-9,\-\/]+|\*{1}|\*{1}\/[0-9]+)\s+([0-9,\-\/]+|\*{1}|\*{1}\/[0-9]+)\s+([0-9,\-\/]+|\*{1}|\*{1}\/[0-9]+)\s*$/); + let parts = cron.match(/^([0-9,\-/]+|\*{1}|\*{1}\/[0-9]+)\s+([0-9,\-/]+|\*{1}|\*{1}\/[0-9]+)\s+([0-9,\-/]+|\*{1}|\*{1}\/[0-9]+)\s+([0-9,\-/]+|\*{1}|\*{1}\/[0-9]+)\s+([0-9,\-/]+|\*{1}|\*{1}\/[0-9]+)\s*$/); this.minute = this.parse(parts[1]); this.hour = this.parse(parts[2]); diff --git a/tests/components/scenes.js b/tests/components/scenes.js index 23ca53e..1050191 100644 --- a/tests/components/scenes.js +++ b/tests/components/scenes.js @@ -34,6 +34,11 @@ try { }, (err, item) => { try { + // WORKER_TRHEAD_ENABLED=true + // NOTE: args[0] = serialized object, and not instance of Scene + // is this maybe fixed when pre/post hooks are implemented?, + // see: https://github.com/OpenHausIO/backend/issues/6#issuecomment-2932114069 + // check event arguments event.args.forEach((args) => { assert.equal(args[0] instanceof Scene, true); @@ -118,6 +123,8 @@ try { ]).then(() => { + // see comment above for "add" + // WORKER_TRHEAD_ENABLED=true event.args.forEach((args) => { assert.equal(args[0] instanceof Scene, true); }); diff --git a/tests/index.js b/tests/index.js index e7b68ba..aabf050 100644 --- a/tests/index.js +++ b/tests/index.js @@ -1,7 +1,6 @@ const mongodb = require("mongodb"); const path = require("path"); const { describe, it, after } = require("mocha"); -const { channel } = require("../system/component/class.events.js"); const crypto = require("crypto"); const env = require("dotenv").config({ @@ -46,6 +45,8 @@ if (!process.env.DATABASE_URL) { process.env.DATABASE_URL = `mongodb://${process.env.DATABASE_HOST}:${process.env.DATABASE_PORT}/${process.env.DATABASE_NAME}`; } +const { channel } = require("../system/component/class.events.js"); + describe("Database", () => {