Skip to content

Commit

Permalink
bpo-40275: Avoid importing logging in test.support (pythonGH-19601)
Browse files Browse the repository at this point in the history
Import logging lazily in assertLogs() in unittest.
Move TestHandler from test.support to logging_helper.
  • Loading branch information
serhiy-storchaka committed Apr 25, 2020
1 parent 1699491 commit 515fce4
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 109 deletions.
5 changes: 0 additions & 5 deletions Doc/library/test.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1410,11 +1410,6 @@ The :mod:`test.support` module defines the following classes:
Run *test* and return the result.


.. class:: TestHandler(logging.handlers.BufferingHandler)

Class for logging support.


.. class:: FakePath(path)

Simple :term:`path-like object`. It implements the :meth:`__fspath__`
Expand Down
34 changes: 0 additions & 34 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import importlib
import importlib.util
import locale
import logging.handlers
import os
import platform
import re
Expand Down Expand Up @@ -99,8 +98,6 @@
"open_urlresource",
# processes
'temp_umask', "reap_children",
# logging
"TestHandler",
# threads
"threading_setup", "threading_cleanup", "reap_threads", "start_threads",
# miscellaneous
Expand Down Expand Up @@ -2368,37 +2365,6 @@ def optim_args_from_interpreter_flags():
optimization settings in sys.flags."""
return subprocess._optim_args_from_interpreter_flags()

#============================================================
# Support for assertions about logging.
#============================================================

class TestHandler(logging.handlers.BufferingHandler):
def __init__(self, matcher):
# BufferingHandler takes a "capacity" argument
# so as to know when to flush. As we're overriding
# shouldFlush anyway, we can set a capacity of zero.
# You can call flush() manually to clear out the
# buffer.
logging.handlers.BufferingHandler.__init__(self, 0)
self.matcher = matcher

def shouldFlush(self):
return False

def emit(self, record):
self.format(record)
self.buffer.append(record.__dict__)

def matches(self, **kwargs):
"""
Look for a saved dict whose keys/values match the supplied arguments.
"""
result = False
for d in self.buffer:
if self.matcher.matches(d, **kwargs):
result = True
break
return result

class Matcher(object):

Expand Down
29 changes: 29 additions & 0 deletions Lib/test/support/logging_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging.handlers

class TestHandler(logging.handlers.BufferingHandler):
def __init__(self, matcher):
# BufferingHandler takes a "capacity" argument
# so as to know when to flush. As we're overriding
# shouldFlush anyway, we can set a capacity of zero.
# You can call flush() manually to clear out the
# buffer.
logging.handlers.BufferingHandler.__init__(self, 0)
self.matcher = matcher

def shouldFlush(self):
return False

def emit(self, record):
self.format(record)
self.buffer.append(record.__dict__)

def matches(self, **kwargs):
"""
Look for a saved dict whose keys/values match the supplied arguments.
"""
result = False
for d in self.buffer:
if self.matcher.matches(d, **kwargs):
result = True
break
return result
5 changes: 3 additions & 2 deletions Lib/test/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from test.support.script_helper import assert_python_ok, assert_python_failure
from test import support
from test.support import socket_helper
from test.support.logging_helper import TestHandler
import textwrap
import threading
import time
Expand Down Expand Up @@ -3524,7 +3525,7 @@ def test_formatting(self):
@unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
'logging.handlers.QueueListener required for this test')
def test_queue_listener(self):
handler = support.TestHandler(support.Matcher())
handler = TestHandler(support.Matcher())
listener = logging.handlers.QueueListener(self.queue, handler)
listener.start()
try:
Expand All @@ -3540,7 +3541,7 @@ def test_queue_listener(self):

# Now test with respect_handler_level set

handler = support.TestHandler(support.Matcher())
handler = TestHandler(support.Matcher())
handler.setLevel(logging.CRITICAL)
listener = logging.handlers.QueueListener(self.queue, handler,
respect_handler_level=True)
Expand Down
69 changes: 69 additions & 0 deletions Lib/unittest/_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
import collections

from .case import _BaseTestCaseContext


_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
["records", "output"])

class _CapturingHandler(logging.Handler):
"""
A logging handler capturing all (raw and formatted) logging output.
"""

def __init__(self):
logging.Handler.__init__(self)
self.watcher = _LoggingWatcher([], [])

def flush(self):
pass

def emit(self, record):
self.watcher.records.append(record)
msg = self.format(record)
self.watcher.output.append(msg)


class _AssertLogsContext(_BaseTestCaseContext):
"""A context manager used to implement TestCase.assertLogs()."""

LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"

def __init__(self, test_case, logger_name, level):
_BaseTestCaseContext.__init__(self, test_case)
self.logger_name = logger_name
if level:
self.level = logging._nameToLevel.get(level, level)
else:
self.level = logging.INFO
self.msg = None

def __enter__(self):
if isinstance(self.logger_name, logging.Logger):
logger = self.logger = self.logger_name
else:
logger = self.logger = logging.getLogger(self.logger_name)
formatter = logging.Formatter(self.LOGGING_FORMAT)
handler = _CapturingHandler()
handler.setFormatter(formatter)
self.watcher = handler.watcher
self.old_handlers = logger.handlers[:]
self.old_level = logger.level
self.old_propagate = logger.propagate
logger.handlers = [handler]
logger.setLevel(self.level)
logger.propagate = False
return handler.watcher

def __exit__(self, exc_type, exc_value, tb):
self.logger.handlers = self.old_handlers
self.logger.propagate = self.old_propagate
self.logger.setLevel(self.old_level)
if exc_type is not None:
# let unexpected exceptions pass through
return False
if len(self.watcher.records) == 0:
self._raiseFailure(
"no logs of level {} or higher triggered on {}"
.format(logging.getLevelName(self.level), self.logger.name))
70 changes: 2 additions & 68 deletions Lib/unittest/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
import functools
import difflib
import logging
import pprint
import re
import warnings
Expand Down Expand Up @@ -297,73 +296,6 @@ def __exit__(self, exc_type, exc_value, tb):



_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
["records", "output"])


class _CapturingHandler(logging.Handler):
"""
A logging handler capturing all (raw and formatted) logging output.
"""

def __init__(self):
logging.Handler.__init__(self)
self.watcher = _LoggingWatcher([], [])

def flush(self):
pass

def emit(self, record):
self.watcher.records.append(record)
msg = self.format(record)
self.watcher.output.append(msg)



class _AssertLogsContext(_BaseTestCaseContext):
"""A context manager used to implement TestCase.assertLogs()."""

LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"

def __init__(self, test_case, logger_name, level):
_BaseTestCaseContext.__init__(self, test_case)
self.logger_name = logger_name
if level:
self.level = logging._nameToLevel.get(level, level)
else:
self.level = logging.INFO
self.msg = None

def __enter__(self):
if isinstance(self.logger_name, logging.Logger):
logger = self.logger = self.logger_name
else:
logger = self.logger = logging.getLogger(self.logger_name)
formatter = logging.Formatter(self.LOGGING_FORMAT)
handler = _CapturingHandler()
handler.setFormatter(formatter)
self.watcher = handler.watcher
self.old_handlers = logger.handlers[:]
self.old_level = logger.level
self.old_propagate = logger.propagate
logger.handlers = [handler]
logger.setLevel(self.level)
logger.propagate = False
return handler.watcher

def __exit__(self, exc_type, exc_value, tb):
self.logger.handlers = self.old_handlers
self.logger.propagate = self.old_propagate
self.logger.setLevel(self.old_level)
if exc_type is not None:
# let unexpected exceptions pass through
return False
if len(self.watcher.records) == 0:
self._raiseFailure(
"no logs of level {} or higher triggered on {}"
.format(logging.getLevelName(self.level), self.logger.name))


class _OrderedChainMap(collections.ChainMap):
def __iter__(self):
seen = set()
Expand Down Expand Up @@ -854,6 +786,8 @@ def assertLogs(self, logger=None, level=None):
self.assertEqual(cm.output, ['INFO:foo:first message',
'ERROR:foo.bar:second message'])
"""
# Lazy import to avoid importing logging if it is not needed.
from ._log import _AssertLogsContext
return _AssertLogsContext(self, logger, level)

def _getAssertEqualityFunc(self, first, second):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The :mod:`logging` package is now imported lazily in :mod:`unittest` only
when the :meth:`~unittest.TestCase.assertLogs` assertion is used.

0 comments on commit 515fce4

Please sign in to comment.