Permalink
Browse files

bugfix for twisted LineReceiver recursion

  • Loading branch information...
1 parent 41bbafa commit 5f9e76e71462d84943dd9ee9892b31d54a07b422 @fiorix committed Aug 8, 2012
Showing with 72 additions and 4 deletions.
  1. +72 −4 txredisapi.py
View
@@ -110,7 +110,75 @@ def append(self, item):
self.items.append(item)
-class RedisProtocol(basic.LineReceiver, policies.TimeoutMixin):
+class LineReceiver(protocol.Protocol, basic._PauseableMixin):
+ line_mode = 1
+ __buffer = ''
+ delimiter = '\r\n'
+ MAX_LENGTH = 16384
+
+ def clearLineBuffer(self):
+ b = self.__buffer
+ self.__buffer = ""
+ return b
+
+ def dataReceived(self, data, unpause=False):
+ if unpause is True:
+ if self.__buffer:
+ self.__buffer = data + self.__buffer
+ else:
+ self.__buffer += data
+
+ self.resumeProducing()
+ else:
+ self.__buffer = self.__buffer + data
+
+ while self.line_mode and not self.paused:
+ try:
+ line, self.__buffer = self.__buffer.split(self.delimiter, 1)
+ except ValueError:
+ if len(self.__buffer) > self.MAX_LENGTH:
+ line, self.__buffer = self.__buffer, ''
+ return self.lineLengthExceeded(line)
+ break
+ else:
+ linelength = len(line)
+ if linelength > self.MAX_LENGTH:
+ exceeded = line + self.__buffer
+ self.__buffer = ''
+ return self.lineLengthExceeded(exceeded)
+ why = self.lineReceived(line)
+ if why or self.transport and self.transport.disconnecting:
+ return why
+ else:
+ if not self.paused:
+ data = self.__buffer
+ self.__buffer = ''
+ if data:
+ return self.rawDataReceived(data)
+
+ def setLineMode(self, extra=''):
+ self.line_mode = 1
+ if extra:
+ self.pauseProducing()
+ reactor.callLater(0, self.dataReceived, extra, True)
+
+ def setRawMode(self):
+ self.line_mode = 0
+
+ def rawDataReceived(self, data):
+ raise NotImplementedError
+
+ def lineReceived(self, line):
+ raise NotImplementedError
+
+ def sendLine(self, line):
+ return self.transport.write(line + self.delimiter)
+
+ def lineLengthExceeded(self, line):
+ return self.transport.loseConnection()
+
+
+class RedisProtocol(LineReceiver, policies.TimeoutMixin):
"""
Redis client protocol.
"""
@@ -154,7 +222,7 @@ def connectionMade(self):
def connectionLost(self, why):
self.connected = 0
self.factory.delConnection(self)
- basic.LineReceiver.connectionLost(self, why)
+ LineReceiver.connectionLost(self, why)
while self.replyQueue.pending:
self.replyReceived(ConnectionError("Lost connection"))
@@ -244,7 +312,7 @@ def rawDataReceived(self, data):
bulk_buffer = "".join(self.bulk_buffer)[:-2]
self.bulk_buffer = []
self.bulkDataReceived(bulk_buffer)
- reactor.callLater(0, self.setLineMode, extra=rest)
+ self.setLineMode(extra=rest)
def bulkDataReceived(self, data):
"""
@@ -1767,4 +1835,4 @@ def lazyShardedUnixConnectionPool(paths, dbid=None, poolsize=10,
]
__author__ = "Alexandre Fiori"
-__version__ = version = "0.8"
+__version__ = version = "0.9"

0 comments on commit 5f9e76e

Please sign in to comment.