Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Branch: master
executable file 246 lines (202 sloc) 5.955 kB
#!/usr/bin/env python
import RPi.GPIO as GPIO
import mcp3008
import mediorc
import sys
import time
import pidvarmap
CHILLPIN = 17
def nv(v) :
if v is None :
return 0.0
else :
return v
def mean(l, default=0) :
try :
return reduce(lambda x,y: x+y, l) / float(len(l))
except :
return default
def beersafe(v) :
return min(110.0, max(38.0, v))
class PID(object) :
def __init__(self, temp_probe, target) :
self.temp_probe = temp_probe
self.samples = []
self.target = beersafe(target)
GPIO.setup(CHILLPIN, GPIO.OUT)
self.pid_ts = None
self.events = []
GPIO.output(CHILLPIN, False)
self.fPeriod = 300.0
self.fMinOff = 60.0 * 3
assert self.fPeriod > self.fMinOff
# pid state
self.fError = None
self.fIError = 0.0
self.fDError = None
self.fPriorError = None
self.fP = None
self.fI = None
self.fD = 0.0
self.fPID = None
self.fTemp = None
# guesstimated PID constants. ratio, not percentage based.
self.fKP = 0.02
self.fKI = 0.002
self.fKD = 0.0
# physical state
self.bCompressor = False
@property
def state(self) :
return {
'constants' : {
'kp' : nv(self.fKP),
'ki' : nv(self.fKI),
'kd' : nv(self.fKD),
},
'errors' : {
'err' : nv(self.fError),
'ierr' : nv(self.fIError),
'derr' : nv(self.fDError),
},
'pidstate' : {
'p' : nv(self.fP),
'i' : nv(self.fI),
'd' : nv(self.fD),
'pid' : nv(self.fPID)
},
'physical' : {
'compressor' : self.bCompressor
}
}
def step(self) :
now = time.time()
self.samples.append(self.temp_probe.read())
self.fTemp = mean(self.samples, default=self.target)
if self.pid_ts is None or self.pid_ts < now - self.fPeriod :
self.pid_ts = now
self.samples = []
# This is the error we want to reduce to 0. Positive values mean it is too warm.
self.fError = self.fTemp - self.target
print 'err: %f' % self.fError
# figure out proportion to use, start at 0
self.fPID = 0.0
# apply proportional...
self.fP = self.fKP * self.fError
# to degree-minutes
self.fIError += (self.fError * self.fPeriod) / 60.0
# apply integral...
self.fI = self.fKI * self.fIError
if self.fPriorError is not None :
self.fDError = ((self.fError - self.fPriorError) * 60.0) / self.fPeriod
self.fPriorError = self.fError
# if we've got a derivative... apply it
if self.fDError is not None :
self.fD = self.fKD * self.fDError
# sum it all up
self.fPID = self.fP + self.fI + self.fD
print time.ctime(), 'PID output: %f' % self.fPID
if self.fPID < 0.02 :
# nope, it's less than 2%. just turn it off and leave it that way until next time
self.events.append((now, False))
print time.ctime(), 'no chilling, pid output way low. just turning off'
else :
off_sec = (1.0 - min(self.fPID, 1.0)) * self.fPeriod
off_sec = max(off_sec, self.fMinOff) # 3 minute minimum to be off
on_sec = self.fPeriod - off_sec
self.events.append((now, True))
self.events.append((now + on_sec, False))
print time.ctime(), 'scheduled on/off events for pid output. on for %f, off for %f' % (on_sec, off_sec)
kept_events = []
for event in self.events :
ts,val = event
if ts <= now :
print time.ctime(), 'set chiller: %f' % val
GPIO.output(CHILLPIN, val)
self.bCompressor = val
else :
kept_events.append(event)
self.events = kept_events
class ChillmonBot(mediorc.IRC) :
def __init__(self, server, nick, chan, temp_probes, pid, zmqpub=None, zmqctrl=None) :
self.temp_probes = temp_probes
self.pid = pid
self.zmqpub = zmqpub
self.zmqctrl = zmqctrl
mediorc.IRC.__init__(self, server, nick, chan)
def on_pubmsg(self, c, e) :
chan = e.target()
words = e.arguments()[0].split(' ')
msg = None
if words[0] == '!temp' :
msg = 'temperatures: %s' % (', '.join(['%s: %0.1f F' % (name, probe.read()) for name,probe in self.temp_probes.items()]))
if msg :
self.connection.privmsg(chan, msg)
def do_work(self) :
if self.zmqpub :
ret = {
'temps' : dict(
[(k,v.read()) for k,v in self.temp_probes.items()]
+ [('run_avg', self.pid.fTemp), ('target', self.pid.target)]
)
}
ret.update(self.pid.state)
self.zmqpub.send(ret)
"""
self.fKP = 0.02
self.fKI = 0.002
self.fKD = 0.0
fIError
"""
if self.zmqctrl :
try :
v = self.zmqctrl.recv()
try :
cmd = v['cmd']
if cmd == 'set' :
k = v['k']
val = pidvarmap.typemap[k](v['v'])
try :
if k == 'target' :
val = beersafe(val)
setattr(self.pid, k, val)
except Exception, e :
print 'set error', e
pass
else :
print 'ERR: unknown command %s' % cmd
except KeyError, ke :
print 'ERR: %s' % ke
except IndexError, ie :
print 'ERR: %s' % ie
except zmqsub.NoMessagesException :
pass
self.pid.step()
class ChillmonBotThread(mediorc.IRCThread) :
def __init__(self, server, nick, chan, temperature_probes, pid, zmqpub=None, zmqctrl=None) :
self.bot_create = lambda: ChillmonBot(server, nick, chan, temperature_probes, pid, zmqpub=zmqpub, zmqctrl=zmqctrl)
mediorc.IRCThread.__init__(self)
if __name__ == '__main__' :
a2d = mcp3008.MCP3008(1000.0)
beer = mcp3008.TMP36(mcp3008.TMP36.F)
a2d.setup_channel(1, beer)
room = mcp3008.TMP36(mcp3008.TMP36.F)
a2d.setup_channel(2, room)
beertop = mcp3008.TMP36(mcp3008.TMP36.F)
a2d.setup_channel(3, beertop)
zmqpub = None
zmqctrl = None
try :
url = sys.argv[4]
from zmqfan import zmqsub
zmqpub = zmqsub.BindPub(url)
try :
zmqctrl = zmqsub.BindSub(sys.argv[5])
except IndexError :
pass
except IndexError :
pass
temp_probes = {'beer' : beer, 'room' : room}
pid = PID(beer, 72) # TODO configuration system for what temp to stick to
chill = ChillmonBotThread(sys.argv[1], sys.argv[2], sys.argv[3], temp_probes, pid, zmqpub=zmqpub, zmqctrl=zmqctrl)
chill.run()
Jump to Line
Something went wrong with that request. Please try again.