-
Notifications
You must be signed in to change notification settings - Fork 0
/
NumpyQueue.py
133 lines (95 loc) · 3.82 KB
/
NumpyQueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import numpy as np
import threading
class NumpyQueue(): #TODO: make threadsafe
def __init__(self, size, roll_when_full=False, dtype=np.float32):
self._arr = np.empty(size, dtype=dtype)
self._cur_idx = 0
self.roll_when_full = roll_when_full
def append(self, values):
if self._cur_idx + len(values) > len(self._arr):
if self.roll_when_full: #if roll (overwrite start of queue) to make room
self._arr = np.roll(self._arr, len(self._arr) - self._cur_idx - len(values))
self._arr[max(-len(self._arr), -len(values)): ] = values[:min(len(values),len(self._arr))]
elif self._cur_idx < len(self._arr): #if at least 1 position free
self._arr[self._cur_idx : ] = values[0 : len(self._arr) - self._cur_idx] #Add as many items as possible
self._cur_idx = len(self._arr)
else:
self._arr[self._cur_idx: self._cur_idx + len(values)] = values
self._cur_idx += len(values)
def peek_idx(self, idx):
return self._arr[idx].copy() #TODO: copy or not? is this safe
def peek(self, count):
return self._arr[:count].copy()
def __len__(self):
return self._cur_idx
def __str__(self):
return str(self._arr)
def pop(self, count):
assert count >= 0
if self._cur_idx < count:
raise Exception(f'{count} exceeds amount of items in queue ({self._cur_idx + 1})')
vals = self._arr[0: count].copy()
self._cur_idx -=count
self._arr = np.roll(self._arr, -count) #[1, 2, 3, 4|] --> [3, 4,| 1, 2]
return vals
class ThreadNumpyQueue(NumpyQueue):
def __init__(self, *args, **kwargs):
super(ThreadNumpyQueue, self).__init__(*args, **kwargs)
self._lock = threading.Lock()
def append(self, values):
with self._lock:
return super(ThreadNumpyQueue, self).append(values)
def peek_idx(self, idx):
with self._lock:
return super(ThreadNumpyQueue, self).peek_idx(idx)
def peek(self, count):
with self._lock:
return super(ThreadNumpyQueue, self).peek(count)
def __len__(self):
return self._cur_idx
def __str__(self):
with self._lock:
return super(ThreadNumpyQueue, self).__str__()
def pop(self, count):
with self._lock:
return super(ThreadNumpyQueue, self).pop(count)
if __name__ == "__main__":
testqueue = NumpyQueue(5000, False)
for i in range(10000):
testqueue.append([i])
testqueue.pop(1)
assert len(testqueue) == 0
for i in range(10000):
testqueue.append([i])
assert np.all(testqueue.pop(5000) == np.array(range(0, 5000)))
testqueue = NumpyQueue(5000, True)
for i in range(10000):
testqueue.append([i])
assert np.all(testqueue.pop(5000) == np.array(range(5000, 10000)))
assert len(testqueue) == 0
randarr = np.random.rand(100)
testqueue = NumpyQueue(5000, True, dtype=np.float64)
for i in range(50):
testqueue.append(randarr)
for i in range(50):
returned_arr = testqueue.pop(len(randarr))
assert np.all(randarr == returned_arr)
testqueue.append(range(100))
for i in range(10):
testqueue.append(range(5))
testqueue.pop(10)
assert np.all(testqueue.pop(50) == np.array( [*range(5)] * 10))
for i in range(1, 10):
testqueue.append([i] * i)
testqueue.pop(max(1, i-1))
print(f"len is now: {len(testqueue)}")
for i in testqueue.pop(len(testqueue)):
print(i)
for i in range(10):
testqueue.append([i]*100)
testqueue.pop(900)
assert np.all(testqueue.pop(100) == np.array([9] * 100))
for i in range(10000):
testqueue.append([i])
assert(testqueue.pop(len(testqueue)) )
print("All assertions valid!")