This repository has been archived by the owner on Sep 24, 2022. It is now read-only.
/
utils.py
162 lines (130 loc) · 5.45 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# -*- coding: utf-8 -*-
"""
momoko.utils
~~~~~~~~~~~~
Utilities for Momoko.
:copyright: (c) 2011 by Frank Smit.
:license: MIT, see LICENSE for more details.
"""
import functools
import psycopg2
import psycopg2.extensions
from tornado.ioloop import IOLoop
class TransactionChain(object):
"""Run queries as a transaction
A list/tuple with queries looks like this::
(
['SELECT 42, 12, %s, 11;', (23,)],
'SELECT 1, 2, 3, 4, 5;'
)
A query with parameters is contained in a list: ``['some sql
here %s, %s', ('and some', 'parameters here')]``. A query
without parameters doesn't need to be in a list.
:param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
:param queries: A tuple or with all the queries.
:param callback: The function that needs to be executed once all the
queries are finished.
:param cursor_kwargs: A dictionary with Psycopg's `connection.cursor`_ arguments.
:return: A list with the resulting cursors is passed on to the callback.
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
def __init__(self, db, queries, callback, cursor_kwargs={}):
self._db = db
self._cursors = []
self._queries = list(queries)
self._queries.reverse()
self._callback = callback
self._cursor_kwargs = cursor_kwargs
self._connection = self._db._get_free_conn
self._collect(None)
def _collect(self, cursor):
if cursor is not None:
self._cursors.append(cursor)
if not self._queries:
if self._callback:
self._callback(self._cursors)
return
query = self._queries.pop()
if isinstance(query, str):
query = [query]
self._db.execute(*query, callback=self._collect,
cursor_kwargs=self._cursor_kwargs)
class QueryChain(object):
"""Run a chain of queries in the given order.
A list/tuple with queries looks like this::
(
['SELECT 42, 12, %s, 11;', (23,)],
'SELECT 1, 2, 3, 4, 5;'
)
A query with parameters is contained in a list: ``['some sql
here %s, %s', ('and some', 'parameters here')]``. A query
without parameters doesn't need to be in a list.
:param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
:param queries: A tuple or with all the queries.
:param callback: The function that needs to be executed once all the
queries are finished.
:param cursor_kwargs: A dictionary with Psycopg's `connection.cursor`_ arguments.
:return: A list with the resulting cursors is passed on to the callback.
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
def __init__(self, db, queries, callback, cursor_kwargs={}):
self._db = db
self._cursors = []
self._queries = list(queries)
self._queries.reverse()
self._callback = callback
self._cursor_kwargs = cursor_kwargs
self._collect(None)
def _collect(self, cursor):
if cursor is not None:
self._cursors.append(cursor)
if not self._queries:
if self._callback:
self._callback(self._cursors)
return
query = self._queries.pop()
if isinstance(query, str):
query = [query]
self._db.execute(*query, callback=self._collect,
cursor_kwargs=self._cursor_kwargs)
class BatchQuery(object):
"""Run a batch of queries all at once.
**Note:** Every query needs a free connection. So if three queries are
are executed, three free connections are used.
A dictionary with queries looks like this::
{
'query1': ['SELECT 42, 12, %s, %s;', (23, 56)],
'query2': 'SELECT 1, 2, 3, 4, 5;',
'query3': 'SELECT 465767, 4567, 3454;'
}
A query with parameters is contained in a list: ``['some sql
here %s, %s', ('and some', 'parameters here')]``. A query
without parameters doesn't need to be in a list.
:param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
:param queries: A dictionary with all the queries.
:param callback: The function that needs to be executed once all the
queries are finished.
:param cursor_kwargs: A dictionary with Psycopg's `connection.cursor`_ arguments.
:return: A dictionary with the same keys as the given queries with the
resulting cursors as values is passed on to the callback.
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
def __init__(self, db, queries, callback, cursor_kwargs={}):
self._db = db
self._callback = callback
self._cursor_kwargs = cursor_kwargs
self._queries = {}
self._args = {}
self._size = len(queries)
for key, query in list(queries.items()):
if isinstance(query, str):
query = [query, ()]
query.append(functools.partial(self._collect, key))
self._queries[key] = query
for query in list(self._queries.values()):
self._db.execute(*query, cursor_kwargs=self._cursor_kwargs)
def _collect(self, key, cursor):
self._size = self._size - 1
self._args[key] = cursor
if not self._size and self._callback:
self._callback(self._args)