Skip to content

Commit

Permalink
Inventory storage abstraction
Browse files Browse the repository at this point in the history
- can have multiple storage types for inventory
- sqlite is the old one, filesystem is a new available
  • Loading branch information
PeterSurda committed May 27, 2017
1 parent 1d87c63 commit 36b5e2c
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 73 deletions.
96 changes: 23 additions & 73 deletions src/inventory.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,37 @@
import collections
from importlib import import_module
from threading import current_thread, enumerate as threadingEnumerate, RLock
import Queue
import time
import sys

from bmconfigparser import BMConfigParser
from helper_sql import *
from singleton import Singleton

# TODO make this dynamic, and watch out for frozen, like with messagetypes
import storage.sqlite
import storage.filesystem

@Singleton
class Inventory(collections.MutableMapping):
class Inventory():
def __init__(self):
super(self.__class__, self).__init__()
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self.numberOfInventoryLookupsPerformed = 0
self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
self.InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag')

def __contains__(self, hash):
with self.lock:
self.numberOfInventoryLookupsPerformed += 1
if hash in self._inventory:
return True
return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash))

def __getitem__(self, hash):
with self.lock:
if hash in self._inventory:
return self._inventory[hash]
rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash)
if not rows:
raise KeyError(hash)
return self.InventoryItem(*rows[0])

def __setitem__(self, hash, value):
with self.lock:
value = self.InventoryItem(*value)
self._inventory[hash] = value
self._streams[value.stream].add(hash)

def __delitem__(self, hash):
raise NotImplementedError

def __iter__(self):
with self.lock:
hashes = self._inventory.keys()[:]
hashes += (x for x, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()

def __len__(self):
with self.lock:
return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0]

def by_type_and_tag(self, type, tag):
with self.lock:
values = [value for value in self._inventory.values() if value.type == type and value.tag == tag]
values += (self.InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag))
return values

def hashes_by_stream(self, stream):
with self.lock:
return self._streams[stream]

def unexpired_hashes_by_stream(self, stream):
with self.lock:
t = int(time.time())
hashes = [x for x, value in self._inventory.items() if value.stream == stream and value.expires > t]
hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t))
return hashes

def flush(self):
with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql:
for objectHash, value in self._inventory.items():
sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', objectHash, *value)
self._inventory.clear()

def clean(self):
with self.lock:
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
self._streams.clear()
for objectHash, value in self.items():
self._streams[value.stream].add(objectHash)
#super(self.__class__, self).__init__()
self._moduleName = BMConfigParser().safeGet("inventory", "storage")
#import_module("." + self._moduleName, "storage")
#import_module("storage." + self._moduleName)
self._className = "storage." + self._moduleName + "." + self._moduleName.title() + "Inventory"
self._inventoryClass = eval(self._className)
self._realInventory = self._inventoryClass()

# cheap inheritance copied from asyncore
def __getattr__(self, attr):
try:
realRet = getattr(self._realInventory, attr)
except AttributeError:
raise AttributeError("%s instance has no attribute '%s'" %(self.__class__.__name__, attr))
else:
return realRet


class PendingDownloadQueue(Queue.Queue):
Expand Down
Empty file added src/storage/__init__.py
Empty file.
175 changes: 175 additions & 0 deletions src/storage/filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from binascii import hexlify, unhexlify
from os import listdir, makedirs, path, remove, rmdir
import string
from threading import RLock
import time
import traceback

from paths import lookupAppdataFolder
from storage import InventoryStorage, InventoryItem

class FilesystemInventory(InventoryStorage):
topDir = "inventory"
objectDir = "objects"
metadataFilename = "metadata"
dataFilename = "data"

def __init__(self):
super(self.__class__, self).__init__()
self.baseDir = path.join(lookupAppdataFolder(), FilesystemInventory.topDir)
for createDir in [self.baseDir, path.join(self.baseDir, "objects")]:
if path.exists(createDir):
if not path.isdir(createDir):
raise IOError("%s exists but it's not a directory" % (createDir))
else:
makedirs(createDir)
self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
self._inventory = {}
self._load()

def __contains__(self, hash):
retval = False
for streamDict in self._inventory.values():
if hash in streamDict:
return True
return False

def __getitem__(self, hash):
for streamDict in self._inventory.values():
try:
retval = streamDict[hash]
except KeyError:
continue
if retval.payload is None:
retval = InventoryItem(retval.type, retval.stream, self.getData(hash), retval.expires, retval.tag)
return retval
raise KeyError(hash)

def __setitem__(self, hash, value):
with self.lock:
value = InventoryItem(*value)
try:
makedirs(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash)))
except OSError:
pass
try:
with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.metadataFilename), 'w') as f:
f.write("%s,%s,%s,%s," % (value.type, value.stream, value.expires, hexlify(value.tag)))
with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.dataFilename), 'w') as f:
f.write(value.payload)
except IOError:
raise KeyError
try:
self._inventory[value.stream][hash] = value
except KeyError:
self._inventory[value.stream] = {}
self._inventory[value.stream][hash] = value

def __delitem__(self, hash):
for stream in self._inventory.keys():
try:
del self._inventory[stream][hash]
except KeyError:
pass
with self.lock:
try:
remove(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.metadataFilename))
except IOError:
pass
try:
remove(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.dataFilename))
except IOError:
pass
try:
rmdir(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash)))
except IOError:
pass

def __iter__(self):
elems = []
for streamDict in self._inventory.values():
elems.extend (streamDict.keys())
return elems.__iter__()

def __len__(self):
retval = 0
for streamDict in self._inventory.values():
retval += len(streamDict)
return retval

def _load(self):
newInventory = {}
for hashId in self.object_list():
try:
objectType, streamNumber, expiresTime, tag = self.getMetadata(hashId)
try:
newInventory[streamNumber][hashId] = InventoryItem(objectType, streamNumber, None, expiresTime, tag)
except KeyError:
newInventory[streamNumber] = {}
newInventory[streamNumber][hashId] = InventoryItem(objectType, streamNumber, None, expiresTime, tag)
except KeyError:
print "error loading %s" % (hexlify(hashId))
pass
self._inventory = newInventory
for i, v in self._inventory.items():
print "loaded stream: %s, %i items" % (i, len(v))

def stream_list(self):
return self._inventory.keys()

def object_list(self):
return [unhexlify(x) for x in listdir(path.join(self.baseDir, FilesystemInventory.objectDir))]

def getData(self, hashId):
try:
with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hashId), FilesystemInventory.dataFilename), 'r') as f:
return f.read()
except IOError:
raise AttributeError

def getMetadata(self, hashId):
try:
with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hashId), FilesystemInventory.metadataFilename), 'r') as f:
objectType, streamNumber, expiresTime, tag, undef = string.split(f.read(), ",", 4)
return [int(objectType), int(streamNumber), int(expiresTime), unhexlify(tag)]
except IOError:
raise KeyError

def by_type_and_tag(self, objectType, tag):
retval = []
for stream, streamDict in self._inventory:
for hashId, item in streamDict:
if item.type == objectType and item.tag == tag:
try:
if item.payload is None:
item.payload = self.getData(hashId)
except IOError:
continue
retval.append(InventoryItem(item.type, item.stream, item.payload, item.expires, item.tag))
return retval

def hashes_by_stream(self, stream):
try:
return self._inventory[stream].keys()
except KeyError:
return []

def unexpired_hashes_by_stream(self, stream):
t = int(time.time())
try:
return [x for x, value in self._inventory[stream].items() if value.expires > t]
except KeyError:
return []

def flush(self):
self._load()

def clean(self):
minTime = int(time.time()) - (60 * 60 * 30)
deletes = []
for stream, streamDict in self._inventory.items():
for hashId, item in streamDict.items():
if item.expires < minTime:
deletes.append(hashId)
for hashId in deletes:
del self[hashId]
81 changes: 81 additions & 0 deletions src/storage/sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import collections
from threading import current_thread, enumerate as threadingEnumerate, RLock
import Queue
import time

from helper_sql import *
from storage import InventoryStorage, InventoryItem

class SqliteInventory(InventoryStorage):
def __init__(self):
super(self.__class__, self).__init__()
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)

def __contains__(self, hash):
with self.lock:
self.numberOfInventoryLookupsPerformed += 1
if hash in self._inventory:
return True
return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash))

def __getitem__(self, hash):
with self.lock:
if hash in self._inventory:
return self._inventory[hash]
rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash)
if not rows:
raise KeyError(hash)
return InventoryItem(*rows[0])

def __setitem__(self, hash, value):
with self.lock:
value = InventoryItem(*value)
self._inventory[hash] = value
self._streams[value.stream].add(hash)

def __delitem__(self, hash):
raise NotImplementedError

def __iter__(self):
with self.lock:
hashes = self._inventory.keys()[:]
hashes += (x for x, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()

def __len__(self):
with self.lock:
return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0]

def by_type_and_tag(self, type, tag):
with self.lock:
values = [value for value in self._inventory.values() if value.type == type and value.tag == tag]
values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag))
return values

def hashes_by_stream(self, stream):
with self.lock:
return self._streams[stream]

def unexpired_hashes_by_stream(self, stream):
with self.lock:
t = int(time.time())
hashes = [x for x, value in self._inventory.items() if value.stream == stream and value.expires > t]
hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t))
return hashes

def flush(self):
with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql:
for objectHash, value in self._inventory.items():
sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', objectHash, *value)
self._inventory.clear()

def clean(self):
with self.lock:
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
self._streams.clear()
for objectHash, value in self.items():
self._streams[value.stream].add(objectHash)

0 comments on commit 36b5e2c

Please sign in to comment.