Skip to content

Commit

Permalink
fix kestrel regression from memcache decoding changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusae committed Mar 18, 2011
1 parent 77dd110 commit a03db42
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import com.twitter.finagle.memcached.util.ParserUtils
import com.twitter.finagle.memcached.util.ChannelBufferUtils._

object AbstractDecoder {
private val DELIMITER = ChannelBuffers.wrappedBuffer("\r\n".getBytes)
private val SKIP_SPACE = 1
private val FIND_CRLF = new ChannelBufferIndexFinder() {
private val Delimiter = ChannelBuffers.wrappedBuffer("\r\n".getBytes)
private val DelimiterLength = Delimiter.capacity
private val FindCRLF = new ChannelBufferIndexFinder() {
def find(buffer: ChannelBuffer, guessedIndex: Int): Boolean = {
val enoughBytesForDelimeter = guessedIndex + DELIMITER.readableBytes
val enoughBytesForDelimeter = guessedIndex + Delimiter.readableBytes
if (buffer.writerIndex < enoughBytesForDelimeter) return false

val cr = buffer.getByte(guessedIndex)
val lf = buffer.getByte(guessedIndex + 1)
cr == '\r' && lf == '\n'
}
}
}
}

abstract class AbstractDecoder extends FrameDecoder {
Expand All @@ -38,12 +38,12 @@ abstract class AbstractDecoder extends FrameDecoder {
}

protected def decodeLine(buffer: ChannelBuffer, needsData: Seq[ChannelBuffer] => Option[Int])(continue: Seq[ChannelBuffer] => Decoding): Decoding = {
val frameLength = buffer.bytesBefore(FIND_CRLF)
val frameLength = buffer.bytesBefore(FindCRLF)
if (frameLength < 0) {
needMoreData
} else {
val frame = buffer.slice(buffer.readerIndex, frameLength)
buffer.skipBytes(frameLength + DELIMITER.capacity)
buffer.skipBytes(frameLength + DelimiterLength)

val tokens = frame.split
val bytesNeeded = needsData(tokens)
Expand All @@ -58,14 +58,14 @@ abstract class AbstractDecoder extends FrameDecoder {
}

protected def decodeData(bytesNeeded: Int, buffer: ChannelBuffer)(continue: ChannelBuffer => Decoding): Decoding = {
if (buffer.readableBytes < (bytesNeeded + DELIMITER.capacity))
if (buffer.readableBytes < (bytesNeeded + DelimiterLength))
needMoreData
else {
val lastTwoBytesInFrame = buffer.slice(bytesNeeded + buffer.readerIndex, DELIMITER.capacity)
if (!lastTwoBytesInFrame.equals(DELIMITER)) throw new ClientError("Missing delimiter")
val lastTwoBytesInFrame = buffer.slice(bytesNeeded + buffer.readerIndex, DelimiterLength)
if (!lastTwoBytesInFrame.equals(Delimiter)) throw new ClientError("Missing delimiter")

val data = buffer.slice(buffer.readerIndex, bytesNeeded)
buffer.skipBytes(bytesNeeded + DELIMITER.capacity)
buffer.skipBytes(bytesNeeded + DelimiterLength)

start()
// Copied rather than wrapped to avoid caching data outside the reader/writer mark.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,31 @@ object ChannelBufferUtils {
override def toString = buffer.toString(CharsetUtil.UTF_8)
def size = buffer.writerIndex() - buffer.readerIndex()

def split = {
def split: Seq[ChannelBuffer] =
split(FIND_SPACE, 1)

def split(delimiter: String): Seq[ChannelBuffer] =
split(stringToChannelBufferIndexFinder(delimiter), delimiter.size)

def split(indexFinder: ChannelBufferIndexFinder, delimiterLength: Int): Seq[ChannelBuffer] = {
val tokens = new ArrayBuffer[ChannelBuffer]
val skipDelimiter = 1
var scratch = buffer
while (scratch.capacity > 0) {
val tokenLength = scratch.bytesBefore(FIND_SPACE)
val tokenLength = scratch.bytesBefore(indexFinder)

if (tokenLength < 0) {
tokens += scratch.copy
scratch = scratch.slice(0, 0)
} else {
tokens += scratch.slice(0, tokenLength).copy
scratch = scratch.slice(
tokenLength + skipDelimiter,
scratch.capacity - tokenLength - skipDelimiter)
tokenLength + delimiterLength,
scratch.capacity - tokenLength - delimiterLength)
}
}
tokens
}

}

implicit def channelBufferToRichChannelBuffer(buffer: ChannelBuffer) =
Expand All @@ -54,4 +60,18 @@ object ChannelBufferUtils {

implicit def stringToByteArray(string: String) =
string.getBytes

implicit def stringToChannelBufferIndexFinder(string: String): ChannelBufferIndexFinder =
new ChannelBufferIndexFinder {
def find(buffer: ChannelBuffer, guessedIndex: Int): Boolean = {
val array = string.toArray
var i: Int = 0
while (i < string.size) {
if (buffer.getByte(guessedIndex + i) != array(i).toByte)
return false
i += 1
}
return true
}
}
}

0 comments on commit a03db42

Please sign in to comment.