Skip to content

Commit

Permalink
rework-database-indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Jun 23, 2021
1 parent a86d7ef commit a2c5625
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 185 deletions.
211 changes: 79 additions & 132 deletions renku/core/incubation/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import hashlib
import json
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Union
from uuid import uuid4

from BTrees.OOBTree import OOBTree
Expand All @@ -32,6 +32,7 @@
from zope.interface import implementer

OID_TYPE = str
MARKER = object()

"""NOTE: These are used as _p_serial to mark if an object was read from storage or is new"""
NEW = z64 # NOTE: Do not change this value since this is the default when a Persistent object is created
Expand Down Expand Up @@ -76,12 +77,11 @@ def get_attribute(object, name: Union[List[str], str]):
class Database:
"""The Metadata Object Database.
It is equivalent to a ZODB.Connection and a persistent.DataManager. It implements ZODB.interfaces.IConnection and
persistent.interfaces.IPersistentDataManager interfaces.
This class is equivalent to a persistent.DataManager and implements persistent.interfaces.IPersistentDataManager
interface.
"""

ROOT_OID = "root"
ROOT_TYPE_NAMES = ("Activity", "Entity", "Plan")

def __init__(self, storage):
self._storage: Storage = storage
Expand All @@ -93,20 +93,15 @@ def __init__(self, storage):
self._reader: ObjectReader = ObjectReader(database=self)
self._writer: ObjectWriter = ObjectWriter(database=self)
self._root: Optional[OOBTree] = None
self._root_types: Tuple[type, ...] = (OOBTree, Index)

self._initialize_root()

@classmethod
def from_path(cls, path: Union[str, Path]) -> "Database":
def from_path(cls, path: Union[Path, str]) -> "Database":
"""Create a Storage and Database using the given path."""
storage = Storage(path)
return Database(storage=storage)

@staticmethod
def new_oid():
"""Generate a random oid."""
return f"{uuid4().hex}{uuid4().hex}"

@staticmethod
def generate_oid(object: Persistent) -> OID_TYPE:
"""Generate oid for a Persistent object based on its id."""
Expand All @@ -126,108 +121,60 @@ def hash_id(id: str) -> OID_TYPE:
"""Return oid from id."""
return hashlib.sha3_256(id.encode("utf-8")).hexdigest()

@staticmethod
def new_oid():
"""Generate a random oid."""
return f"{uuid4().hex}{uuid4().hex}"

@staticmethod
def _get_filename_from_oid(oid: OID_TYPE) -> str:
return oid.lower()

@property
def root(self):
"""Return the database root object."""
if not self._root:
self._initialize_root()

return self._root

def _initialize_root(self):
"""Initialize root object."""
if not self._root:
try:
self._root = self.get(Database.ROOT_OID)
root_types = tuple(i.value_type for i in self._root.values() if i.value_type not in self._root_types)
self._root_types += root_types
for index in self._root.values():
len(index._entries)
except POSKeyError:
self._root = OOBTree()
self._add_internal(self._root, Database.ROOT_OID)
self._root._p_oid = Database.ROOT_OID
self.register(self._root)

def add_index(self, name: str, value_type: type, attribute: str, is_list: bool = False, key_type: type = None):
"""Add an index."""
assert len(self._objects_to_commit) == 0 or set(self._objects_to_commit.keys()) == {Database.ROOT_OID}
root = self.root
assert name not in root, f"Index already exists: '{name}'"
assert name not in self._root, f"Index already exists: '{name}'"

cls = IndexList if is_list else Index
root[name] = cls(name=name, value_type=value_type, attribute=attribute, key_type=key_type)
if value_type not in self._root_types:
self._root_types += (value_type,)

def add(self, object: Persistent, *, key_object=None):
"""Add a new object to the database.
If ``key_object`` is not None then it is used to generate index keys, otherwise, ``object`` is used as the key.
"""
assert isinstance(object, self._root_types), f"Cannot add objects of type '{type(object)}'"

if object._p_oid is None:
assert getattr(object, "id", None) is not None, f"Object does not have 'id': {object}"
object._p_oid = self.generate_oid(object)
index = cls(name=name, value_type=value_type, attribute=attribute, key_type=key_type)
index._p_jar = self

oid = object._p_oid

cached_object = self.get_cached(oid)
if cached_object:
assert cached_object is object, f"An object with oid '{oid}' is in the cache: {cached_object} != {object}"
return

self._add_internal(object=object, key_object=key_object)

def _update_indexes(self, object: Persistent, key_object):
index: Index
for index in self.root.values():
index.update(object=object, key_object=key_object)

def _add_internal(self, object: Persistent, oid: OID_TYPE = None, key_object=None):
"""Allow adding non-root types; used for adding root object."""
assert isinstance(object, Persistent), f"Cannot add non-Persistent object: '{object}'"
assert oid is None or isinstance(oid, OID_TYPE), f"Invalid oid type: '{type(oid)}'"

object._p_jar = self

if oid:
assert object._p_oid is None or object._p_oid == oid, f"Object's oid is different: {object._p_oid} != {oid}"
object._p_oid = oid
else:
assert object._p_oid is not None, f"Object has no pid: {object}"

object._p_serial = NEW
self._objects_to_commit[object._p_oid] = object

self._update_indexes(object=object, key_object=key_object)

def oldstate(self, object, tid):
"""See persistent.interfaces.IPersistentDataManager::oldstate."""
raise NotImplementedError

def setstate(self, object: Persistent):
"""Load the state for a ghost object."""
oid = object._p_oid

data = self._storage.load(filename=self._get_filename_from_oid(oid))
self._reader.set_ghost_state(object, data)
self._root[name] = index

object._p_serial = PERSISTED
return index

def register(self, object: Persistent):
"""Register a Persistent object to be stored.
NOTE: When a Persistent object is changed it calls this method.
"""
assert isinstance(object, Persistent), f"Cannot add non-Persistent object: '{object}'"

if object._p_oid is None:
object._p_oid = self.generate_oid(object)

self._add_internal(object)
object._p_jar = self
# object._p_serial = NEW
self._objects_to_commit[object._p_oid] = object

def get(self, oid: OID_TYPE) -> Persistent:
"""Get the object by oid."""
if oid != Database.ROOT_OID and oid in self.root: # NOTE: Avoid looping if getting "root"
return self.root[oid]
if oid != Database.ROOT_OID and oid in self._root: # NOTE: Avoid looping if getting "root"
return self._root[oid]

object = self.get_cached(oid)
if object is not None:
Expand Down Expand Up @@ -259,56 +206,49 @@ def get_cached(self, oid: OID_TYPE) -> Optional[Persistent]:
if object is not None:
return object

def new_ghost(self, oid: OID_TYPE, object: Persistent):
"""Create a new ghost object."""
object._p_jar = self
self._cache.new_ghost(oid, object)

def setstate(self, object: Persistent):
"""Load the state for a ghost object."""
data = self._storage.load(filename=self._get_filename_from_oid(object._p_oid))
self._reader.set_ghost_state(object, data)
object._p_serial = PERSISTED

def commit(self):
"""Commit modified and new objects."""
while self._objects_to_commit:
oid, object = self._objects_to_commit.popitem()

if not object._p_changed and object._p_serial != NEW:
continue

self._store_object(object)
_, object = self._objects_to_commit.popitem()
if object._p_changed or object._p_serial == NEW:
self._store_object(object)

def _store_object(self, object: Persistent):
oid = object._p_oid

data = self._writer.serialize(object)
self._storage.store(filename=self._get_filename_from_oid(object._p_oid), data=data)

self._storage.store(filename=self._get_filename_from_oid(oid), data=data)
self._cache[oid] = object

object._p_estimated_size = 0
self._cache[object._p_oid] = object

object._p_changed = 0 # NOTE: transition from changed to up-to-date
object._p_serial = PERSISTED

@staticmethod
def _get_filename_from_oid(oid: OID_TYPE) -> str:
return oid.lower()

def new_ghost(self, oid: OID_TYPE, object: Persistent):
"""Create a new ghost object."""
object._p_jar = self
self._cache.new_ghost(oid, object)

def remove(self, object: Persistent):
"""Remove an object from cache and indexes by creating a new oid for it."""
def remove_from_cache(self, object: Persistent):
"""Remove an object from cache."""
oid = object._p_oid
try:
del self._cache[oid]
except KeyError:
pass
self._cache.pop(oid, None)
self._pre_cache.pop(oid, None)
self._objects_to_commit.pop(oid, None)

for index in self.root.values():
index.delete(object)

def readCurrent(self, object):
"""We don't use this method but some Persistent logic require its existence."""
assert object._p_jar is self
assert object._p_oid is not None

def oldstate(self, object, tid):
"""See persistent.interfaces.IPersistentDataManager::oldstate."""
raise NotImplementedError


@implementer(IPickleCache)
class Cache:
Expand Down Expand Up @@ -342,6 +282,10 @@ def __delitem__(self, oid):
assert isinstance(oid, OID_TYPE), f"Invalid oid type: '{type(oid)}'"
self._entries.pop(oid)

def pop(self, oid, default=MARKER):
"""Remove and return an object or the default value; raise if not default is provided."""
return self._entries.pop(oid) if default is MARKER else self._entries.pop(oid, default)

def get(self, oid, default=None):
"""See IPickleCache."""
assert isinstance(oid, OID_TYPE), f"Invalid oid type: '{type(oid)}'"
Expand Down Expand Up @@ -393,9 +337,6 @@ def __contains__(self, key):
def __getitem__(self, key):
return self._entries[key]

def __delitem__(self, key):
self._entries.pop(key)

def __getstate__(self):
return {
"name": self._name,
Expand Down Expand Up @@ -434,20 +375,21 @@ def items(self):
"""Return an iterator of keys and values."""
return self._entries.items()

def update(self, object: Persistent, key_object):
"""Update index with object."""
if not isinstance(object, self._value_type):
return
def add(self, object: Persistent, *, key_object=None):
"""Update index with object.
If ``key_object`` is not None then it is used to generate index keys, otherwise, ``object`` is used as the key.
"""
assert isinstance(object, self._value_type), f"Cannot add objects of type '{type(object)}'"

key = self._get_key(object=object, key_object=key_object)
self._entries[key] = object

def delete(self, object: Persistent):
def remove(self, object: Persistent, *, key_object=None):
"""Delete an object if it is indexed."""
if not isinstance(object, self._value_type):
return
assert isinstance(object, self._value_type), f"Invalid object type '{type(object)}'"

key = get_attribute(object, self._attribute)
key = self._get_key(object=object, key_object=key_object)
self._entries.pop(key, None)

def _get_key(self, object: Persistent, key_object):
Expand All @@ -463,10 +405,12 @@ def _get_key(self, object: Persistent, key_object):
class IndexList(Index):
"""Database index pointing to multiple values."""

def update(self, object: Persistent, key_object):
"""Update index with object."""
if not isinstance(object, self._value_type):
return
def add(self, object: Persistent, *, key_object=None):
"""Update index with object.
If ``key_object`` is not None then it is used to generate index keys, otherwise, ``object`` is used as the key.
"""
assert isinstance(object, self._value_type), f"Cannot add objects of type '{type(object)}'"

key = self._get_key(object=object, key_object=key_object)
values = self._entries.get(key)
Expand All @@ -481,12 +425,15 @@ def update(self, object: Persistent, key_object):
else:
self._entries[key] = [object]

def delete(self, object: Persistent):
# NOTE: It is needed to register the object because a list is not Persistent and its content won't be registered
# by the persistent machinery
self._p_jar.register(object)

def remove(self, object: Persistent, *, key_object=None):
"""Delete an object if it is indexed."""
if not isinstance(object, self._value_type):
return
assert isinstance(object, self._value_type), f"Invalid object type '{type(object)}'"

key = get_attribute(object, self._attribute)
key = self._get_key(object=object, key_object=key_object)
values = self._entries.get(key)
if values:
assert isinstance(values, list), f"Value in IndexList is not a list: {values}"
Expand Down Expand Up @@ -594,7 +541,7 @@ def _serialize_helper(self, object):
if not object._p_oid:
object._p_oid = Database.generate_oid(object)
if object._p_state not in [GHOST, UPTODATE] or (object._p_state == UPTODATE and object._p_serial == NEW):
self._database.add(object)
self._database.register(object)
return {"@type": get_type_name(object), "@oid": object._p_oid, "@reference": True}
elif hasattr(object, "__getstate__"):
state = object.__getstate__()
Expand Down
3 changes: 3 additions & 0 deletions renku/core/incubation/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ def process_datasets(commit):
elif client.has_graph_files() or client.has_datasets_provenance():
raise errors.OperationError("Graph metadata exists. Use ``--force`` to regenerate it.")

# database = Database.from_path(path=client.database_path)
# update_injected_database(database)

client.initialize_graph()
client.initialize_datasets_provenance()

Expand Down
16 changes: 16 additions & 0 deletions renku/core/management/command_builder/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,22 @@ def update_injected_client(new_client):
injector._bindings["LocalClient"] = lambda: new_client


def update_injected_database(database):
"""Update the injected Database instance.
Used when re-generating new graphs to allow overriding existing database.
"""
from renku.core.incubation.database import Database

injector = getattr(_LOCAL, "injector", None)

if not injector:
raise inject.InjectorException("No injector is configured")

injector._bindings[Database] = lambda: database
injector._bindings["Database"] = lambda: database


class Command:
"""Base renku command builder."""

Expand Down
Loading

0 comments on commit a2c5625

Please sign in to comment.