forked from The-AVR/AVR-2022
/
sandbox.py
193 lines (178 loc) · 8.64 KB
/
sandbox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# Here we import our own MQTT library which takes care of a lot of boilerplate
# code related to connecting to the MQTT server and sending/receiving messages.
# It also helps us make sure that our code is sending the proper payload on a topic
# and is receiving the proper payload as well.
#from typing_extensions import Self
from ast import Not
from bell.avr.mqtt.client import MQTTModule
from bell.avr.mqtt.payloads import (
AvrApriltagsVisiblePayload,
AvrAutonomousEnablePayload,
AvrAutonomousBuildingDropPayload,
)
from loguru import logger
import time
class Sandbox(MQTTModule):
# NOTE needs logic to handle multiple drops per auto
def __init__(self):
super().__init__()
self.topic_map = {"avr/autonomous/enable": self.on_autonomous_enable, "avr/apriltags/visible" : self.update_visible_tag, "avr/autonomous/building/drop" : self.reset_switch}
# self.topic_map = {"avr/apriltags/visible": self.on_autonomous_enable}
# self.visible_map = {"avr/apriltags/visible" : self.update_visible_tag} # On seeing an april tag, run update_visible_tag
self.visible_tag = None
self.has_dropped_0 = False
self.has_dropped_1 = False
self.has_dropped_2 = False
self.has_dropped_3 = False
self.has_dropped_4 = False
self.has_dropped_5 = False
self.has_dropped_all = False
self.HORIZ_DROP_TOLERANCE = 10000000 # Tolerance for dropping water autonomously in cm NOTE needs to be tuned
# Run autonomous when enabled
def on_autonomous_enable(self, payload: AvrAutonomousEnablePayload):
did_message_recieve = payload["enabled"]
logger.debug(f"visible tag: {self.visible_tag}")
logger.debug(f"recieved auton enable: {did_message_recieve}")
# Check if there is a visible april tag, if the vehicle is within specified horizontal tolerance, and if the vehicle has not already dropped the water
# while self.has_dropped_all == False:
loop_running = True
logger.debug(f"loop running: {loop_running}")
logger.debug(f"has dropped 4: {self.has_dropped_4}")
logger.debug(f"has dropped 5: {self.has_dropped_5}") #NOTE seems like loop is not seeing global variables?
logger.debug(f"visible tag in loop: {self.visible_tag}")
# start = time.time ()
# finish_1 = start + 2
# while time.time () < finish_1:
# pass
if self.visible_tag == 0 and self.has_dropped_0 == False:
start = time.time ()
finish_1 = start + 1
finish_2 = start + 2
self.open_servo(0) # Open servo on channel 0
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_1:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_2:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
self.close_servo(0)
self.has_dropped_0 = True
logger.debug(f"self.has_dropped: {self.has_dropped_0}")
if self.visible_tag == 1 and self.has_dropped_1 == False:
start = time.time ()
finish_1 = start + 1
finish_2 = start + 2
self.close_servo(1) # Open servo on channel 0
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_1:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_2:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
self.open_servo(1)
self.has_dropped_1 = True
logger.debug(f"self.has_dropped: {self.has_dropped_1}")
if self.visible_tag == 2 and self.has_dropped_2 == False:
start = time.time ()
finish_1 = start + 1
finish_2 = start + 2
self.open_servo(0) # Open servo on channel 0
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_1:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_2:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
self.close_servo(0)
self.has_dropped_2 = True
logger.debug(f"self.has_dropped: {self.has_dropped_2}")
if self.visible_tag == 3 and self.has_dropped_3 == False:
start = time.time ()
finish_1 = start + 1
finish_2 = start + 2
self.close_servo(1) # Open servo on channel 0
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_1:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_2:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
self.open_servo(1)
self.has_dropped_3 = True
logger.debug(f"self.has_dropped: {self.has_dropped_3}")
if self.visible_tag == 4 and self.has_dropped_4 == False:
start = time.time ()
finish_1 = start + 1
finish_2 = start + 2
self.open_servo(0) # Open servo on channel 0
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_1:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_2:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
self.close_servo(0)
self.has_dropped_4 = True
logger.debug(f"self.has_dropped: {self.has_dropped_4}")
if self.visible_tag == 5 and self.has_dropped_5 == False:
start = time.time ()
finish_1 = start + 1
finish_2 = start + 2
self.close_servo(1) # Open servo on channel 0
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_1:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
while time.time () < finish_2:
pass
self.blink_leds(0.5) # Blink LEDs 1 times at 0.5 second interval
self.open_servo(1)
self.has_dropped_5 = True
logger.debug(f"self.has_dropped: {self.has_dropped_5}")
if self.has_dropped_0 == True and self.has_dropped_1 == True and self.has_dropped_2 == True and self.has_dropped_3 == True and self.has_dropped_4 == True and self.has_dropped_5 == True:
logger.debug(f"ending loop")
self.has_dropped_all = True
def reset_switch(self, payload: AvrAutonomousBuildingDropPayload):#resets the drop so it can drop more than once per tag
reset = payload["enabled"]
if reset == True:
self.has_dropped_0 = False
self.has_dropped_1 = False
self.has_dropped_2 = False
self.has_dropped_3 = False
self.has_dropped_4 = False
self.has_dropped_5 = False
# Update class variable visible_tag to the most currently seen tag and log the horizontal distance between the vehicle and april tag
def update_visible_tag(self, payload: AvrApriltagsVisiblePayload):
tag_list = payload["tags"] #this is to get the list out of the payload
horiz_dist = tag_list[0]["horizontal_dist"]
tag_id = tag_list[0]["id"]
logger.debug(f"visible tag out in update_visible_tag: {tag_id}")
# logger.debug(f"Horizontal distance: {horiz_dist} cm") # NOTE need to check which logger method to use
if horiz_dist < self.HORIZ_DROP_TOLERANCE:
self.visible_tag = tag_id
# Open servo on desired channel
def open_servo(self, channel):
self.send_message(
"avr/pcm/set_servo_open_close",
{"servo": channel, "action": "open"}
)
def close_servo(self, channel):
self.send_message(
"avr/pcm/set_servo_open_close",
{"servo": channel, "action": "close"}
)
# Blink led for desired iterations with desired wrbg value for specified time interval
def blink_leds(self, time):
wrgb = (0,0,51,255)
self.send_message(
"avr/pcm/set_temp_color",
{"wrgb": wrgb, "time": time}
)
if __name__ == "__main__":
box = Sandbox()
box.run()