-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
262 lines (225 loc) · 8.23 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# Python 2.7.15
#!/usr/bin/env python
# You must have /dev/spidev* devices / bcm2708_spi driver for this to work.
# solid(x1,x2,list, )
# Flickers(x1,x2, color, number-of-pixels-to-randomly-assign, , diration-of-flash, diraiton-of-off, fade-on, fade-off)
# Strobe (x1, x2, color, diration-of-flash, diraiton-of-off, fade-on, fade-off)
# Rotate(LEFT, RIGHT, BOTH) (Sides to do a roate on) happens on the hardware thread just before output so all inputs are ok.
##------- Enable --------#
#from tkinter import *
DEBUG = True
##-----------------------#
#------- Disable --------#
#import fcntl, array, RPi.GPIO as GPIO
#DEBUG = False
#-----------------------#
#
# ^ ENABLE THE SCRIPT ABOVE ^
import tornado.websocket
#from tornado.ioloop import IOLoop
#from tornado.locks import Condition
import threading
import tornado.ioloop
import tornado.web
from tornado import gen
import asyncio
import threading
import colorsys
import time #import time, sleep #time libraries
import math
from enum import Enum
from RotateAllService import RotateAllService
from random import randint
from HardwareController import HardwareController
from DebugWindow import DebugWindow
from Effect import Effect
from Wave import Wave, Direction
from Color import Color
from Light import Light
from util import delay, reset
import sys
from tkinter import *
if sys.version_info[2] == 5:
#import realdeal.
pass
else:
#DEBUG = True
pass
### /Configuration ###
## Diagram by Mac Carter
## Here is your pi interface
# with the pins layed out on the board
# =======================================
# | o 5v |
# | o o |
# | --------- o gnd |
# | o o |
# | o o |
# | o o |
# | o o |
# | o o |
# | o o |
# | Di o |
# | o o |
# | Ci Ei |
# | o Li |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | __________ |
# | | | |
# | |ETHERNET| |
# | | | |
# ===|--------|====|---------|===========
# |===USB===|
### /Configuration ###
NUM_LEDS = 144
FONT="Helvetica 8"
# In addition to the hardware SPI pins, we require two general GPIO pins for
# the enable and latch pins. It doesn't matter what pins you use
ENABLE_PIN = 8
LATCH_PIN = 7
class EventLoop(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.frame = 0
self.effects = []
# self.rotateAllService = RotateAllService(hardwareController)
# hardwareController.set_hue_rotation(180)
def run(self):
delay(2000)
# The enent loop
print("here")
# Runs at ~1.37k+ fps. DAMN. except in tkinter, cause tk sucks.
while(True):
for o in self.effects:
o.step()
# self.test.step()
hardwareController.write()
reset(leds)
# delay(200)
# ResetCalledLeds()
# Last touched by UDID of init time of obj callee
# Was Touched LastTickBy YES|NO set by reset or when has been called is boolean.
#
class TornadoThread (threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
delay(2000)
leds[0].setColorRBGW(255,255,255,255)
hardwareController.write()
print("Start a tornado")
asyncio.set_event_loop(asyncio.new_event_loop())
self.application = tornado.web.Application([(r"/", WebSocketHandler),])
self.application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
class WebSocketHandler(tornado.websocket.WebSocketHandler):
# @tornado.web.asynchronous
# @gen.coroutine
def check_origin(self, origin):
return True
@tornado.web.asynchronous
@gen.coroutine
def open(self):
i=0
@tornado.web.asynchronous
@gen.coroutine
def on_close(self):
i=0
print("connection closed")
@tornado.web.asynchronous
@gen.coroutine
def on_message(self,message):
print(message)
# print ("message received: {}").format(message)
messageHeader = message[0]
# print("message header: {}").format(messageHeader)
if messageHeader == "a":#done
white = Color()
white.setColorRBGW(255,0,0,0)
eventLoop.effects.append(Wave(leds, 71, Direction.BOTH_NO_CENTER, white))
if messageHeader == "s":#done
white = Color()
white.setColorRBGW(0,255,255,0)
eventLoop.effects.append(Wave(leds, 71, Direction.BOTH_NO_CENTER, white))
if messageHeader == "d":#done
white = Color()
white.setColorRBGW(255,0,255,0)
eventLoop.effects.append(Wave(leds, 71, Direction.BOTH_NO_CENTER, white))
if messageHeader == "f":#done
white = Color()
white.setColorRBGW(255,255,0,0)
eventLoop.effects.append(Wave(leds, 71, Direction.BOTH_NO_CENTER, white))
if messageHeader == "j":#done
white = Color()
white.setColorRBGW(0,0,255,0)
eventLoop.effects.append(Wave(leds, 71, Direction.BOTH_NO_CENTER, white))
if messageHeader == "k":#done
white = Color()
white.setColorRBGW(255,0,0,0)
eventLoop.effects.append(Wave(leds, 71, Direction.BOTH_NO_CENTER, white))
if messageHeader == "l":#done
# white = Color()
# white.setColorRBGW(0,255,0,0)
# eventLoop.effects.append(Wave(leds, 71, Direction.BOTH_NO_CENTER, white))
hardwareController.set_hue_rotation(0)
if messageHeader == ";":#done
# white = Color()
# white.setColorRBGW(0,0,0,0)
# eventLoop.effects.append(Wave(leds, 71, Direction.BOTH_NO_CENTER, white))
hardwareController.set_hue_rotation(180)
# if messageHeader == "000":#done
# static(message)
# if messageHeader == "100":#Enable threading
# pass
# if messageHeader == "200":#Still searching for a library
# pass
# if messageHeader == "300":#Enable threading
# pass
# if messageHeader == "400":#Enable threading
# pass
# if messageHeader == "500":#Search for a good distribution of light colors
# pass
# if messageHeader == "600":#search for a good distibution of random in and out clouds.
# pass
if message == "NumberOfLights()":#done
self.write_message(str(NUM_LEDS))
else:
self.write_message(str("command not understood"))
# FlashAllRedPattern();#incorrect command.
if __name__ == "__main__":
global leds
leds = [ Light() for i in range(NUM_LEDS)]
global hardwareController
global window
global tkWindow
global effectList
global rotation
if DEBUG:
tkWindow = Tk()
window = Canvas(tkWindow, width=1200, height=480)
else:
window=0
hardwareController = HardwareController(leds,window)
tornadoThread = TornadoThread()
tornadoThread.start()
eventLoop = EventLoop()
eventLoop.start()
#open the SPI device for writing
if DEBUG:
n = 0
while(n<NUM_LEDS):
pos = (n+1)
x1 = 7
y1 = 7
window.create_rectangle((x1*pos), 10, (y1*pos)+10, 300, fill="yellow", tags ="Light_"+str(pos))
window.create_text((x1*pos)+5, y1+10 , text=str(pos), fill="purple", font=FONT, tags = "Text_"+str(pos))
n = n+1
window.pack(side="top", fill="both", expand=True)
tkWindow.mainloop()