Skip to content

Commit

Permalink
Merge pull request #2 from Captricity/test_framework
Browse files Browse the repository at this point in the history
Test framework
  • Loading branch information
yorinasub17 committed Feb 6, 2017
2 parents b35bc4b + 0aa08ea commit dce2b36
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 1 deletion.
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
language: python
python:
- "2.7"
- "3.4"
before_script:
- pip install python-coveralls coverage
# command to run tests
script: coverage run --source kombu_redis_priority setup.py test
after_success:
- coveralls
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
# kombu-redis-priority

[![Build Status](https://travis-ci.org/Captricity/kombu-redis-priority.svg?branch=master)](https://travis-ci.org/Captricity/kombu-redis-priority) [![Coverage Status](https://coveralls.io/repos/Captricity/kombu-redis-priority/badge.png?branch=master)](https://coveralls.io/r/Captricity/kombu-redis-priority?branch=master)

Kombu Transport using Redis SortedSets

## Running tests

python setup.py test
9 changes: 8 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,12 @@
classifiers=[],
install_requires=[
'kombu'
]
],
tests_require=[
'six',
'mock==1.0.1',
'freezegun',
'fakeredis'
],
test_suite='tests'
)
Empty file added tests/__init__.py
Empty file.
97 changes: 97 additions & 0 deletions tests/test_sortedset_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import unicode_literals

import unittest
import mock
import freezegun
import json
import time
import six

from .utils.fakeredis_ext import FakeStrictRedisWithConnection
from kombu import Connection
from kombu_redis_priority.transport.redis_priority_async import redis, Transport


class TestSortedSetTransport(unittest.TestCase):
def setUp(self):
self.faker = FakeStrictRedisWithConnection()
with mock.patch.object(redis, 'StrictRedis', FakeStrictRedisWithConnection):
self.connection = self.create_connection()
self.channel = self.connection.default_channel

def tearDown(self):
self.faker.flushall()

def create_connection(self):
return Connection(transport=Transport)

def _prefixed_message(self, time_, msg_obj):
return six.b('{:011d}:'.format(int(time_)) + json.dumps(msg_obj))

def test_default_message_add(self):
raw_db = self.faker._db

# assert no queues exist
self.assertEqual(len(raw_db), 0)

# put a blank message, locking the time
with freezegun.freeze_time('1985-10-12'):
faketime = time.time()
self.channel._put('foo', {})

# verify queue is created
self.assertEqual(len(raw_db), 1)

# ... and verify queue has a message
raw_queue = raw_db['foo']
self.assertEqual(len(raw_queue), 1)

# verify message:
# - a time prefix is appended to the message
# - has default priority (0)
enqueued_msg, priority = next(six.iteritems(raw_queue))
self.assertEqual(enqueued_msg, self._prefixed_message(faketime, {}))
self.assertEqual(priority, 0.0)

def test_prioritized_message_add(self):
raw_db = self.faker._db
msg = {'properties': {'priority': 5}}

# assert no queues exist
self.assertEqual(len(raw_db), 0)

# put a blank message, locking the time
with freezegun.freeze_time('1985-10-12'):
faketime = time.time()
self.channel._put('foo', msg)

# verify queue is created
self.assertEqual(len(raw_db), 1)

# ... and verify queue has a message
raw_queue = raw_db['foo']
self.assertEqual(len(raw_queue), 1)

# verify message:
# - a time prefix is appended to the message
# - has default priority (0)
enqueued_msg, priority = next(six.iteritems(raw_queue))
self.assertEqual(enqueued_msg, self._prefixed_message(faketime, msg))
self.assertEqual(priority, 5.0)

def test_zrem_read(self):
# Add an item to create a queue
msg = {
'properties': {'delivery_tag': 'abcd'}
}
self.faker.zadd('foo', 1, self._prefixed_message(time.time(), msg))

# Make the channel pull off the foo queue
self.channel._active_queues.append('foo')
self.channel._update_queue_cycle()

# And then try the zrem pipeline
self.channel._zrem_start()
with mock.patch.object(self.channel.connection, '_deliver') as mock_deliver:
self.channel._zrem_read()
mock_deliver.assert_called_once_with(msg, 'foo')
Empty file added tests/utils/__init__.py
Empty file.
122 changes: 122 additions & 0 deletions tests/utils/fakeredis_ext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
""" Extensions for fakeredis, namely adding connection interface that is used by kombu """

from collections import deque
from itertools import count
from fakeredis import FakeStrictRedis, FakePipeline


class FakeStrictRedisWithConnection(FakeStrictRedis):
"""
An extension of FakeStrictRedis to implement some of the low level interfaces of StrictRedis from redis-py. Kombu
uses these internal features to simulate an async event based request response cycle so that it can be hooked into
its chain.
You can learn more about it in the kombu source for the redis transport.
"""
def __init__(self, *args, **kwargs):
super(FakeStrictRedisWithConnection, self).__init__(*args, **kwargs)
self._connection = None
self.connection = self._sconnection(self)
self._response_queue = deque()

def parse_response(self, connection, type, **options):
# If there are any responses queued up, pop and return that
if self._response_queue:
return self._response_queue.pop()

# TODO: this is actually wrong - we need to determine if it is a pipeline response based on what is on the
# datagram.
if type == '_':
return self._parse_pipeline_response_from_connection(connection)
else:
return self._parse_command_response_from_connection(connection, type)

def _parse_pipeline_response_from_connection(self, connection):
"""
A pipeline response consists of several responses:
- OK : acknowledges a transaction
- QUEUED : acknowledges a command has been queued. There will be one per command sent.
- LIST : list of responses
"""
# pop off the first command, which should be MULTI to signal start of transaction
cmd = self.connection._sock.data.pop(0)
assert cmd[0] == 'MULTI'

# Now extract all the commands until transaction ends
cmds_to_execute = []
cmd = self.connection._sock.data.pop(0)
while cmd[0] != 'EXEC':
cmds_to_execute.append(cmd)
cmd = self.connection._sock.data.pop(0)

# It is a bug, if the command stack is NOT empty at this point
assert len(self.connection._sock.data) == 0

# execute those collected commands and construct response list
responses = [self._parse_command_response(cmd, args) for cmd, args in cmds_to_execute]

# Now append the expected pipeline responses to the deque and return the first response, which is 'OK'
for i in range(len(responses)):
self._response_queue.appendleft('QUEUED')
self._response_queue.appendleft(responses)
return 'OK'

def _parse_command_response_from_connection(self, connection, type):
cmd, args = self.connection._sock.data.pop()
assert cmd == type
assert len(self.connection._sock.data) == 0
return self._parse_command_response(cmd, args)

def _parse_command_response(self, cmd, args):
cmd_func = getattr(self, cmd.lower())
return cmd_func(*args)

def pipeline(self, transaction=True):
return FakePipelineWithStack(self, transaction)

class _sconnection(object):
disconnected = False

class _socket(object):
blocking = True
filenos = count(30)

def __init__(self, *args):
self._fileno = next(self.filenos)
self.data = []

def fileno(self):
return self._fileno

def setblocking(self, blocking):
self.blocking = blocking

def __init__(self, client):
self.client = client
self._sock = self._socket()

def disconnect(self):
self.disconnected = True

def send_command(self, cmd, *args):
self._sock.data.append((cmd, args))

def pack_commands(self, cmds):
return cmds # do nothing

def send_packed_command(self, all_cmds):
# Input command format is: tuple(tuple(cmd, arg0, arg1, ...), options)
# The injected command format has to be equivalent to `send_command`: tuple(cmd, args)
def normalize_command(raw_cmd):
return (raw_cmd[0], raw_cmd[1:])
self._sock.data.extend([normalize_command(cmd) for cmd in all_cmds])


class FakePipelineWithStack(FakePipeline):
@property
def command_stack(self):
def normalize_command(raw_cmd):
cmd, args, kwargs = raw_cmd
return ((cmd,) + args, kwargs)

return [normalize_command(cmd) for cmd in self.commands]

0 comments on commit dce2b36

Please sign in to comment.