Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
Implement/port all cluster commands from redis-py and make them clust…
Browse files Browse the repository at this point in the history
…er compatible.

Rework how the nodes for a commands is determined.
  • Loading branch information
Grokzen committed Mar 28, 2016
1 parent 47de998 commit 5e827eb
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Allow passing in a custom connection pool
* Provide default max_connections value for ClusterConnectionPool (2**31)
* Travis now tests both redis 3.0.x and 3.2.x
* Implement all "CLUSTER ..." commands as methods in the client class

* 1.1.0
* Refactored exception handling and exception classes.
Expand Down
191 changes: 161 additions & 30 deletions rediscluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
merge_result,
first_key,
clusterdown_wrapper,
parse_cluster_slots,
parse_cluster_nodes,
)
# 3rd party imports
from redis import StrictRedis
from redis.client import list_or_args
from redis.client import list_or_args, bool_ok, parse_info
from redis.connection import Token
from redis._compat import iteritems, basestring, b, izip, nativestr
from redis.exceptions import RedisError, ResponseError, TimeoutError, DataError, ConnectionError, BusyLoadingError

Expand All @@ -35,45 +38,40 @@ class StrictRedisCluster(StrictRedis):
"""
RedisClusterRequestTTL = 16

NODES_CALLBACKS = dict_merge(
NODES_FLAGS = dict_merge(
string_keys_to_dict([
"CLIENT SETNAME", "SENTINEL GET-MASTER-ADDR-BY-NAME", 'SENTINEL MASTER', 'SENTINEL MASTERS',
'SENTINEL MONITOR', 'SENTINEL REMOVE', 'SENTINEL SENTINELS', 'SENTINEL SET',
'SENTINEL SLAVES', 'SHUTDOWN', 'SLAVEOF', 'SCRIPT KILL',
'MOVE', 'BITOP',
], blocked_command),
], 'blocked'),
string_keys_to_dict([
"ECHO", "CONFIG GET", "CONFIG SET", "SLOWLOG GET", "CLIENT KILL", "INFO",
"BGREWRITEAOF", "BGSAVE", "CLIENT LIST", "CLIENT GETNAME", "CONFIG RESETSTAT",
"CONFIG REWRITE", "DBSIZE", "LASTSAVE", "PING", "SAVE", "SLOWLOG LEN", "SLOWLOG RESET",
"TIME", "SCAN",
], lambda self, command: self.connection_pool.nodes.all_nodes()),
"TIME", "SCAN", "KEYS", "CLUSTER INFO",
], 'all-nodes'),
string_keys_to_dict([
"FLUSHALL", "FLUSHDB",
], lambda self, command: self.connection_pool.nodes.all_masters()),
"FLUSHALL", "FLUSHDB", "SCRIPT LOAD", "SCRIPT FLUSH", "SCRIPT EXISTS",
], 'all-masters'),
string_keys_to_dict([
"SCRIPT LOAD", "SCRIPT FLUSH", "SCRIPT EXISTS",
], lambda self, command: self.connection_pool.nodes.all_masters()),
"RANDOMKEY", "CLUSTER NODES", 'CLUSTER SLOTS',
], 'random'),
string_keys_to_dict([
"KEYS",
], lambda self, command: self.connection_pool.nodes.all_nodes()),
string_keys_to_dict([
"PUBLISH", "SUBSCRIBE",
], lambda self, command: [self.connection_pool.nodes.pubsub_node]),
"CLUSTER COUNTKEYSINSLOT",
], 'slot-id'),
string_keys_to_dict([
"RANDOMKEY",
], lambda self, command: [self.connection_pool.nodes.random_node()]),
'PUBLISH', 'SUBSCRIBE',
], 'pubsub')
)

RESULT_CALLBACKS = dict_merge(
string_keys_to_dict([
"ECHO", "CONFIG GET", "CONFIG SET", "SLOWLOG GET", "CLIENT KILL", "INFO",
"BGREWRITEAOF", "BGSAVE", "CLIENT LIST", "CLIENT GETNAME", "CONFIG RESETSTAT",
"CONFIG REWRITE", "DBSIZE", "LASTSAVE", "PING", "SAVE", "SLOWLOG LEN", "SLOWLOG RESET",
"TIME", "SCAN",
], lambda command, res: res),
string_keys_to_dict([
"FLUSHALL", "FLUSHDB",
"TIME", "SCAN", "CLUSTER INFO", 'CLUSTER ADDSLOTS', 'CLUSTER COUNT-FAILURE-REPORTS',
'CLUSTER DELSLOTS', 'CLUSTER FAILOVER', 'CLUSTER FORGET', "FLUSHALL", "FLUSHDB",
], lambda command, res: res),
string_keys_to_dict([
"SCRIPT LOAD",
Expand All @@ -88,13 +86,34 @@ class StrictRedisCluster(StrictRedis):
"KEYS",
], merge_result),
string_keys_to_dict([
"SSCAN", "HSCAN", "ZSCAN",
], first_key),
string_keys_to_dict([
"RANDOMKEY",
"SSCAN", "HSCAN", "ZSCAN", "RANDOMKEY",
], first_key),
)

CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
'CLUSTER ADDSLOTS': bool_ok,
'CLUSTER COUNT-FAILURE-REPORTS': int,
'CLUSTER COUNTKEYSINSLOT': int,
'CLUSTER DELSLOTS': bool_ok,
'CLUSTER FAILOVER': bool_ok,
'CLUSTER FORGET': bool_ok,
'CLUSTER GETKEYSINSLOT': int,
'CLUSTER INFO': parse_info,
'CLUSTER KEYSLOT': int,
'CLUSTER MEET': bool_ok,
'CLUSTER NODES': parse_cluster_nodes,
'CLUSTER REPLICATE': bool_ok,
'CLUSTER RESET': bool_ok,
'CLUSTER SAVECONFIG': bool_ok,
'CLUSTER SET-CONFIG-EPOCH': bool_ok,
'CLUSTER SETSLOT': bool_ok,
'CLUSTER SLAVES': parse_cluster_nodes,
'CLUSTER SLOTS': parse_cluster_slots,
'ASKING': bool_ok,
'READONLY': bool_ok,
'READWRITE': bool_ok,
}

def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32, init_slot_cache=True,
readonly_mode=False, reinitialize_steps=None, **kwargs):
"""
Expand Down Expand Up @@ -143,11 +162,12 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32,
super(StrictRedisCluster, self).__init__(connection_pool=pool, **kwargs)

self.refresh_table_asap = False
self.nodes_callbacks = self.__class__.NODES_CALLBACKS.copy()
self.nodes_flags = self.__class__.NODES_FLAGS.copy()
self.result_callbacks = self.__class__.RESULT_CALLBACKS.copy()
self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
self.reinitialize_counter = 0
self.reinitialize_steps = reinitialize_steps or 25
self.response_callbacks = dict_merge(self.response_callbacks, self.CLUSTER_COMMANDS_RESPONSE_CALLBACKS)

def __repr__(self):
servers = list(set(['{}:{}'.format(nativestr(info['host']), info['port']) for info in self.connection_pool.nodes.startup_nodes]))
Expand All @@ -174,7 +194,6 @@ def pipeline(self, transaction=None, shard_hint=None):
connection_pool=self.connection_pool,
startup_nodes=self.connection_pool.nodes.startup_nodes,
refresh_table_asap=self.refresh_table_asap,
nodes_callbacks=self.nodes_callbacks,
result_callbacks=self.result_callbacks,
response_callbacks=self.response_callbacks,
reinitialize_steps=self.reinitialize_steps
Expand Down Expand Up @@ -218,6 +237,25 @@ def _merge_result(self, command, res):
# Default way to handle result
return first_key(command, res)

def determine_node(self, *args, **kwargs):
command = args[0]
node_flag = self.nodes_flags.get(command)

if node_flag == 'blocked':
return blocked_command(self, command)
elif node_flag == 'random':
return [self.connection_pool.nodes.random_node()]
elif node_flag == 'all-masters':
return self.connection_pool.nodes.all_masters()
elif node_flag == 'all-nodes':
return self.connection_pool.nodes.all_nodes()
elif node_flag == 'slot-id':
return [self.connection_pool.nodes.node_from_slot(args[1])]
elif node_flag == 'pubsub':
return [self.connection_pool.nodes.pubsub_node]
else:
return None

@clusterdown_wrapper
def execute_command(self, *args, **kwargs):
"""
Expand All @@ -228,8 +266,9 @@ def execute_command(self, *args, **kwargs):

command = args[0]

if command in self.nodes_callbacks:
return self._execute_command_on_nodes(self.nodes_callbacks[command](self, command), *args, **kwargs)
node = self.determine_node(*args, **kwargs)
if node:
return self._execute_command_on_nodes(node, *args, **kwargs)

# If set externally we must update it before calling any commands
if self.refresh_table_asap:
Expand All @@ -239,7 +278,6 @@ def execute_command(self, *args, **kwargs):
redirect_addr = None
asking = False

command = args[0]
try_random_node = False
slot = self._determine_slot(*args)
ttl = int(self.RedisClusterRequestTTL)
Expand Down Expand Up @@ -330,6 +368,100 @@ def _execute_command_on_nodes(self, nodes, *args, **kwargs):

return self._merge_result(command, res)

##########
# Cluster management commands

# Send to specefied node
def cluster_addslots(self, node_id, *slots):
"""Assign new hash slots to receiving node"""
return self.execute_command('CLUSTER ADDSLOTS', *slots, node_id=node_id)

# Send to node based on slot_id
def cluster_countkeysinslot(self, slot_id):
"""Return the number of local keys in the specified hash slot"""
return self.execute_command('CLUSTER COUNTKEYSINSLOT', slot_id)

# Send to specefied node
def cluster_count_failure_report(self, node_id):
"""Return the number of failure reports active for a given node"""
return self.execute_command('CLUSTER COUNT-FAILURE-REPORTS', node_id=node_id)

# Send to specefied node
def cluster_delslots(self, node_id, *slots):
"""Set hash slots as unbound in receiving node"""
return self.execute_command('CLUSTER DELSLOTS', *slots, node_id=node_id)

# Send to specefied node
def cluster_failover(self, node_id, option):
"""Forces a slave to perform a manual failover of its master."""
assert option.upper() in ('FORCE', 'TAKEOVER') # TODO: change this option handling
return self.execute_command('CLUSTER FAILOVER', Token(option))

# Send to random node
def cluster_info(self):
"""Provides info about Redis Cluster node state"""
return self.execute_command('CLUSTER INFO')

# Send to random node
def cluster_keyslot(self, name):
"""Returns the hash slot of the specified key"""
return self.execute_command('CLUSTER KEYSLOT', name)

# Send to specefied node
def cluster_meet(self, node_id, host, port):
"""Force a node cluster to handshake with another node"""
return self.execute_command('CLUSTER MEET', host, port, node_id=node_id)

# Send to random node
def cluster_nodes(self):
"""Force a node cluster to handshake with another node"""
return self.execute_command('CLUSTER NODES')

# Send to specefied node
def cluster_replicate(self, target_node_id):
"""Reconfigure a node as a slave of the specified master node"""
return self.execute_command('CLUSTER REPLICATE', target_node_id)

# Send to specific node
def cluster_reset(self, node_id, soft=True):
"""
Reset a Redis Cluster node.
If 'soft' is True then it will send 'SOFT' argument
If 'soft' is False then it will send 'HARD' argument
"""
return self.execute_command('CLUSTER RESET', Token('SOFT' if soft else 'HARD'), node_id=node_id)

# Send to all nodes
def cluster_save_config(self):
"""Forces the node to save cluster state on disk"""
return self.execute_command('CLUSTER SAVECONFIG')

# Send to specefied node_id
def cluster_set_config_epoch(self, node_id, epoch):
"""Set the configuration epoch in a new node"""
return self.execute_command('CLUSTER SET-CONFIG-EPOCH', epoch, node_id=node_id)

# Send to specefied node_id
def cluster_setslot(self, node_id, slot_id, state, bind_to_node_id=None):
"""Bind an hash slot to a specific node"""
if state.upper() in ('IMPORTING', 'MIGRATING', 'NODE'):
if node_id is not None:
return self.execute_command('CLUSTER SETSLOT', slot_id, Token(state), node_id)
elif state.upper() == 'STABLE':
return self.execute_command('CLUSTER SETSLOT', slot_id, Token('STABLE'))
else:
raise RedisError('Invalid slot state: %s' % state)

# Specefied node
def cluster_slaves(self, target_node_id):
"""Force a node cluster to handshake with another node"""
return self.execute_command('CLUSTER SLAVES', target_node_id)

# Random node
def cluster_slots(self):
"""Get array of Cluster slot to node mappings"""
return self.execute_command('CLUSTER SLOTS')

##########
# All methods that must have custom implementation

Expand Down Expand Up @@ -888,7 +1020,6 @@ def pipeline(self, transaction=True, shard_hint=None):
startup_nodes=self.connection_pool.nodes.startup_nodes,
refresh_table_asap=self.refresh_table_asap,
nodes_callbacks=self.nodes_callbacks,
result_callbacks=self.result_callbacks,
response_callbacks=self.response_callbacks
)

Expand Down
5 changes: 5 additions & 0 deletions rediscluster/nodemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ def keyslot(self, key):

return crc16(k) % self.RedisClusterHashSlots

def node_from_slot(self, slot):
for node in self.slots[slot]:
if node['server_type'] == 'master':
return node

def all_nodes(self):
for node in self.nodes.values():
yield node
Expand Down
6 changes: 2 additions & 4 deletions rediscluster/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ class StrictClusterPipeline(StrictRedisCluster):
"""
"""

def __init__(self, connection_pool, nodes_callbacks=None, result_callbacks=None,
response_callbacks=None, startup_nodes=None, refresh_table_asap=False,
reinitialize_steps=None):
def __init__(self, connection_pool, result_callbacks=None, reinitialize_steps=None,
response_callbacks=None, startup_nodes=None, refresh_table_asap=False):
self.command_stack = []
self.connection_pool = connection_pool
self.nodes_callbacks = nodes_callbacks
self.refresh_table_asap = refresh_table_asap
self.reinitialize_counter = 0
self.reinitialize_steps = reinitialize_steps or 25
Expand Down

0 comments on commit 5e827eb

Please sign in to comment.