From d7a8462785ca3d9f1581e6de3573aff0d6271106 Mon Sep 17 00:00:00 2001 From: pylover Date: Tue, 22 May 2018 21:30:53 +0430 Subject: [PATCH] Push and Pull --- easyq/client.py | 10 ++++----- easyq/configuration.py | 2 +- easyq/queuemanager.py | 32 ++++++++++++++++++++------- easyq/server.py | 46 ++++++++++++++++++++++----------------- easyq/tests/test_login.py | 3 +++ easyq/tests/test_pull.py | 15 +++++++++++-- easyq/tests/test_push.py | 16 +++++++++----- practice/taskcancel.py | 1 + 8 files changed, 83 insertions(+), 42 deletions(-) diff --git a/easyq/client.py b/easyq/client.py index 5db8b8a..8a7984c 100644 --- a/easyq/client.py +++ b/easyq/client.py @@ -6,6 +6,7 @@ class ClientProtocol(asyncio.Protocol): identity = None chunk = None + onerror = None class Patterns: regex = functools.partial(re.compile, flags=re.DOTALL) @@ -53,12 +54,10 @@ def data_received(self, data): asyncio.ensure_future(self.process_response(response)) def connection_lost(self, exc): - print('The server closed the connection') self.logged_in.set_result(False) async def process_response(self, data): - print(b'Data from server: ' + data) - m = self.Patterns.message.match(data) + m = self.Patterns.incomming.match(data) if m is not None: return await self.dispatch(**m.groupdict()) @@ -76,12 +75,13 @@ async def dispatch(self, message, queue): handlers = self.handlers.get(queue) if handlers: await asyncio.gather( - *(handler(message, queue) for handler in handlers), + *(handler(queue, message) for handler in handlers), return_exceptions=True ) async def error(self, err): - print(f'Error from server: {err}') + if self.onerror: + await self.onerror(self, err) class EasyQClientError(Exception): diff --git a/easyq/configuration.py b/easyq/configuration.py index 132f317..bf7cc19 100644 --- a/easyq/configuration.py +++ b/easyq/configuration.py @@ -24,7 +24,7 @@ dispatchers: 1 dispatcher: messages_per_queue: 5 - intervals: .5 + intervals: .3 ''' diff --git a/easyq/queuemanager.py b/easyq/queuemanager.py index 26cae7f..28874f4 100644 --- a/easyq/queuemanager.py +++ b/easyq/queuemanager.py @@ -8,6 +8,10 @@ queues = {} +class AlreadySubscribedError(Exception): + pass + + class Queue: def __init__(self, name): @@ -19,21 +23,31 @@ def __init__(self, name): def push(self, message): self._queue.put_nowait(message) + def get(self): + return self._queue.get_nowait() + def subscribe(self, protocol): + if protocol in self.subscriptors: + raise AlreadySubscribedError() + logger.info(f'Queue {self.name.decode()} was subscribed by {protocol.identity}') self.subscriptors.append(protocol) def unsubscribe(self, protocol): + logger.info(f'Queue {self.name.decode()} was ignored by {protocol.identity}') self.subscriptors.remove(protocol) async def dispatch(self, message): for protocol in self.subscriptors: - await protocol.dispatch(queue, message) + logger.debug( + f'Dispatching message {message} from queue {self.name.decode()} to {protocol.identity}' + ) + await protocol.dispatch(self.name, message) def getqueue(name) -> Queue: if name not in queues: queues[name] = Queue(name) - logger.info(f'Queue {name} just created.') + logger.info(f'Queue {name.decode()} just created.') return queues[name] @@ -42,15 +56,17 @@ async def dispatcher(name, intervals=.5, messages_per_queue=5): cycle = 0 try: while True: - logger.debug(f'Cycle: {cycle}') - for queue in queues: + if cycle % 100 == 0: + logger.debug(f'Cycle: {cycle}') + for queue in queues.values(): try: - for i in range(message_per_queue): - message = queue.get_nowait() + for i in range(messages_per_queue): + message = queue.get() + logger.debug(f'Dispatching {message}') await queue.dispatch(message) - except EmptyQueue: - logger.info(f'Queue {queue.decode()} is empty') + except asyncio.QueueEmpty: + pass cycle += 1 await asyncio.sleep(intervals) diff --git a/easyq/server.py b/easyq/server.py index 3e1ed87..a6bf73f 100644 --- a/easyq/server.py +++ b/easyq/server.py @@ -5,16 +5,12 @@ from .authentication import authenticate, initialize as initialize_authentication from .configuration import settings from .logging import getlogger -from .queuemanager import getqueue, dispatcher +from .queuemanager import getqueue, dispatcher, AlreadySubscribedError """ -> PULL FROM queue1 -> IGNORE queue1 -<- MESSAGE FROM queue1 [ID 122] -<- MESSAGE 122 IS DELIVERED TO user1 - """ @@ -30,7 +26,7 @@ class ServerProtocol(asyncio.Protocol): class Patterns: regex = functools.partial(re.compile, flags=re.DOTALL + re.IGNORECASE) login = regex(b'^LOGIN (?P.+)$') - push = regex(b'^PUSH (?P.+) INTO (?P[0-9a-zA-Z\._:-]+)$') + push = regex(b'^PUSH (?P.+)(?:\s|\n)INTO (?P[0-9a-zA-Z\._:-]+)$') pull = regex(b'^PULL FROM (?P[0-9a-zA-Z\._:-]+)$') def connection_made(self, transport): @@ -62,7 +58,7 @@ def data_received(self, data): # Scheduling a login task, if everything went ok, then the resume_reading will be # called in the future. - asyncio.ensure_future(self.login(credentials)) + asyncio.ensure_future(self.login(credentials.strip())) return # Splitting the received data with \n and adding buffered chunk if available @@ -109,7 +105,13 @@ async def push(self, message, queue): getqueue(queue).push(message) except asyncio.QueueFull: self.logger.warning(f'Queue is full: {self.name}') - self.transport.write(b'ERROR: QUEUE %s IS FULL;\n' % queue) + self.transport.write(b'ERROR: QUEUE %s IS FULL;\n' % queue.decode()) + + async def pull(self, queue): + try: + getqueue(queue).subscribe(self) + except AlreadySubscribedError: + self.transport.write(b'ERROR: QUEUE %s IS ALREASY SUBSCRIBED;\n' % queue) async def process_command(self, command): logger.debug(f'Processing command: {command.decode()} by {self.identity}') @@ -117,6 +119,10 @@ async def process_command(self, command): if m is not None: return await self.push(**m.groupdict()) + m = self.Patterns.pull.match(command) + if m is not None: + return await self.pull(**m.groupdict()) + logger.debug(f'Invalid command: {command}') self.transport.write(b'ERROR: Invalid command: %s;\n' % command) @@ -131,8 +137,11 @@ def create_dispatchers(workers=1, **kwargs): class Server: _server = None + _dispatchers_task = None + def __init__(self, bind=None, loop=None): self.loop = loop or asyncio.get_event_loop() + self.logger = getlogger('SERVER') # Host and Port to listen bind = bind or settings.bind @@ -141,23 +150,20 @@ def __init__(self, bind=None, loop=None): # Configuring the authenticator initialize_authentication() - self.server_coro = self.loop.create_server(ServerProtocol, self.host, self.port) - self._dispatchers = [] - async def start(self): - self._server = await self.server_coro - - for i in range(settings.dispatchers): - self._dispatchers.append( - self.loop.create_task(dispatcher('WORKER %d' % i, **settings.dispatcher)) - ) + self._server = await self.loop.create_server(ServerProtocol, self.host, self.port) + self._dispatchers_task = self.loop.create_task(asyncio.gather( + *(dispatcher('WORKER %d' % i, **settings.dispatcher) + for i in range(settings.dispatchers)) + )) async def close(self): - for dispatcher in self._dispatchers: - dispatcher.cancel() + self.logger.info('Shutting down...') + self._dispatchers_task.cancel() self._server.close() await self._server.wait_closed() - await asyncio.wait(self._dispatchers) + while self._dispatchers_task.cancelled(): + await asyncio.sleep(.2) @property def address(self): diff --git a/easyq/tests/test_login.py b/easyq/tests/test_login.py index 3244e77..828025c 100644 --- a/easyq/tests/test_login.py +++ b/easyq/tests/test_login.py @@ -41,6 +41,9 @@ async def test_trust(self): with self.assertRaises(AuthenticationError): await connect('colon:not:allowed') + client = await connect('testuser\n') + self.assertEqual('testuser', client.identity) + if __name__ == '__main__': unittest.main() diff --git a/easyq/tests/test_pull.py b/easyq/tests/test_pull.py index 0d7f84a..1b48d34 100644 --- a/easyq/tests/test_pull.py +++ b/easyq/tests/test_pull.py @@ -3,7 +3,8 @@ from easyq.server import ServerProtocol from easyq.tests.helpers import EasyQTestServer, TestCase -from easyq.client import AuthenticationError +from easyq.client import AuthenticationError, ClientProtocol +from easyq.queuemanager import AlreadySubscribedError class PullTestCase(TestCase): @@ -32,7 +33,7 @@ async def test_incomming_message_parser(self): (b'MESSAGE Hello\nDear FROM q1', b'Hello\nDear', b'q1'), ] - for command, expected_massage, expected_queue in whitelist: + for command, expected_message, expected_queue in whitelist: m = ClientProtocol.Patterns.incomming.match(command) self.assertIsNotNone(m) message, queue = m.groups() @@ -51,6 +52,7 @@ async def test_pull(self): async with self.server() as connect: client = await connect('testuser') messages = [] + errors = [] async def message_received(queue, message): messages.append(message) @@ -60,6 +62,15 @@ async def message_received(queue, message): await asyncio.sleep(1.1) self.assertEqual([b'Hello'], messages) + # pulling twice! + async def error(client_, error): + errors.append(error) + + client.onerror = error + await client.pull(b'q1', message_received) + await asyncio.sleep(1.1) + self.assertEqual([b'ERROR: QUEUE q1 IS ALREASY SUBSCRIBED'], errors) + if __name__ == '__main__': unittest.main() diff --git a/easyq/tests/test_push.py b/easyq/tests/test_push.py index 254a433..05cfd34 100644 --- a/easyq/tests/test_push.py +++ b/easyq/tests/test_push.py @@ -16,16 +16,20 @@ async def test_parser(self): (b'PUSH Hello dear INTO bad INTO myq', b'Hello dear INTO bad', b'myq'), (b'PUSH Hello INTO myq:a.b_c', b'Hello', b'myq:a.b_c'), (b'push hello into myq:a.b_c', b'hello', b'myq:a.b_c'), + (b'push hello\ninto q1', b'hello', b'q1'), ] for command, expected_message, expected_queue in whitelist: - m = ServerProtocol.Patterns.push.match(command) - if m is None: + try: + m = ServerProtocol.Patterns.push.match(command) + self.assertIsNotNone(m) + print(m.groups()) + message, queue, = m.groups() + self.assertEqual(expected_queue, queue) + self.assertEqual(expected_message, message) + except AssertionError: print(command) - self.assertIsNotNone(m) - message, queue = m.groups() - self.assertEqual(expected_queue, queue) - self.assertEqual(expected_message, message) + raise blacklist = [ b'PUSH Hello INTO my\nq', diff --git a/practice/taskcancel.py b/practice/taskcancel.py index c9a7640..281f257 100644 --- a/practice/taskcancel.py +++ b/practice/taskcancel.py @@ -7,6 +7,7 @@ async def worker(name): c = 0 running = True try: + raise Exception('di dada doo da') while True: print(f'worker: {name} working cycle: {c}') await asyncio.sleep(1)