Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@
from singleton import Singleton


def create_inventory_instance(backend="sqlite"):
"""
Create an instance of the inventory class
defined in `storage.<backend>`.
"""
return getattr(
getattr(storage, backend),
"{}Inventory".format(backend.title()))()


@Singleton
class Inventory():
"""
Expand All @@ -15,11 +25,7 @@ class Inventory():
"""
def __init__(self):
self._moduleName = config.safeGet("inventory", "storage")
self._inventoryClass = getattr(
getattr(storage, self._moduleName),
"{}Inventory".format(self._moduleName.title())
)
self._realInventory = self._inventoryClass()
self._realInventory = create_inventory_instance(self._moduleName)
self.numberOfInventoryLookupsPerformed = 0

# cheap inheritance copied from asyncore
Expand Down
75 changes: 37 additions & 38 deletions src/storage/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,35 @@
Module for using filesystem (directory with files) for inventory storage
"""
import logging
import string
import os
import time
from binascii import hexlify, unhexlify
from os import listdir, makedirs, path, remove, rmdir
from threading import RLock

from paths import lookupAppdataFolder
from storage import InventoryItem, InventoryStorage
from .storage import InventoryItem, InventoryStorage

logger = logging.getLogger('default')


class FilesystemInventory(InventoryStorage):
"""Filesystem for inventory storage"""
# pylint: disable=too-many-ancestors, abstract-method
topDir = "inventory"
objectDir = "objects"
metadataFilename = "metadata"
dataFilename = "data"

def __init__(self):
super(FilesystemInventory, self).__init__()
self.baseDir = path.join(
self.baseDir = os.path.join(
lookupAppdataFolder(), FilesystemInventory.topDir)
for createDir in [self.baseDir, path.join(self.baseDir, "objects")]:
if path.exists(createDir):
if not path.isdir(createDir):
for createDir in [self.baseDir, os.path.join(self.baseDir, "objects")]:
if os.path.exists(createDir):
if not os.path.isdir(createDir):
raise IOError(
"%s exists but it's not a directory" % createDir)
else:
makedirs(createDir)
os.makedirs(createDir)
# Guarantees that two receiveDataThreads
# don't receive and process the same message
# concurrently (probably sent by a malicious individual)
Expand All @@ -46,6 +44,9 @@ def __contains__(self, hashval):
return True
return False

def __delitem__(self, hash_):
raise NotImplementedError

def __getitem__(self, hashval):
for streamDict in self._inventory.values():
try:
Expand All @@ -66,18 +67,18 @@ def __setitem__(self, hashval, value):
with self.lock:
value = InventoryItem(*value)
try:
makedirs(path.join(
os.makedirs(os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval)))
hexlify(hashval).decode()))
except OSError:
pass
try:
with open(
path.join(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval),
hexlify(hashval).decode(),
FilesystemInventory.metadataFilename,
),
"w",
Expand All @@ -86,15 +87,15 @@ def __setitem__(self, hashval, value):
value.type,
value.stream,
value.expires,
hexlify(value.tag)))
hexlify(value.tag).decode()))
with open(
path.join(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval),
hexlify(hashval).decode(),
FilesystemInventory.dataFilename,
),
"w",
"wb",
) as f:
f.write(value.payload)
except IOError:
Expand All @@ -114,28 +115,28 @@ def delHashId(self, hashval):
pass
with self.lock:
try:
remove(
path.join(
os.remove(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval),
hexlify(hashval).decode(),
FilesystemInventory.metadataFilename))
except IOError:
pass
try:
remove(
path.join(
os.remove(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval),
hexlify(hashval).decode(),
FilesystemInventory.dataFilename))
except IOError:
pass
try:
rmdir(path.join(
os.rmdir(os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval)))
hexlify(hashval).decode()))
except IOError:
pass

Expand Down Expand Up @@ -168,26 +169,24 @@ def _load(self):
logger.debug(
'error loading %s', hexlify(hashId), exc_info=True)
self._inventory = newInventory
# for i, v in self._inventory.items():
# print "loaded stream: %s, %i items" % (i, len(v))

def stream_list(self):
"""Return list of streams"""
return self._inventory.keys()

def object_list(self):
"""Return inventory vectors (hashes) from a directory"""
return [unhexlify(x) for x in listdir(path.join(
return [unhexlify(x) for x in os.listdir(os.path.join(
self.baseDir, FilesystemInventory.objectDir))]

def getData(self, hashId):
"""Get object data"""
try:
with open(
path.join(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashId),
hexlify(hashId).decode(),
FilesystemInventory.dataFilename,
),
"r",
Expand All @@ -200,16 +199,16 @@ def getMetadata(self, hashId):
"""Get object metadata"""
try:
with open(
path.join(
os.path.join(
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashId),
hexlify(hashId).decode(),
FilesystemInventory.metadataFilename,
),
"r",
) as f:
objectType, streamNumber, expiresTime, tag = string.split(
f.read(), ",", 4)[:4]
objectType, streamNumber, expiresTime, tag = f.read().split(
",", 4)[:4]
return [
int(objectType),
int(streamNumber),
Expand Down Expand Up @@ -246,10 +245,10 @@ def hashes_by_stream(self, stream):

def unexpired_hashes_by_stream(self, stream):
"""Return unexpired hashes in the inventory for a particular stream"""
t = int(time.time())
try:
return [x for x, value in self._inventory[stream].items()
if value.expires > t]
return [
x for x, value in self._inventory[stream].items()
if value.expires > int(time.time())]
except KeyError:
return []

Expand All @@ -259,7 +258,7 @@ def flush(self):

def clean(self):
"""Clean out old items from the inventory"""
minTime = int(time.time()) - (60 * 60 * 30)
minTime = int(time.time()) - 60 * 60 * 30
deletes = []
for streamDict in self._inventory.values():
for hashId, item in streamDict.items():
Expand Down
4 changes: 2 additions & 2 deletions src/storage/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
from threading import RLock

from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery
from storage import InventoryItem, InventoryStorage
from .storage import InventoryItem, InventoryStorage


class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
class SqliteInventory(InventoryStorage):
"""Inventory using SQLite"""
def __init__(self):
super(SqliteInventory, self).__init__()
Expand Down
74 changes: 24 additions & 50 deletions src/storage/storage.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,47 @@
"""
Storing inventory items
"""
import collections

InventoryItem = collections.namedtuple(
'InventoryItem', 'type stream payload expires tag')
from abc import abstractmethod
from collections import namedtuple
try:
from collections import MutableMapping # pylint: disable=deprecated-class
except ImportError:
from collections.abc import MutableMapping


class Storage(object): # pylint: disable=too-few-public-methods
"""Base class for storing inventory
(extendable for other items to store)"""
pass
InventoryItem = namedtuple('InventoryItem', 'type stream payload expires tag')


class InventoryStorage(Storage, collections.MutableMapping):
"""Module used for inventory storage"""
class InventoryStorage(MutableMapping):
"""
Base class for storing inventory
(extendable for other items to store)
"""

def __init__(self): # pylint: disable=super-init-not-called
def __init__(self):
self.numberOfInventoryLookupsPerformed = 0

def __contains__(self, _):
raise NotImplementedError

def __getitem__(self, _):
raise NotImplementedError

def __setitem__(self, _, value):
raise NotImplementedError

def __delitem__(self, _):
raise NotImplementedError

def __iter__(self):
raise NotImplementedError

def __len__(self):
raise NotImplementedError
@abstractmethod
def __contains__(self, item):
pass

@abstractmethod
def by_type_and_tag(self, objectType, tag):
"""Return objects filtered by object type and tag"""
raise NotImplementedError
pass

@abstractmethod
def unexpired_hashes_by_stream(self, stream):
"""Return unexpired inventory vectors filtered by stream"""
raise NotImplementedError
pass

@abstractmethod
def flush(self):
"""Flush cache"""
raise NotImplementedError
pass

@abstractmethod
def clean(self):
"""Free memory / perform garbage collection"""
raise NotImplementedError


class MailboxStorage(Storage, collections.MutableMapping):
"""Method for storing mails"""

def __delitem__(self, key):
raise NotImplementedError

def __getitem__(self, key):
raise NotImplementedError

def __iter__(self):
raise NotImplementedError

def __len__(self):
raise NotImplementedError

def __setitem__(self, key, value):
raise NotImplementedError
pass
Loading