/
lights_gpio.py
81 lines (54 loc) · 2.56 KB
/
lights_gpio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from __future__ import print_function
import datetime
from weakref import ref
import RPi.GPIO as GPIO
from ConfigParserDefault import ConfigParserDefault
from mainLoop import QueueHandlerProcess
from songConfig import SongConfig
gcp = ConfigParserDefault()
gcp.read('config.ini')
delayBetweenUpdates = float(gcp.get_def('lights', 'delayBetweenUpdates', 0.05))
pins = [0, 1, 4, 7, 8, 9, 10, 11, 14, 15, 17, 18, 21, 22, 23, 24, 25]
class LightController(object):
def __init__(self, analyzer, config):
self.analyzer = ref(analyzer)
self.songConfig = SongConfig(config)
GPIO.setmode(GPIO.BCM)
for pin in pins:
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, False)
self.lastLightUpdate = datetime.datetime.now()
self.previousLightStates = [False] * analyzer.frequencyBands
def _onChunk(self):
now = datetime.datetime.now()
if (now - self.lastLightUpdate).total_seconds() > delayBetweenUpdates:
self.lastLightUpdate = now
spectrum = self.analyzer().spectrum
bands = [spectrum[i] for i in self.songConfig.frequencyBandOrder]
lightStates = [level > self.songConfig.frequencyThresholds[channel] for channel, level in enumerate(bands)]
for channel, value in enumerate(lightStates):
if not value:
if self.previousLightStates[channel]:
lightStates[channel] = bands[channel] > self.songConfig.frequencyOffThresholds[channel]
for channel, value in enumerate(lightStates):
if self.previousLightStates[channel] != value:
GPIO.output(pins[channel], value)
self.previousLightStates = lightStates
def onMessage(self, message):
messageType = message[0]
if messageType == 'songChange':
self.songConfig.loadSongSettings(*message[1:])
elif messageType == 'chunk':
self._onChunk()
def runLightsProcess(messageQueue, nice=None):
LightsProcess(messageQueue, nice).loop()
class LightsProcess(QueueHandlerProcess):
def __init__(self, messageQueue, nice=None):
super(LightsProcess, self).__init__(nice)
import spectrum
self.analyzer = spectrum.SpectrumAnalyzer(self.messageQueue, self.config)
self.lightController = LightController(self.analyzer, self.config)
def onMessage(self, messageType, message):
super(LightsProcess, self).onMessage(messageType, message)
self.analyzer.onMessage(message)
self.lightController.onMessage(message)