Skip to content

Commit

Permalink
main script: command line args and profiling
Browse files Browse the repository at this point in the history
The profiling makes use of the pyprof2calltree converter to
files expected by kcachegrind.

For now CLI args are in a separate argparse instance (to be better
integrated later with Anyblok's Configuration)
  • Loading branch information
gracinet committed Apr 26, 2018
1 parent ba782f9 commit 961b8bd
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 43 deletions.
2 changes: 1 addition & 1 deletion anyblok_wms_examples/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@


def init_config(**kw):
from .basic import config # flake8: noqa
from .launcher import config # flake8: noqa
37 changes: 37 additions & 0 deletions anyblok_wms_examples/launcher/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
# This file is a part of the AnyBlok / WMS Examples project
#
# Copyright (C) 2018 Georges Racinet <gracinet@anybox.fr>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.

from anyblok.config import Configuration


Configuration.add_application_properties(
'wms-bench', ['logging', 'database', 'wms-bench'],
prog='AnyBlok WMS Example and Bench',
description="Example Anyblok WMS application without human interaction, "
"for demonstration and performance analysis."""
)


@Configuration.add('wms-bench', label="WMS Benchmarking")
def bench_options(group):
group.add_argument("--timeslices", type=int, default=10,
help="Number of time slices to run")
group.add_argument("--planner-workers", type=int, default=2,
help="Number of planner worker processes to run")
group.add_argument("--regular-workers", type=int, default=4,
help="Number of regular worker processes to run. "
"in a normal application, these would be the ones "
"reacting to external events (bus, HTTP requests)")
group.add_argument("--with-profile", action='store_true',
help="If set, activates profiling")
group.add_argument("--profile-file", default='wms.stats',
help="Base filename for profile files. "
"if pyprof2calltree is installed, they'll "
"be converted in .kgrind files, to visualize "
"with kcachegrind")
114 changes: 72 additions & 42 deletions anyblok_wms_examples/launcher/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import os
import sys
import anyblok
from anyblok.config import Configuration
import logging
import time
from cProfile import Profile
from multiprocessing import Process
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from sqlalchemy import func


Expand All @@ -21,10 +22,39 @@
DEFAULT_ISOLATION = 'REPEATABLE READ' # 'SERIALIZABLE'


def regular_worker(arguments):
registry = anyblok.start('basic', configuration_groups=[],
loadwithoutmigration=True,
isolation_level=DEFAULT_ISOLATION)
def dump_profile(profile, path_template, wtype='regular'):
"""Dump profile statistics in file
The final file path is derived from path_template, wtype and the pid
"""
base_path, ext = os.path.splitext(path_template)
path = '%s_%s_%s' % (base_path, wtype, os.getpid()) + ext
try:
from pyprof2calltree import convert
except ImportError:
profile.dump_stats(path)
else:
convert(profile.getstats(), path + '.kgrind')


def start_registry(isolation_level):
"""Analog of anyblok.start(), with Configuration already loaded.
Calling ``anyblok.start()`` from a worker process while the main
process has already consumed command line arguments before forking
results in an empty configuration, which can't work at all
"""

from anyblok.blok import BlokManager
from anyblok.registry import RegistryManager
BlokManager.load()
return RegistryManager.get(Configuration.get('db_name'),
isolation_level=isolation_level,
loadwithoutmigration=True)


def regular_worker():
registry = start_registry(DEFAULT_ISOLATION)
if registry is None:
logging.critical("regular_worker: couldn't init registry")
sys.exit(1)
Expand All @@ -35,14 +65,19 @@ def regular_worker(arguments):
if previous_run_timeslice is None:
previous_run_timeslice = 0

timeslices = arguments.timeslices
timeslices = Configuration.get('timeslices')
process = Worker.insert(
pid=os.getpid(),
active=True,
done_timeslice=previous_run_timeslice,
max_timeslice=previous_run_timeslice + timeslices,
)

with_profile = Configuration.get('with_profile')

if with_profile:
profile = Profile()
profile.enable()
for i in range(1, 1 + timeslices):
registry.commit()
try:
Expand All @@ -52,19 +87,20 @@ def regular_worker(arguments):
break
process.wait_others(i)
registry.commit()
if with_profile:
profile.disable()
dump_profile(profile, Configuration.get('profile_file'))


def continuous(wtype, arguments,
def continuous(wtype,
isolation_level=DEFAULT_ISOLATION, cleanup=False):
"""Start a continuous worker.
:param bool cleanup: if ``True`` remove all existing records of
the same worker type. They are considered stale
from previous runs.
"""
registry = anyblok.start('basic', configuration_groups=[],
loadwithoutmigration=True,
isolation_level=DEFAULT_ISOLATION)
registry = start_registry(isolation_level)
if registry is None:
logging.critical("continuous worker(type=%s): couldn't init registry",
wtype)
Expand All @@ -83,51 +119,45 @@ def continuous(wtype, arguments,
logger.info("Regular workers not yet running. Waiting a bit")
time.sleep(0.1)
registry.rollback()

with_profile = Configuration.get('with_profile')
if with_profile:
profile = Profile()
profile.enable()
process.run()
if with_profile:
profile.disable()
dump_profile(profile, Configuration.get('profile_file'), wtype=wtype)


def reserver(arguments):
return continuous('Reserver', arguments, cleanup=True)

def reserver():
return continuous('Reserver', cleanup=True)

def planner(number, arguments):
return continuous('Planner', arguments, cleanup=(number == 0))


class Arguments:
pass
def planner(number):
return continuous('Planner', cleanup=(number == 0))


def run():
parser = ArgumentParser(
description="Run the application in pure batch mode",
formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--timeslices", type=int, default=10,
help="Number of time slices to run")
parser.add_argument("--planner-workers", type=int, default=2,
help="Number of planner worker processes to run")
parser.add_argument("--regular-workers", type=int, default=4,
help="Number of regular worker processes to run. "
"in a normal application, these would be the ones "
"reacting to external events (bus, HTTP requests)")

logging.basicConfig(level=logging.INFO)
# arguments = parser.parse_args()
arguments = Arguments()
arguments.regular_workers = 4
arguments.timeslices = 10
arguments.planner_workers = 2
anyblok.load_init_function_from_entry_points()
Configuration.load('wms-bench')

regular = Configuration.get('regular_workers')
planners = Configuration.get('planner_workers')
print("Starting example/bench for {timeslices} time slices, "
"with {regular} regular workers and "
"{planners} planners\n\n".format(
regular=regular, planners=planners,
timeslices=Configuration.get('timeslices')))

# starting regular workers right away, otherwise continuous workers
# would believe the test/bench run is already finished.
for process in (Process(target=regular_worker, args=(arguments, ))
for i in range(arguments.regular_workers)):
for process in (Process(target=regular_worker, args=())
for i in range(regular)):
process.start()

Process(target=reserver, args=(arguments, )).start()
Process(target=reserver, args=()).start()

planners = [Process(target=planner, args=(i, arguments, ))
for i in range(arguments.planner_workers)]
planners = [Process(target=planner, args=(i, ))
for i in range(planners)]
for process in planners:
process.start()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
'wms-example-launcher=anyblok_wms_examples.launcher:Launcher',
'wms-example-basic-seller=anyblok_wms_examples.basic:Seller',
],
'anyblok.init': 'wms_examples=anyblok_wms_examples:init_config',
'test_bloks': [
],
'console_scripts': [
Expand Down

0 comments on commit 961b8bd

Please sign in to comment.