Skip to content

Commit

Permalink
Add tests for Client.Batcher.
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-stripe committed Mar 28, 2018
1 parent 98f917c commit bfc7d7d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 6 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ resolvers ++= Seq("snapshots", "releases").map(Resolver.sonatypeRepo)
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.3" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.3" % "test"
libraryDependencies += "com.thesamet.scalapb" %% "scalapb-runtime" % "0.7.0-rc6"
libraryDependencies += "org.scalacheck" %% "scalacheck" % "1.13.4"

releasePublishArtifactsAction := PgpKeys.publishSigned.value

Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/github/gphat/censorinus/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Client(

private[this] val batcher: Client.Batcher = maxBatchSize match {
case Some(bufSize) if maxQueueSize.isDefined =>
new Client.Batched(bufSize)
Client.Batched(bufSize)
case Some(bufSize) =>
log.warning(s"maxBatchSize=$bufSize ignored, because maxQueueSize is None")
Client.Unbatched
Expand Down Expand Up @@ -201,7 +201,7 @@ object Client {
}
}

final class Batched(bufferSize: Int) extends Batcher {
final case class Batched(bufferSize: Int) extends Batcher {
private[this] val buffer: ByteBuffer = ByteBuffer.allocate(bufferSize)
private[this] val encoder: CharsetEncoder = StandardCharsets.UTF_8.newEncoder()

Expand All @@ -221,15 +221,15 @@ object Client {
buffer.put('\n'.toByte)
encoder.encode(CharBuffer.wrap(lines.head), buffer, true) match {
case CoderResult.OVERFLOW =>
// On overflow, reset the buffer back to the last metric and
// flush it.
// On overflow, reset the buffer back to the last metric.
buffer.reset()
buffer.rewind()
cont = false
case CoderResult.UNDERFLOW =>
lines.next
}
}

buffer.flip()
f(buffer)
}
}
Expand Down
73 changes: 72 additions & 1 deletion src/test/scala/github/gphat/censorinus/ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package github.gphat.censorinus
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.{ LinkedBlockingQueue, TimeUnit }
import org.scalacheck.{Arbitrary, Gen}
import org.scalacheck.Arbitrary.arbitrary
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.scalatest._
import org.scalatest.concurrent.Eventually
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -39,7 +42,7 @@ class TestSender(val maxMessages: Int = Int.MaxValue) extends MetricSender {
def shutdown: Unit = ()
}

class ClientSpec extends FlatSpec with Matchers with Eventually {
class ClientSpec extends FlatSpec with Matchers with Eventually with GeneratorDrivenPropertyChecks {

"ClientSpec" should "deal with gauges" in {
val sender = new TestSender()
Expand Down Expand Up @@ -83,4 +86,72 @@ class ClientSpec extends FlatSpec with Matchers with Eventually {

client.queue.size should be (1)
}

private def batchAndDecode(batcher: Client.Batcher, lines: Iterator[String]): Vector[String] = {
var result = Vector.empty[String]
batcher.batch(lines) { buf =>
result = result :+ StandardCharsets.UTF_8.newDecoder().decode(buf).toString
}
result
}

val maxMetricLength: Int = 30

case class MetricLines(lines: Vector[String]) {
def iterator: Iterator[String] = lines.iterator
}

val shortLines: Gen[String] = arbitrary[String]
.map { s =>
// We want the final encoded size to be less than maxMetricLength, so
// we do a janky thing here.
Iterator.from(1)
.map { k =>
val len = s.length / k
s.substring(0, math.min(s.length, len))
}
.find(_.getBytes("utf-8").size < maxMetricLength)
.get
}

implicit val arbMetricLines: Arbitrary[MetricLines] =
Arbitrary(Gen.listOf(shortLines).map(_.toVector).map(MetricLines))

def batcher(newBatcher: => Client.Batcher) {
it should "batch metrics with new lines" in {
forAll { (lines: MetricLines) =>
assert(batchAndDecode(newBatcher, lines.iterator).mkString("\n") == lines.iterator.mkString("\n"))
}
}

it should "batch a single metric" in {
forAll(shortLines) { (line: String) =>
assert(batchAndDecode(newBatcher, Iterator.single(line)) == Vector(line))
}
}
}

"Client.Batched" should "batch multiple metrics into byte buffers" in {
val batcher = Client.Batched(10)
assert(batchAndDecode(batcher, Iterator("abc", "def", "ghi")) == Vector("abc\ndef", "ghi"))
assert(batchAndDecode(batcher, Iterator("abc", "de", "ghi")) == Vector("abc\nde\nghi"))
assert(batchAndDecode(batcher, Iterator("abc", "def", "ghi", "jk", "lm", "no")) == Vector("abc\ndef", "ghi\njk\nlm", "no"))
}

it should "truncate the first metric if it is too long" in {
val batcher = Client.Batched(11)
assert(batchAndDecode(batcher, Iterator("123456789011")) == Vector("12345678901"))
assert(batchAndDecode(batcher, Iterator("123456789011121314")) == Vector("12345678901"))
}

it should behave like batcher(Client.Batched(maxMetricLength))

"Client.Unbatched" should "not batch metrics" in {
forAll { (lines: Vector[String]) =>
val batcher = Client.Unbatched
assert(batchAndDecode(batcher, lines.iterator) == lines)
}
}

it should behave like batcher(Client.Batched(maxMetricLength))
}

0 comments on commit bfc7d7d

Please sign in to comment.