Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
Minor refactoring and fixes to fix broken "cluster-require-full-cover…
Browse files Browse the repository at this point in the history
…age" option.
  • Loading branch information
Grokzen committed Mar 28, 2016
1 parent 33c866b commit bede29c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 18 deletions.
13 changes: 6 additions & 7 deletions rediscluster/nodemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ def initialize(self):
self.populate_startup_nodes()
self.refresh_table_asap = False

need_full_slots_coverage = self.cluster_require_full_coverage(nodes_cache)

# Validate if all slots are covered or if we should try next startup node
for i in range(0, self.RedisClusterHashSlots):
if i not in tmp_slots:
if self.cluster_require_full_coverage(nodes_cache):
if need_full_slots_coverage:
all_slots_covered = False

if all_slots_covered:
Expand Down Expand Up @@ -205,14 +207,11 @@ def cluster_require_full_coverage(self, nodes_cache):
nodes = self.nodes or nodes_cache

def node_require_full_coverage(node):
r_node = self.get_redis_link(host=node["host"], port=node["port"],
decode_responses=True)
return "yes" in r_node.config_get(
"cluster-require-full-coverage").values()
r_node = self.get_redis_link(host=node["host"], port=node["port"], decode_responses=True)
return "yes" in r_node.config_get("cluster-require-full-coverage").values()

# at least one node should have cluster-require-full-coverage yes
return any(
node_require_full_coverage(node) for node in nodes.values())
return any(node_require_full_coverage(node) for node in nodes.values())

def determine_pubsub_node(self):
"""
Expand Down
79 changes: 68 additions & 11 deletions tests/test_node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,21 @@ def test_init_slots_cache_not_all_slots(s):
def get_redis_link_wrapper(host, port, decode_responses=False):
link = StrictRedis(host="127.0.0.1", port=7000, decode_responses=True)

# Missing slot 5460
bad_slots_resp = [
[0, 5459, [b'127.0.0.1', 7000], [b'127.0.0.1', 7003]],
[5461, 10922, [b'127.0.0.1', 7001], [b'127.0.0.1', 7004]],
[10923, 16383, [b'127.0.0.1', 7002], [b'127.0.0.1', 7005]],
]
orig_exec_method = link.execute_command

def patch_execute_command(*args, **kwargs):
if args == ('cluster', 'slots'):
# Missing slot 5460
return [
[0, 5459, [b'127.0.0.1', 7000], [b'127.0.0.1', 7003]],
[5461, 10922, [b'127.0.0.1', 7001], [b'127.0.0.1', 7004]],
[10923, 16383, [b'127.0.0.1', 7002], [b'127.0.0.1', 7005]],
]

return orig_exec_method(*args, **kwargs)

# Missing slot 5460
link.execute_command = lambda *args: bad_slots_resp
link.execute_command = patch_execute_command

return link

Expand All @@ -69,6 +75,41 @@ def get_redis_link_wrapper(host, port, decode_responses=False):
assert unicode(ex.value).startswith("All slots are not covered after query all startup_nodes.")


def test_init_slots_cache_not_all_slots_not_require_full_coverage(s):
"""
Test that if not all slots are covered it should raise an exception
"""
# Create wrapper function so we can inject custom 'CLUSTER SLOTS' command result
def get_redis_link_wrapper(host, port, decode_responses=False):
link = StrictRedis(host="127.0.0.1", port=7000, decode_responses=True)

orig_exec_method = link.execute_command

def patch_execute_command(*args, **kwargs):
if args == ('cluster', 'slots'):
# Missing slot 5460
return [
[0, 5459, [b'127.0.0.1', 7000], [b'127.0.0.1', 7003]],
[5461, 10922, [b'127.0.0.1', 7001], [b'127.0.0.1', 7004]],
[10923, 16383, [b'127.0.0.1', 7002], [b'127.0.0.1', 7005]],
]
elif args == ('CONFIG GET', 'cluster-require-full-coverage'):
return {'cluster-require-full-coverage': 'no'}
else:
return orig_exec_method(*args, **kwargs)

# Missing slot 5460
link.execute_command = patch_execute_command

return link

s.connection_pool.nodes.get_redis_link = get_redis_link_wrapper

s.connection_pool.nodes.initialize()

assert 5460 not in s.connection_pool.nodes.slots


def test_init_slots_cache(s):
"""
Test that slots cache can in initialized and all slots are covered
Expand All @@ -80,7 +121,13 @@ def test_init_slots_cache(s):
]

with patch.object(StrictRedis, 'execute_command') as execute_command_mock:
execute_command_mock.side_effect = lambda *args: good_slots_resp
def patch_execute_command(*args, **kwargs):
if args == ('CONFIG GET', 'cluster-require-full-coverage'):
return {'cluster-require-full-coverage': 'yes'}
else:
return good_slots_resp

execute_command_mock.side_effect = patch_execute_command

s.connection_pool.nodes.initialize()
assert len(s.connection_pool.nodes.slots) == NodeManager.RedisClusterHashSlots
Expand Down Expand Up @@ -151,13 +198,15 @@ def monkey_link(host=None, port=None, decode_responses=False):
def execute_command(*args, **kwargs):
if args == ("cluster", "slots"):
return result
return orig_execute_command(*args, **kwargs)
elif args == ('CONFIG GET', 'cluster-require-full-coverage'):
return {'cluster-require-full-coverage': 'yes'}
else:
return orig_execute_command(*args, **kwargs)

r.execute_command = execute_command
return r

n.get_redis_link = monkey_link

with pytest.raises(RedisClusterException) as ex:
n.initialize()
assert unicode(ex.value).startswith("startup_nodes could not agree on a valid slots cache."), unicode(ex.value)
Expand Down Expand Up @@ -281,7 +330,15 @@ def test_cluster_one_instance():
"""
with patch.object(StrictRedis, 'execute_command') as mock_execute_command:
return_data = [[0, 16383, ['', 7006]]]
mock_execute_command.return_value = return_data

def patch_execute_command(*args, **kwargs):
if args == ('CONFIG GET', 'cluster-require-full-coverage'):
return {'cluster-require-full-coverage': 'yes'}
else:
return return_data

# mock_execute_command.return_value = return_data
mock_execute_command.side_effect = patch_execute_command

n = NodeManager(startup_nodes=[{"host": "127.0.0.1", "port": 7006}])
n.initialize()
Expand Down

0 comments on commit bede29c

Please sign in to comment.