Skip to content

Commit

Permalink
New asyncio thread execution, added message queue for messaging betwe…
Browse files Browse the repository at this point in the history
…en threads
  • Loading branch information
cerealkill committed Mar 27, 2019
1 parent 5cf48e9 commit e6604a4
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 86 deletions.
1 change: 0 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pylint = "*"
colorlog = "*"
web3 = "<5.0.0,>=4.8.0"
base58 = "*"
websockets = "*"

[requires]
python_version = "3.7"
3 changes: 1 addition & 2 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions energyweb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
Bond - Your favorite library for logging energy data on the blockchain
"""

import energyweb.config as config

from energyweb.interfaces import Serializable, ExternalData, IntegrationPoint, BlockchainClient
from energyweb.log import Logger
from energyweb.dispatcher import App, EventLoop, Task
from energyweb.dispatcher import App, Task
from energyweb.carbonemission import CarbonEmissionData
from energyweb.eds.interfaces import EnergyUnit, EnergyData, EnergyDevice
from energyweb.smart_contract.origin_v1 import ConsumedEnergy, ProducedEnergy, OriginV1
Expand All @@ -22,7 +24,5 @@
__author__ = 'Paul Depraz <github.com/cerealkill>'
__repository__ = 'github.com/energywebfoundation/ew-link-bond'
__status__ = "pre-alpha"
__version__ = "0.3.7"
__version__ = "0.4.0"
__date__ = "14 December 2018"

modules = {'Eumelv1': {'energyweb.eds', 'Eumelv1'}}
121 changes: 74 additions & 47 deletions energyweb/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,100 +3,127 @@
"""
import asyncio
import datetime
import time
import concurrent


class Task:
"""
Tasks are routines that run from time to time respecting an interval and spawn coroutines.
These routines may only execute if a trigger condition is fired.
"""
def __init__(self, polling_interval: datetime.timedelta, eager: bool = False):
def __init__(self, queue: {str: asyncio.Queue}, polling_interval: datetime.timedelta = None, eager: bool = False, run_forever: bool = True):
"""
:param polling_interval: in seconds
:param queue: app asyncio queues for messages exchange between threads
:param eager: if main waits polling time first or is eager to start
"""
self.polling_interval = polling_interval
self.queue = queue
self.run_forever = run_forever
self.eager = eager

def prepare(self):
async def _prepare(self):
"""
Perform steps required prior to running the main task
run exactly once
"""
raise NotImplementedError

def main(self) -> bool:
async def _main(self, *args):
"""
The main task
run until it returns False
"""
raise NotImplementedError

def finish(self) -> None:
async def _finish(self):
"""
Perform steps required after running the main task
run exactly once
"""
raise NotImplementedError

def _handle_exception(self, e: Exception):
"""
Handle exceptions when they occur
"""
raise NotImplementedError

def run(self, *args):
async def run(self, *args):
"""
run all steps of the task
"""
self.prepare()
while self.main(*args) is True:
# sleep yields to the thread scheduler
time.sleep(self.polling_interval.total_seconds())
self.finish()
async def main_loop():
if self.polling_interval and not self.eager:
await asyncio.sleep(self.polling_interval.total_seconds())
await self._main(*args)
if self.polling_interval and self.eager:
await asyncio.sleep(self.polling_interval.total_seconds())
try:
await self._prepare()
try:
await main_loop()
while self.run_forever:
await asyncio.ensure_future(main_loop())
except Exception as e:
self._handle_exception(e)
finally:
await self._finish()
if self.run_forever:
await self.run(*args)
except Exception as e:
self._handle_exception(e)


class EventLoop:
class App:
"""
Delegate and run a set of Tasks in loop that handles I/O or blocking with async calls.
https://en.wikipedia.org/wiki/Event_loop
https://docs.python.org/3/library/asyncio.html
General application abstraction
"""
def __init__(self):
self.task_list = []
self.started_tasks = []
self.tasks: [asyncio.tasks] = []
self.queue: {str: asyncio.Queue} = {}
self.loop = asyncio.get_event_loop()
self._configure()

def add_task(self, task: Task, *args):
def _configure(self):
"""
add task to event task list
Configuration phase of the app.
"""
if not task:
raise Exception('Please add a Task type with callable task named method.')
self.task_list.append((task, args))
raise NotImplementedError

async def run(self):
def _clean_up(self):
"""
execute all tasks in task list in their own thread
Overwrite this method to implement clean up logic like closing sockets and streams.
"""
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
for (task, args) in self.task_list:
started_task = loop.run_in_executor(pool, task.run, *args)
self.started_tasks.append(started_task)
for task in self.started_tasks:
await task
raise NotImplementedError

def _handle_exception(self, e: Exception):
"""
Handle exceptions when they occur
"""
raise NotImplementedError

class App(EventLoop):
"""
General application abstraction
"""
def _register_task(self, task: Task, *args):
"""
Add task to be executed in run time
"""
if not task:
raise Exception('Please add a Task type with callable task named method.')
self.tasks.append((task, args))

def configure(self):
def _register_queue(self, queue_id: str, max_size: int = 0):
"""
Overwrite this method to implement application logic
add tasks here
Creates a new asyncio Queue for messages exchange between threads
:param queue_id: Name of queue index
:param max_size: Maximum number of messages
"""
raise NotImplementedError
self.queue[queue_id] = asyncio.Queue(maxsize=max_size)

def run(self):
"""
Call the overwritten configure and run the eventLoop
execute all tasks in task list in their own thread
"""
self.configure()
# python 3.6
#loop = asyncio.get_event_loop()
#loop.run_until_complete(super().run())
asyncio.run(super().run())
try:
self.loop.run_until_complete(asyncio.gather(*[task.run(*args) for task, args in self.tasks]))
except Exception as e:
self._handle_exception(e)
finally:
self._clean_up()
134 changes: 102 additions & 32 deletions test/threadtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,120 @@
import energyweb
import urllib

class MyTask(energyweb.dispatcher.Task):
def main(self, duration, character):

class PrintTask(energyweb.dispatcher.Task):

async def _finish(self):
print(f'Task {self.__class__.__name__} finished')

async def _prepare(self):
print(f'Task {self.__class__.__name__} prepared')

def _handle_exception(self, e: Exception):
print(f'Task {self.__class__.__name__} failed because {e.with_traceback(e.__traceback__)}')

async def _main(self, duration, character):
for _ in range(duration):
print(character, end='', flush=True)
time.sleep(1)
return False
print('\n')


class PostManTask(energyweb.dispatcher.Task):

def __init__(self, queue, send_interval, messages: [str]):
self.messages = messages
super().__init__(queue, send_interval, run_forever=False)

async def _finish(self):
print(f'Task {self.__class__.__name__} finished')

async def _prepare(self):
import random
random.shuffle(self.messages, random.random)
print(f'Task {self.__class__.__name__} prepared')

def _handle_exception(self, e: Exception):
print(f'Task {self.__class__.__name__} failed because {e.with_traceback(e.__traceback__)}')

async def _main(self):
for msg in self.messages:
await self.queue['mail_box'].put(msg)
await asyncio.sleep(self.polling_interval.total_seconds())
raise AttributeError(f'Task {self.__class__.__name__} ended delivering messages.')


class MailCheckerTask(energyweb.dispatcher.Task):

async def _finish(self):
print(f'Task {self.__class__.__name__} finished')

async def _prepare(self):
print(f'Task {self.__class__.__name__} prepared')

def _handle_exception(self, e: Exception):
print(f'Task {self.__class__.__name__} failed because {e.with_traceback(e.__traceback__)}')

async def _main(self):
messages = []
while not self.queue['mail_box'].empty():
messages.append(self.queue['mail_box'].get_nowait())
if len(messages) > 0:
[print(msg) for msg in messages]


class NetworkTask(energyweb.dispatcher.Task):
"""
Example Task reading and writing network
"""
def prepare(self):
print('Net try open')
return super().prepare()

def main(self, number):
def __init__(self, queue, polling_interval):
self.net = None
super().__init__(queue, polling_interval, eager=True, run_forever=False)

def _handle_exception(self, e: Exception):
print(f'Task {self.__class__.__name__} failed because {e.with_traceback(e.__traceback__)}')

async def _prepare(self):
print('Net try open')
try:
net = urllib.request.urlopen(f'http://localhost:8000/{number}')
self.net = urllib.request.urlopen(f'http://localhost:8000')
except urllib.error.URLError:
print('Net unavailable')
return True

response = net.read().decode().strip()
if response == 'ja':
print('Here we go', end='')
for _ in range(3):
print('.', end='', flush=True)
time.sleep(1)
print('')
elif response == 'stop':
return False
return True

def finish(self):

async def _main(self):
if self.net:
response = self.net.read().decode().strip()
print(response)
self.queue['network_status'].put({'online': True})
else:
raise urllib.error.URLError('Net Unavailable.')

async def _finish(self):
print('Net close')
return super().finish()
await self.queue['network_status'].put({'online': False})

class MyApp(energyweb.dispatcher.App):
def configure(self):
mytask = MyTask(polling_interval=datetime.timedelta(seconds=1))
networktask = NetworkTask(polling_interval=datetime.timedelta(seconds=5))
self.add_task(mytask, 5, '>')
self.add_task(mytask, 8, '<')
self.add_task(networktask, 1)

myapp = MyApp()
myapp.run()

def _handle_exception(self, e: Exception):
print('==== APP ERROR ====')
print(f'{e.with_traceback(e.__traceback__)}')

def _clean_up(self):
print('==== App finished, cleaning up. ====')

def _configure(self):
print('==== App reading configuration ====')
self._register_queue('mail_box')
self._register_queue('network_status', 1)
self._register_task(NetworkTask(self.queue, datetime.timedelta(seconds=20)))
messages = ['Hello Mike', 'Don\'t forget my bday', 'Have a nice day']
self._register_task(PostManTask(self.queue, datetime.timedelta(seconds=10), messages))
self._register_task(MailCheckerTask(self.queue, datetime.timedelta(seconds=20)))
self._register_task(PrintTask(self.queue, datetime.timedelta(minutes=2)), 3, '>')


if __name__ == '__main__':
app = MyApp()
app.run()

0 comments on commit e6604a4

Please sign in to comment.