/
swarm.py
159 lines (123 loc) · 4.17 KB
/
swarm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""Library for controlling multiple DJI Ryze Tello drones.
"""
from threading import Thread, Barrier
from queue import Queue
from typing import List, Callable
from .tello import Tello, TelloException
from .enforce_types import enforce_types
@enforce_types
class TelloSwarm:
"""Swarm library for controlling multiple Tellos simultaneously
"""
tellos: List[Tello]
barrier: Barrier
funcBarier: Barrier
funcQueues: List[Queue]
threads: List[Thread]
@staticmethod
def fromFile(path: str):
"""Create TelloSwarm from file. The file should contain one IP address per line.
Arguments:
path: path to the file
"""
with open(path, 'r') as fd:
ips = fd.readlines()
return TelloSwarm.fromIps(ips)
@staticmethod
def fromIps(ips: list):
"""Create TelloSwarm from a list of IP addresses.
Arguments:
ips: list of IP Addresses
"""
if not ips:
raise TelloException("No ips provided")
tellos = []
for ip in ips:
tellos.append(Tello(ip.strip()))
return TelloSwarm(tellos)
def __init__(self, tellos: List[Tello]):
"""Initialize a TelloSwarm instance
Arguments:
tellos: list of [Tello][tello] instances
"""
self.tellos = tellos
self.barrier = Barrier(len(tellos))
self.funcBarrier = Barrier(len(tellos) + 1)
self.funcQueues = [Queue() for tello in tellos]
def worker(i):
queue = self.funcQueues[i]
tello = self.tellos[i]
while True:
func = queue.get()
self.funcBarrier.wait()
func(i, tello)
self.funcBarrier.wait()
self.threads = []
for i, _ in enumerate(tellos):
thread = Thread(target=worker, daemon=True, args=(i,))
thread.start()
self.threads.append(thread)
def sequential(self, func: Callable[[int, Tello], None]):
"""Call `func` for each tello sequentially. The function retrieves
two arguments: The index `i` of the current drone and `tello` the
current [Tello][tello] instance.
```python
swarm.parallel(lambda i, tello: tello.land())
```
"""
for i, tello in enumerate(self.tellos):
func(i, tello)
def parallel(self, func: Callable[[int, Tello], None]):
"""Call `func` for each tello in parallel. The function retrieves
two arguments: The index `i` of the current drone and `tello` the
current [Tello][tello] instance.
You can use `swarm.sync()` for syncing between threads.
```python
swarm.parallel(lambda i, tello: tello.move_up(50 + i * 10))
```
"""
for queue in self.funcQueues:
queue.put(func)
self.funcBarrier.wait()
self.funcBarrier.wait()
def sync(self, timeout: float = None):
"""Sync parallel tello threads. The code continues when all threads
have called `swarm.sync`.
```python
def doStuff(i, tello):
tello.move_up(50 + i * 10)
swarm.sync()
if i == 2:
tello.flip_back()
# make all other drones wait for one to complete its flip
swarm.sync()
swarm.parallel(doStuff)
```
"""
return self.barrier.wait(timeout)
def __getattr__(self, attr):
"""Call a standard tello function in parallel on all tellos.
```python
swarm.command()
swarm.takeoff()
swarm.move_up(50)
```
"""
def callAll(*args, **kwargs):
self.parallel(lambda i, tello: getattr(tello, attr)(*args, **kwargs))
return callAll
def __iter__(self):
"""Iterate over all drones in the swarm.
```python
for tello in swarm:
print(tello.get_battery())
```
"""
return iter(self.tellos)
def __len__(self):
"""Return the amount of tellos in the swarm
```python
print("Tello count: {}".format(len(swarm)))
```
"""
return len(self.tellos)