Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
Refactor exception handling
Browse files Browse the repository at this point in the history
 - Simplified some exception handling and logic in client.py
 - New exception classes. They are in connection.ClusterParser.
   They are used in the code and can in some cases be raised to user code.
  • Loading branch information
Grokzen committed Jul 14, 2015
1 parent e709021 commit 44640c5
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 183 deletions.
3 changes: 3 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* Next release
* Refactored exception handling and exception classes.

* 1.0.0
* No change to anything just a bump to 1.0.0 because the lib is now considered stable/production ready.

Expand Down
99 changes: 24 additions & 75 deletions rediscluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import time

# rediscluster imports
from .connection import ClusterConnectionPool
from .exceptions import RedisClusterException, ClusterDownException
from .connection import ClusterConnectionPool, ClusterParser
from .exceptions import RedisClusterException
from .pubsub import ClusterPubSub
from .utils import (
string_keys_to_dict,
Expand Down Expand Up @@ -126,70 +126,6 @@ def __repr__(self):
servers.sort()
return "{}<{}>".format(type(self).__name__, ', '.join(servers))

def handle_cluster_command_exception(self, e):
"""
Given a exception object this method will look at the exception
message/args and determine what error was returned from redis.
Handles:
- CLUSTERDOWN: Disconnects the connection_pool and sets
refresh_table_asap to True.
- MOVED: Updates the slots cache with the new ip:port
- ASK: Returns a dict with ip:port where to connect to try again
"""
errv = StrictRedisCluster._exception_message(e)
if errv is None:
raise e

if errv.startswith('CLUSTERDOWN'):
# Drop all connection and reset the cluster slots/nodes
# on next attempt/command.
self.connection_pool.disconnect()
self.connection_pool.reset()
self.refresh_table_asap = True
return {'method': 'clusterdown'}

info = self.parse_redirection_exception_msg(errv)

if not info:
raise e

if info['action'] == "MOVED":
self.refresh_table_asap = True
node = self.connection_pool.nodes.set_node(info['host'], info['port'], server_type='master')
self.connection_pool.nodes.slots[info['slot']] = node
elif info['action'] == "ASK":
node = self.connection_pool.nodes.set_node(info['host'], info['port'], server_type='master')
return {'name': node['name'], 'method': 'ask'}

return {}

@staticmethod
def _exception_message(e):
"""
Returns the exception message from a exception object.
They are stored in different attributes depending on what
python version is used.
"""
errv = getattr(e, "args", None)
if not errv:
return getattr(e, "message", None)
return errv[0]

def parse_redirection_exception_msg(self, errv):
"""
Parse `errv` exception object for MOVED or ASK error and returns
a dict with host, port and slot data about what node to talk to.
"""
errv = errv.split(" ")

if errv[0] != "MOVED" and errv[0] != "ASK":
return None

a = errv[2].split(":")
return {"action": errv[0], "slot": int(errv[1]), "host": a[0], "port": int(a[1])}

def pubsub(self, **kwargs):
return ClusterPubSub(self.connection_pool, **kwargs)

Expand Down Expand Up @@ -270,18 +206,21 @@ def execute_command(self, *args, **kwargs):
self.connection_pool.nodes.initialize()
self.refresh_table_asap = False

action = {}
redirect_addr = None
asking = False

command = args[0]
try_random_node = False
slot = self._determine_slot(*args)
ttl = int(self.RedisClusterRequestTTL)

while ttl > 0:
ttl -= 1
if action.get("method", "") == "ask":
node = self.connection_pool.nodes.nodes[action['name']]

if asking:
node = self.connection_pool.nodes.nodes[redirect_addr]
r = self.connection_pool.get_connection_by_node(node)
elif try_random_node:
# TODO: Untested
r = self.connection_pool.get_random_connection()
try_random_node = False
else:
Expand All @@ -291,17 +230,27 @@ def execute_command(self, *args, **kwargs):
try:
r.send_command(*args)
return self.parse_response(r, command, **kwargs)

except (RedisClusterException, BusyLoadingError):
raise
except (ConnectionError, TimeoutError):
try_random_node = True
if ttl < self.RedisClusterRequestTTL / 2:
time.sleep(0.1)
except ResponseError as e:
action = self.handle_cluster_command_exception(e)
if action.get("method", "") == "clusterdown":
raise ClusterDownException()
except ClusterParser.ClusterDownError as e:
self.connection_pool.disconnect()
self.connection_pool.reset()
self.refresh_table_asap = True
raise e
except ClusterParser.MovedError as e:
self.refresh_table_asap = True
node = self.connection_pool.nodes.set_node(e.host, e.port, server_type='master')
self.connection_pool.nodes.slots[e.slot_id] = node
redirect_addr = "%s:%s" % (e.host, e.port)
except ClusterParser.TryAgainError as e:
print("TODO: TRY AGAIN ERROR...")
except ClusterParser.AskError as e:
node = self.connection_pool.nodes.set_node(e.host, e.port, server_type='master')
redirect_addr, asking = "%s:%s" % (e.host, e.port), True
finally:
self.connection_pool.release(r)

Expand Down
58 changes: 57 additions & 1 deletion rediscluster/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,69 @@
from .exceptions import RedisClusterException

# 3rd party imports
from redis.connection import ConnectionPool, Connection
from redis.client import dict_merge
from redis.connection import ConnectionPool, Connection, DefaultParser
from redis.exceptions import (
ResponseError, RedisError,
)


class ClusterParser(DefaultParser):
class ClusterError(RedisError):
pass

class ClusterCrossSlotError(ResponseError):
message = "Keys in request don't hash to the same slot"

class ClusterDownError(ClusterError, ResponseError):
def __init__(self, resp):
self.args = (resp, )
self.message = resp

class AskError(ResponseError):
"""
src node: MIGRATING to dst node
get > ASK error
ask dst node > ASKING command
dst node: IMPORTING from src node
asking command only affects next command
any op will be allowed after asking command
"""

def __init__(self, resp):
"""should only redirect to master node"""
self.args = (resp, )
self.message = resp
_, slot_id, new_node = resp.split(' ')
host, port = new_node.rsplit(':', 1)
self.slot_id = int(slot_id)
self.node_addr = self.host, self.port = host, int(port)

class TryAgainError(ResponseError):
def __init__(self, resp):
pass

class MovedError(AskError):
pass

EXCEPTION_CLASSES = dict_merge(
DefaultParser.EXCEPTION_CLASSES, {
'ASK': AskError,
'TRYAGAIN': TryAgainError,
'MOVED': MovedError,
'CLUSTERDOWN': ClusterDownError,
'CROSSSLOT': ClusterCrossSlotError,
})


class ClusterConnection(Connection):
"Manages TCP communication to and from a Redis server"
description_format = "ClusterConnection<host=%(host)s,port=%(port)s>"

def __init__(self, *args, **kwargs):
kwargs['parser_class'] = ClusterParser
super(ClusterConnection, self).__init__(*args, **kwargs)


class UnixDomainSocketConnection(Connection):
description_format = "ClusterUnixDomainSocketConnection<path=%(path)s>"
Expand Down
31 changes: 12 additions & 19 deletions rediscluster/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

# rediscluster imports
from .client import StrictRedisCluster
from .connection import by_node_context
from .exceptions import RedisClusterException, ClusterDownException
from .connection import by_node_context, ClusterParser
from .exceptions import RedisClusterException
from .utils import clusterdown_wrapper

# 3rd party imports
Expand Down Expand Up @@ -193,30 +193,23 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
time.sleep(0.1)
continue

errv = StrictRedisCluster._exception_message(v)
if errv is None:
continue

if errv.startswith('CLUSTERDOWN'):
# If cluster is down it should be raised and bubble up to
# utils.clusterdown_wrapper()
if isinstance(v, ClusterParser.ClusterDownError):
self.connection_pool.disconnect()
self.connection_pool.reset()
self.refresh_table_asap = True
raise ClusterDownException()

redir = self.parse_redirection_exception_msg(errv)

if not redir:
continue
raise v

if redir['action'] == "MOVED":
if isinstance(v, ClusterParser.MovedError):
self.refresh_table_asap = True
node = self.connection_pool.nodes.set_node(redir['host'], redir['port'], server_type='master')
self.connection_pool.nodes.slots[redir['slot']] = node
node = self.connection_pool.nodes.set_node(v.host, v.port, server_type='master')
self.connection_pool.nodes.slots[v.slot_id] = node
attempt.append(i)
self._fail_on_redirect(allow_redirections)
elif redir['action'] == "ASK":
node = self.connection_pool.nodes.set_node(redir['host'], redir['port'], server_type='master')
ask_slots[redir['slot']] = node
elif isinstance(v, ClusterParser.AskError):
node = self.connection_pool.nodes.set_node(v.host, v.port, server_type='master')
ask_slots[v.slot_id] = node
attempt.append(i)
self._fail_on_redirect(allow_redirections)

Expand Down
7 changes: 4 additions & 3 deletions rediscluster/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from socket import gethostbyaddr

# rediscluster imports
from .exceptions import RedisClusterException, ClusterDownException
from .exceptions import RedisClusterException
from .connection import ClusterParser


def is_dict(d):
Expand Down Expand Up @@ -83,13 +84,13 @@ def inner(*args, **kwargs):
for _ in range(0, 3):
try:
return func(*args, **kwargs)
except ClusterDownException:
except ClusterParser.ClusterDownError:
# Try again with the new cluster setup. All other errors
# should be raised.
pass

# If it fails 3 times then raise exception back to caller
raise ClusterDownException("CLUSTERDOWN error. Unable to rebuild the cluster")
raise ClusterParser.ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")
return inner


Expand Down

0 comments on commit 44640c5

Please sign in to comment.