-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Jean-Sebastien Dieu
committed
Feb 21, 2020
1 parent
c339896
commit 88727c2
Showing
21 changed files
with
988 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
include LICENSE | ||
include README.rst | ||
|
||
recursive-exclude * __pycache__ | ||
recursive-exclude * *.py[co] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
import pytest | ||
import time | ||
|
||
|
||
def test_sleep1(): | ||
time.sleep(1) | ||
|
||
|
||
@pytest.mark.monitor_skip_test | ||
def test_sleep2(): | ||
time.sleep(2) | ||
|
||
|
||
@pytest.mark.parametrize(('range_max', 'other'), [(10, "10"), (100, "100"), (1000, "1000"), (10000, "10000")]) | ||
def test_heavy(range_max, other): | ||
assert len(['a' * i for i in range(range_max)]) == range_max |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
import time | ||
|
||
def test_sleep_400ms(): | ||
time.sleep(0.4) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
import time | ||
|
||
|
||
def test_master_sleep(): | ||
t_a = time.time() | ||
b_continue = True | ||
while b_continue: | ||
t_delta = time.time() - t_a | ||
b_continue = t_delta < 5 | ||
|
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
class TestClass: | ||
def setup_method(self, test_method): | ||
self.__value = test_method.__name__ | ||
|
||
def test_method1(self): | ||
assert self.__value == "test_method1" |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import time | ||
import pytest | ||
|
||
pytestmark = pytest.mark.monitor_skip_test | ||
|
||
pytest_monitor_component = 'test' | ||
|
||
def test_not_monitored(): | ||
t_a = time.time() | ||
b_continue = True | ||
while b_continue: | ||
t_delta = time.time() - t_a | ||
b_continue = t_delta < 5 | ||
|
||
|
||
@pytest.mark.monitor_test | ||
def test_force_monitor(): | ||
t_a = time.time() | ||
b_continue = True | ||
while b_continue: | ||
t_delta = time.time() - t_a | ||
b_continue = t_delta < 5 | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
__version__ = "1.0.0" | ||
__author__ = "Jean-Sebastien Dieu" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import sqlite3 | ||
|
||
|
||
class DBHandler: | ||
def __init__(self, db_path): | ||
self.__db = db_path | ||
self.__cnx = sqlite3.connect(self.__db) if db_path else None | ||
|
||
def query(self, what, bind_to, many=False): | ||
cursor = self.__cnx.cursor() | ||
cursor.execute(what, bind_to) | ||
return cursor.fetchall() if many else cursor.fetchone() | ||
|
||
def insert_metric(self, run_date, item_start_date, env_id, scm_id, item, kind, component, | ||
total_time, user_time, kernel_time, cpu_usage, mem_usage): | ||
with self.__cnx: | ||
self.__cnx.execute(f'insert into TEST_METRICS(RUN_DATE,ITEM_START_TIME,ENV_H,SCM_ID,ITEM,KIND,' | ||
f'COMPONENT,TOTAL_TIME,USER_TIME,KERNEL_TIME,CPU_USAGE,MEM_USAGE) ' | ||
f'values (?,?,?,?,?,?,?,?,?,?,?,?)', | ||
(run_date, item_start_date, env_id, scm_id, item, kind, component, | ||
total_time, user_time, kernel_time, cpu_usage, mem_usage)) | ||
|
||
def insert_execution_context(self, exc_context): | ||
with self.__cnx: | ||
self.__cnx.execute(f'insert into EXECUTION_CONTEXTS(CPU_COUNT,CPU_FREQUENCY_MHZ,CPU_TYPE,CPU_VENDOR,' | ||
f'RAM_TOTAL_MB,MACHINE_NODE,MACHINE_TYPE,MACHINE_ARCH,SYSTEM_INFO,' | ||
f'PYTHON_INFO,ENV_H) values (?,?,?,?,?,?,?,?,?,?,?)', | ||
(exc_context.cpu_count, exc_context.cpu_frequency, exc_context.cpu_type, | ||
exc_context.cpu_vendor, exc_context.ram_total, exc_context.fqdn, exc_context.machine, | ||
exc_context.architecture, exc_context.system_info, exc_context.python_info, | ||
exc_context.hash())) | ||
|
||
def prepare(self): | ||
cursor = self.__cnx.cursor() | ||
cursor.execute(''' | ||
CREATE TABLE IF NOT EXISTS TEST_METRICS ( | ||
RUN_DATE varchar(64), -- Date of test run | ||
ENV_H varchar(64), -- Environment description identifier | ||
SCM_ID varchar(128), | ||
ITEM_START_TIME varchar(64), -- Effective start time of the test | ||
ITEM varchar(4096), -- Name of the item | ||
KIND varchar(64), -- Package, Module or function | ||
COMPONENT varchar(512) NULL, -- Tested component if any | ||
TOTAL_TIME float, -- Total time spent running the item | ||
USER_TIME float, -- time spent in user space | ||
KERNEL_TIME float, -- time spent in kernel space | ||
CPU_USAGE float, -- cpu usage | ||
MEM_USAGE float, -- Max resident memory used. | ||
FOREIGN KEY (ENV_H) REFERENCES EXECUTION_CONTEXTS(ENV_H) | ||
);''') | ||
cursor.execute(''' | ||
CREATE TABLE IF NOT EXISTS EXECUTION_CONTEXTS ( | ||
ENV_H varchar(64) primary key not null unique, | ||
CPU_COUNT integer, | ||
CPU_FREQUENCY_MHZ integer, | ||
CPU_TYPE varchar(64), | ||
CPU_VENDOR varchar(256), | ||
RAM_TOTAL_MB integer, | ||
MACHINE_NODE varchar(512), | ||
MACHINE_TYPE varchar(32), | ||
MACHINE_ARCH varchar(16), | ||
SYSTEM_INFO varchar(256), | ||
PYTHON_INFO varchar(512) | ||
); | ||
''') | ||
self.__cnx.commit() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
import os | ||
# -*- coding: utf-8 -*- | ||
import memory_profiler | ||
import psutil | ||
import pytest | ||
import time | ||
import warnings | ||
|
||
from pytest_monitor.sys_utils import ExecutionContext | ||
from pytest_monitor.session import PyTestMonitorSession | ||
|
||
PYTEST_MONITOR_SESSION = None | ||
# These dictionaries are used to compute members set on each items. | ||
# KEY is the marker set on a test function | ||
# value is a tuple: | ||
# expect_args: boolean | ||
# internal marker attribute name: str | ||
# callable that set member's value | ||
# default value | ||
PYTEST_MONITOR_VALID_MARKERS = {'monitor_skip_test': (False, 'monitor_skip_test', lambda x: True, False), | ||
'monitor_skip_test_if': (True, 'monitor_skip_test', lambda x: bool(x), False), | ||
'monitor_test': (False, 'monitor_force_test', lambda x: True, False), | ||
'monitor_test_if': (True, 'monitor_force_test', lambda x: bool(x), False)} | ||
PYTEST_MONITOR_DEPRECATED_MARKERS = {} | ||
|
||
|
||
def pytest_addoption(parser): | ||
group = parser.getgroup('monitor') | ||
group.addoption('--restrict-scope-to', dest='mtr_scope', default='function,module', | ||
help='Select the scope to monitor. By default, only function is monitored.' | ||
'Values are function, class, module, session. You can set one or more of these' | ||
'by listing them using a comma separated list') | ||
group.addoption('--parametrization-explicit', dest='want_explicit_ids', action='store_true', | ||
help='Set this option to distinguish parametrized tests given their values.' | ||
' This requires the parameters to be stringifiable.') | ||
group.addoption('--no-monitor', action='store_true', dest='mtr_none', help='Disable all traces') | ||
group.addoption('--remote', action='store', dest='remote', | ||
help='Remote server to send the results to. Format is <ADRESS>:<PORT>') | ||
group.addoption('--db', action='store', dest='mtr_db_out', default='.pymon', | ||
help='Use the given sqlite database for storing results.') | ||
group.addoption('--no-db', action='store_true', help='Do not store results in local db.') | ||
group.addoption('--force-component', action='store', | ||
help='Force the component to be set at the given value for the all tests run' | ||
' in this session.') | ||
group.addoption('--component-prefix', action='store', | ||
help='Prefix each found components with the given value (applies to all tests' | ||
' run in this session).') | ||
|
||
|
||
def pytest_configure(config): | ||
config.addinivalue_line("markers", "monitor_skip_test: mark test to be executed but not monitored.") | ||
config.addinivalue_line("markers", "monitor_skip_test_if(cond): mark test to be executed but " | ||
"not monitored if cond is verified.") | ||
config.addinivalue_line("markers", "monitor_test: mark test to be monitored (default behaviour)." | ||
" This can turn handy to whitelist some test when you have disabled" | ||
" monitoring on a whole module.") | ||
config.addinivalue_line("markers", "monitor_test_if(cond): mark test to be monitored if and only if cond" | ||
" is verified. This can help you in whitelisting tests to be monitored" | ||
" depending on some external conditions.") | ||
|
||
|
||
def pytest_runtest_setup(item): | ||
""" | ||
Validate marker setup and print warnings if usage of deprecated marker is identified. | ||
Setting marker attribute to the discovered item is done after the above described verification. | ||
:param item: Test item | ||
""" | ||
item_markers = {mark.name: mark for mark in item.iter_markers() if mark and mark.name.startswith('monitor_')} | ||
mark_to_del = [] | ||
for set_marker in item_markers.keys(): | ||
if set_marker not in PYTEST_MONITOR_VALID_MARKERS: | ||
warnings.warn(f"Nothing known about marker {set_marker}. Marker will be dropped.") | ||
mark_to_del.append(set_marker) | ||
if set_marker in PYTEST_MONITOR_DEPRECATED_MARKERS: | ||
warnings.warn(f'Marker {set_marker} is deprecated. Consider upgrading your tests') | ||
|
||
for marker in mark_to_del: | ||
del item_markers[marker] | ||
|
||
all_valid_markers = PYTEST_MONITOR_VALID_MARKERS | ||
all_valid_markers.update(PYTEST_MONITOR_DEPRECATED_MARKERS) | ||
# Setting instantiated markers | ||
for marker, _ in item_markers.items(): | ||
with_args, attr, fun_val, _ = all_valid_markers[marker] | ||
attr_val = fun_val(item_markers[marker].args[0]) if with_args else fun_val(None) | ||
setattr(item, attr, attr_val) | ||
|
||
# Setting other markers to default values | ||
for marker, marker_value in all_valid_markers.items(): | ||
with_args, attr, _, default = marker_value | ||
if not hasattr(item, attr): | ||
setattr(item, attr, default) | ||
|
||
# Finalize marker processing by enforcing some marker's value | ||
if item.monitor_force_test: | ||
# This test has been explicitly flagged as 'to be monitored'. | ||
item.monitor_skip_test = False | ||
|
||
|
||
@pytest.hookimpl(tryfirst=True, hookwrapper=True) | ||
def pytest_runtest_makereport(item, call): | ||
""" | ||
Used to identify the current call to add times. | ||
:param item: Test item | ||
:param call: call instance associated to the given item | ||
""" | ||
outcome = yield | ||
rep = outcome.get_result() | ||
|
||
if rep.when == 'call': | ||
setattr(item, 'test_run_duration', call.stop - call.start) | ||
setattr(item, 'test_effective_start_time', call.start) | ||
|
||
|
||
def pytest_runtest_call(item): | ||
setattr(item, 'monitor_results', False) | ||
setattr(item, 'monitor_component', getattr(item.module, 'pytest_monitor_component', '')) | ||
|
||
|
||
def pytest_pyfunc_call(pyfuncitem): | ||
""" | ||
Core sniffer logic. We encapsulate the test function in a sniffer function to collect | ||
memory results. | ||
""" | ||
testfunction = pyfuncitem.obj | ||
funcargs = pyfuncitem.funcargs | ||
testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames} | ||
|
||
def prof(): | ||
m = memory_profiler.memory_usage((testfunction, (), testargs), max_usage=True) | ||
setattr(pyfuncitem, 'mem_usage', m) | ||
setattr(pyfuncitem, 'monitor_results', True) | ||
prof() | ||
return True | ||
|
||
|
||
def pytest_make_parametrize_id(config, val, argname): | ||
if config.option.want_explicit_ids: | ||
return f'{argname}_{val}' | ||
|
||
|
||
@pytest.hookimpl(hookwrapper=True) | ||
def pytest_sessionstart(session): | ||
""" | ||
Instantiate a monitor session to save collected metrics. | ||
We yield at the end to let pytest pursue the execution. | ||
""" | ||
global PYTEST_MONITOR_SESSION | ||
if session.config.option.force_component and session.config.option.component_prefix: | ||
raise pytest.UsageError('Invalid usage: --force-component and --component-prefix are incompatible options!') | ||
if session.config.option.no_db and not session.config.option.remote and not session.config.option.mtr_none: | ||
warnings.warn('pytest-monitor: No storage specified but monitoring is requested. Disabling monitoring.') | ||
session.config.option.mtr_none = True | ||
component = session.config.option.force_component or session.config.option.component_prefix | ||
if session.config.option.component_prefix: | ||
component += '.{user_component}' | ||
if not component: | ||
component = '{user_component}' | ||
db = None if (session.config.option.mtr_none or session.config.option.no_db) else session.config.option.mtr_db_out | ||
remote = None if session.config.option.mtr_none else session.config.option.remote | ||
PYTEST_MONITOR_SESSION = PyTestMonitorSession(db=db, remote=remote, component=component) | ||
PYTEST_MONITOR_SESSION.set_environment_info(ExecutionContext()) | ||
yield | ||
|
||
|
||
def scoper(scope, monitor_skip_flag, set_scope): | ||
should_skip = monitor_skip_flag | ||
if scope in set_scope and not should_skip: | ||
global PYTEST_MONITOR_SESSION | ||
return PYTEST_MONITOR_SESSION | ||
return None | ||
|
||
|
||
@pytest.fixture(autouse=True, scope='module') | ||
def prf_module_tracer(request): | ||
t_a = time.time() | ||
yield | ||
wrt = scoper('module', False, request.config.option.mtr_scope) | ||
if wrt is not None: | ||
t_z = time.time() | ||
process = psutil.Process(os.getpid()) | ||
rss = process.memory_info().rss / 1024 ** 2 | ||
ptimes = process.cpu_times() | ||
component = getattr(request.module, 'pytest_monitor_component', '') | ||
wrt.add_test_info(request.module.__name__, 'module', component, | ||
t_a, t_z - t_a, ptimes.user, ptimes.system, | ||
rss) | ||
|
||
|
||
@pytest.fixture(autouse=True) | ||
def prf_tracer(request): | ||
yield | ||
wrt = scoper('function', request.node.monitor_skip_test, request.config.option.mtr_scope) | ||
if wrt is not None: | ||
process = psutil.Process(os.getpid()) | ||
ptimes = process.cpu_times() | ||
if request.node.monitor_results: | ||
full_item_name = f'{request.module.__name__}/{request.node.name}' | ||
wrt.add_test_info(full_item_name, 'function', request.node.monitor_component, | ||
request.node.test_effective_start_time, | ||
request.node.test_run_duration, | ||
ptimes.user, ptimes.system, request.node.mem_usage) |
Oops, something went wrong.