Skip to content

Commit

Permalink
publish pwr messages in JSON/REST style
Browse files Browse the repository at this point in the history
  • Loading branch information
icarus75 committed Apr 20, 2010
1 parent 5f516e2 commit 6667727
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 12 deletions.
44 changes: 40 additions & 4 deletions openwrt/package/flukso/src/data.lua
@@ -1,6 +1,7 @@
--
-- data.lua: property and methods for manipulating incoming measurements
-- Copyright (c) 2009 jokamajo.org
-- 2010 flukso.net
--
-- This program is free software; you can redistribute it and/or
-- modify it under the terms of the GNU General Public License
Expand Down Expand Up @@ -45,10 +46,7 @@ end

function filter(M, span, offset)
for meter, T in pairs(M) do
local H = {} -- helper table, an indexed array containing all the measurement's timestamps
for timestamp in pairs(T) do H[#H+1] = timestamp end
table.sort(H) -- sort in ascending order, oldest timestamps will be treated first

local H = timestamps(T)
local i = 2
while not (H[i+1] == nil or H[i] > os.time()-offset) do
if math.floor(H[i-1]/span) == math.floor(H[i]/span) and math.floor(H[i]/span) == math.floor(H[i+1]/span) then
Expand All @@ -60,3 +58,41 @@ function filter(M, span, offset)
end
end
end

function truncate(M, cutoff)
for meter, T in pairs(M) do
local H = timestamps(T)
for i = H[1], H[#H]-60 do
T[i] = nil
end
end
end

function fill(M)
for meter, T in pairs(M) do
local H = timestamps(T)
for i = H[1]+1, H[#H]-1 do
if T[i] == nil then T[i] = T[i-1] end
end
end
end

function json_encode(M)
J = {}
for meter, T in pairs(M) do
J[meter] = '['
local H = timestamps(T)
for i = H[1], H[#H] do
J[meter] = J[meter] .. '[' .. T[i] .. ']'
end
J[meter] = J[meter] .. ']'
end
return J
end

local function timestamps(T)
local H = {} -- helper table, an indexed array containing all the measurement's timestamps
for timestamp in pairs(T) do H[#H+1] = timestamp end
table.sort(H) -- sort in ascending order, oldest timestamps will be treated first
return H
end
58 changes: 50 additions & 8 deletions openwrt/package/flukso/src/flukso.lua
Expand Up @@ -35,10 +35,12 @@ local param = {xmlrpcaddress = 'http://logger.flukso.net/xmlrpc',
pwraddress = '255.255.255.255',
pwrport = 26488,
pwrenable = false,
pwrinterval = 1,
pwrdir = '/tmp/sensor',
device = '/dev/ttyS0',
interval = 300}

function receive(child, device, pwraddress, pwrport, pwrenable)
function dispatch(e_child, p_child, device, pwraddress, pwrport, pwrenable)
return coroutine.create(function()
-- open the connection to the syslog deamon, specifying our identity
posix.openlog('flukso')
Expand All @@ -60,11 +62,15 @@ function receive(child, device, pwraddress, pwrport, pwrenable)
os.execute('gpioctl set 4 > /dev/null')

local meter, value = line:sub(5, 36), tonumber(line:sub(38))
coroutine.resume(child, meter, os.time(), value)
coroutine.resume(e_child, meter, os.time(), value)

elseif line:sub(1, 3) == 'pwr' and line:len() == 47 and line:find(':') == 37 then -- user data + additional data integrity checks
if pwrenable then udp:send(line) end
local meter, value = line:sub(5, 36), tonumber(line:sub(38))
if pwrenable then coroutine.resume(p_child, meter, os.time(), value) end

elseif line:sub(1, 3) == 'msg' then -- control data
posix.syslog(31, 'received message from '..device..': '..line:sub(5))

else
posix.syslog(27, 'input error on '..device..': '..line)
end
Expand Down Expand Up @@ -142,24 +148,51 @@ function gc(child)
end)
end

function debug()
function polish(child, cutoff)
return coroutine.create(function(measurements)
while true do
measurements:fill()
measurements:truncate(cutoff)
coroutine.resume(child, measurements)
measurements = coroutine.yield()
end
end)
end

function publish(child, dir)
return coroutine.create(function(measurements)
os.execute('mkdir -p ' .. dir .. ' > /dev/null')
while true do
local measurements_json = measurements:json_encode()
for meter, json in measurements_json do
io.output(dir .. '/' .. meter)
io.write(json)
io.close()
end
coroutine.resume(child, measurements)
measurements = coroutine.yield()
end
end)
end

function debug(child)
return coroutine.create(function(measurements)
while true do
dbg.vardump(measurements)
if child then coroutine.resume(child, measurements) end
measurements = coroutine.yield()
end
end)
end

-- receive: listen to the serial port for incoming pulses
-- dispatch: listen to the serial port for incoming pulses
-- buffer: buffer the pulses in a measurement object
-- filter: sweep recursively to filter all redundant entries
-- send: report the measurements to the server via xmlrpc
-- gc: perform a full garbage collection cycle
-- debug: dump measurements table to stdout

local chain = receive(
buffer(
local e_chain = buffer(
filter(
filter(
filter(
Expand All @@ -172,6 +205,15 @@ local chain = receive(
, 900, 7200)
, 60, 0)
, param.interval)
, param.device, param.pwraddress, param.pwrport, param.pwrenable)

local p_chain = buffer(
polish(
publish(
debug()
, param.pwrdir)
, 60)
, param.pwrinterval)

local chain = dispatch(e_chain, p_chain, param.device, param.pwraddress, param.pwrport, param.pwrenable)

coroutine.resume(chain)

0 comments on commit 6667727

Please sign in to comment.