Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Add watch, unwatch, mul method to pipeline #201

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 26 additions & 7 deletions rediscluster/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from redis import StrictRedis
from redis.exceptions import ConnectionError, RedisError, TimeoutError
from redis._compat import imap, unicode
from .utils import parse_del


ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, MovedError, AskError, TryAgainError)
Expand All @@ -34,7 +35,7 @@ def __init__(self, connection_pool, result_callbacks=None,
self.startup_nodes = startup_nodes if startup_nodes else []
self.nodes_flags = self.__class__.NODES_FLAGS.copy()
self.response_callbacks = dict_merge(response_callbacks or self.__class__.RESPONSE_CALLBACKS.copy(),
self.CLUSTER_COMMANDS_RESPONSE_CALLBACKS)
self.CLUSTER_COMMANDS_RESPONSE_CALLBACKS, {'DEL': parse_del})
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this? I do not see a reason for it elsewhere in the code.


def __repr__(self):
"""
Expand Down Expand Up @@ -162,6 +163,10 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T

# now that we know the name of the node ( it's just a string in the form of host:port )
# we can build a list of commands for each node.

if c.args[0] in ['MULTI', 'UNWATCH']:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments that is above this belongs to the line below node_name = node['name']. Move the comments down or move this new if case up above the comments.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should commands on a single node or all commands be unwatched if exception raised?

c.args = (c.args[0],)

node_name = node['name']
if node_name not in nodes:
nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node))
Expand Down Expand Up @@ -193,13 +198,19 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
# a mismatched result. (not just theoretical, I saw this happen on production x.x).
for n in nodes.values():
self.connection_pool.release(n.connection)
if n.connection in self.connection_pool._available_connections.get(node_name, []):
self.connection_pool._available_connections[node_name].remove(n.connection)

if self.connection_pool._created_connections_per_node.get(node_name, 0):
self.connection_pool._created_connections_per_node[node_name] -= 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, but should a connection be discarded directly? The connection pool then seems to be meaningless


# if the response isn't an exception it is a valid response from the node
# we're all done with that command, YAY!
# if we have more commands to attempt, we've run into problems.
# collect all the commands we are allowed to retry.
# (MOVED, ASK, or connection errors or timeout errors)
attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)

if attempt and allow_redirections:
# RETRY MAGIC HAPPENS HERE!
# send these remaing comamnds one at a time using `execute_command`
Expand Down Expand Up @@ -239,10 +250,13 @@ def _fail_on_redirect(self, allow_redirections):
if not allow_redirections:
raise RedisClusterException("ASK & MOVED redirection not allowed in this pipeline")

def multi(self):
def multi(self, *names):
"""
"""
raise RedisClusterException("method multi() is not implemented")
if not names:
raise RedisClusterException("MULTI command needs at least on name in this pipeline")

return self.execute_command('MULTI', *names)

def immediate_execute_command(self, *args, **options):
"""
Expand All @@ -262,12 +276,18 @@ def load_scripts(self):
def watch(self, *names):
"""
"""
raise RedisClusterException("method watch() is not implemented")
if not names:
raise RedisClusterException("WATCH command needs at least on name in this pipeline")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly change the error message to WATCH command needs at least one name when used in a pipeline


return self.execute_command('WATCH', *names)

def unwatch(self):
def unwatch(self, *names):
"""
"""
raise RedisClusterException("method unwatch() is not implemented")
if not names:
raise RedisClusterException("UNWATCH command needs at least on name in this pipeline")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly change the error message to UNWATCH command needs at least one name when used in a pipeline


return self.execute_command('UNWATCH', *names)

def script_load_for_pipeline(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -413,7 +433,6 @@ def read(self):
"""
connection = self.connection
for c in self.commands:

# if there is a result on this command, it means we ran into an exception
# like a connection error. Trying to parse a response on a connection that
# is no longer open will result in a connection error raised by redis-py.
Expand Down
10 changes: 10 additions & 0 deletions rediscluster/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,13 @@ def parse_pubsub_numsub(command, res, **options):
for channel, numsub in numsub_d.items():
ret_numsub.append((channel, numsub))
return ret_numsub


def parse_del(response, *args, **kwargs):

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blank line here. Remove blank line or add empty docstring

s = '{}'.format(response)

if s.isdigit():
return int(s)
else:
return s