-
Notifications
You must be signed in to change notification settings - Fork 0
/
opius.py
executable file
·251 lines (218 loc) · 9.35 KB
/
opius.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
#!/usr/bin/env python
# Author: Aaron Adel <aadel112@gmail.com>
# Copyright: This module has been placed in the public domain.
"""
Getting peak concurent event counts, a.k.a peak usage,
peak seats, etc. is a fairly common use case. Coming from an IVR-type background I see a large number of people who actually want to be billed this way.
Getting peaks is easy, but getting accurate peaks, quickly is not. For even modest data sizes of ~50,000 records, getting a days worth of peaks really requires looping through each second of the day (86,400) seconds. As you can see this can spiral out of control quickly. The common quick-access data structure, the dictionary, or hash is really inefficient. You could have a solution that runs in hours or days. My purpose for writing this is multi-fold:
1) To provide an off-the-shelf calculator that is fast, accurate, and memory efficient.
2) To provide this solution in such a way that it reaches the maximum number of developers. This is why I chose python even though the perl solution to this I've written before seems faster.
3) To illustrate an interesting concept, which is that sqlite can be used in memory as a drop-in replacement for a data structure. Prpoerly indexed, and setup, it can be lightning fast. It's especially useful for ranged index searches.
This module implements the following class:
- 'oPiUS', the objext that calculates peaks using sqlite
Functions:
- 'main', the function that's called when running from the command line
How to use this module in shell
===============================
1) You can either pipe input to it, which is the default, or you can set it up to take an input_file. The input file must be delimited. The default delimiter is ',', but you can set this to some other value using --input_delim=.
Available options are:
--infile= - a file relative or absolute
--outfile= - a file realtive or absolute
--output_type= - csv or json
--input_delim= - , | ^ etc.
--output_delim= - , | ^ etc.
--start_colno= - the column of the start time of the event, indexed from 1 on
--end_colno= - the columm of the end time of the event, indexed from 1 on
The output step is one second. You will always get the concurrent count of records for each second from the record minimum start time to the record maximum end time. It's up to you, the developer to get less granular maxes from that.
How to use this module in python
===============================
1) import it: ``import oPiUS``
2) create an object:
opius = oPiUS(in_h, ide, ode, sc, ec, ot, o_h)
3) call load, find_peaks, and close
opius.load() - loads the sqlite memory structure
opius.find_peaks() - get each peak into opius.peaks, a dict
opius.close() - closes the sqlite object, free the memory
"""
__docformat__ = 'restructuredtext'
import getopt
import sqlite3
import sys
import csv
from docutils import utils
from docutils.utils.error_reporting import ErrorOutput
class oPiUS:
"""
opius module for getting peaks given an input
"""
conn = None
curs = None
input_h = None
in_delim = None
out_delim = None
out_type = None
output_h = None
minimum = -1
maximum = -1
peaks = {}
def __init__(self, input_h = sys.stdin, in_delim=',', out_delim=',', start_colno=1, end_colno=2, out_type='csv', output_h=sys.stdout):
"""
base opius constructor
Parameters:
- `input_h`: input file, defaults to stdin
- `in_delim`: the delimiter for the input stream
- `out_delim`: the delimiter for the output stream, ignored if the output is not delimited
- `start_colno`: the column number (1 based), of the event start time
- `end_colno`: the column number (1 based), of the event emd time
- `out_type`: the output format, csv or json
- `output_h`: output file, defaults to stdout
"""
# init the in-memory data store
self.conn = sqlite3.connect(':memory:')
self.curs = self.conn.cursor()
# single threaded no file sync
self.curs.execute('''PRAGMA synchronous = off''')
# set all the pragmas for extra juice
self.curs.execute('''PRAGMA locking_mode = EXCLUSIVE''')
self.curs.execute('''PRAGMA journal_mode = OFF''')
self.curs.execute('''PRAGMA read_uncommitted = 1''')
self.curs.execute('''PRAGMA temp_store = 2''')
self.curs.execute('''PRAGMA threads = 8''')
# just need two columns for this
self.curs.execute('''CREATE TABLE store(start_time int, end_time int)''')
self.input_h = input_h
self.in_delim = in_delim
self.out_delim = out_delim
self.start_colno = start_colno - 1
self.end_colno = end_colno - 1
self.out_type = out_type
self.output_h = output_h
def idx_tbl(self):
self.curs.execute('''CREATE index idx01 ON store(start_time, end_time)''')
self.curs.execute('''ANALYZE''')
#load the database
def load(self):
"""
Loads the in memory store, indexes it, analyzes the index for query planning, and sets the max and min for looping in opius.maximum and opius.minimum
Parameters:
None
"""
data = []
if self.input_h == sys.stdin:
rd = csv.reader(sys.stdin, delimiter=self.in_delim)
for r in rd:
if len(r) < self.end_colno + 1 or len(r) < self.start_colno + 1:
continue
cl = self.get_column_list(r)
self.set_min_max(cl)
data.append(cl)
else:
with open(self.input_h, 'rb') as f:
rd = csv.reader(f, delimiter=self.in_delim)
for r in rd:
if len(r) < self.end_colno + 1 or len(r) < self.start_colno + 1:
continue
cl = self.get_column_list(r)
self.set_min_max(cl)
data.append(cl)
self.curs.executemany('INSERT INTO store VALUES(?,?)', data)
self.idx_tbl()
def set_min_max(self, cl):
if cl[0] < self.minimum or self.minimum == -1:
self.minimum = int(cl[0])
if cl[1] > self.maximum or self.maximum == -1:
self.maximum = int(cl[1])
def get_column_list(self, r):
e1 = r[self.start_colno].strip()
e2 = r[self.end_colno].strip()
try:
ie1 = int(e1)
ie2 = int(e2)
return [ ie1, ie2 ]
except Exception:
self.curs.execute('''select strftime('%s', ?, 'localtime')''', (e1,))
e1 = self.curs.fetchone()[0]
self.curs.execute('''select strftime('%s', ?, 'localtime')''', (e2,))
e2 = self.curs.fetchone()[0]
return [ e1, e2 ]
def find_peaks(self):
"""
Loops from opius.minimum - ipius.maximum, getting the concurrent event count at that instant in time, and storing this count im the value to the key of the times epoch in a dictionary. The times are represented as epochs on output because that is an easy format to create other formats from.
Parameters:
None
"""
for i in range(self.minimum, self.maximum+1):
self.curs.execute('''select count(*) from store where start_time <= ? and end_time >= ?''', (i, i))
cnt = self.curs.fetchone()[0]
self.peaks[i] = cnt
def output(self):
"""
Outputs opius.peaks as either json or csv.
Parameters:
None
"""
if self.output_h == sys.stdout:
if self.out_type == 'json':
print self.peaks
elif self.out_type == 'csv':
wt = csv.writer(sys.stdout,delimiter=self.out_delim)
for k in list(self.peaks.keys()):
wt.writerow([k,self.peaks[k]])
else:
with open(self.output_h, 'wb') as f:
if self.out_type == 'json':
f.write(self.peaks)
elif self.out_type == 'csv':
wt = csv.writer(f,delimiter=self.out_delim)
for k in list(self.peaks.keys()):
wt.writerow([k,self.peaks[k]])
# close out of db
def close(self):
"""
Closes the sqlite db connection
Parameters:
None
"""
self.conn.close()
def main():
"""
Reads the command line arguments, creates an opius objects, loads the input data, finds the peaks, and outputs
Parameters:
None
"""
try:
opts, args = getopt.getopt(sys.argv[1:], "", ["infile=", "outfile=", "output_type=", "input_delim=", "output_delim=", "start_colno=", "end_colno="])
except getopt.GetoptError as err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
sys.exit(2)
in_h = sys.stdin
o_h = sys.stdout
ot = 'csv'
ide = ','
ode = ','
sc = 1
ec = 2
# print opts
for o, a in opts:
if o == "--infile":
in_h = a
if o == "--outfile":
o_h = a
if o == "--output_type":
ot = a
if o == "--input_delim":
ide = a
if o == "--output_delim":
ode = a
if o == "--start_colno":
sc = int(a)
if o == "--end_colno":
ec = int(a)
opius = oPiUS(in_h, ide, ode, sc, ec, ot, o_h)
opius.load()
opius.find_peaks()
opius.output()
opius.close()
if __name__ == "__main__":
main()