Skip to content

Commit

Permalink
Merge pull request #99 from IlyaSkriblovsky/implicit-pipelining
Browse files Browse the repository at this point in the history
Implicit pipelining of all commands except blocking ones
  • Loading branch information
IlyaSkriblovsky committed Jan 18, 2016
2 parents fe88916 + 578c117 commit b30aa2d
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 14 deletions.
26 changes: 20 additions & 6 deletions README.md
Expand Up @@ -538,11 +538,25 @@ normally return just an ``OK``.

### Pipelining ###

Redis supports [pipelining](http://redis.io/topics/pipelining) multiple commands
at once to improve performance. Currently, pipelining is NOT supported
on sharded connections.

To execute commands in a pipeline:
txredisapi automatically [pipelines](http://redis.io/topics/pipelining) all commands
by sending next commands without waiting for the previous one to receive reply from
server. This works even on single connections and increases performance by reducing
number of round-trip delays and. There are two exceptions, though:
- no commands will be sent after blocking `blpop`, `brpop` or `brpoplpush` until
response is received;
- transaction by `multi`/`commit` are also blocking connection making all other
commands to wait until transaction is executed.

When you need to load tons of data to Redis it might be more effective to sent
commands in batches grouping them together offline to save on TCP packets and network
stack overhead. You can do this using `pipeline` method to explicitly accumulate
commands and send them to server in a single batch. Be careful to not accumulate too
many commands: unreasonable batch size may eat up unexpected amount of memory on both
client and server side. Group commands in batches of, for example, 10k commands instead
of sending all your data at once. The speed will be nearly the same, but the additional
memory used will be at max the amount needed to queue this 10k commands

To send commands in a batch:

#!/usr/bin/env python
# coding: utf-8
Expand All @@ -556,7 +570,7 @@ To execute commands in a pipeline:
def main():
rc = yield redis.ConnectionPool()

# Start pipeline
# Start grouping commands
pipeline = yield rc.pipeline()

pipeline.set("foo", 123)
Expand Down
68 changes: 68 additions & 0 deletions tests/test_implicit_pipelining.py
@@ -0,0 +1,68 @@
# coding: utf-8
# Copyright 2015 Ilya Skriblovsky
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer
from twisted.trial import unittest

import txredisapi as redis

from tests.mixins import REDIS_HOST, REDIS_PORT

cnt_LineReceiver = 0
cnt_HiredisProtocol = 0


class _CallCounter(object):
def __init__(self, original):
self.call_count = 0
self.original = original

def get_callee(self):
def callee(this, *args, **kwargs):
self.call_count += 1
return self.original(this, *args, **kwargs)
return callee


class TestImplicitPipelining(unittest.TestCase):
KEY = 'txredisapi:key'
VALUE = 'txredisapi:value'

@defer.inlineCallbacks
def testImplicitPipelining(self):
"""
Calling query method several times in a row without waiting should
do implicit pipelining, so all requests are send immediately and
all replies are received in one chunk with high probability
"""
db = yield redis.Connection(REDIS_HOST, REDIS_PORT, reconnect=False)

cnt_LineReceiver = _CallCounter(redis.LineReceiver.dataReceived)
self.patch(redis.LineReceiver, 'dataReceived',
cnt_LineReceiver.get_callee())
cnt_HiredisProtocol = _CallCounter(redis.HiredisProtocol.dataReceived)
self.patch(redis.HiredisProtocol, 'dataReceived',
cnt_HiredisProtocol.get_callee())

for i in range(5):
db.set(self.KEY, self.VALUE)

yield db.get(self.KEY)

total_data_chunks = cnt_LineReceiver.call_count + \
cnt_HiredisProtocol.call_count
self.assertEqual(total_data_chunks, 1)

yield db.disconnect()
23 changes: 15 additions & 8 deletions txredisapi.py
Expand Up @@ -221,6 +221,11 @@ def _cancelGet(self, d):
self.waiting[i] = defer.Deferred()


def _blocking_command(method):
method._blocking = True
return method


class BaseRedisProtocol(LineReceiver, policies.TimeoutMixin):
"""
Redis client protocol.
Expand Down Expand Up @@ -945,6 +950,7 @@ def rpop(self, key):
"""
return self.execute_command("RPOP", key)

@_blocking_command
def blpop(self, keys, timeout=0):
"""
Blocking LPOP
Expand All @@ -957,6 +963,7 @@ def blpop(self, keys, timeout=0):
keys.append(timeout)
return self.execute_command("BLPOP", *keys)

@_blocking_command
def brpop(self, keys, timeout=0):
"""
Blocking RPOP
Expand All @@ -969,6 +976,7 @@ def brpop(self, keys, timeout=0):
keys.append(timeout)
return self.execute_command("BRPOP", *keys)

@_blocking_command
def brpoplpush(self, source, destination, timeout=0):
"""
Pop a value from a list, push it to another list and return
Expand Down Expand Up @@ -1843,25 +1851,24 @@ def wrapper(*args, **kwargs):

def callback(connection):
protocol_method = getattr(connection, method)
blocking = getattr(protocol_method, '_blocking', False)
try:
d = protocol_method(*args, **kwargs)
except:
self._factory.connectionQueue.put(connection)
raise

def put_back(reply):
def put_back(reply=None):
if not connection.inTransaction and \
not connection.pipelining:
self._factory.connectionQueue.put(connection)
return reply

def switch_to_errback(reply):
if isinstance(reply, Exception):
raise reply
return reply

d.addBoth(put_back)
d.addCallback(switch_to_errback)
if connection.inTransaction or connection.pipelining or \
blocking:
d.addBoth(put_back)
else:
put_back()
return d
d.addCallback(callback)
return d
Expand Down

0 comments on commit b30aa2d

Please sign in to comment.