Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-213 Sporadic tests failures over locked database issue
Move from 2 different sessions - one for the log, and the other for general model operations,
to one single session, while utilizing the keep tracking of changes mechanism for both logs and node/task states.
  • Loading branch information
mxmrlv committed May 11, 2017
1 parent 6864d42 commit 2ee06b8a6abe79f429458c7dbc5f9e1c31aec589
Showing 9 changed files with 219 additions and 78 deletions.
@@ -114,17 +114,11 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None):
return console


def create_sqla_log_handler(session, engine, log_cls, execution_id, level=logging.DEBUG):
def create_sqla_log_handler(model, log_cls, execution_id, level=logging.DEBUG):

# This is needed since the engine and session are entirely new we need to reflect the db
# schema of the logging model into the engine and session.
log_cls.__table__.create(bind=engine, checkfirst=True)

return _SQLAlchemyHandler(session=session,
engine=engine,
log_cls=log_cls,
execution_id=execution_id,
level=level)
return _SQLAlchemyHandler(model=model, log_cls=log_cls, execution_id=execution_id, level=level)


class _DefaultConsoleFormat(logging.Formatter):
@@ -168,10 +162,9 @@ def create_file_log_handler(

class _SQLAlchemyHandler(logging.Handler):

def __init__(self, session, engine, log_cls, execution_id, **kwargs):
def __init__(self, model, log_cls, execution_id, **kwargs):
logging.Handler.__init__(self, **kwargs)
self._session = session
self._engine = engine
self._model = model
self._cls = log_cls
self._execution_id = execution_id

@@ -188,15 +181,7 @@ def emit(self, record):
# Not mandatory.
traceback=getattr(record, 'traceback', None)
)
self._session.add(log)

try:
self._session.commit()
except BaseException:
self._session.rollback()
raise
finally:
self._session.close()
self._model.log.put(log)


_default_file_formatter = logging.Formatter(
@@ -79,13 +79,9 @@ def _register_logger(self, level=None, task_id=None):
self.logger.addHandler(self._get_sqla_handler())

def _get_sqla_handler(self):
api_kwargs = {}
if self._model._initiator:
api_kwargs.update(self._model._initiator(**self._model._initiator_kwargs))
api_kwargs.update(**self._model._api_kwargs)
return aria_logger.create_sqla_log_handler(log_cls=modeling.models.Log,
execution_id=self._execution_id,
**api_kwargs)
return aria_logger.create_sqla_log_handler(model=self._model,
log_cls=modeling.models.Log,
execution_id=self._execution_id)

def __repr__(self):
return (
@@ -196,7 +192,6 @@ def get_resource_and_render(self, path=None, variables=None):

def _render_resource(self, resource_content, variables):
variables = variables or {}
if 'ctx' not in variables:
variables['ctx'] = self
variables.setdefault('ctx', self)
resource_template = jinja2.Template(resource_content)
return resource_template.render(variables)
@@ -40,7 +40,7 @@ def _task_started(task, *args, **kwargs):
with task._update():
task.started_at = datetime.utcnow()
task.status = task.STARTED
_update_node_state_if_necessary(task, is_transitional=True)
_update_node_state_if_necessary(task, is_transitional=True)


@events.on_failure_task_signal.connect
@@ -74,7 +74,7 @@ def _task_succeeded(task, *args, **kwargs):
task.ended_at = datetime.utcnow()
task.status = task.SUCCESS

_update_node_state_if_necessary(task)
_update_node_state_if_necessary(task)


@events.start_workflow_signal.connect
@@ -229,6 +229,7 @@ def _handle_apply_tracked_changes_request(self, task_id, request, response):
def _apply_tracked_changes(task, request):
instrumentation.apply_tracked_changes(
tracked_changes=request['tracked_changes'],
new_instances=request['new_instances'],
model=task.context.model)


@@ -277,22 +278,28 @@ def started(self):
"""Task started message"""
self._send_message(type='started')

def succeeded(self, tracked_changes):
def succeeded(self, tracked_changes, new_instances):
"""Task succeeded message"""
self._send_message(type='succeeded', tracked_changes=tracked_changes)
self._send_message(
type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances)

def failed(self, tracked_changes, exception):
def failed(self, tracked_changes, new_instances, exception):
"""Task failed message"""
self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception)
self._send_message(type='failed',
tracked_changes=tracked_changes,
new_instances=new_instances,
exception=exception)

def apply_tracked_changes(self, tracked_changes):
self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes)
def apply_tracked_changes(self, tracked_changes, new_instances):
self._send_message(type='apply_tracked_changes',
tracked_changes=tracked_changes,
new_instances=new_instances)

def closed(self):
"""Executor closed message"""
self._send_message(type='closed')

def _send_message(self, type, tracked_changes=None, exception=None):
def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.port))
try:
@@ -301,7 +308,8 @@ def _send_message(self, type, tracked_changes=None, exception=None):
'task_id': self.task_id,
'exception': exceptions.wrap_if_needed(exception),
'traceback': exceptions.get_exception_as_string(*sys.exc_info()),
'tracked_changes': tracked_changes
'tracked_changes': tracked_changes or {},
'new_instances': new_instances or {}
})
response = _recv_message(sock)
response_exception = response.get('exception')
@@ -311,7 +319,7 @@ def _send_message(self, type, tracked_changes=None, exception=None):
sock.close()


def _patch_session(ctx, messenger, instrument):
def _patch_ctx(ctx, messenger, instrument):
# model will be None only in tests that test the executor component directly
if not ctx.model:
return
@@ -326,12 +334,13 @@ def patched_refresh(target):
original_refresh(target)

def patched_commit():
messenger.apply_tracked_changes(instrument.tracked_changes)
messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances)
instrument.expunge_session()
instrument.clear()

def patched_rollback():
# Rollback is performed on parent process when commit fails
pass
instrument.expunge_session()

# when autoflush is set to true (the default), refreshing an object will trigger
# an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
@@ -363,21 +372,29 @@ def _main():
# This is required for the instrumentation work properly.
# See docstring of `remove_mutable_association_listener` for further details
modeling_types.remove_mutable_association_listener()
try:
ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
except BaseException as e:
messenger.failed(exception=e, tracked_changes=None, new_instances=None)
return

with instrumentation.track_changes() as instrument:
with instrumentation.track_changes(ctx.model) as instrument:
try:
messenger.started()
ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
_patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
_patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument)
task_func = imports.load_attribute(implementation)
aria.install_aria_extensions()
for decorate in process_executor.decorate():
task_func = decorate(task_func)
task_func(ctx=ctx, **operation_inputs)
messenger.succeeded(tracked_changes=instrument.tracked_changes)
messenger.succeeded(tracked_changes=instrument.tracked_changes,
new_instances=instrument.new_instances)
except BaseException as e:
messenger.failed(exception=e, tracked_changes=instrument.tracked_changes)

messenger.failed(exception=e,
tracked_changes=instrument.tracked_changes,
new_instances=instrument.new_instances)
finally:
instrument.expunge_session()

if __name__ == '__main__':
_main()
@@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import copy
import json
import os

import sqlalchemy.event

@@ -26,11 +26,19 @@
_VERSION_ID_COL = 'version'
_STUB = object()
_INSTRUMENTED = {
_models.Node.runtime_properties: dict
'modified': {
_models.Node.runtime_properties: dict,
_models.Node.state: str,
_models.Task.status: str,
},
'new': (_models.Log, )

}

_NEW_INSTANCE = 'NEW_INSTANCE'


def track_changes(instrumented=None):
def track_changes(model=None, instrumented=None):
"""Track changes in the specified model columns
This call will register event listeners using sqlalchemy's event mechanism. The listeners
@@ -50,32 +58,78 @@ def track_changes(instrumented=None):
will then call ``apply_tracked_changes()`` that resides in this module as well.
At that point, the changes will actually be written back to the database.
:param model: the model storage. it should hold a mapi for each model. the session of each mapi
is needed to setup events
:param instrumented: A dict from model columns to their python native type
:return: The instrumentation context
"""
return _Instrumentation(instrumented or _INSTRUMENTED)
return _Instrumentation(model, instrumented or _INSTRUMENTED)


class _Instrumentation(object):

def __init__(self, instrumented):
def __init__(self, model, instrumented):
self.tracked_changes = {}
self.new_instances = {}
self.listeners = []
self._instances_to_expunge = []
self._model = model
self._track_changes(instrumented)

@property
def _new_instance_id(self):
return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE,
index=len(self._instances_to_expunge))

def expunge_session(self):
for new_instance in self._instances_to_expunge:
self._get_session_from_model(new_instance.__tablename__).expunge(new_instance)

def _get_session_from_model(self, tablename):
mapi = getattr(self._model, tablename, None)
if mapi:
return mapi._session
raise StorageError("Could not retrieve session for {0}".format(tablename))

def _track_changes(self, instrumented):
instrumented_classes = {}
for instrumented_attribute, attribute_type in instrumented.items():
instrumented_attribute_classes = {}
# Track any newly-set attributes.
for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
self._register_set_attribute_listener(
instrumented_attribute=instrumented_attribute,
attribute_type=attribute_type)
instrumented_class = instrumented_attribute.parent.entity
instrumented_class_attributes = instrumented_classes.setdefault(instrumented_class, {})
instrumented_class_attributes = instrumented_attribute_classes.setdefault(
instrumented_class, {})
instrumented_class_attributes[instrumented_attribute.key] = attribute_type
for instrumented_class, instrumented_attributes in instrumented_classes.items():
self._register_instance_listeners(
instrumented_class=instrumented_class,
instrumented_attributes=instrumented_attributes)

# Track any global instance update such as 'refresh' or 'load'
for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items():
self._register_instance_listeners(instrumented_class=instrumented_class,
instrumented_attributes=instrumented_attributes)

# Track any newly created instances.
for instrumented_class in instrumented.get('new', {}):
self._register_new_instance_listener(instrumented_class)

def _register_new_instance_listener(self, instrumented_class):
if self._model is None:
raise StorageError("In order to keep track of new instances, a ctx is needed")

def listener(_, instance):
if not isinstance(instance, instrumented_class):
return
self._instances_to_expunge.append(instance)
tracked_instances = self.new_instances.setdefault(instance.__modelname__, {})
tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {})
instance_as_dict = instance.to_dict()
instance_as_dict.update((k, getattr(instance, k))
for k in getattr(instance, '__private_fields__', []))
tracked_attributes.update(instance_as_dict)
session = self._get_session_from_model(instrumented_class.__tablename__)
listener_args = (session, 'after_attach', listener)
sqlalchemy.event.listen(*listener_args)
self.listeners.append(listener_args)

def _register_set_attribute_listener(self, instrumented_attribute, attribute_type):
def listener(target, value, *_):
@@ -125,6 +179,9 @@ def clear(self, target=None):
else:
self.tracked_changes.clear()

self.new_instances.clear()
self._instances_to_expunge = []

def restore(self):
"""Remove all listeners registered by this instrumentation"""
for listener_args in self.listeners:
@@ -160,7 +217,7 @@ def dict(self):
return {'initial': self.initial, 'current': self.current}.copy()


def apply_tracked_changes(tracked_changes, model):
def apply_tracked_changes(tracked_changes, new_instances, model):
"""Write tracked changes back to the database using provided model storage
:param tracked_changes: The ``tracked_changes`` attribute of the instrumentation context
@@ -169,6 +226,7 @@ def apply_tracked_changes(tracked_changes, model):
"""
successfully_updated_changes = dict()
try:
# handle instance updates
for mapi_name, tracked_instances in tracked_changes.items():
successfully_updated_changes[mapi_name] = dict()
mapi = getattr(model, mapi_name)
@@ -177,18 +235,27 @@ def apply_tracked_changes(tracked_changes, model):
instance = None
for attribute_name, value in tracked_attributes.items():
if value.initial != value.current:
if not instance:
instance = mapi.get(instance_id)
instance = instance or mapi.get(instance_id)
setattr(instance, attribute_name, value.current)
if instance:
_validate_version_id(instance, mapi)
mapi.update(instance)
successfully_updated_changes[mapi_name][instance_id] = [
v.dict for v in tracked_attributes.values()]

# Handle new instances
for mapi_name, new_instance in new_instances.items():
successfully_updated_changes[mapi_name] = dict()
mapi = getattr(model, mapi_name)
for new_instance_kwargs in new_instance.values():
instance = mapi.model_cls(**new_instance_kwargs)
mapi.put(instance)
successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs
except BaseException:
for key, value in successfully_updated_changes.items():
if not value:
del successfully_updated_changes[key]
# TODO: if the successful has _STUB, the logging fails because it can't serialize the object
model.logger.error(
'Registering all the changes to the storage has failed. {0}'
'The successful updates were: {0} '

0 comments on commit 2ee06b8

Please sign in to comment.