Permalink
Browse files

Improve memcache pipelining code

Summary: Memcache pipeline is now efficient, and probably works.

Test Plan: HOLY SHIT THIS MAKES NO SENSE

Reviewers: jeff, ted, tail

Reviewed By: tail

Differential Revision: http://phabricator.local.disqus.net/D3815
  • Loading branch information...
dcramer committed Nov 1, 2012
1 parent 93c2a9c commit 0d7615f253d89555304c344847226367bcd439ab
Showing with 391 additions and 85 deletions.
  1. +153 −28 nydus/db/backends/memcache.py
  2. +3 −2 nydus/db/backends/redis.py
  3. +25 −24 nydus/db/map.py
  4. +2 −2 nydus/db/promise.py
  5. +9 −0 nydus/utils.py
  6. +199 −29 tests/nydus/db/backends/memcache/tests.py
@@ -1,6 +1,6 @@
"""
nydus.db.backends.memcache
-~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~~~~~~~~~~~~~~~~~~
:copyright: (c) 2012 DISQUS.
:license: Apache License 2.0, see LICENSE for more details.
@@ -10,8 +10,10 @@
import pylibmc
+from itertools import izip
from nydus.db.backends import BaseConnection, BasePipeline
from nydus.db.promise import EventualCommand
+from nydus.utils import peek
class Memcache(BaseConnection):
@@ -44,35 +46,158 @@ def get_pipeline(self, *args, **kwargs):
class MemcachePipeline(BasePipeline):
- def __init__(self, connection):
- self.pending = []
- self.connection = connection
-
- def add(self, command):
- # A feature of Memcache is a 'get_multi' command. Therefore we can merge
- # consecutive 'get' commands into one 'get_multi' command.
-
- # Need to merge this into one command
- name, args, kwargs = command.get_command()
- if name == 'get':
- if self.pending and self.pending[-1].get_name() == 'get_multi':
- prev_command = self.pending[-1]
- args = prev_command.get_args()
- args[0].append(command.get_args()[0])
- prev_command.set_args(args)
+ def execute(self):
+ grouped = regroup_commands(self.pending)
+ results = resolve_grouped_commands(grouped, self.connection)
+ return results
- else:
- key = command.get_args()[0]
- multi_command = EventualCommand('get_multi')
- multi_command([key])
- self.pending.append(multi_command)
+def grouped_args_for_command(command):
+ """
+ Returns a list of arguments that are shared for this command.
+
+ When comparing similar commands, these arguments represent the
+ groupable signature for said commands.
+ """
+ if command.get_name() == 'set':
+ return command.get_args()[2:]
+ return command.get_args()[1:]
+
+
+def grouped_command(commands):
+ """
+ Given a list of commands (which are assumed groupable), return
+ a new command which is a batch (multi) command.
+
+ For ``set`` commands the outcome will be::
+
+ set_multi({key: value}, **kwargs)
+
+ For ``get`` and ``delete`` commands, the outcome will be::
+
+ get_multi(list_of_keys, **kwargs)
+
+ (Or respectively ``delete_multi``)
+ """
+ base = commands[0]
+ name = base.get_name()
+ multi_command = EventualCommand('%s_multi' % name)
+ if name in ('get', 'delete'):
+ args = [c.get_args()[0] for c in commands]
+ elif base.get_name() == 'set':
+ args = dict(c.get_args()[0:2] for c in commands)
+ else:
+ raise ValueError('Command not supported: %r' % (base.get_name(),))
+
+ multi_command(args, *grouped_args_for_command(base), **base.get_kwargs())
+
+ return multi_command
+
+
+def can_group_commands(command, next_command):
+ """
+ Returns a boolean representing whether these commands can be
+ grouped together or not.
+
+ A few things are taken into account for this decision:
+
+ For ``set`` commands:
+
+ - Are all arguments other than the key/value the same?
+
+ For ``delete`` and ``get`` commands:
+
+ - Are all arguments other than the key the same?
+ """
+ multi_capable_commands = ('get', 'set', 'delete')
+
+ if next_command is None:
+ return False
+
+ name = command.get_name()
+
+ # TODO: support multi commands
+ if name not in multi_capable_commands:
+ return False
+
+ if name != next_command.get_name():
+ return False
+
+ # if the shared args (key, or key/value) do not match, we cannot group
+ if grouped_args_for_command(command) != grouped_args_for_command(next_command):
+ return False
+
+ # If the keyword arguments do not much (e.g. key_prefix, or timeout on set)
+ # then we cannot group
+ if command.get_kwargs() != next_command.get_kwargs():
+ return False
+
+ return True
+
+
+def regroup_commands(commands):
+ """
+ Returns a list of tuples:
+
+ [(command_to_run, [list, of, commands])]
+
+ If the list of commands has a single item, the command was not grouped.
+ """
+ grouped = []
+ pending = []
+
+ def group_pending():
+ if not pending:
+ return
+
+ new_command = grouped_command(pending)
+ result = []
+ while pending:
+ result.append(pending.pop(0))
+ grouped.append((new_command, result))
+
+ for command, next_command in peek(commands):
+ # if the previous command was a get, and this is a set we must execute
+ # any pending commands
+ # TODO: unless this command is a get_multi and it matches the same option
+ # signature
+ if can_group_commands(command, next_command):
+ # if previous command does not match this command
+ if pending and not can_group_commands(pending[0], command):
+ group_pending()
+
+ pending.append(command)
else:
- self.pending.append(command)
+ # if pending exists for this command, group it
+ if pending and can_group_commands(pending[0], command):
+ pending.append(command)
+ else:
+ grouped.append((command.clone(), [command]))
- def execute(self):
- ret = []
- for command in self.pending:
- ret.append(command.resolve(self.connection))
+ # We couldn't group with previous command, so ensure we bubble up
+ group_pending()
+
+ group_pending()
+
+ return grouped
+
+
+def resolve_grouped_commands(grouped, connection):
+ results = {}
+
+ for master_command, grouped_commands in grouped:
+ result = master_command.resolve(connection)
+
+ # this command was not grouped
+ if len(grouped_commands) == 1:
+ results[grouped_commands[0]] = result
+ else:
+ if isinstance(result, dict):
+ # XXX: assume first arg is key
+ for command in grouped_commands:
+ results[command] = result.get(command.get_args()[0])
+ else:
+ for command, value in izip(grouped_commands, result):
+ results[command] = value
- return ret
+ return results
@@ -8,6 +8,7 @@
from __future__ import absolute_import
+from itertools import izip
from redis import Redis as RedisClient
from redis import RedisError
@@ -21,13 +22,13 @@ def __init__(self, connection):
self.pipe = connection.pipeline()
def add(self, command):
- self.pending.append(command)
name, args, kwargs = command.get_command()
+ self.pending.append(command)
# ensure the command is executed in the pipeline
getattr(self.pipe, name)(*args, **kwargs)
def execute(self):
- return self.pipe.execute()
+ return dict(izip(self.pending, self.pipe.execute()))
class Redis(BaseConnection):
View
@@ -6,7 +6,6 @@
:license: Apache License 2.0, see LICENSE for more details.
"""
-from itertools import izip
from collections import defaultdict
from nydus.utils import ThreadPool
from nydus.db.exceptions import CommandError
@@ -57,25 +56,25 @@ def resolve(self):
pending_commands = self._build_pending_commands()
num_commands = sum(len(v) for v in pending_commands.itervalues())
- if num_commands == 0:
- self._commands = []
-
# Don't bother with the pooling if we only need to do one operation on a single machine
- elif num_commands == 1:
+ if num_commands == 1:
db_num, (command,) = pending_commands.items()[0]
self._commands = [command.resolve(self._cluster[db_num])]
- else:
+ elif num_commands > 1:
results = self.execute(self._cluster, pending_commands)
- for command, result in results.iteritems():
- for value in result:
- if isinstance(value, Exception):
- self._errors.append((command.get_name(), value))
+ for command in self._commands:
+ result = results.get(command)
+
+ if result:
+ for value in result:
+ if isinstance(value, Exception):
+ self._errors.append((command.get_name(), value))
- # XXX: single path routing (implicit) doesnt return a list
- if len(result) == 1:
- result = result[0]
+ # XXX: single path routing (implicit) doesnt return a list
+ if len(result) == 1:
+ result = result[0]
change_resolution(command, result)
@@ -153,22 +152,24 @@ def execute(self, cluster, commands):
pool.add(db_num, pipe.execute, (), {})
# Consolidate commands with their appropriate results
- result_map = pool.join()
+ db_result_map = pool.join()
- results = defaultdict(list)
# Results get grouped by their command signature, so we have to separate the logic
- for db_num, result in result_map.iteritems():
- # Pipelines always execute on a single database
- assert len(result) == 1
+ results = defaultdict(list)
- result = result[0]
+ for db_num, db_results in db_result_map.iteritems():
+ # Pipelines always execute on a single database
+ assert len(db_results) == 1
+ db_results = db_results[0]
- if isinstance(result, Exception):
+ # if pipe.execute (within nydus) fails, this will be an exception object
+ if isinstance(db_results, Exception):
for command in commands[db_num]:
- results[command].append(result)
- else:
- for command, value in izip(commands[db_num], result):
- results[command].append(value)
+ results[command].append(db_results)
+ continue
+
+ for command, result in db_results.iteritems():
+ results[command].append(result)
return results
View
@@ -45,12 +45,12 @@ def __init__(self, attr, args=None, kwargs=None):
self.__resolved = False
self.__args = args or []
self.__kwargs = kwargs or {}
- self.__ident = ':'.join(map(str, [id(self.__attr), id(self.__args), id(self.__kwargs)]))
+ self.__ident = ':'.join(map(lambda x: str(hash(str(x))), [self.__attr, self.__args, self.__kwargs]))
def __call__(self, *args, **kwargs):
self.__args = args
self.__kwargs = kwargs
- self.__ident = ':'.join(map(str, [id(self.__attr), id(self.__args), id(self.__kwargs)]))
+ self.__ident = ':'.join(map(lambda x: str(hash(str(x))), [self.__attr, self.__args, self.__kwargs]))
return self
def __hash__(self):
View
@@ -37,6 +37,15 @@ def apply_defaults(host, defaults):
return host
+def peek(value):
+ generator = iter(value)
+ prev = generator.next()
+ for item in generator:
+ yield prev, item
+ prev = item
+ yield prev, None
+
+
class Worker(Thread):
def __init__(self, queue):
Thread.__init__(self)
Oops, something went wrong.

0 comments on commit 0d7615f

Please sign in to comment.