-
Notifications
You must be signed in to change notification settings - Fork 1
/
collector.py
130 lines (92 loc) · 2.86 KB
/
collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
class for collectors
"""
import numpy as np
import queue
class Collector:
"""
collector class for audio processing
"""
def __init__(self, N=400, hop=160, frame_size=50, update_size=10, frames_post=10, energy_frame_size=4, is_audio_record=False):
"""
constructor
"""
# arguments
self.N = N
self.hop = hop
self.frame_size = frame_size
self.update_size = update_size
self.frames_post = frames_post
self.energy_frame_size = energy_frame_size
self.is_audio_record = is_audio_record
# determine data size
self.data_size = int((frame_size * hop + N ) / hop) + frames_post
# data containers
self.q = queue.Queue(maxsize=self.data_size)
self.e_q = queue.Queue(maxsize=self.energy_frame_size)
self.x = np.empty(shape=(0), dtype=np.float32)
# whole audio data for recording
self.x_all = np.empty(shape=(0), dtype=np.float32)
self.e_all = np.empty(shape=(0), dtype=np.float32)
self.on_all = np.empty(shape=(0), dtype=np.float32)
# vars
self.is_collecting = False
self.collection_counter = 0
def start_collecting(self):
"""
start collecting data e.g from onset
"""
self.is_collecting = True
def update_collect(self, x, e=None, on=None):
"""
update the collection with an entry
"""
# put e
if not self.e_q.full(): self.e_q.put(e)
# usual update of new item
if not self.is_collecting:
# remove last
if self.q.qsize() == self.update_size: dummy = self.q.get_nowait()
# error message
if self.q.qsize() > self.update_size: print("wrong handling of buffer")
# put conditions
if not self.q.full(): self.q.put(x)
# # save all for audio record
# if self.is_audio_record:
# self.x_all = np.append(self.x_all, x)
# self.e_all = np.append(self.e_all, e)
# if on: self.on_all = np.append(self.on_all, len(self.x_all) / self.hop)
def reset_collection_all(self):
"""
reset the collection of all data during audio record
"""
self.x_all = np.empty(shape=(0), dtype=np.float32)
self.e_all = np.empty(shape=(0), dtype=np.float32)
self.on_all = np.empty(shape=(0), dtype=np.float32)
def read_collection(self):
"""
read out whole collection
"""
# reset flag
self.is_collecting = False
# reset x
self.x = np.empty(shape=(0), dtype=np.float32)
# read out elements
while not self.q.empty(): self.x = np.append(self.x, self.q.get_nowait())
# update collection counter
self.collection_counter += 1
return self.x
def read_energy_collection(self):
"""
read out whole collection
"""
# reset x
e = np.empty(shape=(0), dtype=np.float32)
# read out elements
while not self.e_q.empty(): e = np.append(e, self.e_q.get_nowait())
return e
def is_full(self):
"""
get full info
"""
return self.q.full()