Skip to content

Commit

Permalink
Add support for configuration in non-retain mode #11, fixed #9 #10
Browse files Browse the repository at this point in the history
  • Loading branch information
user2684 committed Aug 3, 2021
2 parents 1987fa0 + 4c030ad commit c46ece9
Show file tree
Hide file tree
Showing 15 changed files with 280 additions and 66 deletions.
53 changes: 53 additions & 0 deletions .github/workflows/build_sdk.yml
@@ -0,0 +1,53 @@
name: Build and Publish eGeoffrey Package

on:
push:
branches:
- '**'
tags-ignore:
- '*.*'
pull_request:
workflow_dispatch:

jobs:
build:
runs-on: ubuntu-18.04

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Setup QEMU
uses: docker/setup-qemu-action@v1

- name: Setup Docker buildx
uses: docker/setup-buildx-action@v1

- name: Install eGeoffrey
run: |
sudo EGEOFFREY_INSTALL_DIRECTORY=/tmp/egeoffrey EGEOFFREY_SETUP_REMOTE=n bash -c "$(curl -ssL https://get.egeoffrey.com)"
- name: Build the eGeoffrey SDK
run: |
egeoffrey-cli -o build_sdk alpine amd64 >> /tmp/egeoffrey.log 2>&1
egeoffrey-cli -o build_sdk alpine arm32v6 >> /tmp/egeoffrey.log 2>&1
egeoffrey-cli -o build_sdk raspbian amd64 >> /tmp/egeoffrey.log 2>&1
egeoffrey-cli -o build_sdk raspbian arm32v6 >> /tmp/egeoffrey.log 2>&1
- name: Check for errors
run: |
sudo cat /tmp/egeoffrey.log
if cat /tmp/egeoffrey.log| sed $'s/\e\\[[0-9;:]*[a-zA-Z]//g'|grep ERROR: >/dev/null ; then exit 1; fi
- name: Push the eGeoffrey SDK to DockerHub
run: |
egeoffrey-cli -f build_sdk alpine amd64
egeoffrey-cli -f build_sdk alpine arm32v6
egeoffrey-cli -f build_sdk raspbian amd64
egeoffrey-cli -f build_sdk raspbian arm32v6
2 changes: 2 additions & 0 deletions Dockerfile-raspbian
Expand Up @@ -9,6 +9,8 @@ WORKDIR $WORKDIR

### install dependencies
RUN apt-get update && apt-get install -y jq nano curl wget unzip python-pip expect net-tools usbutils raspi-config raspi-gpio wiringpi libraspberrypi-bin libusb-1.0.0-dev build-essential autoconf cmake pkg-config libtool python-dev libyaml-dev && apt-get clean && rm -rf /var/lib/apt/lists/*
#RUN pip install typing && pip install --upgrade "pip < 21.0"
RUN pip install --upgrade "pip < 21.0"
RUN pip install paho-mqtt requests tinynumpy pyyaml yq apscheduler

### install eGeoffrey SDK
Expand Down
18 changes: 13 additions & 5 deletions docker-entrypoint.sh
Expand Up @@ -4,11 +4,19 @@
SUPPORTED_MANIFEST_SCHEMA=2
PACKAGE_MANIFEST="manifest.yml"
SDK_MANIFEST="sdk/manifest.yml"
API_VERSION=$(echo -e "import sdk.python.constants\nprint sdk.python.constants.API_VERSION"|python)

# extract a string value from a file
extract()
{
STRING=$1
FILE=$2
grep "^$STRING:" $FILE | awk '{ print $2}'
}


# Get the package manifest schema version if the file exists
if [ -f "$PACKAGE_MANIFEST" ]; then
MANIFEST_SCHEMA=$(yq -r .manifest_schema $PACKAGE_MANIFEST)
MANIFEST_SCHEMA=$(extract manifest_schema $PACKAGE_MANIFEST)
if [ $MANIFEST_SCHEMA != $SUPPORTED_MANIFEST_SCHEMA ]; then
echo "ERROR: unsupported manifest schema v"$MANIFEST_SCHEMA
exit
Expand All @@ -17,11 +25,11 @@ fi

# welcome message (package)
if [ -f "$PACKAGE_MANIFEST" ]; then
echo -e "Package "$(yq -r .package $PACKAGE_MANIFEST)" v"$(yq -r .version $PACKAGE_MANIFEST |xargs printf '%.1f')"-"$(yq -r .revision $PACKAGE_MANIFEST)" ("$(yq -r .branch $PACKAGE_MANIFEST)") | SDK v"$(yq -r .version $SDK_MANIFEST |xargs printf '%.1f')"-"$(yq -r .revision $SDK_MANIFEST)" ("$(yq -r .branch $SDK_MANIFEST)") | API "$API_VERSION
echo -e "Environment settings: MODULES ["$EGEOFFREY_MODULES"] | GATEWAY ["$EGEOFFREY_GATEWAY_HOSTNAME" "$EGEOFFREY_GATEWAY_PORT"] | HOUSE ["$EGEOFFREY_ID"]"
echo -e "Package "$(extract package $PACKAGE_MANIFEST)" v"$(extract version $PACKAGE_MANIFEST |xargs printf '%.1f')"-"$(extract revision $PACKAGE_MANIFEST)" ("$(extract branch $PACKAGE_MANIFEST)") | SDK v"$(extract version $SDK_MANIFEST |xargs printf '%.1f')"-"$(extract revision $SDK_MANIFEST)" ("$(extract branch $SDK_MANIFEST)")"
echo -e "Environment settings: MODULES ["$EGEOFFREY_MODULES"] | GATEWAY ["$EGEOFFREY_GATEWAY_HOSTNAME" "$EGEOFFREY_GATEWAY_PORT" v"$EGEOFFREY_GATEWAY_VERSION"] | HOUSE ["$EGEOFFREY_ID"]"
# welcome message (sdk)
else
echo -e "SDK v"$(yq -r .version $SDK_MANIFEST |xargs printf '%.1f')"-"$(yq -r .revision $SDK_MANIFEST)" ("$(yq -r .branch $SDK_MANIFEST)") | API "$API_VERSION
echo -e "SDK v"$(extract version $SDK_MANIFEST |xargs printf '%.1f')"-"$(extract revision $SDK_MANIFEST)" ("$(extract branch $SDK_MANIFEST)")"
fi

# execute eGeoffrey
Expand Down
3 changes: 0 additions & 3 deletions sdk/javascript/constants.js

This file was deleted.

4 changes: 2 additions & 2 deletions sdk/javascript/module/helpers/message.js
Expand Up @@ -58,11 +58,11 @@ class Message {
}

// parse MQTT message (topic and payload)
parse(topic, payload, retain) {
parse(topic, payload, retain, gateway_version) {
var topics = topic.split("/")
// sanity check
if (topics.legth < 8) throw "missing required information in topic"
if (topics[0] != "egeoffrey" || topics[1] != constants["API_VERSION"]) throw "invalid api call"
if (topics[0] != "egeoffrey" || topics[1] != "v"+gateway_version) throw "invalid gateway version"
this.topic = topic
this.house_id = topics[2]
this.sender = topics[3]+"/"+topics[4]
Expand Down
71 changes: 65 additions & 6 deletions sdk/javascript/module/helpers/mqtt_client.js
@@ -1,5 +1,3 @@


class Mqtt_client {
constructor(module) {
// we need the module's object to call its methods
Expand All @@ -14,6 +12,35 @@ class Mqtt_client {
this.__publish_queue = []
// queue configuration messages while not configured
this.__configuration_queue = []
// if the configuration is not retained in the gateway, we need additional tools for requesting it
if (this.__module.gateway_version >= 2) {
this.pending_configurations = []
this.pending_configurations_job = null
}
}

// notify controller/config we need some configuration files
__send_configuration_request(topic) {
var message = new Message(this.__module)
message.recipient = "controller/config"
message.command = "SUBSCRIBE"
message.set_data(topic)
this.__module.send(message)
}

// job for periodically sending configuration request if controller/config does not respond or it is not running
__resend_configuration_request() {
// if we received all the pending configurations, clear the scheduled job
if (this.pending_configurations.length == 0) {
clearInterval(this.pending_configurations_job)
this.pending_configurations_job = null
return
} else {
// otherwise for each pending configuration, send again a request to controller/config
for (var topic of this.pending_configurations) {
this.__send_configuration_request(topic)
}
}
}

// connect to the MQTT broker
Expand Down Expand Up @@ -67,7 +94,7 @@ class Mqtt_client {
__subscribe(topic) {
this.__module.log_debug("Subscribing topic "+topic)
try {
this.__gateway.subscribe(topic, {"qos": 2})
this.__gateway.subscribe(topic, {"qos": this.__module.gateway_qos_subscribe})
} catch(e) {
this.__module.log_error("Unable to subscribe to topic "+topic+": "+get_exception(e))
}
Expand All @@ -76,7 +103,7 @@ class Mqtt_client {
// Build the full topic (e.g. egeoffrey/v1/<house_id>/<from_module>/<to_module>/<command>/<args>)
__build_topic(house_id, from_module, to_module, command, args) {
if (args == "") args = "null"
return ["egeoffrey", constants["API_VERSION"], house_id, from_module, to_module, command, args].join("/")
return ["egeoffrey", "v"+this.__module.gateway_version, house_id, from_module, to_module, command, args].join("/")
}

// publish payload to a given topic (queue the message while offline)
Expand Down Expand Up @@ -134,7 +161,7 @@ class Mqtt_client {
try {
// parse the incoming request into a message data structure
var message = new Message()
message.parse(msg.destinationName, msg.payloadString, msg.retained)
message.parse(msg.destinationName, msg.payloadString, msg.retained, this_class.__module.gateway_version)
if (this_class.__module.verbose) this_class.__module.log_debug("Received message "+message.dump(), false)
} catch (e) {
this_class.__module.log_error("Invalid message received on "+msg.destinationName+" - "+msg.payloadString+": "+get_exception(e))
Expand All @@ -157,7 +184,13 @@ class Mqtt_client {
var configuration_consumed = false
if (this_class.__topics_to_wait.length > 0) {
for (var req_pattern of this_class.__topics_to_wait) {
if (topic_matches_sub(req_pattern, message.topic)) {
// normalize the pattern so to match also configuration files received directly
var req_pattern_normalized = req_pattern
if (req_pattern.includes("*/*")) {
req_pattern_normalized = req_pattern.replace("*/*","+/+")
}
// check if we were waiting for this file
if (topic_matches_sub(req_pattern_normalized, message.topic)) {
this_class.__module.log_debug("received configuration "+message.topic)
var index = this_class.__topics_to_wait.indexOf(req_pattern)
this_class.__topics_to_wait.splice(index, 1)
Expand Down Expand Up @@ -197,6 +230,12 @@ class Mqtt_client {
message.reply()
message.command = "PONG"
this_class.__module.send(message)
// controller/config acknowledged this subscribe request
} else if (message.sender == "controller/config" && message.command == "SUBSCRIBE_ACK") {
pattern = message.get_data()
if (this_class.pending_configurations.includes(pattern)) {
this_class.pending_configurations.remove(pattern)
}
// notify the module about this message (only if fully configured)
} else {
if (this_class.__module.configured) {
Expand Down Expand Up @@ -259,6 +298,26 @@ class Mqtt_client {
}
return topic
}

// add a configuration listener for the given request
add_configuration_listener(house_id, args, wait_for_it) {
// just wrap add_listener
var topic = this.add_listener(house_id, "controller/config", "*/*", "CONF", args, wait_for_it)
// if the config is not retained on the gateway, notify controller/config
if (this.__module.gateway_version >= 2) {
// add the configuration to the pending queue
this.pending_configurations.push(args)
// request the configuration files
this.__send_configuration_request(args)
// if not already running, schedule a job for periodically resending configuration requests in case controller/config has not responded
if (this.pending_configurations_job == null) {
this.pending_configurations_job = setInterval(function(this_class) {
return this_class.__resend_configuration_request()
}(this), 2000);
}
}
return topic
}

// disconnect from the MQTT broker
stop() {
Expand Down
24 changes: 20 additions & 4 deletions sdk/javascript/module/module.js
@@ -1,5 +1,3 @@


// Module class from which all the components inherits common functionalities
class Module {
constructor(scope, name) {
Expand All @@ -14,6 +12,8 @@ class Module {
this.gateway_hostname = "EGEOFFREY_GATEWAY_HOSTNAME" in window ? window.EGEOFFREY_GATEWAY_HOSTNAME : "localhost"
this.gateway_port = "EGEOFFREY_GATEWAY_PORT" in window ? window.EGEOFFREY_GATEWAY_PORT : 443
this.gateway_ssl = "EGEOFFREY_GATEWAY_SSL" in window ? Boolean(window.EGEOFFREY_GATEWAY_SSL) : false
this.gateway_qos_subscribe = "EGEOFFREY_GATEWAY_QOS_SUBSCRIBE" in window ? window.EGEOFFREY_GATEWAY_QOS_SUBSCRIBE : 2
this.gateway_version = "EGEOFFREY_GATEWAY_VERSION" in window ? window.EGEOFFREY_GATEWAY_VERSION : 1
// house settings
this.house_id = "EGEOFFREY_ID" in window ? window.EGEOFFREY_ID : "house"
this.house_passcode = "EGEOFFREY_PASSCODE" in window ? window.EGEOFFREY_PASSCODE : ""
Expand Down Expand Up @@ -41,10 +41,11 @@ class Module {
}

}

// Add a listener for the given configuration request (will call on_configuration())
add_configuration_listener(args, version=null, wait_for_it=false) {
var filename = version == null ? args : version+"/"+args
return this.__mqtt.add_listener(this.house_id, "controller/config", "*/*", "CONF", filename, wait_for_it)
return this.__mqtt.add_configuration_listener(this.house_id, filename, wait_for_it)
}

// add a listener for the messages addressed to this module (will call on_message())
Expand All @@ -56,7 +57,22 @@ class Module {
add_broadcast_listener(from_module, command, args) {
return this.__mqtt.add_listener(this.house_id, from_module, "*/*", command, args, false)
}


// add a listner for broadcasted manifests (will call on_message())
add_manifest_listener(from_module="+/+") {
// add a broadcast listener for the manifest
var topic = this.add_broadcast_listener(from_module, "MANIFEST", "#")
// if manifests are not supposed to be retained on the bus, ask them explicitely by broadcasting a request
if (this.gateway_version >= 2) {
var recipient = from_module == "+/+" ? "*/*" : from_module
var message = new Message(this)
message.recipient = recipient
message.command = "REQ_MANIFEST"
this.send(message)
}
return topic
}

// add a listener for intercepting messages from a given module to a given module (will call on_message())
add_inspection_listener(from_module, to_module, command, args) {
return this.__mqtt.add_listener(this.house_id, from_module, to_module, command, args, false)
Expand Down
4 changes: 2 additions & 2 deletions sdk/manifest.yml
Expand Up @@ -5,6 +5,6 @@ github: egeoffrey/egeoffrey-sdk
manifest_schema: 2
modules: []
package: egeoffrey-sdk
revision: 36
revision: 1
tags: null
version: 1.0
version: 1.1
7 changes: 0 additions & 7 deletions sdk/python/constants.py

This file was deleted.

7 changes: 3 additions & 4 deletions sdk/python/module/helpers/message.py
Expand Up @@ -10,7 +10,6 @@
import re
import time

import sdk.python.constants as constants
import sdk.python.utils.exceptions as exception

class Message():
Expand Down Expand Up @@ -63,14 +62,14 @@ def clear(self):
self.__add_request_id()

# parse a MQTT message (topic and payload)
def parse(self, topic, payload, retain):
def parse(self, topic, payload, retain, gateway_version):
# split the topic
topics = topic.split("/")
# sanity check
if len(topics) < 8:
raise Exception("missing required information in topic")
if topics[0] != "egeoffrey" or topics[1] != constants.API_VERSION:
raise Exception("invalid api call")
if topics[0] != "egeoffrey" or topics[1] != "v"+str(gateway_version):
raise Exception("invalid gateway version")
# store original topic (mainly used by mqtt_client to dispatch the message)
self.topic = topic
# store individual topic sections into internal variables
Expand Down

0 comments on commit c46ece9

Please sign in to comment.