Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

fix choice combinator, specialize not to only accept matcher

  • Loading branch information...
commit bc5a2ee12389dd1a378f1010e33034f69a57544e 1 parent f6ecd4a
Matt Freels authored
14 src/main/scala/com/twitter/finagle/memcached/Protocol.scala
@@ -36,10 +36,10 @@ object ResponseDecoder {
36 36
37 37 val readValue = {
38 38 val readKey = readTo(" ") // map { decodeUTF8String(_) }
39   - val readFlags = readTo(" ") map { bytes =>
40   - decodeFlags(decodeDecimalInt(bytes))
  39 + val readFlags = readTo(" ") into { bytes =>
  40 + lift(decodeDecimalInt(bytes)) map { decodeFlags(_) }
41 41 }
42   - val readLength = readUntil(" ", "\r\n") map { decodeDecimalInt(_) }
  42 + val readLength = readUntil(" ", "\r\n") into { b => lift(decodeDecimalInt(b)) }
43 43 val readCas = choice(
44 44 " " -> (readLine map { cas => Some(decodeDecimalInt(cas)) }),
45 45 "\r\n" -> success(None)
@@ -55,7 +55,7 @@ object ResponseDecoder {
55 55 }
56 56
57 57 val readValues = {
58   - val readRest = repsep(accept("VALUE ") append readValue, not(guard("END\r\n")))
  58 + val readRest = repsep(accept("VALUE ") append readValue, not("END\r\n"))
59 59
60 60 readValue flatMap { first =>
61 61 readRest map { rest => Values(first :: rest) }
@@ -67,7 +67,7 @@ object ResponseDecoder {
67 67 }
68 68
69 69 val readStats = {
70   - val readRest = repsep(accept("STAT ") append readStat, not(guard("END\r\n")))
  70 + val readRest = repsep(accept("STAT ") append readStat, not("END\r\n"))
71 71
72 72 readStat flatMap { first =>
73 73 readRest map { rest => Stats(first :: rest) }
@@ -96,7 +96,9 @@ object ResponseDecoder {
96 96 "VERSION " -> readVersion
97 97 )
98 98
99   - val readNumber = readLine map { bytes => Number(decodeDecimalInt(bytes)) }
  99 + val readNumber = readLine into { bytes =>
  100 + lift(decodeDecimalInt(bytes)) map { Number(_) }
  101 + }
100 102
101 103 val parser: Parser[Response] = readResponse or readNumber
102 104 }
10 src/main/scala/com/twitter/finagle/parser/incremental/BacktrackingParser.scala
@@ -17,23 +17,23 @@ class BacktrackingParser[+Out](inner: Parser[Out], offset: Int) extends Parser[O
17 17 // live with the warning.
18 18 inner.decode(buffer) match {
19 19 case r: Return[Out] => r
20   - case c: Continue[Out] => {
21   - if (c.next == inner && buffer.readerIndex == (start + offset)) {
  20 + case Continue(next) => {
  21 + if (next == inner && buffer.readerIndex == (start + offset)) {
22 22 buffer.readerIndex(start)
23 23 Continue(this)
24 24 } else {
25 25 val newOffset = buffer.readerIndex - start
26 26 buffer.readerIndex(start)
27   - Continue(new BacktrackingParser(c.next, newOffset))
  27 + Continue(new BacktrackingParser(next, newOffset))
28 28 }
29 29 }
30 30 case e: Fail => {
31 31 buffer.readerIndex(start)
32 32 e
33 33 }
34   - case e: Error => {
  34 + case Error(ex) => {
35 35 buffer.readerIndex(start)
36   - Fail(e.ex)
  36 + Fail(ex)
37 37 }
38 38 }
39 39 }
46 src/main/scala/com/twitter/finagle/parser/incremental/Parser.scala
@@ -7,10 +7,10 @@ import com.twitter.finagle.ParseException
7 7
8 8 // states: continue (wait), return, error
9 9
10   -sealed abstract class ParseResult[+Output]
  10 +sealed trait ParseResult[+Out]
11 11
12   -case class Continue[+T](next: Parser[T]) extends ParseResult[T]
13   -case class Return[+T](ret: T) extends ParseResult[T]
  12 +case class Continue[+Out](next: Parser[Out]) extends ParseResult[Out]
  13 +case class Return[+Out](ret: Out) extends ParseResult[Out]
14 14 case class Fail(ex: ParseException) extends ParseResult[Nothing]
15 15 case class Error(ex: ParseException) extends ParseResult[Nothing]
16 16
@@ -50,6 +50,12 @@ abstract class Parser[+Out] {
50 50 def map[T](f: Out => T): Parser[T] = this into { out => success(f(out)) }
51 51 }
52 52
  53 +
  54 +class LiftParser[+Out](r: ParseResult[Out]) extends Parser[Out] {
  55 + def decode(buffer: ChannelBuffer) = r
  56 +}
  57 +
  58 +
53 59 sealed abstract class CompoundParser[+Out] extends Parser[Out] {
54 60 override def hasNext = true
55 61
@@ -64,22 +70,16 @@ sealed abstract class CompoundParser[+Out] extends Parser[Out] {
64 70 step(this)
65 71 }
66 72
67   - protected[this] def end(r: ParseResult[Out]) = new ConstParser(r)
  73 + protected[this] def end(r: ParseResult[Out]) = new LiftParser(r)
68 74 }
69 75
70   -
71   -class ConstParser[+Out](r: ParseResult[Out]) extends Parser[Out] {
72   - def decode(buffer: ChannelBuffer) = r
73   -}
74   -
75   -
76 76 class AppendParser[+Out](parser: Parser[_], tail: Parser[Out]) extends CompoundParser[Out] {
77 77 override def decodeStep(buffer: ChannelBuffer) = parser.decode(buffer) match {
78   - case r: Return[_] => tail
79   - case c: Continue[_] => if (c.next == parser) {
  78 + case Return(_) => tail
  79 + case Continue(next) => if (next eq parser) {
80 80 end(Continue(this))
81 81 } else {
82   - end(Continue(new AppendParser(c.next, tail)))
  82 + end(Continue(new AppendParser(next, tail)))
83 83 }
84 84 case e: Fail => end(e)
85 85 case e: Error => end(e)
@@ -93,11 +93,11 @@ class AppendParser[+Out](parser: Parser[_], tail: Parser[Out]) extends CompoundP
93 93 class IntoParser[T, +Out](parser: Parser[T], f: T => Parser[Out])
94 94 extends CompoundParser[Out] {
95 95 override def decodeStep(buffer: ChannelBuffer) = parser.decode(buffer) match {
96   - case r: Return[T] => f(r.ret)
97   - case c: Continue[T] => if (c.next == parser) {
  96 + case Return(r) => f(r)
  97 + case Continue(next) => if (next eq parser) {
98 98 end(Continue(this))
99 99 } else {
100   - end(Continue(new IntoParser(c.next, f)))
  100 + end(Continue(new IntoParser(next, f)))
101 101 }
102 102 case e: Fail => end(e)
103 103 case e: Error => end(e)
@@ -113,17 +113,17 @@ extends CompoundParser[Out] {
113 113 val start = buffer.readerIndex
114 114
115 115 choice.decode(buffer) match {
116   - case r: Return[Out] => end(r)
  116 + case r: Return[Out] => end(r)
117 117 case e: Fail => if (committed || buffer.readerIndex != start) {
118 118 end(Error(e.ex))
119 119 } else {
120 120 tail
121 121 }
122   - case c: Continue[Out] => {
123   - if (c.next == choice && buffer.readerIndex == start) {
  122 + case Continue(next) => {
  123 + if ((next eq choice) && buffer.readerIndex == start) {
124 124 end(Continue(this))
125 125 } else {
126   - end(Continue(new OrParser(c.next, tail, committed || buffer.readerIndex != start)))
  126 + end(Continue(new OrParser(next, tail, committed || buffer.readerIndex != start)))
127 127 }
128 128 }
129 129 case e: Error => end(e)
@@ -155,13 +155,13 @@ class NotParser(parser: Parser[_]) extends Parser[Unit] {
155 155 Return(())
156 156 }
157 157 }
158   - case c: Continue[_] => {
  158 + case Continue(next) => {
159 159 if (buffer.readerIndex != start) {
160 160 error()
161   - } else if (c.next == parser) {
  161 + } else if (next == parser) {
162 162 Continue(this)
163 163 } else {
164   - Continue(new NotParser(c.next))
  164 + Continue(new NotParser(next))
165 165 }
166 166 }
167 167 case e: Error => e
117 src/main/scala/com/twitter/finagle/parser/incremental/Parsers.scala
@@ -6,69 +6,29 @@ import com.twitter.finagle.ParseException
6 6
7 7
8 8 object Parsers {
9   - def readTo(choices: String*) = {
10   - new ConsumingDelimiterParser(AlternateMatcher(choices))
11   - }
12   -
13   - def readUntil(choices: String*) = {
14   - new DelimiterParser(AlternateMatcher(choices))
15   - }
16   -
17   - val readLine = readTo("\r\n", "\n")
18   -
19   - def fail(ex: ParseException) = new ConstParser(Fail(ex))
20 9
21   - def error(ex: ParseException) = new ConstParser(Error(ex))
  10 + // lifting values
22 11
23   - def success[T](t: T) = new ConstParser(Return(t))
  12 + def fail(ex: ParseException) = new LiftParser(Fail(ex))
24 13
25   - def lift[T](f: => T): Parser[T] = {
26   - try {
27   - success(f)
28   - } catch {
29   - case e: ParseException => fail(e)
30   - }
31   - }
  14 + def error(ex: ParseException) = new LiftParser(Error(ex))
32 15
33   - def attempt[T](p: Parser[T]) = new BacktrackingParser(p)
  16 + def success[T](t: T) = new LiftParser(Return(t))
34 17
35 18 val unit = success(())
36 19
37   - def readBytes(size: Int) = new FixedBytesParser(size)
38   -
39   - def skipBytes(size: Int) = readBytes(size) append unit
40   -
41   - def accept(m: Matcher) = new ConsumingMatchParser(m)
42   -
43   - implicit def accept(choice: String): Parser[ChannelBuffer] = {
44   - accept(new DelimiterMatcher(choice))
45   - }
46   -
47   - def accept(choices: String*): Parser[ChannelBuffer] = {
48   - accept(AlternateMatcher(choices))
  20 + def lift[T](o: Option[T]): Parser[T] = o match {
  21 + case Some(r) => success(r)
  22 + case None => fail(new ParseException("Parse failed."))
49 23 }
50 24
51   - def guard(m: Matcher) = new MatchParser(m)
52   -
53   - def guard(choice: String): Parser[ChannelBuffer] = {
54   - guard(new DelimiterMatcher(choice))
55   - }
56 25
57   - def guard(choices: String*): Parser[ChannelBuffer] = {
58   - guard(AlternateMatcher(choices))
59   - }
  26 + // backtrack
60 27
61   - def not(m: Parser[Any]) = new NotParser(m)
  28 + def attempt[T](p: Parser[T]) = new BacktrackingParser(p)
62 29
63   - def choice[T](choices: (String, Parser[T])*) = {
64   - val (m, p) = choices.last
65   - val last: Parser[T] = accept(m) append p
66 30
67   - (choices.tail.reverse foldRight last) { (choice, rest) =>
68   - val (m, p) = choice
69   - (accept(m) append p) or rest
70   - }
71   - }
  31 + // repetition
72 32
73 33 def rep[T](p: Parser[T]): Parser[List[T]] = {
74 34 def go(): Parser[List[T]] = {
@@ -119,6 +79,63 @@ object Parsers {
119 79 }
120 80
121 81
  82 + // matching parsers
  83 +
  84 + def accept(m: Matcher) = new ConsumingMatchParser(m)
  85 +
  86 + implicit def accept(choice: String): Parser[ChannelBuffer] = {
  87 + accept(new DelimiterMatcher(choice))
  88 + }
  89 +
  90 + def accept(choices: String*): Parser[ChannelBuffer] = {
  91 + accept(AlternateMatcher(choices))
  92 + }
  93 +
  94 + def guard(m: Matcher) = new MatchParser(m)
  95 +
  96 + def guard(choice: String): Parser[ChannelBuffer] = {
  97 + guard(new DelimiterMatcher(choice))
  98 + }
  99 +
  100 + def guard(choices: String*): Parser[ChannelBuffer] = {
  101 + guard(AlternateMatcher(choices))
  102 + }
  103 +
  104 + def not(m: Matcher) = new MatchParser(new NotMatcher(m))
  105 +
  106 + def not(choice: String): Parser[ChannelBuffer] = {
  107 + not(new DelimiterMatcher(choice))
  108 + }
  109 +
  110 + def not(choices: String*): Parser[ChannelBuffer] = {
  111 + not(AlternateMatcher(choices))
  112 + }
  113 +
  114 +
  115 +
  116 + def choice[T](choices: (String, Parser[T])*): Parser[T] = {
  117 + val (m, p) = choices.first
  118 + val first: Parser[T] = accept(m) append p
  119 + val rest = choices.tail
  120 +
  121 + if (rest.isEmpty) first else first or choice(rest: _*)
  122 + }
  123 +
  124 + def readTo(choices: String*) = {
  125 + new ConsumingDelimiterParser(AlternateMatcher(choices))
  126 + }
  127 +
  128 + def readUntil(choices: String*) = {
  129 + new DelimiterParser(AlternateMatcher(choices))
  130 + }
  131 +
  132 + val readLine = readTo("\r\n", "\n")
  133 +
  134 +
  135 + // basic reading parsers
  136 +
  137 + def readBytes(size: Int) = new FixedBytesParser(size)
  138 +
122 139 private[parser] abstract class PrimitiveParser[Out] extends Parser[Out] {
123 140 protected val continue = Continue(this)
124 141 }
10 src/main/scala/com/twitter/finagle/parser/util/DecimalIntCodec.scala
@@ -44,11 +44,11 @@ object DecimalIntCodec {
44 44 }
45 45 }
46 46
47   - def decode(buf: ChannelBuffer): Int = {
  47 + def decode(buf: ChannelBuffer): Option[Int] = {
48 48 decode(buf, buf.readableBytes)
49 49 }
50 50
51   - def decode(buf: ChannelBuffer, numBytes: Int): Int = {
  51 + def decode(buf: ChannelBuffer, numBytes: Int): Option[Int] = {
52 52 val last = numBytes - 1
53 53 var i = last
54 54 var rv = 0
@@ -65,19 +65,19 @@ object DecimalIntCodec {
65 65 while (i >= lower) {
66 66 val c = buf.getByte(buf.readerIndex + i) - AsciiZero
67 67
68   - if (c < 0 || c > 9) throw new ParseException("byte out of bounds")
  68 + if (c < 0 || c > 9) return None
69 69 rv = rv + c * pow(10, last - i)
70 70 i = i - 1
71 71 }
72 72
73   - if (isNegative) rv * -1 else rv
  73 + if (isNegative) Some(rv * -1) else Some(rv)
74 74 }
75 75
76 76 // helpers
77 77
78 78 private def pow(x: Int, p: Int) = {
79 79 var rv = 1
80   - var j = 0
  80 + var j = 0
81 81
82 82 while (j < p) {
83 83 rv = rv * x
12 src/main/scala/com/twitter/finagle/parser/util/Matcher.scala
@@ -42,6 +42,8 @@ class DelimiterMatcher(delimiter: Array[Byte]) extends Matcher {
42 42
43 43 delimiter.length
44 44 }
  45 +
  46 + override def toString = "DelimiterMatcher("+ (new String(delimiter, "US-ASCII")) +")"
45 47 }
46 48
47 49 object AlternateMatcher {
@@ -73,3 +75,13 @@ class AlternateMatcher(delimiters: Array[Array[Byte]]) extends Matcher {
73 75 -1
74 76 }
75 77 }
  78 +
  79 +class NotMatcher(inner: Matcher) extends Matcher {
  80 + val bytesNeeded = inner.bytesNeeded
  81 +
  82 + def bytesMatching(buffer: ChannelBuffer, offset: Int): Int = {
  83 + if (buffer.writerIndex < offset + bytesNeeded) return -1
  84 +
  85 + if (inner.bytesMatching(buffer, offset) == -1) 0 else -1
  86 + }
  87 +}
6 src/main/scala/com/twitter/finagle/redis/protocol/Reply.scala
@@ -19,7 +19,7 @@ object ReplyDecoder {
19 19 import Reply._
20 20 import Parsers._
21 21
22   - private val readDecimalInt = readLine into { bytes => lift(decodeDecimalInt(bytes)) }
  22 + private val readDecimalInt = readLine into { b => lift(decodeDecimalInt(b)) }
23 23
24 24 private val readStatusReply = readLine map { Status(_) }
25 25
@@ -32,9 +32,7 @@ object ReplyDecoder {
32 32 success(Bulk(None))
33 33 } else {
34 34 readBytes(size) flatMap { bytes =>
35   - skipBytes(2) map { _ =>
36   - Bulk(Some(bytes))
37   - }
  35 + readBytes(2) append success(Bulk(Some(bytes)))
38 36 }
39 37 }
40 38 }
60 src/test/scala/com/twitter/finagle/parser/IncrementalParserSpec.scala
@@ -97,11 +97,6 @@ object ParserSpec extends ParserSpecification {
97 97 readBytes(7) map asString mustParse "aaaaaa" andContinue()
98 98 }
99 99
100   - "skipBytes" in {
101   - skipBytes(2) mustParse "abc" andReturn () readingBytes(2)
102   - skipBytes(4) mustParse "abc" andContinue()
103   - }
104   -
105 100 "accept" in {
106 101 val parser = Parsers.accept("$") append (readBytes(1) map asString)
107 102
@@ -232,37 +227,38 @@ object ParserSpec extends ParserSpecification {
232 227 }
233 228 }
234 229
235   - def time[T](f: => T) = {
236   - val s = System.currentTimeMillis
237   - f
238   - val e = System.currentTimeMillis
239   - e - s
240   - }
241 230
242   - "performance" in {
243   - val readInt = readLine map { bytes => decodeDecimalInt(bytes) }
244   - val readBulk = Parsers.accept("$") append (readInt into { length =>
245   - readBytes(length) into { bytes =>
246   - readBytes(2) append success(bytes)
247   - }
248   - })
  231 + // "performance" in {
  232 + // def time[T](f: => T) = {
  233 + // val s = System.currentTimeMillis
  234 + // f
  235 + // val e = System.currentTimeMillis
  236 + // e - s
  237 + // }
249 238
250   - val test1 = Parsers.accept("*") append (readInt into { count =>
251   - repN(count, readBulk)
252   - })
  239 + // val readInt = readLine map { bytes => decodeDecimalInt(bytes) }
  240 + // val readBulk = Parsers.accept("$") append (readInt into { length =>
  241 + // readBytes(length) into { bytes =>
  242 + // readBytes(2) append success(bytes)
  243 + // }
  244 + // })
253 245
254   - val count = 100
255   - val buf1 = ChannelBuffers.wrappedBuffer(("*"+count+"\r\n" + ("$6\r\nfoobar\r\n" * count)).getBytes)
  246 + // val test1 = Parsers.accept("*") append (readInt into { count =>
  247 + // repN(count, readBulk)
  248 + // })
256 249
257   - println(test1.decode(buf1))
  250 + // val count = 100
  251 + // val buf1 = ChannelBuffers.wrappedBuffer(("*"+count+"\r\n" + ("$6\r\nfoobar\r\n" * count)).getBytes)
258 252
259   - for (x <- 1 to 100) {
260   - val rv = time { for (i <- 1 to 100000) {
261   - buf1.resetReaderIndex
262   - test1.decode(buf1)
263   - } }
  253 + // println(test1.decode(buf1))
264 254
265   - println("test 1: "+ rv +" ("+ (rv / 100000.0) +")")
266   - }
267   - }
  255 + // for (x <- 1 to 100) {
  256 + // val rv = time { for (i <- 1 to 100000) {
  257 + // buf1.resetReaderIndex
  258 + // test1.decode(buf1)
  259 + // } }
  260 +
  261 + // println("test 1: "+ rv +" ("+ (rv / 100000.0) +")")
  262 + // }
  263 + // }
268 264 }

0 comments on commit bc5a2ee

Please sign in to comment.
Something went wrong with that request. Please try again.