Skip to content

Commit

Permalink
Push and Pull
Browse files Browse the repository at this point in the history
  • Loading branch information
pylover committed May 22, 2018
1 parent b5f42a2 commit d7a8462
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 42 deletions.
10 changes: 5 additions & 5 deletions easyq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class ClientProtocol(asyncio.Protocol):
identity = None
chunk = None
onerror = None

class Patterns:
regex = functools.partial(re.compile, flags=re.DOTALL)
Expand Down Expand Up @@ -53,12 +54,10 @@ def data_received(self, data):
asyncio.ensure_future(self.process_response(response))

def connection_lost(self, exc):
print('The server closed the connection')
self.logged_in.set_result(False)

async def process_response(self, data):
print(b'Data from server: ' + data)
m = self.Patterns.message.match(data)
m = self.Patterns.incomming.match(data)
if m is not None:
return await self.dispatch(**m.groupdict())

Expand All @@ -76,12 +75,13 @@ async def dispatch(self, message, queue):
handlers = self.handlers.get(queue)
if handlers:
await asyncio.gather(
*(handler(message, queue) for handler in handlers),
*(handler(queue, message) for handler in handlers),
return_exceptions=True
)

async def error(self, err):
print(f'Error from server: {err}')
if self.onerror:
await self.onerror(self, err)


class EasyQClientError(Exception):
Expand Down
2 changes: 1 addition & 1 deletion easyq/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
dispatchers: 1
dispatcher:
messages_per_queue: 5
intervals: .5
intervals: .3
'''

Expand Down
32 changes: 24 additions & 8 deletions easyq/queuemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
queues = {}


class AlreadySubscribedError(Exception):
pass


class Queue:

def __init__(self, name):
Expand All @@ -19,21 +23,31 @@ def __init__(self, name):
def push(self, message):
self._queue.put_nowait(message)

def get(self):
return self._queue.get_nowait()

def subscribe(self, protocol):
if protocol in self.subscriptors:
raise AlreadySubscribedError()
logger.info(f'Queue {self.name.decode()} was subscribed by {protocol.identity}')
self.subscriptors.append(protocol)

def unsubscribe(self, protocol):
logger.info(f'Queue {self.name.decode()} was ignored by {protocol.identity}')
self.subscriptors.remove(protocol)

async def dispatch(self, message):
for protocol in self.subscriptors:
await protocol.dispatch(queue, message)
logger.debug(
f'Dispatching message {message} from queue {self.name.decode()} to {protocol.identity}'
)
await protocol.dispatch(self.name, message)


def getqueue(name) -> Queue:
if name not in queues:
queues[name] = Queue(name)
logger.info(f'Queue {name} just created.')
logger.info(f'Queue {name.decode()} just created.')
return queues[name]


Expand All @@ -42,15 +56,17 @@ async def dispatcher(name, intervals=.5, messages_per_queue=5):
cycle = 0
try:
while True:
logger.debug(f'Cycle: {cycle}')
for queue in queues:
if cycle % 100 == 0:
logger.debug(f'Cycle: {cycle}')
for queue in queues.values():
try:
for i in range(message_per_queue):
message = queue.get_nowait()
for i in range(messages_per_queue):
message = queue.get()
logger.debug(f'Dispatching {message}')
await queue.dispatch(message)

except EmptyQueue:
logger.info(f'Queue {queue.decode()} is empty')
except asyncio.QueueEmpty:
pass

cycle += 1
await asyncio.sleep(intervals)
Expand Down
46 changes: 26 additions & 20 deletions easyq/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@
from .authentication import authenticate, initialize as initialize_authentication
from .configuration import settings
from .logging import getlogger
from .queuemanager import getqueue, dispatcher
from .queuemanager import getqueue, dispatcher, AlreadySubscribedError


"""
> PULL FROM queue1
-> IGNORE queue1
<- MESSAGE FROM queue1 [ID 122]
<- MESSAGE 122 IS DELIVERED TO user1
"""


Expand All @@ -30,7 +26,7 @@ class ServerProtocol(asyncio.Protocol):
class Patterns:
regex = functools.partial(re.compile, flags=re.DOTALL + re.IGNORECASE)
login = regex(b'^LOGIN (?P<credentials>.+)$')
push = regex(b'^PUSH (?P<message>.+) INTO (?P<queue>[0-9a-zA-Z\._:-]+)$')
push = regex(b'^PUSH (?P<message>.+)(?:\s|\n)INTO (?P<queue>[0-9a-zA-Z\._:-]+)$')
pull = regex(b'^PULL FROM (?P<queue>[0-9a-zA-Z\._:-]+)$')

def connection_made(self, transport):
Expand Down Expand Up @@ -62,7 +58,7 @@ def data_received(self, data):

# Scheduling a login task, if everything went ok, then the resume_reading will be
# called in the future.
asyncio.ensure_future(self.login(credentials))
asyncio.ensure_future(self.login(credentials.strip()))
return

# Splitting the received data with \n and adding buffered chunk if available
Expand Down Expand Up @@ -109,14 +105,24 @@ async def push(self, message, queue):
getqueue(queue).push(message)
except asyncio.QueueFull:
self.logger.warning(f'Queue is full: {self.name}')
self.transport.write(b'ERROR: QUEUE %s IS FULL;\n' % queue)
self.transport.write(b'ERROR: QUEUE %s IS FULL;\n' % queue.decode())

async def pull(self, queue):
try:
getqueue(queue).subscribe(self)
except AlreadySubscribedError:
self.transport.write(b'ERROR: QUEUE %s IS ALREASY SUBSCRIBED;\n' % queue)

async def process_command(self, command):
logger.debug(f'Processing command: {command.decode()} by {self.identity}')
m = self.Patterns.push.match(command)
if m is not None:
return await self.push(**m.groupdict())

m = self.Patterns.pull.match(command)
if m is not None:
return await self.pull(**m.groupdict())

logger.debug(f'Invalid command: {command}')
self.transport.write(b'ERROR: Invalid command: %s;\n' % command)

Expand All @@ -131,8 +137,11 @@ def create_dispatchers(workers=1, **kwargs):

class Server:
_server = None
_dispatchers_task = None

def __init__(self, bind=None, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.logger = getlogger('SERVER')

# Host and Port to listen
bind = bind or settings.bind
Expand All @@ -141,23 +150,20 @@ def __init__(self, bind=None, loop=None):
# Configuring the authenticator
initialize_authentication()

self.server_coro = self.loop.create_server(ServerProtocol, self.host, self.port)
self._dispatchers = []

async def start(self):
self._server = await self.server_coro

for i in range(settings.dispatchers):
self._dispatchers.append(
self.loop.create_task(dispatcher('WORKER %d' % i, **settings.dispatcher))
)
self._server = await self.loop.create_server(ServerProtocol, self.host, self.port)
self._dispatchers_task = self.loop.create_task(asyncio.gather(
*(dispatcher('WORKER %d' % i, **settings.dispatcher)
for i in range(settings.dispatchers))
))

async def close(self):
for dispatcher in self._dispatchers:
dispatcher.cancel()
self.logger.info('Shutting down...')
self._dispatchers_task.cancel()
self._server.close()
await self._server.wait_closed()
await asyncio.wait(self._dispatchers)
while self._dispatchers_task.cancelled():
await asyncio.sleep(.2)

@property
def address(self):
Expand Down
3 changes: 3 additions & 0 deletions easyq/tests/test_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ async def test_trust(self):
with self.assertRaises(AuthenticationError):
await connect('colon:not:allowed')

client = await connect('testuser\n')
self.assertEqual('testuser', client.identity)


if __name__ == '__main__':
unittest.main()
Expand Down
15 changes: 13 additions & 2 deletions easyq/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from easyq.server import ServerProtocol
from easyq.tests.helpers import EasyQTestServer, TestCase
from easyq.client import AuthenticationError
from easyq.client import AuthenticationError, ClientProtocol
from easyq.queuemanager import AlreadySubscribedError


class PullTestCase(TestCase):
Expand Down Expand Up @@ -32,7 +33,7 @@ async def test_incomming_message_parser(self):
(b'MESSAGE Hello\nDear FROM q1', b'Hello\nDear', b'q1'),
]

for command, expected_massage, expected_queue in whitelist:
for command, expected_message, expected_queue in whitelist:
m = ClientProtocol.Patterns.incomming.match(command)
self.assertIsNotNone(m)
message, queue = m.groups()
Expand All @@ -51,6 +52,7 @@ async def test_pull(self):
async with self.server() as connect:
client = await connect('testuser')
messages = []
errors = []

async def message_received(queue, message):
messages.append(message)
Expand All @@ -60,6 +62,15 @@ async def message_received(queue, message):
await asyncio.sleep(1.1)
self.assertEqual([b'Hello'], messages)

# pulling twice!
async def error(client_, error):
errors.append(error)

client.onerror = error
await client.pull(b'q1', message_received)
await asyncio.sleep(1.1)
self.assertEqual([b'ERROR: QUEUE q1 IS ALREASY SUBSCRIBED'], errors)


if __name__ == '__main__':
unittest.main()
Expand Down
16 changes: 10 additions & 6 deletions easyq/tests/test_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ async def test_parser(self):
(b'PUSH Hello dear INTO bad INTO myq', b'Hello dear INTO bad', b'myq'),
(b'PUSH Hello INTO myq:a.b_c', b'Hello', b'myq:a.b_c'),
(b'push hello into myq:a.b_c', b'hello', b'myq:a.b_c'),
(b'push hello\ninto q1', b'hello', b'q1'),
]

for command, expected_message, expected_queue in whitelist:
m = ServerProtocol.Patterns.push.match(command)
if m is None:
try:
m = ServerProtocol.Patterns.push.match(command)
self.assertIsNotNone(m)
print(m.groups())
message, queue, = m.groups()
self.assertEqual(expected_queue, queue)
self.assertEqual(expected_message, message)
except AssertionError:
print(command)
self.assertIsNotNone(m)
message, queue = m.groups()
self.assertEqual(expected_queue, queue)
self.assertEqual(expected_message, message)
raise

blacklist = [
b'PUSH Hello INTO my\nq',
Expand Down
1 change: 1 addition & 0 deletions practice/taskcancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ async def worker(name):
c = 0
running = True
try:
raise Exception('di dada doo da')
while True:
print(f'worker: {name} working cycle: {c}')
await asyncio.sleep(1)
Expand Down

0 comments on commit d7a8462

Please sign in to comment.