Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-262 Inconsistent node attributes behavior
Inroduced a more comprehensive way to instrument relationship attributes.

Old behavior instrumented attributes only if they were accessed directly from the
parent model. Traversing the storage made the access to an attribute inconsistent.

The new solution enables encapsulating the attributes disregarding the way they
were retrieved.
  • Loading branch information
mxmrlv committed Jun 6, 2017
1 parent e4d0036 commit 180e0a1cf1ad6da0ddd611b90a58e71acbea52e7
Showing 14 changed files with 289 additions and 115 deletions.
@@ -21,17 +21,21 @@

import aria
from aria.utils import file
from . import (
common,
collection_instrumentation
)
from . import common


class BaseOperationContext(common.BaseContext):
"""
Context object used during operation creation and execution
"""

INSTRUMENTATION_FIELDS = (
aria.modeling.models.Node.attributes,
aria.modeling.models.Node.properties,
aria.modeling.models.NodeTemplate.attributes,
aria.modeling.models.NodeTemplate.properties
)

def __init__(self, task_id, actor_id, **kwargs):
self._task_id = task_id
self._actor_id = actor_id
@@ -76,7 +80,6 @@ def plugin_workdir(self):

@property
def serialization_dict(self):
context_cls = self.__class__
context_dict = {
'name': self.name,
'service_id': self._service_id,
@@ -89,7 +92,7 @@ def serialization_dict(self):
'logger_level': self.logger.level
}
return {
'context_cls': context_cls,
'context_cls': self.__class__,
'context': context_dict
}

@@ -117,7 +120,6 @@ class NodeOperationContext(BaseOperationContext):
"""

@property
@collection_instrumentation.instrument_collection('attributes')
def node_template(self):
"""
the node of the current operation
@@ -126,7 +128,6 @@ def node_template(self):
return self.node.node_template

@property
@collection_instrumentation.instrument_collection('attributes')
def node(self):
"""
The node instance of the current operation
@@ -141,7 +142,6 @@ class RelationshipOperationContext(BaseOperationContext):
"""

@property
@collection_instrumentation.instrument_collection('attributes')
def source_node_template(self):
"""
The source node
@@ -150,7 +150,6 @@ def source_node_template(self):
return self.source_node.node_template

@property
@collection_instrumentation.instrument_collection('attributes')
def source_node(self):
"""
The source node instance
@@ -159,7 +158,6 @@ def source_node(self):
return self.relationship.source_node

@property
@collection_instrumentation.instrument_collection('attributes')
def target_node_template(self):
"""
The target node
@@ -168,7 +166,6 @@ def target_node_template(self):
return self.target_node.node_template

@property
@collection_instrumentation.instrument_collection('attributes')
def target_node(self):
"""
The target node instance
@@ -33,11 +33,7 @@ def host_ip(self):
:return:
"""
assert isinstance(self._op_context, operation.NodeOperationContext)
host = self._op_context.node.host
ip = host.attributes.get('ip')
if ip:
return ip.value

return self._op_context.node.host.attributes.get('ip')


class RelationshipToolBelt(object):
@@ -68,11 +68,13 @@ def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=No

@wraps(func)
def _wrapper(**func_kwargs):
ctx = func_kwargs['ctx']
if toolbelt:
operation_toolbelt = context.toolbelt(func_kwargs['ctx'])
operation_toolbelt = context.toolbelt(ctx)
func_kwargs.setdefault('toolbelt', operation_toolbelt)
validate_function_arguments(func, func_kwargs)
return func(**func_kwargs)
with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS):
return func(**func_kwargs)
return _wrapper


@@ -117,14 +117,15 @@ def _request_handler(self):

def _process(self, request):
try:
typed_request = json.loads(request)
args = typed_request['args']
payload = _process_ctx_request(self.ctx, args)
result_type = 'result'
if isinstance(payload, exceptions.ScriptException):
payload = dict(message=str(payload))
result_type = 'stop_operation'
result = {'type': result_type, 'payload': payload}
with self.ctx.model.instrument(*self.ctx.INSTRUMENTATION_FIELDS):
typed_request = json.loads(request)
args = typed_request['args']
payload = _process_ctx_request(self.ctx, args)
result_type = 'result'
if isinstance(payload, exceptions.ScriptException):
payload = dict(message=str(payload))
result_type = 'stop_operation'
result = {'type': result_type, 'payload': payload}
except Exception as e:
traceback_out = StringIO.StringIO()
traceback.print_exc(file=traceback_out)
@@ -15,6 +15,7 @@
"""
General storage API
"""
import threading


class StorageAPI(object):
@@ -45,6 +46,15 @@ def __init__(self, model_cls, name=None, **kwargs):
super(ModelAPI, self).__init__(**kwargs)
self._model_cls = model_cls
self._name = name or model_cls.__modelname__
self._thread_local = threading.local()
self._thread_local._instrumentation = []

@property
def _instrumentation(self):
if not hasattr(self._thread_local, '_instrumentation'):
self._thread_local._instrumentation = []
return self._thread_local._instrumentation


@property
def name(self):
@@ -13,24 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from functools import partial

from aria.modeling import models
from . import exceptions


class _InstrumentedCollection(object):

def __init__(self,
model,
mapi,
parent,
field_name,
field_cls,
seq=None,
is_top_level=True,
**kwargs):
self._model = model
self._mapi = mapi
self._parent = parent
self._field_name = field_name
self._is_top_level = is_top_level
self._field_cls = field_cls
self._load(seq, **kwargs)

@property
@@ -75,31 +75,29 @@ def _instrument(self, key, value):
else:
return value

return instrumentation_cls(self._model, self, key, value, False)
return instrumentation_cls(self._mapi, self, key, self._field_cls, value, False)

@staticmethod
def _raw_value(value):
def _raw_value(self, value):
"""
Get the raw value.
:param value:
:return:
"""
if isinstance(value, models.Attribute):
if isinstance(value, self._field_cls):
return value.value
return value

@staticmethod
def _encapsulate_value(key, value):
def _encapsulate_value(self, key, value):
"""
Create a new item cls if needed.
:param key:
:param value:
:return:
"""
if isinstance(value, models.Attribute):
if isinstance(value, self._field_cls):
return value
# If it is not wrapped
return models.Attribute.wrap(key, value)
return self._field_cls.wrap(key, value)

def __setitem__(self, key, value):
"""
@@ -112,11 +110,9 @@ def __setitem__(self, key, value):
if self._is_top_level:
# We are at the top level
field = getattr(self._parent, self._field_name)
mapi = getattr(self._model, models.Attribute.__modelname__)
value = self._set_field(field,
key,
value if key in field else self._encapsulate_value(key, value))
mapi.update(value)
self._set_field(
field, key, value if key in field else self._encapsulate_value(key, value))
self._mapi.update(self._parent)
else:
# We are not at the top level
self._set_field(self._parent, self._field_name, self)
@@ -131,7 +127,7 @@ def _set_field(self, collection, key, value):
"""
if isinstance(value, _InstrumentedCollection):
value = value._raw
if key in collection and isinstance(collection[key], models.Attribute):
if key in collection and isinstance(collection[key], self._field_cls):
if isinstance(collection[key], _InstrumentedCollection):
self._del(collection, key)
collection[key].value = value
@@ -204,39 +200,107 @@ def _raw(self):

class _InstrumentedModel(object):

def __init__(self, field_name, original_model, model_storage):
def __init__(self, original_model, mapi, instrumentation):
"""
The original model
:param original_model: the model to be instrumented
:param mapi: the mapi for that model
"""
super(_InstrumentedModel, self).__init__()
self._field_name = field_name
self._model_storage = model_storage
self._original_model = original_model
self._mapi = mapi
self._instrumentation = instrumentation
self._apply_instrumentation()

def __getattr__(self, item):
return getattr(self._original_model, item)
return_value = getattr(self._original_model, item)
if isinstance(return_value, self._original_model.__class__):
return _create_instrumented_model(return_value, self._mapi, self._instrumentation)
if isinstance(return_value, (list, dict)):
return _create_wrapped_model(return_value, self._mapi, self._instrumentation)
return return_value

def _apply_instrumentation(self):
for field in self._instrumentation:
field_name = field.key
field_cls = field.mapper.class_
field = getattr(self._original_model, field_name)

# Preserve the original value. e.g. original attributes would be located under
# _attributes
setattr(self, '_{0}'.format(field_name), field)

# set instrumented value
if isinstance(field, dict):
instrumentation_cls = _InstrumentedDict
elif isinstance(field, list):
instrumentation_cls = _InstrumentedList
else:
# TODO: raise proper error
raise exceptions.StorageError(
"ARIA supports instrumentation for dict and list. Field {field} of the "
"class {model} is of {type} type.".format(
field=field,
model=self._original_model,
type=type(field)))

instrumented_class = instrumentation_cls(seq=field,
parent=self._original_model,
mapi=self._mapi,
field_name=field_name,
field_cls=field_cls)
setattr(self, field_name, instrumented_class)


class _WrappedModel(object):

def __init__(self, wrapped, instrumentation, **kwargs):
"""
:param instrumented_cls: The class to be instrumented
:param instrumentation_cls: the instrumentation cls
:param wrapped: the currently wrapped instance
:param kwargs: and kwargs to the passed to the instrumented class.
"""
self._kwargs = kwargs
self._instrumentation = instrumentation
self._wrapped = wrapped

def _wrap(self, value):
if value.__class__ in (class_.class_ for class_ in self._instrumentation):
return _create_instrumented_model(
value, instrumentation=self._instrumentation, **self._kwargs)
elif hasattr(value, 'metadata') or isinstance(value, (dict, list)):
# Basically checks that the value is indeed an sqlmodel (it should have metadata)
return _create_wrapped_model(
value, instrumentation=self._instrumentation, **self._kwargs)
return value

def __getattr__(self, item):
if hasattr(self, '_wrapped'):
return self._wrap(getattr(self._wrapped, item))
else:
super(_WrappedModel, self).__getattribute__(item)

def __getitem__(self, item):
return self._wrap(self._wrapped[item])

field = getattr(self._original_model, self._field_name)

# Preserve the original value. e.g. original attributes would be located under
# _attributes
setattr(self, '_{0}'.format(self._field_name), field)
def _create_instrumented_model(original_model, mapi, instrumentation, **kwargs):
return type('Instrumented{0}'.format(original_model.__class__.__name__),
(_InstrumentedModel,),
{})(original_model, mapi, instrumentation, **kwargs)

# set instrumented value
setattr(self, self._field_name, _InstrumentedDict(self._model_storage,
self._original_model,
self._field_name,
field))

def _create_wrapped_model(original_model, mapi, instrumentation, **kwargs):
return type('Wrapped{0}'.format(original_model.__class__.__name__),
(_WrappedModel, ),
{})(original_model, instrumentation, mapi=mapi, **kwargs)

def instrument_collection(field_name, func=None):
if func is None:
return partial(instrument_collection, field_name)

def _wrapper(*args, **kwargs):
original_model = func(*args, **kwargs)
return type('Instrumented{0}'.format(original_model.__class__.__name__),
(_InstrumentedModel, ),
{})(field_name, original_model, args[0].model)
def instrument(instrumentation, original_model, mapi):
for instrumented_field in instrumentation:
if isinstance(original_model, instrumented_field.class_):
return _create_instrumented_model(original_model, mapi, instrumentation)

return _wrapper
return _create_wrapped_model(original_model, mapi, instrumentation)

0 comments on commit 180e0a1

Please sign in to comment.