Skip to content

Commit

Permalink
[tmpod] improve mqtt connection checking
Browse files Browse the repository at this point in the history
  • Loading branch information
icarus75 committed Jan 26, 2015
1 parent 6dff970 commit b53c65c
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions openwrt/package/flukso/luasrc/tmpod.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
tmpod.lua - Flukso timeseries logging daemon
Copyright (C) 2014 Bart Van Der Meerssche <bart@flukso.net>
Copyright (C) 2015 Bart Van Der Meerssche <bart@flukso.net>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand All @@ -28,6 +28,7 @@ nixio.fs = require "nixio.fs"
local luci = require "luci"
luci.json = require "luci.json"
luci.util = require "luci.util"
luci.sys = require "luci.sys"
local uci = require "luci.model.uci".cursor()
local uloop = require "uloop"
uloop.init()
Expand Down Expand Up @@ -75,6 +76,7 @@ local TMPO_TOPIC_SYNC_PUB = "/device/%s/tmpo/sync"
local TMPO_TOPIC_SENSOR_SUB = "/sensor/+/+"
local TMPO_TOPIC_SENSOR_PUB = "/sensor/%s/tmpo/%d/%d/%d/gz"
-- /sensor/[sid]/tmpo/[rid]/[lvl]/[bid]/gz
local TMPO_TOPIC_MQTT_CHECK = string.format("/daemon/%s/check", DAEMON)
local TMPO_GC20_THRESHOLD = 100 -- 100 free 4kB blocks out of +-1000 in jffs2 = 90% full
local TMPO_GZCHECK_EXEC_FMT = "gzip -trS '' %s 2>&1"
local TMPO_GZCHECK_FILE_REGEX = "^gzip:%s*([%w%.%-_/]+):.*$"
Expand All @@ -95,14 +97,21 @@ local MOSQ_ERROR = "MQTT error: %s"
-- increase process niceness
nixio.nice(TMPO_NICE)

local function merror(success, errno, err)
--TODO explicitely cancel the uloop on error
if not success then error(MOSQ_ERROR:format(err)) end
end

-- connect to the MQTT broker
mosq.init()
local mqtt = mosq.new(MOSQ_ID, MOSQ_CLN_SESSION)
while not mqtt:connect(MOSQ_HOST, MOSQ_PORT, MOSQ_KEEPALIVE) do
nixio.nanosleep(SLEEP_S, SLEEP_NS)
if not mqtt:connect(MOSQ_HOST, MOSQ_PORT, MOSQ_KEEPALIVE) then
repeat
nixio.nanosleep(SLEEP_S, SLEEP_NS)
until mqtt:reconnect()
end
mqtt:subscribe(TMPO_TOPIC_SYNC_SUB:format(DEVICE), MOSQ_QOS0)
mqtt:subscribe(TMPO_TOPIC_SENSOR_SUB, MOSQ_QOS0)
merror(mqtt:subscribe(TMPO_TOPIC_SYNC_SUB:format(DEVICE), MOSQ_QOS0))
merror(mqtt:subscribe(TMPO_TOPIC_SENSOR_SUB, MOSQ_QOS0))

local config = {
sensor = nil,
Expand Down Expand Up @@ -202,7 +211,7 @@ local tmpo = {

gzcheck()
compactcheck()
mqtt:publish(TMPO_TOPIC_SYNC_PUB:format(DEVICE), "", MOSQ_QOS0, not MOSQ_RETAIN)
merror(mqtt:publish(TMPO_TOPIC_SYNC_PUB:format(DEVICE), "", MOSQ_QOS0, not MOSQ_RETAIN))
end,

push8 = function(self, sid, time, value, unit)
Expand Down Expand Up @@ -271,7 +280,7 @@ local tmpo = {
local source = assert(io.open(path, "r"))
local payload = source:read("*all")
local topic = TMPO_TOPIC_SENSOR_PUB:format(sid, rid, 8, b8id)
mqtt:publish(topic, payload, MOSQ_QOS0, not MOSQ_RETAIN)
merror(mqtt:publish(topic, payload, MOSQ_QOS0, not MOSQ_RETAIN))
source:close()
end
end
Expand Down Expand Up @@ -476,7 +485,7 @@ local tmpo = {
local source = assert(io.open(path, "r"))
local payload = source:read("*all")
local topic = TMPO_TOPIC_SENSOR_PUB:format(sid, rid, lvl + 4, cid)
mqtt:publish(topic, payload, MOSQ_QOS0, not MOSQ_RETAIN)
merror(mqtt:publish(topic, payload, MOSQ_QOS0, not MOSQ_RETAIN))
source:close()
end

Expand Down Expand Up @@ -575,7 +584,7 @@ local tmpo = {
local source = assert(io.open(path, "r"))
local payload = source:read("*all")
local topic = TMPO_TOPIC_SENSOR_PUB:format(sid, rid, lvl, bid)
mqtt:publish(topic, payload, MOSQ_QOS0, not MOSQ_RETAIN)
merror(mqtt:publish(topic, payload, MOSQ_QOS0, not MOSQ_RETAIN))
source:close()
end

Expand All @@ -594,6 +603,12 @@ local tmpo = {
end
end
self.synclist = nil
end,

-- mqtt connection servicing
misc = function(self)
merror(mqtt:misc())
merror(mqtt:publish(TMPO_TOPIC_MQTT_CHECK, "", MOSQ_QOS0, not MOSQ_RETAIN))
end
}

Expand All @@ -620,11 +635,11 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain)
end)

local ufdr = uloop.fd(mqtt:socket(), uloop.READ, function(events)
mqtt:read(MOSQ_MAX_PKTS)
merror(mqtt:read(MOSQ_MAX_PKTS))
end)

local ufdw = uloop.fd(mqtt:socket(), uloop.WRITE, function(events)
mqtt:write(MOSQ_MAX_PKTS)
merror(mqtt:write(MOSQ_MAX_PKTS))
end)

local ub_events = {
Expand All @@ -638,8 +653,7 @@ ub:listen(ub_events)
local ut
ut = uloop.timer(function()
-- mosquitto connection maintenance
local success, errno, err = mqtt:misc()
if not success then error(MOSQ_ERROR:format(err)) end
tmpo:misc()

-- run sync algo if needed
tmpo:sync2()
Expand Down

0 comments on commit b53c65c

Please sign in to comment.