Permalink
Browse files

split the client into two pieces -- the normal client with no logging…

…, and a debug client with logging.
  • Loading branch information...
1 parent f26de8e commit b25b205f8e4ac612613be9f4a3d97f9353fd242d @andymccurdy committed Jan 10, 2011
Showing with 111 additions and 64 deletions.
  1. +9 −0 redis/client/__init__.py
  2. +34 −63 redis/{client.py → client/base.py}
  3. +66 −0 redis/client/debug.py
  4. +2 −1 tests/server_commands.py
@@ -0,0 +1,9 @@
+import logging
+
+from redis.client.base import *
+
+log = logging.getLogger("redis")
+if log.isEnabledFor(logging.DEBUG):
+ from redis.client.debug import DebugClient as Redis
+ from redis.client.debug import DebugConnection as Connection
+ from redis.client.debug import DebugPipline as Pipeline
@@ -1,6 +1,5 @@
import datetime
import errno
-import logging
import socket
import threading
import time
@@ -9,39 +8,6 @@
from redis.exceptions import ConnectionError, ResponseError, InvalidResponse, WatchError
from redis.exceptions import RedisError, AuthenticationError
-try:
- NullHandler = logging.NullHandler
-except AttributeError:
- class NullHandler(logging.Handler):
- def emit(self, record): pass
-
-log = logging.getLogger("redis")
-# Add a no-op handler to avoid error messages if the importing module doesn't
-# configure logging.
-log.addHandler(NullHandler())
-
-class ConnectionPool(threading.local):
- "Manages a list of connections on the local thread"
- def __init__(self):
- self.connections = {}
-
- def make_connection_key(self, host, port, db):
- "Create a unique key for the specified host, port and db"
- return '%s:%s:%s' % (host, port, db)
-
- def get_connection(self, host, port, db, password, socket_timeout):
- "Return a specific connection for the specified host, port and db"
- key = self.make_connection_key(host, port, db)
- if key not in self.connections:
- self.connections[key] = Connection(
- host, port, db, password, socket_timeout)
- return self.connections[key]
-
- def get_all_connections(self):
- "Return a list of all connection objects the manager knows about"
- return self.connections.values()
-
-
class Connection(object):
"Manages TCP communication to and from a Redis server"
def __init__(self, host='localhost', port=6379, db=0, password=None,
@@ -56,10 +22,11 @@ def __init__(self, host='localhost', port=6379, db=0, password=None,
def connect(self, redis_instance):
"Connects to the Redis server if not already connected"
- if self._sock:
- return
- if log_enabled(log):
- log.debug("connecting to %s:%d/%d", self.host, self.port, self.db)
+ if not self._sock:
+ self._connect(redis_instance)
+
+ def _connect(self, redis_instance):
+ "Connects to the Redis server if not already connected"
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.socket_timeout)
@@ -81,10 +48,11 @@ def connect(self, redis_instance):
def disconnect(self):
"Disconnects from the Redis server"
- if self._sock is None:
- return
- if log_enabled(log):
- log.debug("disconnecting from %s:%d/%d", self.host, self.port, self.db)
+ if self._sock is not None:
+ self._disconnect()
+
+ def _disconnect(self):
+ "Disconnects from the Redis server"
try:
self._sock.close()
except socket.error:
@@ -123,6 +91,30 @@ def read(self, length=None):
e.args[1])
return ''
+class ConnectionPool(threading.local):
+ "Manages a list of connections on the local thread"
+ def __init__(self, connection_class=Connection):
+ self.connection_class = connection_class
+ self.connections = {}
+
+ def make_connection_key(self, host, port, db):
+ "Create a unique key for the specified host, port and db"
+ return '%s:%s:%s' % (host, port, db)
+
+ def get_connection(self, host, port, db, password, socket_timeout):
+ "Return a specific connection for the specified host, port and db"
+ key = self.make_connection_key(host, port, db)
+ if key not in self.connections:
+ self.connections[key] = self.connection_class(
+ host, port, db, password, socket_timeout)
+ return self.connections[key]
+
+ def get_all_connections(self):
+ "Return a list of all connection objects the manager knows about"
+ return self.connections.values()
+
+
+
def list_or_args(command, keys, args):
# returns a single list combining keys and args
# if keys is not a list or args has items, issue a
@@ -163,17 +155,6 @@ def dict_merge(*dicts):
[merged.update(d) for d in dicts]
return merged
-def log_enabled(log, level=logging.DEBUG):
- return log.isEnabledFor(log, level)
-
-def repr_command(args):
- "Represents a command as a string."
- command = [args[0]]
- if len(args) > 1:
- command.extend(repr(x) for x in args[1:])
-
- return ' '.join(command)
-
def parse_info(response):
"Parse the result of Redis's INFO command into a Python dict"
info = {}
@@ -343,8 +324,6 @@ def _execute_command(self, command_name, command, **options):
if self.subscribed and not subscription_command:
raise RedisError("Cannot issue commands other than SUBSCRIBE and "
"UNSUBSCRIBE while channels are open")
- if log_enabled(log):
- log.debug(repr_command(command))
command = self._encode_command(command)
try:
self.connection.send(command, self)
@@ -1444,11 +1423,6 @@ def _execute_transaction(self, commands):
commands,
(('', ('EXEC',), ''),)
)])
- if log_enabled(log):
- log.debug("MULTI")
- for command in commands:
- log.debug("TRANSACTION> "+ repr_command(command[1]))
- log.debug("EXEC")
self.connection.send(all_cmds, self)
# parse off the response for MULTI and all commands prior to EXEC
for i in range(len(commands)+1):
@@ -1474,9 +1448,6 @@ def _execute_transaction(self, commands):
def _execute_pipeline(self, commands):
# build up all commands into a single request to increase network perf
all_cmds = ''.join([self._encode_command(c) for _1, c, _2 in commands])
- if log_enabled(log):
- for command in commands:
- log.debug("PIPELINE> " + repr_command(command[1]))
self.connection.send(all_cmds, self)
data = []
for command_name, _, options in commands:
View
@@ -0,0 +1,66 @@
+import logging
+from redis.client.base import Connection, ConnectionPool, Redis, Pipeline
+
+log = logging.getLogger("redis")
+
+def repr_command(args):
+ "Represents a command as a string."
+ command = [args[0]]
+ if len(args) > 1:
+ command.extend(repr(x) for x in args[1:])
+ return ' '.join(command)
+
+class DebugConnection(Connection):
+ def _connect(self, redis_instance):
+ log.debug("connecting to %s:%d/%d", self.host, self.port, self.db)
+ super(DebugConnection, self)._connect(redis_instance)
+
+ def _disconnect(self):
+ log.debug("disconnecting from %s:%d/%d", self.host, self.port, self.db)
+ super(DebugConnection, self)._disconnect()
+
+
+class DebugClient(Redis):
+ def __init__(self, *args, **kwargs):
+ pool = kwargs.pop('connection_pool', None)
+ if not pool:
+ pool = ConnectionPool(connection_class=DebugConnection)
+ kwargs['connection_pool'] = pool
+ super(DebugClient, self).__init__(*args, **kwargs)
+
+ def _execute_command(self, command_name, command, **options):
+ log.debug(repr_command(command))
+ return super(DebugClient, self)._execute_command(
+ command_name, command, **options
+ )
+
+ def pipeline(self, transaction=True):
+ """
+ Return a new pipeline object that can queue multiple commands for
+ later execution. ``transaction`` indicates whether all commands
+ should be executed atomically. Apart from multiple atomic operations,
+ pipelines are useful for batch loading of data as they reduce the
+ number of back and forth network operations between client and server.
+ """
+ return DebugPipeline(
+ self.connection,
+ transaction,
+ self.encoding,
+ self.errors
+ )
+
+
+class DebugPipeline(Pipeline):
+ def _execute_transaction(self, commands):
+ log.debug("MULTI")
+ for command in commands:
+ log.debug("TRANSACTION> "+ repr_command(command[1]))
+ log.debug("EXEC")
+ return super(DebugPipeline, self)._execute_transaction(commands)
+
+ def _execute_pipeline(self, commands):
+ for command in commands:
+ log.debug("PIPELINE> " + repr_command(command[1]))
+ return super(DebugPipeline, self)._execute_pipeline(commands)
+
+
@@ -1,4 +1,5 @@
import redis
+from redis.client.debug import DebugClient
import unittest
import datetime
import threading
@@ -1272,7 +1273,7 @@ def shouldFlush(self, record):
class LoggingTestCase(unittest.TestCase):
def get_client(self):
- return redis.Redis(host='localhost', port=6379, db=9)
+ return DebugClient(host='localhost', port=6379, db=9)
def setUp(self):
self.client = self.get_client()

0 comments on commit b25b205

Please sign in to comment.