Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: andymccurdy/redis-py
base: master
...
head fork: andymccurdy/redis-py
compare: logging
Checking mergeability… Don't worry, you can still create the pull request.
  • 9 commits
  • 5 files changed
  • 0 commit comments
  • 2 contributors
View
9 redis/client/__init__.py
@@ -0,0 +1,9 @@
+import logging
+
+from redis.client.base import *
+
+log = logging.getLogger("redis")
+if log.isEnabledFor(logging.DEBUG):
+ from redis.client.debug import DebugClient as Redis
+ from redis.client.debug import DebugConnection as Connection
+ from redis.client.debug import DebugPipline as Pipeline
View
79 redis/client.py → redis/client/base.py
@@ -8,29 +8,6 @@
from redis.exceptions import ConnectionError, ResponseError, InvalidResponse, WatchError
from redis.exceptions import RedisError, AuthenticationError
-
-class ConnectionPool(threading.local):
- "Manages a list of connections on the local thread"
- def __init__(self):
- self.connections = {}
-
- def make_connection_key(self, host, port, db):
- "Create a unique key for the specified host, port and db"
- return '%s:%s:%s' % (host, port, db)
-
- def get_connection(self, host, port, db, password, socket_timeout):
- "Return a specific connection for the specified host, port and db"
- key = self.make_connection_key(host, port, db)
- if key not in self.connections:
- self.connections[key] = Connection(
- host, port, db, password, socket_timeout)
- return self.connections[key]
-
- def get_all_connections(self):
- "Return a list of all connection objects the manager knows about"
- return self.connections.values()
-
-
class Connection(object):
"Manages TCP communication to and from a Redis server"
def __init__(self, host='localhost', port=6379, db=0, password=None,
@@ -45,8 +22,11 @@ def __init__(self, host='localhost', port=6379, db=0, password=None,
def connect(self, redis_instance):
"Connects to the Redis server if not already connected"
- if self._sock:
- return
+ if not self._sock:
+ self._connect(redis_instance)
+
+ def _connect(self, redis_instance):
+ "Connects to the Redis server if not already connected"
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.socket_timeout)
@@ -68,8 +48,11 @@ def connect(self, redis_instance):
def disconnect(self):
"Disconnects from the Redis server"
- if self._sock is None:
- return
+ if self._sock is not None:
+ self._disconnect()
+
+ def _disconnect(self):
+ "Disconnects from the Redis server"
try:
self._sock.close()
except socket.error:
@@ -108,6 +91,30 @@ def read(self, length=None):
e.args[1])
return ''
+class ConnectionPool(threading.local):
+ "Manages a list of connections on the local thread"
+ def __init__(self, connection_class=Connection):
+ self.connection_class = connection_class
+ self.connections = {}
+
+ def make_connection_key(self, host, port, db):
+ "Create a unique key for the specified host, port and db"
+ return '%s:%s:%s' % (host, port, db)
+
+ def get_connection(self, host, port, db, password, socket_timeout):
+ "Return a specific connection for the specified host, port and db"
+ key = self.make_connection_key(host, port, db)
+ if key not in self.connections:
+ self.connections[key] = self.connection_class(
+ host, port, db, password, socket_timeout)
+ return self.connections[key]
+
+ def get_all_connections(self):
+ "Return a list of all connection objects the manager knows about"
+ return self.connections.values()
+
+
+
def list_or_args(command, keys, args):
# returns a single list combining keys and args
# if keys is not a list or args has items, issue a
@@ -317,6 +324,7 @@ def _execute_command(self, command_name, command, **options):
if self.subscribed and not subscription_command:
raise RedisError("Cannot issue commands other than SUBSCRIBE and "
"UNSUBSCRIBE while channels are open")
+ command = self._encode_command(command)
try:
self.connection.send(command, self)
if subscription_command:
@@ -328,14 +336,17 @@ def _execute_command(self, command_name, command, **options):
if subscription_command:
return None
return self.parse_response(command_name, **options)
+
+ def _encode_command(self, args):
+ cmds = ['$%s\r\n%s\r\n' % (len(enc_value), enc_value)
+ for enc_value in imap(self.encode, args)]
+ return '*%s\r\n%s' % (len(cmds), ''.join(cmds))
def execute_command(self, *args, **options):
"Sends the command to the redis server and returns it's response"
- cmds = ['$%s\r\n%s\r\n' % (len(enc_value), enc_value)
- for enc_value in imap(self.encode, args)]
return self._execute_command(
args[0],
- '*%s\r\n%s' % (len(cmds), ''.join(cmds)),
+ args,
**options
)
@@ -1407,10 +1418,10 @@ def _execute_command(self, command_name, command, **options):
def _execute_transaction(self, commands):
# wrap the commands in MULTI ... EXEC statements to indicate an
# atomic operation
- all_cmds = ''.join([c for _1, c, _2 in chain(
- (('', 'MULTI\r\n', ''),),
+ all_cmds = ''.join([self._encode_command(c) for _1, c, _2 in chain(
+ (('', ('MULTI',), ''),),
commands,
- (('', 'EXEC\r\n', ''),)
+ (('', ('EXEC',), ''),)
)])
self.connection.send(all_cmds, self)
# parse off the response for MULTI and all commands prior to EXEC
@@ -1436,7 +1447,7 @@ def _execute_transaction(self, commands):
def _execute_pipeline(self, commands):
# build up all commands into a single request to increase network perf
- all_cmds = ''.join([c for _1, c, _2 in commands])
+ all_cmds = ''.join([self._encode_command(c) for _1, c, _2 in commands])
self.connection.send(all_cmds, self)
data = []
for command_name, _, options in commands:
View
66 redis/client/debug.py
@@ -0,0 +1,66 @@
+import logging
+from redis.client.base import Connection, ConnectionPool, Redis, Pipeline
+
+log = logging.getLogger("redis")
+
+def repr_command(args):
+ "Represents a command as a string."
+ command = [args[0]]
+ if len(args) > 1:
+ command.extend(repr(x) for x in args[1:])
+ return ' '.join(command)
+
+class DebugConnection(Connection):
+ def _connect(self, redis_instance):
+ log.debug("connecting to %s:%d/%d", self.host, self.port, self.db)
+ super(DebugConnection, self)._connect(redis_instance)
+
+ def _disconnect(self):
+ log.debug("disconnecting from %s:%d/%d", self.host, self.port, self.db)
+ super(DebugConnection, self)._disconnect()
+
+
+class DebugClient(Redis):
+ def __init__(self, *args, **kwargs):
+ pool = kwargs.pop('connection_pool', None)
+ if not pool:
+ pool = ConnectionPool(connection_class=DebugConnection)
+ kwargs['connection_pool'] = pool
+ super(DebugClient, self).__init__(*args, **kwargs)
+
+ def _execute_command(self, command_name, command, **options):
+ log.debug(repr_command(command))
+ return super(DebugClient, self)._execute_command(
+ command_name, command, **options
+ )
+
+ def pipeline(self, transaction=True):
+ """
+ Return a new pipeline object that can queue multiple commands for
+ later execution. ``transaction`` indicates whether all commands
+ should be executed atomically. Apart from multiple atomic operations,
+ pipelines are useful for batch loading of data as they reduce the
+ number of back and forth network operations between client and server.
+ """
+ return DebugPipeline(
+ self.connection,
+ transaction,
+ self.encoding,
+ self.errors
+ )
+
+
+class DebugPipeline(Pipeline):
+ def _execute_transaction(self, commands):
+ log.debug("MULTI")
+ for command in commands:
+ log.debug("TRANSACTION> "+ repr_command(command[1]))
+ log.debug("EXEC")
+ return super(DebugPipeline, self)._execute_transaction(commands)
+
+ def _execute_pipeline(self, commands):
+ for command in commands:
+ log.debug("PIPELINE> " + repr_command(command[1]))
+ return super(DebugPipeline, self)._execute_pipeline(commands)
+
+
View
3  tests/__init__.py
@@ -1,5 +1,5 @@
import unittest
-from server_commands import ServerCommandsTestCase
+from server_commands import ServerCommandsTestCase, LoggingTestCase
from connection_pool import ConnectionPoolTestCase
from pipeline import PipelineTestCase
from lock import LockTestCase
@@ -10,4 +10,5 @@ def all_tests():
suite.addTest(unittest.makeSuite(ConnectionPoolTestCase))
suite.addTest(unittest.makeSuite(PipelineTestCase))
suite.addTest(unittest.makeSuite(LockTestCase))
+ suite.addTest(unittest.makeSuite(LoggingTestCase))
return suite
View
55 tests/server_commands.py
@@ -1,8 +1,11 @@
import redis
+from redis.client.debug import DebugClient
import unittest
import datetime
import threading
import time
+import logging
+import logging.handlers
from distutils.version import StrictVersion
class ServerCommandsTestCase(unittest.TestCase):
@@ -1258,3 +1261,55 @@ def test_binary_lists(self):
# check that it is possible to get list content by key name
for key in mapping.keys():
self.assertEqual(self.client.lrange(key, 0, -1), list(mapping[key]))
+
+class BufferingHandler(logging.handlers.BufferingHandler):
+
+ def __init__(self):
+ logging.handlers.BufferingHandler.__init__(self, None)
+
+ def shouldFlush(self, record):
+ return False
+
+class LoggingTestCase(unittest.TestCase):
+
+ def get_client(self):
+ return DebugClient(host='localhost', port=6379, db=9)
+
+ def setUp(self):
+ self.client = self.get_client()
+ self.client.flushdb()
+
+ self.log = logging.getLogger("redis")
+ self.log.setLevel(logging.DEBUG)
+ self.handler = BufferingHandler()
+ self.log.addHandler(self.handler)
+ self.buffer = self.handler.buffer
+
+ def tearDown(self):
+ self.client.flushdb()
+ for c in self.client.connection_pool.get_all_connections():
+ c.disconnect()
+
+ def test_command_logging(self):
+ self.client.get("foo")
+
+ self.assertEqual(len(self.buffer), 1)
+ self.assertEqual(self.buffer[0].msg, "GET 'foo'")
+
+ def test_command_logging_pipeline(self):
+ pipe = self.client.pipeline(transaction=False)
+ pipe.get("foo")
+ pipe.execute()
+
+ self.assertEqual(len(self.buffer), 1)
+ self.assertEqual(self.buffer[0].msg, "PIPELINE> GET 'foo'")
+
+ def test_command_logging_transaction(self):
+ txn = self.client.pipeline(transaction=True)
+ txn.get("foo")
+ txn.execute()
+
+ self.assertEqual(len(self.buffer), 3)
+ messages = [x.msg for x in self.buffer]
+ self.assertEqual(messages,
+ ["MULTI", "TRANSACTION> GET 'foo'", "EXEC"])

No commit comments for this range

Something went wrong with that request. Please try again.