Skip to content

Commit

Permalink
Merge 61a813a into 90e65e1
Browse files Browse the repository at this point in the history
  • Loading branch information
itamarst committed Dec 7, 2015
2 parents 90e65e1 + 61a813a commit 9d90d06
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 35 deletions.
27 changes: 27 additions & 0 deletions docs/source/generating/threads.rst
Expand Up @@ -8,6 +8,33 @@ In many applications we are interested in tasks that exist in more than just a s
For example, one server may send a request to another server over a network and we would like to trace the combined operation across both servers' logs.
To make this as easy as possible Eliot supports serializing task identifiers for transfer over the network (or between threads), allowing tasks to span multiple processes.

.. _cross thread tasks:

Cross-Thread Tasks
------------------

To trace actions across threads Eliot provides the ``eliot.preserve_context`` API.
It takes a callable that is about to be passed to a thread constructor and preserves the current Eliot context, returning a new callable.
This new callable should only be used, in the thread where it will run; it will restore the Eliot context and run the original function inside of it.
For example:

.. literalinclude:: ../../../examples/cross_thread.py

Here's what the result is when run:

.. code-block:: shell
$ python examples/cross_thread.py | eliot-tree
11a85c42-a13f-491c-ad44-c48b2efad0e3
+-- main_thread@1/started
+-- eliot:remote_task@2,1/started
+-- in_thread@2,2,1/started
|-- x: 3
`-- y: 4
+-- in_thread@2,2,2/succeeded
|-- result: 7
+-- eliot:remote_task@2,3/succeeded
+-- main_thread@3/succeeded
.. _cross process tasks:
Expand Down
9 changes: 9 additions & 0 deletions docs/source/news.rst
@@ -1,6 +1,15 @@
What's New
==========

0.11.0
^^^^^^

Features:

* Eliot tasks can now more easily :ref:`span multiple threads <cross thread tasks>` using the new ``eliot.preserve_context`` API.
* ``eliot-prettyprint`` command line tool now pretty prints field values in a more informative manner.


0.10.1
^^^^^^

Expand Down
4 changes: 2 additions & 2 deletions eliot/__init__.py
Expand Up @@ -3,7 +3,7 @@
"""
# Expose the public API:
from ._message import Message
from ._action import startAction, startTask, Action
from ._action import startAction, startTask, Action, preserve_context
from ._output import (
ILogger, Logger, MemoryLogger, to_file, FileDestination,
)
Expand All @@ -26,7 +26,7 @@


__all__ = ["Message", "writeTraceback", "writeFailure",
"startAction", "startTask", "Action",
"startAction", "startTask", "Action", "preserve_context",
"Field", "fields", "MessageType", "ActionType",
"ILogger", "Logger", "MemoryLogger", "addDestination",
"removeDestination", "addGlobalFields", "FileDestination",
Expand Down
42 changes: 42 additions & 0 deletions eliot/_action.py
Expand Up @@ -808,3 +808,45 @@ def startTask(logger=None, action_type=u"", _serializers=None, **fields):
_serializers)
action._start(fields)
return action


class TooManyCalls(Exception):
"""
The callable was called more than once.
This typically indicates a coding bug: the result of
C{preserve_context} should only be called once, and
C{preserve_context} should therefore be called each time you want to
pass the callable to a thread.
"""


def preserve_context(f):
"""
Package up the given function with the current Eliot context, and then
restore context and call given function when the resulting callable is
run. This allows continuing the action context within a different thread.
The result should only be used once, since it relies on
L{Action.serialize_task_id} whose results should only be deserialized
once.
@param f: A callable.
@return: One-time use callable that calls given function in context of
a child of current Eliot action.
"""
action = currentAction()
if action is None:
return f
task_id = action.serialize_task_id()
called = threading.Lock()

def restore_eliot_context(*args, **kwargs):
# Make sure the function has not already been called:
if not called.acquire(False):
raise TooManyCalls(f)

with Action.continue_task(task_id=task_id):
return f(*args, **kwargs)
return restore_eliot_context
12 changes: 2 additions & 10 deletions eliot/_traceback.py
Expand Up @@ -5,15 +5,12 @@

from __future__ import unicode_literals

import types
import traceback
import sys
from warnings import warn

from six import exec_

from ._message import EXCEPTION_FIELD, REASON_FIELD
from ._util import safeunicode
from ._util import safeunicode, load_module
from ._validation import MessageType, Field
from ._errors import _error_extraction

Expand Down Expand Up @@ -57,12 +54,7 @@ def _get_traceback_no_io():
"""
Return a version of L{traceback} that doesn't do I/O.
"""
module = types.ModuleType(str("_traceback_no_io"))
path = traceback.__file__
if path.endswith(".pyc") or path.endswith(".pyo"):
path = path[:-1]
with open(path) as f:
exec_(f.read(), module.__dict__, module.__dict__)
module = load_module(str("_traceback_no_io"), traceback)
class FakeLineCache(object):
def checkcache(self, *args, **kwargs):
None
Expand Down
23 changes: 22 additions & 1 deletion eliot/_util.py
Expand Up @@ -4,7 +4,9 @@

from __future__ import unicode_literals

from six import text_type as unicode
from types import ModuleType

from six import exec_, text_type as unicode


def safeunicode(o):
Expand Down Expand Up @@ -37,3 +39,22 @@ def saferepr(o):
except:
# Not much we can do about this...
return "eliot: unknown, unicode() raised exception"


def load_module(name, original_module):
"""
Load a copy of a module, distinct from what you'd get if you imported
it directly.
@param str name: The name of the new module.
@param original_module: The original module we're recreating.
@return: A new, distinct module.
"""
module = ModuleType(name)
path = original_module.__file__
if path.endswith(".pyc") or path.endswith(".pyo"):
path = path[:-1]
with open(path) as f:
exec_(f.read(), module.__dict__, module.__dict__)
return module
21 changes: 19 additions & 2 deletions eliot/prettyprint.py
Expand Up @@ -4,13 +4,16 @@

from __future__ import unicode_literals

import pprint
from datetime import datetime
from sys import stdin, stdout, argv

from ._bytesjson import loads
from ._message import (
TIMESTAMP_FIELD, TASK_UUID_FIELD, TASK_LEVEL_FIELD, MESSAGE_TYPE_FIELD,
)
from ._action import ACTION_TYPE_FIELD, ACTION_STATUS_FIELD
from ._util import load_module

from six import text_type as unicode, PY2, PY3
if PY3:
Expand All @@ -19,6 +22,18 @@
stdin = stdin.buffer


# On Python 2 pprint formats unicode with u'' prefix, which is inconsistent
# with Python 3 and not very nice to read. So we modify a copy to omit the u''.
if PY2:
def _nicer_unicode_repr(o, original_repr=repr):
if isinstance(o, unicode):
return original_repr(o.encode("utf-8"))
else:
return original_repr(o)
pprint = load_module(b"unicode_pprint", pprint)
pprint.repr = _nicer_unicode_repr


def pretty_format(message):
"""
Convert a message dictionary into a human-readable string.
Expand All @@ -31,11 +46,13 @@ def pretty_format(message):
MESSAGE_TYPE_FIELD, ACTION_TYPE_FIELD, ACTION_STATUS_FIELD}

def add_field(previous, key, value):
value = unicode(value).rstrip("\n")
value = unicode(pprint.pformat(value, width=40)).replace(
"\\n", "\n ").replace("\\t", "\t")
# Reindent second line and later to match up with first line's
# indentation:
lines = value.split("\n")
indent = " " * (2 + len(key) + 2) # lines are " <key>: <value>"
# indent lines are " <key length>| <value>"
indent = "{}| ".format(" " * (2 + len(key)))
value = "\n".join([lines[0]] + [indent + l for l in lines[1:]])
return " %s: %s\n" % (key, value)

Expand Down
67 changes: 65 additions & 2 deletions eliot/tests/test_action.py
Expand Up @@ -25,15 +25,19 @@
Action, _ExecutionContext, currentAction, startTask, startAction,
ACTION_STATUS_FIELD, ACTION_TYPE_FIELD, FAILED_STATUS, STARTED_STATUS,
SUCCEEDED_STATUS, DuplicateChild, InvalidStartMessage, InvalidStatus,
TaskLevel, WrittenAction, WrongActionType, WrongTask, WrongTaskLevel)
TaskLevel, WrittenAction, WrongActionType, WrongTask, WrongTaskLevel,
TooManyCalls)
from .._message import (
EXCEPTION_FIELD, REASON_FIELD, TASK_LEVEL_FIELD, TASK_UUID_FIELD,
MESSAGE_TYPE_FIELD, Message,
)
from .._output import MemoryLogger
from .._validation import ActionType, Field, _ActionSerializers
from ..testing import assertContainsFields
from ..testing import assertContainsFields, capture_logging
from .._parse import Parser
from .. import (
_action, add_destination, remove_destination, register_exception_extractor,
preserve_context,
)

from .strategies import (
Expand Down Expand Up @@ -1389,3 +1393,62 @@ class MyException(Exception):
"reason": "because",
"exception":
"eliot.tests.test_action.MyException"})



class PreserveContextTests(TestCase):
"""
Tests for L{preserve_context}.
"""
def add(self, x, y):
"""
Add two inputs.
"""
Message.log(message_type="child")
return x + y

def test_no_context(self):
"""
If C{preserve_context} is run outside an action context it just
returns the same function.
"""
wrapped = preserve_context(self.add)
self.assertEqual(wrapped(2, 3), 5)

def test_with_context_calls_underlying(self):
"""
If run inside an Eliot context, the result of C{preserve_context} is
the result of calling the underlying function.
"""
with startAction(action_type="parent"):
wrapped = preserve_context(self.add)
self.assertEqual(wrapped(3, y=4), 7)

@capture_logging(None)
def test_with_context_preserves_context(self, logger):
"""
If run inside an Eliot context, the result of C{preserve_context} runs
the wrapped function within a C{eliot:task} which is a child of
the original action.
"""
with startAction(action_type="parent"):
wrapped = preserve_context(lambda: self.add(3, 4))
thread = Thread(target=wrapped)
thread.start()
thread.join()
[tree] = Parser.parse_stream(logger.messages)
root = tree.root()
self.assertEqual((root.action_type,
root.children[0].action_type,
root.children[0].children[0].contents[
MESSAGE_TYPE_FIELD]),
("parent", "eliot:remote_task", "child"))

def test_callable_only_once(self):
"""
The result of C{preserve_context} can only be called once.
"""
with startAction(action_type="parent"):
wrapped = preserve_context(self.add)
wrapped(1, 2)
self.assertRaises(TooManyCalls, wrapped, 3, 4)

0 comments on commit 9d90d06

Please sign in to comment.