-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathdb.py
187 lines (144 loc) · 5.2 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# -*- coding: utf-8 -*-
from .file_writer import is_valid
from .compat import iteritems
class Database(object):
"""
A class for a dictionary of data stored in a JSON file.
"""
def __init__(self, file_path):
"""
This class manages a json-formatted file database.
Constructor takes the file path of the database as a parameter.
"""
from .file_writer import read_data, write_data
self.read_data = read_data
self.write_data = write_data
self.path = None
self.set_path(file_path)
self._data = {}
def memory_read(self, _):
"""
Return the in-memory data.
"""
return self._data
def memory_write(self, _, data):
"""
Write to the in-memory data.
"""
self._data = data
def set_path(self, file_path):
"""
Set the path of the database.
Create the file if it does not exist.
"""
if not file_path:
self.read_data = self.memory_read
self.write_data = self.memory_write
elif not is_valid(file_path):
self.write_data(file_path, {})
self.path = file_path
def _get_content(self, key=None):
obj = self.read_data(self.path)
if key or key == "":
if key in obj.keys():
return obj[key]
else:
return None
return obj
def _set_content(self, key, value):
obj = self._get_content()
obj[key] = value
self.write_data(self.path, obj)
def delete(self, key):
"""
Removes the specified key from the database.
"""
obj = self._get_content()
obj.pop(key, None)
self.write_data(self.path, obj)
def data(self, **kwargs):
"""
If a key is passed in, a corresponding value will be returned.
If a key-value pair is passed in then the corresponding key in
the database will be set to the specified value.
A dictionary can be passed in as well.
If a key does not exist and a value is provided then an entry
will be created in the database.
"""
key = kwargs.pop('key', None)
value = kwargs.pop('value', None)
dictionary = kwargs.pop('dictionary', None)
# Fail if a key and a dictionary or a value and a dictionary are given
if (key is not None and dictionary is not None) or \
(value is not None and dictionary is not None):
raise ValueError
# If only a key was provided return the corresponding value
if key is not None and value is None:
return self._get_content(key)
# if a key and a value are passed in
if key is not None and value is not None:
self._set_content(key, value)
if dictionary is not None:
for key in dictionary.keys():
value = dictionary[key]
self._set_content(key, value)
return self._get_content()
def _contains_value(self, obj, keys, find_value):
key = keys.pop(0)
# If there are no keys left
if not len(keys):
if obj[key] == find_value:
return True
else:
return False
if isinstance(obj, dict):
if key in obj:
return self._contains_value(obj[key], keys, find_value)
def filter(self, filter_arguments):
"""
Takes a dictionary of filter parameters.
Return a list of objects based on a list of parameters.
"""
results = self._get_content()
# Filter based on a dictionary of search parameters
if isinstance(filter_arguments, dict):
for item, content in iteritems(self._get_content()):
for key, value in iteritems(filter_arguments):
keys = key.split('.')
value = filter_arguments[key]
if not self._contains_value({item: content}, keys, value):
del results[item]
# Filter based on an input string that should match database key
if isinstance(filter_arguments, str):
if filter_arguments in results:
return [{filter_arguments: results[filter_arguments]}]
else:
return []
return results
def drop(self):
"""
Remove the database by deleting the JSON file.
"""
import os
if self.path:
if os.path.exists(self.path):
os.remove(self.path)
else:
# Clear the in-memory data if there is no file path
self._data = {}
# Iterator methods
def __len__(self):
return len(self._get_content())
def __iter__(self):
return iter(self._get_content().keys())
def __list__(self):
return list(self._get_content().keys())
# Dictionary compatibility methods
def __getitem__(self, key):
return self.data(key=key)
def __setitem__(self, key, value):
return self.data(key=key, value=value)
def __delitem__(self, key):
return self.delete(key=key)
def __contains__(self, key):
return key in self._get_content()