Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-115-Log-model-should-have-an-Execution-field
  • Loading branch information
mxmrlv committed Mar 6, 2017
1 parent 63b157c commit c0d76adaf37935a10a9f4d3b3fe4b508595192ab
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 12 deletions.
@@ -98,13 +98,17 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None):
return console


def create_sqla_log_handler(session, engine, log_cls, level=logging.DEBUG):
def create_sqla_log_handler(session, engine, log_cls, execution_id, level=logging.DEBUG):

# This is needed since the engine and session are entirely new we need to reflect the db
# schema of the logging model into the engine and session.
log_cls.__table__.create(bind=engine, checkfirst=True)

return _SQLAlchemyHandler(session=session, engine=engine, log_cls=log_cls, level=level)
return _SQLAlchemyHandler(session=session,
engine=engine,
log_cls=log_cls,
execution_id=execution_id,
level=level)


class _DefaultConsoleFormat(logging.Formatter):
@@ -148,16 +152,18 @@ def create_file_log_handler(

class _SQLAlchemyHandler(logging.Handler):

def __init__(self, session, engine, log_cls, **kwargs):
def __init__(self, session, engine, log_cls, execution_id, **kwargs):
logging.Handler.__init__(self, **kwargs)
self._session = session
self._engine = engine
self._cls = log_cls
self._execution_id = execution_id

def emit(self, record):
created_at = datetime.strptime(logging.Formatter('%(asctime)s').formatTime(record),
'%Y-%m-%d %H:%M:%S,%f')
log = self._cls(
execution_fk=self._execution_id,
actor=record.prefix,
level=record.levelname,
msg=record.msg,
@@ -17,6 +17,7 @@
"""
import logging
from contextlib import contextmanager
from datetime import datetime
from functools import partial
from uuid import uuid4

@@ -62,6 +63,17 @@ def __init__(
self._workdir = workdir
self.logger = None

def _create_execution(self):
now = datetime.utcnow()
execution = self.model.execution.model_cls(
service_instance=self.service_instance,
workflow_name=self._workflow_name,
created_at=now,
parameters=self.parameters,
)
self.model.execution.put(execution)
return execution.id

def _register_logger(self, logger_name=None, level=None):
self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__),
self.logging_id)
@@ -74,7 +86,9 @@ def _get_sqla_handler(self):
if self._model._initiator:
api_kwargs.update(self._model._initiator(**self._model._initiator_kwargs))
api_kwargs.update(**self._model._api_kwargs)
return aria_logger.create_sqla_log_handler(log_cls=modeling.model.Log, **api_kwargs)
return aria_logger.create_sqla_log_handler(log_cls=modeling.model.Log,
execution_id=self._execution_id,
**api_kwargs)

def __repr__(self):
return (
@@ -34,6 +34,7 @@ def __init__(self,
service_instance_id,
task_id,
actor_id,
execution_id,
**kwargs):
super(BaseOperationContext, self).__init__(
name=name,
@@ -44,6 +45,7 @@ def __init__(self,
self._task_id = task_id
self._actor_id = actor_id
self._task = None
self._execution_id = execution_id
self._register_logger()

def __repr__(self):
@@ -89,7 +91,8 @@ def serialization_dict(self):
'actor_id': self._actor_id,
'workdir': self._workdir,
'model_storage': self.model.serialization_dict if self.model else None,
'resource_storage': self.resource.serialization_dict if self.resource else None
'resource_storage': self.resource.serialization_dict if self.resource else None,
'execution_id': self._execution_id
}
return {
'context_cls': context_cls,
@@ -32,10 +32,10 @@ class WorkflowContext(BaseContext):
def __init__(self,
workflow_name,
parameters=None,
execution_id=None,
task_max_attempts=1,
task_retry_interval=0,
task_ignore_failure=False,
execution_id=None,
*args, **kwargs):
super(WorkflowContext, self).__init__(*args, **kwargs)
self._workflow_name = workflow_name
@@ -45,7 +45,7 @@ def __init__(self,
self._task_ignore_failure = task_ignore_failure
# TODO: execution creation should happen somewhere else
# should be moved there, when such logical place exists
self._execution_id = self._create_execution() if execution_id is None else execution_id
self._execution_id = execution_id or self._create_execution()
self._register_logger()

def __repr__(self):
@@ -146,6 +146,7 @@ def __init__(self, api_task, *args, **kwargs):
service_instance_id=self._workflow_context._service_instance_id,
task_id=operation_task.id,
actor_id=api_task.actor.id,
execution_id=self._workflow_context._execution_id,
workdir=self._workflow_context._workdir)
self._task_id = operation_task.id
self._update_fields = None
@@ -471,6 +471,14 @@ def retry(message=None, retry_interval=None):
class LogBase(ModelMixin):
__tablename__ = 'log'

@declared_attr
def execution_fk(cls):
return cls.foreign_key('execution')

@declared_attr
def execution(cls):
return cls.many_to_one_relationship('execution')

level = Column(String)
msg = Column(String)
created_at = Column(DateTime, index=True)
@@ -213,7 +213,7 @@ def basic_workflow(graph, **_):


@pytest.fixture(params=[
(thread.ThreadExecutor, dict()),
# (thread.ThreadExecutor, dict()),
(process.ProcessExecutor, dict(python_path=tests.ROOT_DIR))
])
def executor(request):
@@ -252,7 +252,7 @@ def basic_workflow(graph, **_):
)

execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
_assert_loggins(ctx.model.log.list(), inputs)
_assert_loggins(ctx, inputs)


def test_relationship_operation_logging(ctx, executor):
@@ -283,14 +283,22 @@ def basic_workflow(graph, **_):
)

execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
_assert_loggins(ctx.model.log.list(), inputs)
_assert_loggins(ctx, inputs)


def _assert_loggins(logs, inputs):
def _assert_loggins(ctx, inputs):

# The logs should contain the following: Workflow Start, Operation Start, custom operation
# log string (op_start), custom operation log string (op_end), Operation End, Workflow End.
assert len(logs) == 6

executions = ctx.model.execution.list()
assert len(executions) == 1
execution = executions[0]

logs = ctx.model.log.list()
assert len(logs) == execution.logs.count() == 6
assert all(l in logs for l in execution.logs)
assert all(l.execution == execution for l in logs)

op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info']
assert len(op_start_log) == 1
@@ -22,6 +22,7 @@
LoggerMixin,
_DefaultConsoleFormat)


def test_create_logger():

logger = create_logger()

0 comments on commit c0d76ad

Please sign in to comment.