diff --git a/easyq/client.py b/easyq/client.py index 8a7984c..48d25f5 100644 --- a/easyq/client.py +++ b/easyq/client.py @@ -71,6 +71,14 @@ async def pull(self, queue, callback): handlers = self.handlers.setdefault(queue, set()) handlers.add(callback) + async def ignore(self, queue, callback): + handlers = self.handlers.setdefault(queue, set()) + if callback not in handlers: + raise ValueError(f'Invalid callback: {callback}') + + handlers.remove(callback) + self.transport.write(b'IGNORE %s;\n' % queue) + async def dispatch(self, message, queue): handlers = self.handlers.get(queue) if handlers: diff --git a/easyq/queuemanager.py b/easyq/queuemanager.py index 28874f4..8f96056 100644 --- a/easyq/queuemanager.py +++ b/easyq/queuemanager.py @@ -12,6 +12,10 @@ class AlreadySubscribedError(Exception): pass +class NotSubscribedError(Exception): + pass + + class Queue: def __init__(self, name): @@ -33,6 +37,9 @@ def subscribe(self, protocol): self.subscriptors.append(protocol) def unsubscribe(self, protocol): + if protocol not in self.subscriptors: + raise NotSubscribedError() + logger.info(f'Queue {self.name.decode()} was ignored by {protocol.identity}') self.subscriptors.remove(protocol) diff --git a/easyq/server.py b/easyq/server.py index a6bf73f..97e1089 100644 --- a/easyq/server.py +++ b/easyq/server.py @@ -5,13 +5,7 @@ from .authentication import authenticate, initialize as initialize_authentication from .configuration import settings from .logging import getlogger -from .queuemanager import getqueue, dispatcher, AlreadySubscribedError - - -""" --> IGNORE queue1 - -""" +from .queuemanager import getqueue, dispatcher, AlreadySubscribedError, NotSubscribedError logger = getlogger('PROTO') @@ -28,6 +22,7 @@ class Patterns: login = regex(b'^LOGIN (?P.+)$') push = regex(b'^PUSH (?P.+)(?:\s|\n)INTO (?P[0-9a-zA-Z\._:-]+)$') pull = regex(b'^PULL FROM (?P[0-9a-zA-Z\._:-]+)$') + ignore = regex(b'^IGNORE (?P[0-9a-zA-Z\._:-]+)$') def connection_made(self, transport): self.peername = transport.get_extra_info('peername') @@ -111,7 +106,15 @@ async def pull(self, queue): try: getqueue(queue).subscribe(self) except AlreadySubscribedError: - self.transport.write(b'ERROR: QUEUE %s IS ALREASY SUBSCRIBED;\n' % queue) + self.transport.write(b'ERROR: QUEUE %s IS ALREADY SUBSCRIBED;\n' % queue) + + async def ignore(self, queue): + try: + getqueue(queue).unsubscribe(self) + except NotSubscribedError: + self.transport.write(b'ERROR: QUEUE %s IS NOT SUBSCRIBED;\n' % queue) + + async def process_command(self, command): logger.debug(f'Processing command: {command.decode()} by {self.identity}') @@ -123,6 +126,10 @@ async def process_command(self, command): if m is not None: return await self.pull(**m.groupdict()) + m = self.Patterns.ignore.match(command) + if m is not None: + return await self.ignore(**m.groupdict()) + logger.debug(f'Invalid command: {command}') self.transport.write(b'ERROR: Invalid command: %s;\n' % command) diff --git a/easyq/tests/test_pull.py b/easyq/tests/test_pull.py index 1b48d34..6928643 100644 --- a/easyq/tests/test_pull.py +++ b/easyq/tests/test_pull.py @@ -57,19 +57,38 @@ async def test_pull(self): async def message_received(queue, message): messages.append(message) + async def error(client_, error): + errors.append(error) + + client.onerror = error + await client.pull(b'q1', message_received) await client.push(b'q1', b'Hello') await asyncio.sleep(1.1) self.assertEqual([b'Hello'], messages) + self.assertEqual([], errors) # pulling twice! - async def error(client_, error): - errors.append(error) - - client.onerror = error await client.pull(b'q1', message_received) await asyncio.sleep(1.1) - self.assertEqual([b'ERROR: QUEUE q1 IS ALREASY SUBSCRIBED'], errors) + self.assertEqual([b'ERROR: QUEUE q1 IS ALREADY SUBSCRIBED'], errors) + + # Unsunscribing + await client.ignore(b'q1', message_received) + messages = [] + errors = [] + await client.push(b'q1', b'Hello') + await asyncio.sleep(1.1) + self.assertEqual([], messages) + self.assertEqual([], errors) + + # Ignoring twice! + client.handlers[b'q1'] = [message_received] + await client.ignore(b'q1', message_received) + await asyncio.sleep(1.1) + self.assertEqual([b'ERROR: QUEUE q1 IS NOT SUBSCRIBED'], errors) + + if __name__ == '__main__':