Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
Rebuild how max connections is handled to be more similar to how redi…
Browse files Browse the repository at this point in the history
…s-py works

 - max_connections_per_node is now a direct option in StrictRedisCluster and not part of kwargs option
 - pubsub connections is no longer tracked in seperate variables, now works same way ad redis-py handles pubsub connections
 - Counting and enforcing of max_connections and max_connections_per_node is no longer done with _in_use_connections
   but with the tracker of how many that has been created so far
 - Added threading test method 'test_pubsub_thread_publish' to more easily verify that hammering publish commands works as expected
   even in a threaded environment.
  • Loading branch information
Grokzen committed Apr 3, 2016
1 parent 8c5bbc3 commit 88a86d3
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 38 deletions.
2 changes: 2 additions & 0 deletions docs/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Next release (??? ?, 2016)
a node in the cluster. Other clients that do not use this pattern will not be fully compatible with this client. Known limitations is pattern
subscription that do not work properly because a pattern can't know all the possible channel names in advance.
* Convert all docs to ReadTheDocs
* Rework connection pool logic to be more similar to redis-py. This also fixes an issue with pubsub and that connections
was never release back to the pool of available connections.

1.1.0 (??? ?, ????)
-------------------
Expand Down
3 changes: 2 additions & 1 deletion rediscluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class StrictRedisCluster(StrictRedis):
'READWRITE': bool_ok,
}

def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32, init_slot_cache=True,
def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32, max_connections_per_node=False, init_slot_cache=True,
readonly_mode=False, reinitialize_steps=None, **kwargs):
"""
:startup_nodes:
Expand Down Expand Up @@ -153,6 +153,7 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32,
startup_nodes=startup_nodes,
init_slot_cache=init_slot_cache,
max_connections=max_connections,
max_connections_per_node=max_connections_per_node,
**kwargs
)

Expand Down
66 changes: 29 additions & 37 deletions rediscluster/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ def reset(self):
"""
self.pid = os.getpid()
self._created_connections = 0
self._created_connections_per_node = {} # Dict(Node, Int)
self._available_connections = {} # Dict(Node, List)
self._in_use_connections = {} # Dict(Node, Set)
self._available_pubsub_connections = []
self._in_use_pubsub_connections = set([])
self._check_lock = threading.Lock()

def _checkpid(self):
Expand Down Expand Up @@ -131,21 +130,31 @@ def get_connection(self, command_name, *keys, **options):
return self.get_random_connection()

slot = self.nodes.keyslot(channel)
node = self.get_master_node_by_slot(slot)

# TOOD: Pop existing connection if it exists
connection = self.make_connection(self.get_master_node_by_slot(slot))
self._in_use_pubsub_connections.add(connection)
self._checkpid()

try:
connection = self._available_connections.get(node["name"], []).pop()
print("Getting existing connection")
except IndexError:
connection = self.make_connection(node)
self._in_use_connections[node['name']].add(connection)

return connection

def make_connection(self, node):
"""
Create a new connection
"""
if self.count_num_connections(node) >= self.max_connections:
if self.count_all_num_connections(node) >= self.max_connections:
if self.max_connections_per_node:
raise RedisClusterException("Too many connection ({}) for node: {}".format(self.count_all_num_connections(node), node['name']))

raise RedisClusterException("Too many connections")

self._created_connections += 1
self._created_connections_per_node.setdefault(node['name'], 0)
self._created_connections_per_node[node['name']] += 1
connection = self.connection_class(host=node["host"], port=node["port"], **self.connection_kwargs)

# Must store node in the connection to make it eaiser to track
Expand All @@ -161,22 +170,18 @@ def release(self, connection):
if connection.pid != self.pid:
return

if connection in self._in_use_pubsub_connections:
self._in_use_pubsub_connections.remove(connection)
self._available_pubsub_connections.append(connection)
else:
# Remove the current connection from _in_use_connection and add it back to the available pool
# There is cases where the connection is to be removed but it will not exist and there
# must be a safe way to remove
i_c = self._in_use_connections.get(connection._node["name"], set())
# Remove the current connection from _in_use_connection and add it back to the available pool
# There is cases where the connection is to be removed but it will not exist and there
# must be a safe way to remove
i_c = self._in_use_connections.get(connection._node["name"], set())

if connection in i_c:
i_c.remove(connection)
else:
pass
# TODO: Log.warning("Tried to release connection that did not exist any longer : {0}".format(connection))
if connection in i_c:
i_c.remove(connection)
else:
pass
# TODO: Log.warning("Tried to release connection that did not exist any longer : {0}".format(connection))

self._available_connections.setdefault(connection._node["name"], []).append(connection)
self._available_connections.setdefault(connection._node["name"], []).append(connection)

def disconnect(self):
"""
Expand All @@ -191,24 +196,11 @@ def disconnect(self):
for connection in node_connections:
connection.disconnect()

all_pubsub_conns = chain(
self._available_pubsub_connections,
self._in_use_pubsub_connections,
)

for connection in all_pubsub_conns:
connection.disconnect()

def count_num_connections(self, node):
def count_all_num_connections(self, node):
if self.max_connections_per_node:
return len(self._in_use_connections.get(node['name'], []))

i = 0

for _, connections in self._in_use_connections.items():
i += len(connections)
return self._created_connections_per_node.get(node['name'], 0)

return i
return sum([i for i in self._created_connections_per_node.values()])

def get_random_connection(self):
"""
Expand Down
49 changes: 49 additions & 0 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

# python std lib
from __future__ import with_statement
import threading
import time

# rediscluster imports
from rediscluster.client import StrictRedisCluster

# 3rd party imports
import pytest

Expand Down Expand Up @@ -434,3 +438,48 @@ def test_channel_subscribe(self, r):
p = r.pubsub()
with pytest.raises(ConnectionError):
p.subscribe('foo')


class TestPubSubThreadedPublish(object):

def test_pubsub_thread_publish(self):
"""
This test will never fail but it will still show and be viable to use
and to test the threading capability of the connectionpool and the publish
mechanism.
"""
startup_nodes = [{"host": "127.0.0.1", "port": "7000"}]

r = StrictRedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
max_connections=16,
max_connections_per_node=16,
)

p = r.pubsub()

def t_run(rc, pub):
for i in range(0, 50):
rc.publish('foo', 'bar')
rc.publish('bar', 'foo')
rc.publish('asd', 'dsa')
rc.publish('dsa', 'asd')
rc.publish('qwe', 'bar')
rc.publish('ewq', 'foo')
rc.publish('wer', 'dsa')
rc.publish('rew', 'asd')

# Use this for debugging
# print(rc.connection_pool._available_connections)
# print(rc.connection_pool._in_use_connections)
# print(rc.connection_pool._created_connections)

try:
threads = []
for i in range(10):
t = threading.Thread(target=t_run, args=(r, p))
threads.append(t)
t.start()
except:
print("Error: unable to start thread")

0 comments on commit 88a86d3

Please sign in to comment.