Skip to content

Commit

Permalink
feature: redis cluster as broker
Browse files Browse the repository at this point in the history
Support new transport: redis-cluster

Basicly use redis transport code and replace the redis part into
alauda-redis-py-cluster which is a patch and waiting to be merged into
main branch.

Signed-off-by: Lei Gong <xue177125184@gmail.com>
  • Loading branch information
Lei Gong authored and max8899 committed Jun 22, 2017
1 parent cf2ca7c commit 395c0f1
Show file tree
Hide file tree
Showing 8 changed files with 2,489 additions and 1 deletion.
2 changes: 2 additions & 0 deletions examples/complete_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def handle_message(body, message):
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.


with Connection('amqp://guest:guest@localhost:5672//') as connection:

#: Create consumer using our callback and queue.
Expand Down
1 change: 1 addition & 0 deletions examples/experimental/async_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def send_message(conn):
def on_message(message):
print('RECEIVED: %r' % (message.body, ))
message.ack()
print "hub.stop"
hub.stop() # <-- exit after one message


Expand Down
115 changes: 115 additions & 0 deletions examples/hello_poll.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
""" Poll prototype for redis-cluster
"""

import time
import sys

from redis import Redis
from redis.connection import ConnectionPool

from rediscluster.connection import ClusterConnectionPool
from rediscluster.exceptions import MovedError
from rediscluster import RedisCluster

from kombu.utils.eventio import poll, READ, ERR

eventflags = READ | ERR

keys = [
'asynt',
'asynt1',
'asynt2',
'asynt3',
'asynt4',
'asynt5',
'asynt6',
'asynt7'
]


def register(conns, poller):
for _, conn in conns:
conn.connect()
poller.register(conn._sock, eventflags)


def unregister(conns, poller):
print "unregister .."
for _, conn in conns:
poller.unregister(conn._sock)
conn.disconnect()


def start_poll(conns, poller, cli):
while 1:
_m = {}

for key, conn in conns:
_m.setdefault(conn.port, []).append(key)

for _, conn in conns:
conn.send_command('BRPOP', *_m[conn.port]+[1])

start = time.time()
events = poller.poll(10)
print time.time() - start
if events:
for fileno, event in events or []:
if event | READ:
for key, conn in conns:
if fileno == conn._sock.fileno():
try:
print(
"key: ", key, "resp: ",
cli.parse_response(conn, 'BRPOP', **{})
)
except MovedError as e:
print "MovedError: ", e
break


def cluster_poll():
startup = [
{'host': '127.0.0.1', 'port': 30001},
{'host': '127.0.0.1', 'port': 30002},
{'host': '127.0.0.1', 'port': 30003},
{'host': '127.0.0.1', 'port': 30004},
{'host': '127.0.0.1', 'port': 30005},
{'host': '127.0.0.1', 'port': 30006},
]
pool = ClusterConnectionPool(startup_nodes=startup)
cli = RedisCluster(connection_pool=pool)
poller = poll()
conns = [
(key, cli.connection_pool.get_connection_by_key(key))
for key in keys
]
register(conns, poller)
try:
start_poll(conns, poller, cli)
except KeyboardInterrupt:
unregister(conns, poller)


def normal_poll():
pool = ConnectionPool(host='127.0.0.1', port=6379, db=0)
cli = Redis(connection_pool=pool)
poller = poll()
conns = [
(key, cli.connection_pool.get_connection('_'))
for key in keys
]
register(conns, poller)
try:
start_poll(conns, poller, cli)
except KeyboardInterrupt:
unregister(conns, poller)


if __name__ == '__main__':

if sys.argv[1] == 'cluster':
print "Start cluster mode, press Ctrl+C to stop"
cluster_poll()
else:
normal_poll()
6 changes: 6 additions & 0 deletions examples/simple_task_queue/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@


def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):

""" send_as_task
"""

payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
routing_key = priority_to_routing_key[priority]

Expand All @@ -19,7 +23,9 @@ def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
declare=[task_exchange],
routing_key=routing_key)


if __name__ == '__main__':

from kombu import Connection
from .tasks import hello_task

Expand Down
2 changes: 1 addition & 1 deletion kombu/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ def default_channel(self):
"""
# make sure we're still connected, and if not refresh.
self.connection
self.ensure_connection()
if self._default_channel is None:
self._default_channel = self.channel()
return self._default_channel
Expand Down
Loading

0 comments on commit 395c0f1

Please sign in to comment.