Skip to content

Commit

Permalink
Merge 50e2554 into 554dc5c
Browse files Browse the repository at this point in the history
  • Loading branch information
pigmej committed Dec 8, 2015
2 parents 554dc5c + 50e2554 commit f2b3f85
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 29 deletions.
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ multipledispatch==0.4.8
pbr
pydot
bunch
wrapt
# if you want to use riak backend then
riak
# if you want to use sql backend then
# peewee


# if you want to use lua computable inputs
# lupa
3 changes: 2 additions & 1 deletion solar/core/resource/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from multipledispatch import dispatch
import networkx


from solar.core.signals import get_mapping
from solar.core import validation
from solar.dblayer.model import StrInt
Expand Down Expand Up @@ -98,7 +99,7 @@ def __init__(self, name, base_path, args=None, tags=None,
self.db_obj.save()

# Load
@dispatch(DBResource) # NOQA
@dispatch(object) # noqa
def __init__(self, resource_db):
self.db_obj = resource_db
self.name = resource_db.name
Expand Down
101 changes: 101 additions & 0 deletions solar/dblayer/lfu_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from collections import Counter
from operator import itemgetter
from solar.dblayer.proxy import DBLayerProxy

from threading import RLock


class LFUCache(object):

def __init__(self, owner, maxsize):
self._maxsize = maxsize
self._store = {}
self._lock = RLock()
self._use_count = Counter()
self._owner = owner

def set(self, item, value):
with self._lock:
if item in self._store:
self._use_count[item] += 1
return self._store[item]
store_len = len(self._store)
if store_len >= self._maxsize:
deleted = 0
# overflow + one more
exp_deleted = (store_len - self._maxsize) + 1
for k, _ in sorted(self._use_count.iteritems(),
key=itemgetter(1)):
if self.is_deletable(self._store[k]):
del self[k]
deleted += 1
if deleted == exp_deleted:
break
self._use_count[item] += 1
self._store[item] = value
return value

def is_empty(self):
return False if self._store else True

def get(self, item):
obj = self._store[item]
return DBLayerProxy(obj)

def __eq__(self, other):
if isinstance(other, dict):
for k, v in other.iteritems():
if k not in self._store:
return False
mv = self._store[k]
if not v == mv:
return False
return True
else:
return self == other

def __setitem__(self, item, value):
self.set(item, value)

def is_deletable(self, elem):
if elem.changed():
return False
if elem in elem._c.lazy_save:
return False
if elem._c.refs[elem.key]:
return False
return True

def __contains__(self, item):
return self._store.__contains__(item)

def __delitem__(self, item):
with self._lock:
del self._store[item]
del self._use_count[item]

def __getitem__(self, item):
# print 'incrementing in cache', item
with self._lock:
res = self._store[item] # will crash when no key but it's ok
self._use_count[item] += 1
return res

def incr_count(self, item):
with self._lock:
if item in self._use_count:
self._use_count[item] += 1
39 changes: 26 additions & 13 deletions solar/dblayer/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@

from functools import total_ordering
from functools import wraps
from random import getrandbits
from threading import RLock

import time
import uuid
import weakref

from collections import defaultdict
from random import getrandbits
from threading import RLock

from solar.dblayer.conflict_resolution import dblayer_conflict_resolver
from solar.dblayer.lfu_cache import LFUCache
from solar.utils import get_local


Expand All @@ -37,7 +42,6 @@ class DBLayerNoRiakObj(DBLayerException):

class NONE(object):
"""A None like type"""
pass


class SingleIndexCache(object):
Expand Down Expand Up @@ -79,12 +83,15 @@ def __exit__(self, *args, **kwargs):

class SingleClassCache(object):

__slots__ = ['obj_cache', 'db_ch_state', 'lazy_save', 'origin_class']
__slots__ = ['obj_cache', 'db_ch_state',
'lazy_save', 'origin_class',
'refs']

def __init__(self, origin_class):
self.obj_cache = {}
self.obj_cache = LFUCache(origin_class, 200)
self.db_ch_state = {'index': set()}
self.lazy_save = set()
self.refs = defaultdict(weakref.WeakSet)
self.origin_class = origin_class


Expand Down Expand Up @@ -805,6 +812,9 @@ def __str__(self):
return "<%s %s:%s>" % (self.__class__.__name__,
self._riak_object.bucket.name, self.key)

def __hash__(self):
return hash(self.key)

@classmethod
def new(cls, key, data):
return cls.from_dict(key, data)
Expand All @@ -822,8 +832,10 @@ def from_riakobj(cls, riak_obj):
obj._riak_object = riak_obj
if obj._new is None:
obj._new = False
cls._c.obj_cache[riak_obj.key] = obj
return obj
cache = cls._c.obj_cache
cache.set(riak_obj.key, obj)
# cache may adjust object
return cache.get(riak_obj.key)

@classmethod
def from_dict(cls, key, data=None):
Expand All @@ -835,6 +847,11 @@ def from_dict(cls, key, data=None):
raise DBLayerException("No key specified")
if key and 'key' in data and data['key'] != key:
raise DBLayerException("Different key values detected")
# shouldn't be needed, but may cover some weird usecase
# when inproperly using from_dict, because it then leads to conflicts
if key in cls._c.obj_cache:
raise DBLayerException("Object already exists in cache"
" cannot create second")
data['key'] = key
riak_obj = cls.bucket.new(key, data={})
obj = cls.from_riakobj(riak_obj)
Expand All @@ -855,20 +872,16 @@ def from_dict(cls, key, data=None):
setattr(obj, gname, val)
return obj

def __hash__(self):
return hash(self.key)

@classmethod
def get(cls, key):
try:
return cls._c.obj_cache[key]
return cls._c.obj_cache.get(key)
except KeyError:
riak_object = cls.bucket.get(key)
if not riak_object.exists:
raise DBLayerNotFound(key)
else:
obj = cls.from_riakobj(riak_object)
return obj
return cls.from_riakobj(riak_object)

@classmethod
def multi_get(cls, keys):
Expand Down
40 changes: 40 additions & 0 deletions solar/dblayer/proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.


import wrapt


class DBLayerProxy(wrapt.ObjectProxy):

def __init__(self, wrapped):
super(DBLayerProxy, self).__init__(wrapped)
refs = wrapped._c.refs
refs[wrapped.key].add(self)

def next(self, *args, **kwargs):
return self.__wrapped__.next(*args, **kwargs)

def __hash__(self):
# id is there by intention
# we override __has__ in model
return hash(id(self.__wrapped__))

def __eq__(self, other):
if not isinstance(other, DBLayerProxy):
return self.__wrapped__ == other
return self.__wrapped__ == self.__wrapped__

def __repr__(self):
return "<P: %r>" % self.__wrapped__
26 changes: 13 additions & 13 deletions solar/dblayer/test/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_from_dict(rk):
m11 = M1.get(key)
assert m1.key == key
assert m1.f3 == 250
assert m1 is m11
assert hash(m1) == hash(m11)


def test_not_exists(rk):
Expand Down Expand Up @@ -100,21 +100,21 @@ def test_lazy(rk):
def test_cache_logic(rk):
k = next(rk)
M1.session_start()
assert M1._c.obj_cache == {}
assert M1._c.obj_cache.is_empty()

m1 = M1.from_dict(k, {'f1': 'blah', 'f2': 150})
m1.save()
M1.session_end()

M1.session_start()
assert M1._c.obj_cache == {}
assert M1._c.obj_cache.is_empty()
m11 = M1.get(k)
pid = id(M1._c)
assert M1._c.obj_cache == {k: m11}
M1.session_end()

M1.session_start()
assert M1._c.obj_cache == {}
assert M1._c.obj_cache.is_empty()
M1.get(k)
aid = id(M1._c)

Expand Down Expand Up @@ -181,16 +181,16 @@ def test_cache_behaviour(rk):
m1 = M1.from_dict(key1, {'f1': 'm1'})

m11 = M1.get(key1)
assert m1 is m11
assert m1 == m11
m1.save()
assert m1 is m11
assert m1 == m11

m12 = M1.get(key1)
assert m1 is m12
assert m1 == m12

clear_cache()
m13 = M1.get(key1)
assert m1 is not m13
assert m1 != m13


def test_save_lazy(rk):
Expand All @@ -205,19 +205,19 @@ def test_save_lazy(rk):
m1g = M1.get(key1)
m2g = M1.get(key2)

assert m1 is m1g
assert m2 is m2g
assert m1 == m1g
assert m2 == m2g

assert M1._c.lazy_save == {m1, m2}
assert set(x.key for x in M1._c.lazy_save) == {m1.key, m2.key}
M1.session_end()
assert M1._c.lazy_save == set()

clear_cache()
m1g2 = M1.get(key1)
m2g2 = M1.get(key2)

assert m1g is not m1g2
assert m2g is not m2g2
assert m1g != m1g2
assert m2g != m2g2


def test_changed_index(rk):
Expand Down

0 comments on commit f2b3f85

Please sign in to comment.