/
storage.py
174 lines (138 loc) · 6.01 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
This module allows to load and store values using a simple sqlite database
in a distributed fashion. This is a simple `key-value NoSQL
<http://en.wikipedia.org/wiki/NoSQL>`_ database using only the Python standard
library and `sqlite3 <http://www.sqlite.org/>`_.
This can be used to cache results of functions or scripts in distributed
environments.
"""
# Authors: Arnaud Joly
#
# License: BSD 3 clause
from __future__ import unicode_literals
import os
import sqlite3
import pickle
__all__ = [
"sqlite3_loads",
"sqlite3_dumps",
]
def _decompressed(value):
"""Decompressed a binary object compressed with pickle from sqlite3."""
return pickle.loads(bytes(value))
def _compressed(value):
"""Compressed binary object with highest pickle protocol for sqlite3."""
return sqlite3.Binary(pickle.dumps(value,
protocol=pickle.HIGHEST_PROTOCOL))
def sqlite3_loads(file_name, key=None, timeout=7200.0):
"""Load value with key from sqlite3 stored at fname.
In order to improve performance, it's advised to query the database using as
many keys as possible at once. Otherwise by calling this function
repeatedly, you might run into the `SQlite lock timeout
<http://beets.radbox.org/blog/sqlite-nightmare.html>`_.
Parameters
----------
file_name : str
Path to the sqlite database.
key : str or list of str or None, optional (default=None)
Key or list of keys used when the value was stored. If ``key`` is None,
all key value pairs are returned from the database.
timeout : float, optional (default=7200.0)
The timeout parameter specifies how long the connection should wait
for the lock to go away until raising an exception.
Returns
-------
out : dict
Return a dict where each key point is associated to the stored object.
If a key from key is missing in the sqlite3, then there is no
entry in out for this key. If there is no sqlite3 database at
``file_name``, then an empty dictionary is returned.
Examples
--------
Here, we generate a temporary sqlite3 database, dump then load some
data from it.
>>> from tempfile import NamedTemporaryFile
>>> from clusterlib.storage import sqlite3_dumps
>>> from clusterlib.storage import sqlite3_loads
>>> with NamedTemporaryFile() as fhandle:
... sqlite3_dumps({"3": 3, "2": 5}, fhandle.name)
... out = sqlite3_loads(fhandle.name, key=["7", "3"])
... print(out['3'])
... print("7" in out) # "7" is not in the database
3
False
It's also possible to get all key-value pairs from the database without
specifying the keys.
>>> with NamedTemporaryFile() as fhandle:
... sqlite3_dumps({'first': 1}, fhandle.name)
... out = sqlite3_loads(fhandle.name)
... print(out['first'])
1
"""
if isinstance(key, str):
key = [key]
out = dict()
if os.path.exists(file_name):
if key is None:
with sqlite3.connect(file_name, timeout=timeout) as connection:
cursor = connection.cursor()
cursor.execute("SELECT key, value FROM dict")
out = cursor.fetchall()
cursor.close()
out = dict((key, _decompressed(value)) for key, value in out)
else:
with sqlite3.connect(file_name, timeout=timeout) as connection:
cursor = connection.cursor()
for k in key:
cursor.execute("SELECT value FROM dict where key = ?",
(k,))
value = cursor.fetchone() # key is the primary key
if value is not None:
out[k] = _decompressed(bytes(value[0]))
cursor.close()
return out
def sqlite3_dumps(dictionnary, file_name, timeout=7200.0, overwrite=False):
"""Dump value with key in the sqlite3 database.
In order to improve performance, it's advised to dump into the database as
many entry as possible at once. Otherwise by calling this function
repeatedly, you might run into the `SQlite lock timeout
<http://beets.radbox.org/blog/sqlite-nightmare.html>`_.
Parameters
----------
dictionnary: dict of (str, object)
Each key is a string associated to an object to store in the database,
it will raise an exception if the key is already present in the
database.
fname : str
Path to the sqlite database.
timeout : float, optional (default=7200.0)
The timeout parameter specifies how long the connection should wait
for the lock to go away until raising an exception.
overwrite : bool, optional (default=False)
Whether to overwrite the value associated to a key already present
in the database. If True, the value is replaced in case of conflict.
If False, an IntegrityError is raised in case of conflict.
Examples
--------
Here, we generate a temporary sqlite3 database, then dump some data in it.
>>> from tempfile import NamedTemporaryFile
>>> from clusterlib.storage import sqlite3_dumps
>>> from clusterlib.storage import sqlite3_loads
>>> with NamedTemporaryFile() as fhandle:
... sqlite3_dumps({"list": [3, 2], "number": 5}, fhandle.name)
...
"""
# compressed value first
compressed_dict = {k: _compressed(v) for k, v in dictionnary.items()}
with sqlite3.connect(file_name, timeout=timeout) as connection:
# Create table if needed
connection.execute("""CREATE TABLE IF NOT EXISTS dict
(key TEXT PRIMARY KEY, value BLOB)""")
# Add a new key
if overwrite:
connection.executemany("INSERT OR REPLACE INTO dict(key, value)"
"VALUES (?, ?)",
compressed_dict.items())
else:
connection.executemany("INSERT INTO dict(key, value) VALUES (?, ?)",
compressed_dict.items())