/
output-openenergi
executable file
·279 lines (243 loc) · 7.87 KB
/
output-openenergi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
#!/usr/bin/lua
--[[
Karl Palsson, 2018 <karlp@etactica.com>
This is an "output" daemon for Open Energi.
]]
local json = require("cjson.safe")
local mosq = require("mosquitto")
local ugly = require("remake.uglylog")
local Puni = require("posix.unistd")
local Pt = require("posix.time")
local pl = require("pl.import_into")()
local args = pl.lapp [[
Output power consumption data to Open Energi
-H,--host (default "localhost") MQTT host to use
-v,--verbose (0..7 default 5) Logging level, higher == more
]]
local cfg = {
APP_NAME = "output-openenergi",
MOSQ_CLIENT_ID = string.format("output-openenergi-%d", Puni.getpid()),
TOPIC_LISTEN_DATA = "status/local/json/device/#",
TOPIC_PUBLISH = "ext/openenergi/out/ooe",
PROCESS_INTERVAL = 1,
max_send_interval = 15 * 60 * 1000,
DEFAULTS = {
delta_percent = 0.01, -- ie 1%
delta_absolute = 10, -- in Watts
}
}
--- cache of live data, updated by live readings.
local cache = {}
CACHE_ENTRY = {}
CACHE_ENTRY.__index = CACHE_ENTRY
function CACHE_ENTRY.create(key, opts)
local i = { }
setmetatable(i, CACHE_ENTRY)
i.key = key
if not opts then opts = {} end
i.opts = pl.tablex.update(cfg.DEFAULTS, opts)
return i
end
function CACHE_ENTRY:update(ts, v)
self.last_ts = ts
self.last_v = v
end
function CACHE_ENTRY:changed()
local old = self.last_v_sent
local new = self.last_v
if not old then return true end
local change = (new - old)/math.abs(old)
--print("Considering old/new as change:", old, new, change)
local delta = math.abs(change) >= self.opts.delta_percent
local absol = math.abs(old-new) >= self.opts.delta_absolute
if delta and absol then return true end
return false
end
function CACHE_ENTRY:old()
if not self.last_ts_sent then return true end
local delta = self.last_ts - self.last_ts_sent
if delta > cfg.max_send_interval then return true end
return false
end
function CACHE_ENTRY:__tostring()
if self.last_ts_sent and self.last_v_sent then
return string.format("CE<%s @ %s => %f [%s=>%f]>", self.key, self.last_ts, self.last_v, self.last_ts_sent, self.last_v_sent)
else
return string.format("CE<%s @ %s => %f [neversent]>", self.key, self.last_ts, self.last_v)
end
end
ugly.initialize(cfg.APP_NAME, args.verbose or 4)
mosq.init()
local mqtt = mosq.new(cfg.MOSQ_CLIENT_ID, true)
mqtt.ON_CONNECT = function(success, rc, str)
if not success then
ugly.crit("Failed to connect to MQTT broker: %s, %d: %s", args.host, rc, str)
os.exit(1)
end
if not mqtt:subscribe(cfg.TOPIC_LISTEN_DATA, 0) then
ugly.crit("Aborting, MQTT Subscribe failed: to %s:%s.", args.host, cfg.TOPIC_LISTEN_DATA)
os.exit(1)
end
ugly.notice("Successfully connected and listening for data")
cfg.last_ts_process = Pt.time()
end
--- Look up the openenergi entity id mapping
local function find_entity_id(key)
-- FIXME do the real implementation eventually!
return "EID_" .. key
end
local function update_cache(idx, ts, power)
-- like mq delta gadget, keep last sent ts, last sent val, and last ts, last val
if not cache[idx] then cache[idx] = CACHE_ENTRY.create(idx) end
local ce = cache[idx]
ce:update(ts, power)
ugly.debug("updating cache: %s", tostring(ce))
end
local function process_cache()
local rv = {}
for k,v in pairs(cache) do
if v:old() or v:changed() then
ugly.info("Will publish as old or changed: %s", tostring(v))
v.last_ts_sent = v.last_ts
v.last_v_sent = v.last_v
table.insert(rv, v)
end
end
--- Round to one decimal place.
local function round(num)
local mult = 10^1
return math.floor(num * mult + 0.5) / mult
end
-- Convert points that changed into entity readings for OE
local entities = {}
for k,v in pairs(rv) do
local eid = find_entity_id(v.key)
table.insert(entities, {
topic = "readings",
entity = eid,
type = "active-power",
timestamp = v.last_ts,
value = round(v.last_v / 1000), -- kW.
})
end
if #entities > 0 then return entities end
return nil, "No changed readings this time"
end
--- Common code for validating senml entry list
local function validate_entries(eee, bt)
local rval = true
for k,e in pairs(eee) do
if type(e) ~= "table" then rval = false; break end
if not e.v then rval = false; break end
if type(e.v) ~= "number" then rval = false; break end
if not e.n then rval = false; break end
if type(e.n) ~= "string" then rval = false; break end
if e.u and type(e.u) ~= "string" then rval = false; break end
if e.t and type(e.t) ~= "number" then rval = false; break end
if not e.t and not bt then rval = false; break; end
-- safe to modify here, it wasn't provided, so it will have no affect
if not e.t then e.t = 0 end
end
return rval
end
--- Common code for validating senml metadata
-- will modify meta if necessary!
local function validate_meta(eee)
if eee.bn and type(eee.bn) ~= "string" then return false end
-- safe to modify, not required, but need string concat to work
if not eee.bn then eee.bn = "" end
if eee.bt and type(eee.bt) ~= "number" then return false end
return true
end
--- Process newly arrived data messages.
-- Needs to validate the data message, then actually handle it.
local function handle_message_data(mid, topic, jpayload, qos, retain)
local chunks = pl.stringx.split(topic, "/")
if #chunks < 5 then
ugly.debug("Ignoring invalid/unprobed device on topic: %s", topic)
return
end
local payload, err = json.decode(jpayload)
if not payload then
ugly.warning("Invalid json in message on topic: %s, %s", topic, err)
ugly.debug("Raw message=<%s>", jpayload)
return
end
if not payload.hwc then
ugly.info("Ignoring unsuitable json format on topic: %s", topic);
return
end
if payload.hwc.error then
ugly.debug("Ignoring failed reading")
return
end
if type(payload.senml) ~= "table" then
ugly.warning("device data without a senml table?!")
return
end
if type(payload.senml.e) ~= "table" then
ugly.warning("device data with an empty senml.e field?!")
return
end
if not validate_meta(payload.senml) then
ugly.warning("senml metadata (bt,bn etc) was invalid?!")
return
end
if not validate_entries(payload.senml.e, payload.senml.bt) then
ugly.warning("senml entry set contained invalid entries, ignoring batch!")
return
end
if not payload.senml.bt then payload.senml.bt = 0 end
local this_power = {}
-- iterate the senml, looking for our v/i/pf data per index.
for _, e in ipairs(payload.senml.e) do
-- save only what we're interested in, ignore the rest.
local name_chunks = pl.stringx.split(e.n, "/")
local n = name_chunks[1]
local idx = name_chunks[2]
local fullt = payload.senml.bt + e.t
if idx then
if n == "current" or n == "volt" or n == "pf" then
local key = payload.senml.bn .. idx
if not this_power[key] then this_power[key] = {} end
--print("Inserting to key, n, value", key, n, e.v)
this_power[key][n] = e.v
this_power[key].ts = fullt
end
end
end
-- Now, convert to power and save for publishing
for fn,v in pairs(this_power) do
if v.current and v.volt and v.pf then
local power = v.current * v.volt * v.pf
update_cache(fn, v.ts, power)
end
end
end
mqtt.ON_MESSAGE = function(mid, topic, jpayload, qos, retain)
if mosq.topic_matches_sub(cfg.TOPIC_LISTEN_DATA, topic) then
local ok, err = pcall(handle_message_data, mid, topic, jpayload, qos, retain)
if not ok then
ugly.crit("Exception in message handler! %s", tostring(err))
end
end
end
mqtt:connect(args.host, 1883, 60)
while true do
local rc, code, err = mqtt:loop()
if not rc then
-- let process monitoring handle this.
ugly.warning("mqtt loop failed, exiting: %d %s", code, err)
os.exit(1)
end
-- This only has second resolution, but that's ok really.
local now_ts = Pt.time()
if now_ts - cfg.last_ts_process >= cfg.PROCESS_INTERVAL then
local entity_readings, err = process_cache()
cfg.last_ts_process = now_ts
if entity_readings then
mqtt:publish(cfg.TOPIC_PUBLISH, json.encode(entity_readings), 1, false)
ugly.notice("Posted %d changed entity readings", #entity_readings)
end
end
end