-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
339 lines (260 loc) · 9.7 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# -*- coding: utf-8 -*-
"""
This is an example of query-based model construction approach for PostgreSQL database.
Attributes of models here do not depend on tables columns or even the existence of tables. As shown below,
models can be constructed from any query. This gives developer the full power of database, not limited to
subset of ORM features. Just remember that with great power comes great responsibility. )
This script requires Python 2/3 with installed psycopg2 package to run.
"""
import sys
from collections import namedtuple
from pprint import pprint
from psycopg2 import connect
DB_LOGIN = 'user'
DB_PASSWORD = 'password'
DB_NAME = 'database'
DB_HOST = 'localhost'
DB_PORT = 5432
class Object(object):
""" Common base class for all database models.
If primary_key attribute is set, then constructed instances are the same for identical primary keys. """
_instance_map = None
primary_key = None
is_admin = False
def __new__(cls, **kwargs):
if cls.primary_key is None:
return super(Object, cls).__new__(cls)
if cls._instance_map is None:
cls._instance_map = {}
cache_key = tuple(
kwargs[column]
for column in cls.primary_key
)
if cache_key in cls._instance_map:
obj = cls._instance_map[cache_key]
else:
obj = super(Object, cls).__new__(cls)
cls._instance_map[cache_key] = obj
return obj
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def update(self, **kwargs):
self.__dict__.update(kwargs)
class Query(object):
""" Query container. """
def __init__(self, query_string, *args, **kwargs):
if args and kwargs:
raise ValueError('Both named and unnamed arguments passed to query: {}'.format(query_string))
self.query_string = query_string
if kwargs:
self.args = [kwargs]
else:
self.args = args
def __repr__(self):
if len(self.args) == 1 and isinstance(self.args[0], dict):
query_args = {
name: repr(value)
for name, value in self.args[0].items()
}
else:
query_args = [
repr(value)
for value in self.args
]
return self.query_string % query_args
class Singleton(object):
""" Singleton pattern class """
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Singleton, cls).__new__(cls)
return cls._instance
class Database(Singleton):
""" This class is responsible for interactions with database.
`initialize` method should be called before executing queries. """
database = None
user = None
_connection = None
def initialize(self, database, user, password, host=None, port=None):
if self._connection is not None:
raise RuntimeError('Database connection already exists')
self._connection = connect(
database=database,
user=user,
password=password,
host=host,
port=port,
)
self.database = database
self.user = user
def commit(self):
self._get_connection().commit()
def rollback(self):
if self._connection is not None:
self._connection.rollback()
def execute(self, query):
return self._get_cursor_for_query(query)
def get_one(self, query):
cursor = self._get_cursor_for_query(query)
row = cursor.fetchone()
if row is None:
return row
return self._populate_rows_with_names(
rows=[row],
names=self._get_column_names_from_cursor(cursor)
)[0]
def get_all(self, query):
cursor = self._get_cursor_for_query(query)
return self._populate_rows_with_names(
rows=cursor.fetchall(),
names=self._get_column_names_from_cursor(cursor)
)
def _get_connection(self):
if self._connection is None:
raise RuntimeError('No database connection')
return self._connection
def _get_cursor_for_query(self, query):
cursor = self._get_connection().cursor()
cursor.execute(query.query_string, *query.args)
return cursor
def _get_column_names_from_cursor(self, cursor):
return [column.name for column in cursor.description]
def _populate_rows_with_names(self, rows, names):
named_rows = []
for row in rows:
named_rows.append({name: value
for name, value in zip(names, row)})
return named_rows
class Options(Object):
""" This is an example model of key-value storage. """
primary_key = ('name',)
@classmethod
def get_all_options(cls):
options_info = Database().get_all(
Query('SELECT * FROM options ORDER BY LOWER(name)')
)
return [cls(**option_info) for option_info in options_info]
@classmethod
def get_option(cls, name):
option_info = Database().get_one(
Query('SELECT * FROM options WHERE name = %(name)s', name=name)
)
if option_info:
return cls(**option_info)
@classmethod
def add_option(cls, name, value):
option_info = Database().get_one(
Query(
'INSERT INTO options (name, value) VALUES (%(name)s, %(value)s) RETURNING *',
name=name,
value=value,
)
)
return cls(**option_info)
def update(self, value):
option_info = Database().get_one(
Query(
'UPDATE options SET value = %(value)s WHERE name = %(name)s RETURNING *',
name=self.name,
value=value,
)
)
super(Options, self).update(**option_info)
@classmethod
def create_demo_table(cls):
queries = [
Query('CREATE TABLE options (name TEXT NOT NULL PRIMARY KEY, value TEXT NOT NULL)'),
Query('INSERT INTO options (name, value) VALUES (%(name)s, %(value)s)', name='first', value='one'),
Query('INSERT INTO options (name, value) VALUES (%(name)s, %(value)s)', name='second', value='two'),
Query('INSERT INTO options (name, value) VALUES (%(name)s, %(value)s)', name='third', value='three'),
Query('INSERT INTO options (name, value) VALUES (%(name)s, %(value)s)', name='forth', value='four'),
]
for query in queries:
Database().execute(query)
@classmethod
def destroy_demo_table(cls):
Database().execute(Query('DROP TABLE options'))
def __repr__(self):
return '<Option: name={name!r}, value={value!r}>'.format(
name=self.name,
value=self.value,
)
class UserTablesStats(Object):
""" Example model of user table usage statistics. """
@classmethod
def get_stats(cls):
stats_info = Database().get_all(
Query('SELECT schemaname AS schema, relname AS table,'
' seq_scan, idx_scan, now() as timestamp FROM pg_stat_user_tables')
)
return [cls(**table_stats) for table_stats in stats_info]
def __repr__(self):
return (
'<UserTablesStats: schema={schema!r}, table={table!r},'
' sequential_scans={seq_scan!r}, index_scans={idx_scan!r}, timestamp={timestamp!r}>'.format(
schema=self.schema,
table=self.table,
seq_scan=self.seq_scan,
idx_scan=self.idx_scan,
timestamp=self.timestamp.strftime('%Y/%m/%d %H:%M:%S'),
))
Point = namedtuple('Point', ['x', 'y'])
class RandomCoordinates(Object):
""" Example model which does not require table. """
primary_key = ('x', 'y')
@classmethod
def pick(cls, point1, point2):
min_x = min(point1.x, point2.x)
min_y = min(point1.y, point2.y)
max_x = max(point1.x, point2.x)
max_y = max(point1.y, point2.y)
coordinates_info = Database().get_one(
Query(
'SELECT'
' (RANDOM() * %(width)s + %(min_x)s)::int AS x,'
' (RANDOM() * %(height)s + %(min_y)s)::int AS y;',
width=max_x - min_x,
height=max_y - min_y,
min_x=min_x,
min_y=min_y
)
)
return cls(**coordinates_info)
def __repr__(self):
return '<RandomCoordinates: x={x!r}, y={y!r}>'.format(x=self.x, y=self.y)
def main(argv=None):
if argv is None:
argv = sys.argv
# Connect to a database
Database().initialize(user=DB_LOGIN, password=DB_PASSWORD, database=DB_NAME, host=DB_HOST, port=DB_PORT)
# Create and populate key-value storage
Options.create_demo_table()
# Get an existing option
first_option = Options.get_option('first')
print(first_option)
# Create new option
fifth_option = Options.add_option(name='fifth', value='five')
print(fifth_option)
# Update existing option
first_option.update(value='The One')
print(first_option)
# List all available options
option = Options.get_all_options()
pprint(option)
# Get user table usage statistics from PostgreSQL predefined view
stats = UserTablesStats.get_stats()
pprint(stats)
# Generate 5 random coordinates
coordinates = [
RandomCoordinates.pick(Point(x=-500, y=-500),
Point(x=500, y=500))
for _ in range(5)
]
pprint(coordinates)
# Drop previously created table
Options.destroy_demo_table()
if __name__ == '__main__':
try:
sys.exit(main())
finally:
Database().rollback()