-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
71 lines (55 loc) · 1.86 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from Modules.Runtime.environment import SAT
if SAT:
from Modules.Magnetometer import magnetometer_factory
from Modules.Magnetorquer.hBridge import hBridge
from Modules.DataHub.remote import DataHub
import time
import math
import threading
if SAT:
sensor = magnetometer_factory.initialize()
actuator = hBridge(21, 5, 16, 20)
hub = DataHub("http://rogozin.space/varserver/")
print("Trying to achieve ground station address...")
req = hub.get(["lab_ip", "lab_port"])
lab_addr = "http://" + req[0]["content"] + ":" + str(req[1]["content"])
hub = DataHub(lab_addr)
hub.set_endpoints("/get", "/set")
def readMagnet():
if SAT:
global senor
axes = sensor.readMagnet()
axes = [axes["x"], axes["y"], axes["z"]]
return [axes, 0]
else:
return [[0, 0, 0], 0]
magnetometer_value, magnetorquer_value, _working = None, None, True
def network_thread():
global _working, magnetometer_value, magnetorquer_value
while _working:
try:
print("Updating the server with magnetometer value")
hub.set({"sat_mag": magnetometer_value})
except:
print("An error occured while updating the server")
try:
print("Querying the server for magnetorquer commands")
magnetorquer_value = hub.get(["sat_magnetorquer"])[0]["content"]
except:
print("An error occured while querying the server")
t = threading.Thread(target=network_thread)
t.start()
while True:
try:
if magnetorquer_value:
actuator.SetDirection(magnetorquer_value)
time.sleep(0.5)
actuator.SetDirection(0)
time.sleep(0.05)
magnetometer_value = readMagnet()
if not magnetorquer_value:
time.sleep(0.15)
except KeyboardInterrupt:
actuator.SetDirection(0)
break
_working = False