This repository has been archived by the owner on Sep 22, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
RingArray2.py
95 lines (80 loc) · 4.21 KB
/
RingArray2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import numpy as np
import PQTools as pq
import gc
import logging
pqLogger = logging.getLogger('pqLogger')
class ringarray2():
def __init__(self, max_size = 2000000):
self.ringBuffer = np.zeros(max_size)
self.max_size = max_size
self.size = 0
# Are those two even necessary? Lets find out
def get_data_view(self):
return self.ringBuffer[:self.size]
def get_index(self, index):
return self.ringBuffer[index]
def attach_to_back(self, data_to_attach):
self.check_buffer_overflow(data_to_attach.size)
self.ringBuffer[self.size:self.size + data_to_attach.size] = data_to_attach
self.size += data_to_attach.size
def cut_off_front2(self,index):
cut_of_data = self.ringBuffer[:index].copy() # is this copy necessary?
self.ringBuffer[:self.size - index] = self.ringBuffer[index:self.size]
self.size -= index
return cut_of_data
def cut_off_before_first_zero_crossing(self):
first_zero_index = pq.detect_zero_crossings(self.ringBuffer)[0]
#print('First detected zero_index: '+str(first_zero_index))
self.ringBuffer[:self.size - first_zero_index] = self.ringBuffer[first_zero_index:self.size]
#print('First value of new data : '+str(self.ringBuffer[0]))
self.size = self.size - first_zero_index
return first_zero_index
def cut_off_10periods(self):
zero_indices = pq.detect_zero_crossings(self.ringBuffer)[:21]
data_10periods = self.ringBuffer[:zero_indices[-1]]
self.ringBuffer[:self.size - zero_indices[-1]] = self.ringBuffer[zero_indices[-1]:self.size]
self.size = self.size - zero_indices[-1]
#print('Last Value of data_10periods1 : '+str(data_10periods[-1]))
#print('First Value of ringBuffer :'+str(self.ringBuffer[-1]))
return data_10periods, zero_indices
def cut_off_10periods2(self):
zero_indices = np.zeros(21)
zc = 0
for i in xrange(1,21):
dataslice = self.ringBuffer[zero_indices[i-1] + 9500 : zero_indices[i-1] + 10500]
zero_crossings_in_dataslice = pq.detect_zero_crossings(dataslice)
if zero_crossings_in_dataslice.size > 1:
pqLogger.warning('Multiple zero crossings in single dataslice, taking the more plausible one')
pqLogger.warning(str(zero_crossings_in_dataslice))
zero_crossings_in_dataslice = zero_crossings_in_dataslice[np.abs(zero_crossings_in_dataslice-500).argmin()]
with open('zero_crossings','a') as f:
f.write(str(zero_crossings_in_dataslice)+'\n')
zero_indices[i] = zero_indices[i-1] + zero_crossings_in_dataslice + 9500
data_10periods = self.ringBuffer[:zero_indices[-1]].copy()
self.ringBuffer[:self.size - zero_indices[-1]] = self.ringBuffer[zero_indices[-1]:self.size]
self.size = self.size - zero_indices[-1]
return data_10periods, zero_indices
def attach_to_front(self, data_to_attach):
self.check_buffer_overflow(data_to_attach.size)
# Move current ringBuffer content out of the way
self.ringBuffer[data_to_attach.size : data_to_attach.size + self.size] = self.ringBuffer[:self.size]
self.size += data_to_attach.size
# Add new content to front
self.ringBuffer[:data_to_attach.size] = data_to_attach
def check_buffer_overflow(self,size_to_attach):
while self.size + size_to_attach > self.max_size:
#print('==Reallocating Buffer to '+str(self.max_size * 1.7)+'==')
pqLogger.warning('Reallocating Buffer to '+str(self.max_size * 1.7))
# Allocate new buffer, 1.7 times bigger than the old one
self.max_size *= 1.7 # if this resolves to float no problem, np.zeros can handle it
newRingBuffer = np.zeros(self.max_size)
newRingBuffer[:self.size] = self.ringBuffer[:self.size]
self.ringBuffer = newRingBuffer
# Manually call garbage collection, does this make sense?
#gc.collect()
# Helper Functions:
def plot_ringBuffer(self):
import matplotlib.pyplot as plt
plt.plot(self.ringBuffer[:self.size])
plt.grid(True)
plt.show()