/
filesystem.py
268 lines (246 loc) · 8.99 KB
/
filesystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""
Module for using filesystem (directory with files) for inventory storage
"""
import logging
import os
import time
from binascii import hexlify, unhexlify
from threading import RLock
from paths import lookupAppdataFolder
from .storage import InventoryItem, InventoryStorage
logger = logging.getLogger('default')
class FilesystemInventory(InventoryStorage):
"""Filesystem for inventory storage"""
topDir = "inventory"
objectDir = "objects"
metadataFilename = "metadata"
dataFilename = "data"
def __init__(self):
super(FilesystemInventory, self).__init__()
self.baseDir = os.path.join(
lookupAppdataFolder(), FilesystemInventory.topDir)
for createDir in [self.baseDir, os.path.join(self.baseDir, "objects")]:
if os.path.exists(createDir):
if not os.path.isdir(createDir):
raise IOError(
"%s exists but it's not a directory" % createDir)
else:
os.makedirs(createDir)
# Guarantees that two receiveDataThreads
# don't receive and process the same message
# concurrently (probably sent by a malicious individual)
self.lock = RLock()
self._inventory = {}
self._load()
def __contains__(self, hashval):
for streamDict in self._inventory.values():
if hashval in streamDict:
return True
return False
def __delitem__(self, hash_):
raise NotImplementedError
def __getitem__(self, hashval):
for streamDict in self._inventory.values():
try:
retval = streamDict[hashval]
except KeyError:
continue
if retval.payload is None:
retval = InventoryItem(
retval.type,
retval.stream,
self.getData(hashval),
retval.expires,
retval.tag)
return retval
raise KeyError(hashval)
def __setitem__(self, hashval, value):
with self.lock:
value = InventoryItem(*value)
try:
os.makedirs(os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode()))
except OSError:
pass
try:
with open(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode(),
FilesystemInventory.metadataFilename,
),
"w",
) as f:
f.write("%s,%s,%s,%s," % (
value.type,
value.stream,
value.expires,
hexlify(value.tag).decode()))
with open(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode(),
FilesystemInventory.dataFilename,
),
"wb",
) as f:
f.write(value.payload)
except IOError:
raise KeyError
try:
self._inventory[value.stream][hashval] = value
except KeyError:
self._inventory[value.stream] = {}
self._inventory[value.stream][hashval] = value
def delHashId(self, hashval):
"""Remove object from inventory"""
for stream in self._inventory:
try:
del self._inventory[stream][hashval]
except KeyError:
pass
with self.lock:
try:
os.remove(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode(),
FilesystemInventory.metadataFilename))
except IOError:
pass
try:
os.remove(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode(),
FilesystemInventory.dataFilename))
except IOError:
pass
try:
os.rmdir(os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode()))
except IOError:
pass
def __iter__(self):
elems = []
for streamDict in self._inventory.values():
elems.extend(streamDict.keys())
return elems.__iter__()
def __len__(self):
retval = 0
for streamDict in self._inventory.values():
retval += len(streamDict)
return retval
def _load(self):
newInventory = {}
for hashId in self.object_list():
try:
objectType, streamNumber, expiresTime, tag = self.getMetadata(
hashId)
try:
newInventory[streamNumber][hashId] = InventoryItem(
objectType, streamNumber, None, expiresTime, tag)
except KeyError:
newInventory[streamNumber] = {}
newInventory[streamNumber][hashId] = InventoryItem(
objectType, streamNumber, None, expiresTime, tag)
except KeyError:
logger.debug(
'error loading %s', hexlify(hashId), exc_info=True)
self._inventory = newInventory
def stream_list(self):
"""Return list of streams"""
return self._inventory.keys()
def object_list(self):
"""Return inventory vectors (hashes) from a directory"""
return [unhexlify(x) for x in os.listdir(os.path.join(
self.baseDir, FilesystemInventory.objectDir))]
def getData(self, hashId):
"""Get object data"""
try:
with open(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashId).decode(),
FilesystemInventory.dataFilename,
),
"r",
) as f:
return f.read()
except IOError:
raise AttributeError
def getMetadata(self, hashId):
"""Get object metadata"""
try:
with open(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashId).decode(),
FilesystemInventory.metadataFilename,
),
"r",
) as f:
objectType, streamNumber, expiresTime, tag = f.read().split(
",", 4)[:4]
return [
int(objectType),
int(streamNumber),
int(expiresTime),
unhexlify(tag)]
except IOError:
raise KeyError
def by_type_and_tag(self, objectType, tag):
"""Get a list of objects filtered by object type and tag"""
retval = []
for streamDict in self._inventory.values():
for hashId, item in streamDict:
if item.type == objectType and item.tag == tag:
try:
if item.payload is None:
item.payload = self.getData(hashId)
except IOError:
continue
retval.append(InventoryItem(
item.type,
item.stream,
item.payload,
item.expires,
item.tag))
return retval
def hashes_by_stream(self, stream):
"""Return inventory vectors (hashes) for a stream"""
try:
return self._inventory[stream].keys()
except KeyError:
return []
def unexpired_hashes_by_stream(self, stream):
"""Return unexpired hashes in the inventory for a particular stream"""
try:
return [
x for x, value in self._inventory[stream].items()
if value.expires > int(time.time())]
except KeyError:
return []
def flush(self):
"""Flush the inventory and create a new, empty one"""
self._load()
def clean(self):
"""Clean out old items from the inventory"""
minTime = int(time.time()) - 60 * 60 * 30
deletes = []
for streamDict in self._inventory.values():
for hashId, item in streamDict.items():
if item.expires < minTime:
deletes.append(hashId)
for hashId in deletes:
self.delHashId(hashId)