Skip to content

Commit

Permalink
Issue #6: configures and writes HTML (part two)
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Lehmann committed Jan 20, 2018
1 parent 4f677fe commit 8ee6914
Show file tree
Hide file tree
Showing 17 changed files with 511 additions and 185 deletions.
169 changes: 169 additions & 0 deletions docs/notebooks/Multiprocessing.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import multiprocessing\n",
"from contextlib import closing\n",
"\n",
"def parallel(worker, data):\n",
" with closing(multiprocessing.Pool(multiprocessing.cpu_count())) as pool:\n",
" for result in pool.map(worker, data):\n",
" yield result"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Sleep 9 seconds\n",
"Sleep 1 seconds\n",
"Sleep 10 seconds\n",
"Sleep 7 seconds\n",
"Sleep 9 seconds\n",
"Sleep 3 seconds\n",
"Sleep 3 seconds\n",
"Sleep 10 seconds\n",
"Sleep 2 seconds\n",
"Sleep 8 seconds\n"
]
},
{
"data": {
"text/plain": [
"[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import time\n",
"import random\n",
"\n",
"def simple_worker(data):\n",
" wait_seconds = random.randint(1, 10)\n",
" print(\"Sleep %d seconds\" % wait_seconds)\n",
" time.sleep(wait_seconds)\n",
" return data\n",
" \n",
"list(parallel(simple_worker, list(range(10))))"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Sleep 4 seconds\n",
"Sleep 7 seconds\n",
"Sleep 8 seconds\n",
"Sleep 8 seconds\n",
"Sleep 5 seconds\n",
"Sleep 2 seconds\n",
"Sleep 10 seconds\n",
"Sleep 4 seconds\n",
"Sleep 9 seconds\n",
"Sleep 9 seconds\n",
"Consumer receives 9\n",
"Consumer receives 1\n",
"Consumer receives 5\n",
"Consumer receives 7\n",
"Consumer receives 0\n",
"Consumer receives 2\n",
"Consumer receives 3\n",
"Consumer receives 8\n",
"Consumer receives 6\n",
"Consumer receives 4\n",
"Waiting for consumer\n",
"done\n"
]
}
],
"source": [
"class Consumer(multiprocessing.Process):\n",
" def __init__(self, result_queue):\n",
" multiprocessing.Process.__init__(self)\n",
" self.result_queue = result_queue\n",
" self.keep_running = True\n",
"\n",
" def run(self):\n",
" while self.keep_running:\n",
" result = self.result_queue.get()\n",
" print(\"Consumer receives %s\" % result)\n",
" print(\"consumer stops now\")\n",
"\n",
"class Producer(multiprocessing.Process):\n",
" def __init__(self, result_queue, value):\n",
" multiprocessing.Process.__init__(self)\n",
" self.result_queue = result_queue\n",
" self.value = value\n",
" self.start()\n",
"\n",
" def run(self):\n",
" wait_seconds = random.randint(1, 10)\n",
" print(\"Sleep %d seconds\" % wait_seconds)\n",
" time.sleep(wait_seconds)\n",
" self.result_queue.put(self.value)\n",
"\n",
"\n",
"# Establish communication queues\n",
"result_queue = multiprocessing.Queue()\n",
"consumer = Consumer(result_queue)\n",
"consumer.start()\n",
"\n",
"tasks = [Producer(result_queue, idx) for idx in range(10)]\n",
"for task in tasks:\n",
" task.join()\n",
"\n",
"print(\"Waiting for consumer\")\n",
"consumer.keep_running = False\n",
"result_queue.close()\n",
"consumer.terminate()\n",
"print(\"done\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
38 changes: 15 additions & 23 deletions spline/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from spline.components.hooks import Hooks
from spline.components.config import ApplicationOptions
from spline.tools.logger import Logger
from spline.tools.filters import find_matrix
from spline.tools.event import Event
from spline.tools.report.collector import Collector
from spline.validation import Validator
Expand Down Expand Up @@ -91,26 +92,6 @@ def validate_document(self, definition):
self.logger.info("Schema validation for '%s' succeeded", definition)
return document

@staticmethod
def find_matrix(document):
"""
Find X{matrix} in document.
The spline syntax allows following definitions:
- B{'matrix'} - ordered execution of each pipeline (short form)
- B{'matrix(ordered)'} - ordered execution of each pipeline (more readable form)
- B{'matrix(parallel)'} - parallel execution of each pipeline
@type document: dict
@param document: validated spline document loaded from a yaml file.
@rtype: list
@return: matrix as a part of the spline document or an empty list if not given.
"""
return document['matrix'] if 'matrix' in document \
else document['matrix(ordered)'] if 'matrix(ordered)' in document \
else document['matrix(parallel)'] if 'matrix(parallel)' in document \
else []

def run_matrix(self, matrix_definition, document):
"""
Running pipeline via a matrix.
Expand Down Expand Up @@ -142,9 +123,8 @@ def run(self, definition):
self.logger.info("Stopping after validation as requested!")
return

Collector().configure(document)

matrix = Application.find_matrix(document)
collector = Application.create_and_run_collector(document)
matrix = find_matrix(document)
if len(matrix) == 0:
model = {} if 'model' not in document else document['model']
pipeline = Pipeline(model=model, options=self.options)
Expand All @@ -158,6 +138,18 @@ def run(self, definition):
sys.exit(1)

self.event.succeeded()
# shutdown of collector
collector.queue.put(None)
collector.join()

@staticmethod
def create_and_run_collector(document):
"""Create and run collector process for report data."""
collector = Collector()
collector.store.configure(document)
Event.configure(collector_queue=collector.queue)
collector.start()
return collector


@click.command()
Expand Down
8 changes: 5 additions & 3 deletions spline/components/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ class Stage(object):

def __init__(self, pipeline, title):
"""Initializing with reference to pipeline main object."""
# providing title of stage and name of matrix to the event
self.event = Event.create(__name__, stage=title)
matrix = 'default'
if 'PIPELINE_MATRIX' in pipeline.data.env_list[0]:
self.event.information.update({'matrix': pipeline.data.env_list[0]['PIPELINE_MATRIX']})
matrix = pipeline.data.env_list[0]['PIPELINE_MATRIX']

# providing title of stage and name of matrix to the event
self.event = Event.create(__name__, matrix=matrix, stage=title)

self.logger = Logger.get_logger(__name__)
self.pipeline = pipeline
Expand Down
6 changes: 5 additions & 1 deletion spline/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ def matrix_worker(data):
matrix = data['matrix']
Logger.get_logger(__name__ + '.worker').info(
"Processing pipeline for matrix entry '%s'", matrix['name'])
pipeline = Pipeline(model=data['model'], env=matrix['env'], options=data['options'])

env = matrix['env'].copy()
env.update({'PIPELINE_MATRIX': matrix['name']})

pipeline = Pipeline(model=data['model'], env=env, options=data['options'])
pipeline.hooks = data['hooks']
return pipeline.process(data['pipeline'])

Expand Down
20 changes: 14 additions & 6 deletions spline/tools/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
import time
from datetime import datetime
from spline.tools.logger import Logger
from spline.tools.report.collector import Collector, CollectorUpdate
from spline.tools.report.collector import CollectorUpdate


class Event(object):
"""Event mechanism for pipeline."""

is_logging_enabled = False
collector_queue = None

def __init__(self, context, timestamp, **kwargs):
"""Initialize event with optional additional information."""
Expand All @@ -42,9 +43,16 @@ def __init__(self, context, timestamp, **kwargs):
self.logger = Logger.get_logger(None)

@staticmethod
def configure(is_logging_enabled):
def configure(**kwargs):
"""Global configuration for event handling."""
Event.is_logging_enabled = is_logging_enabled
for key in kwargs:
if key == 'is_logging_enabled':
Event.is_logging_enabled = kwargs[key]
elif key == 'collector_queue':
Event.collector_queue = kwargs[key]
else:
Logger.get_logger(__name__).error("Unknown key %s in configure or bad type %s",
key, type(kwargs[key]))

@staticmethod
def create(context, **kwargs):
Expand Down Expand Up @@ -80,9 +88,9 @@ def duration(self):

def update_report_collector(self, timestamp):
"""Updating report collector for pipeline details."""
if 'stage' in self.information:
Collector().update(CollectorUpdate(
matrix=self.information['matrix'] if 'marix' in self.information else 'default',
if 'stage' in self.information and Event.collector_queue is not None:
Event.collector_queue.put(CollectorUpdate(
matrix=self.information['matrix'] if 'matrix' in self.information else 'default',
stage=self.information['stage'],
status=self.status,
timestamp=timestamp,
Expand Down

0 comments on commit 8ee6914

Please sign in to comment.