Skip to content

Commit

Permalink
tcp reconnection tests
Browse files Browse the repository at this point in the history
  • Loading branch information
benmur committed Jan 4, 2013
1 parent f31dcda commit 98c43cd
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 14 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -121,9 +121,9 @@ Care has been taken to be as reliable as possible, because sending metrics shoul
- All API-visible data structures are immutable and concurrency-friendly
- Network writes are serialized through Akka actors
- Exceptions are ignored silently (only logged to the akka event bus)
- Failed connections are retried at most twice per second

Remaining items include :
- Add explicit unit tests for TCP reconnections (which already work thanks to Akka automatically respawning failed actors).
- Hybrid tcp/udp connection mode
- Retrying failed Writes after reconnecting (with a counter)
- Shutdown/closing
Expand Down
19 changes: 9 additions & 10 deletions src/main/scala/net/benmur/riemann/client/ReliableIO.scala
Expand Up @@ -3,24 +3,22 @@ package net.benmur.riemann.client
import java.io.{ DataInputStream, DataOutputStream }
import java.net.{ Socket, SocketAddress, SocketException }
import java.util.concurrent.atomic.AtomicLong

import scala.annotation.implicitNotFound

import com.aphyr.riemann.Proto

import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, OneForOneStrategy, Props }
import akka.actor.SupervisorStrategy.Restart
import akka.actor.SupervisorStrategy._
import akka.actor.actorRef2Scala
import akka.dispatch.{ Future, Promise }
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration.intToDurationInt
import akka.actor.ActorInitializationException

trait ReliableIO {
private val nClients = new AtomicLong(0L) // FIXME this should be more global

private[this] class ReliableConnectionActor(where: SocketAddress, factory: Reliable#SocketFactory, dispatcherId: Option[String])(implicit system: ActorSystem) extends Actor {
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 36000, withinTimeRange = 1 hour) { // This needs to be more reasonable
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second) { // This needs to be more reasonable
case _ => Restart
}

Expand Down Expand Up @@ -71,10 +69,10 @@ trait ReliableIO {
}

class TcpConnectionActor(where: SocketAddress, factory: Reliable#SocketFactory) extends Actor with ActorLogging {
val connection = factory(where)
val outputStream = new DataOutputStream(connection.outputStream)
val inputStream = new DataInputStream(connection.inputStream)
println("actor init")
lazy val connection = factory(where)
lazy val outputStream = new DataOutputStream(connection.outputStream)
lazy val inputStream = new DataInputStream(connection.inputStream)

def receive = {
case WriteBinary(ab) =>
try {
Expand All @@ -88,7 +86,8 @@ trait ReliableIO {
case e: SocketException => throw e
case exception =>
log.error(exception, "could not send or receive data")
sender ! Proto.Msg.newBuilder.setError(exception.getMessage).setOk(false).build
val message = Option(exception.getMessage) getOrElse "(no message)"
sender ! Proto.Msg.newBuilder.setError(message).setOk(false).build
}
}
}
Expand Down
67 changes: 64 additions & 3 deletions src/test/scala/net/benmur/riemann/client/ReliableIOTest.scala
Expand Up @@ -2,18 +2,17 @@ package net.benmur.riemann.client

import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream }
import java.net.SocketAddress

import org.scalamock.ProxyMockFactory
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers

import com.aphyr.riemann.Proto

import akka.actor.ActorSystem
import akka.dispatch.Await
import akka.testkit.CallingThreadDispatcher
import akka.util.duration.intToDurationInt
import java.io.InputStream
import java.net.SocketException

class ReliableIOTest extends FunSuite
with testingsupport.ImplicitActorSystem
Expand Down Expand Up @@ -104,4 +103,66 @@ class ReliableIOTest extends FunSuite
val resp = Await.result(respFuture, 1 second)
resp should be === Seq(event, event2)
}

test("reconnect in case of SocketException while reading") {
val inputStream = new InputStream {
override def read = throw new SocketException
}

val os = new ByteArrayOutputStream

val wrapper = mock[ConnectedSocketWrapper]
wrapper expects 'inputStream returning inputStream twice;
wrapper expects 'outputStream returning os twice

val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper]
socketFactory expects address returning wrapper twice

val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))

// TODO need to test that automatic resending works (two messages should be sent instead of one)
implicitly[SendAndExpectFeedback[Query, Iterable[EventPart], Reliable]].send(conn, Write(Query("true")))

implicitly[SendAndExpectFeedback[Query, Iterable[EventPart], Reliable]].send(conn, Write(Query("true")))

val queryData = protoMsgQuery.toByteArray
val out = os.toByteArray
val is = new ByteArrayInputStream(out)
val dis = new DataInputStream(is)

dis.readInt should be === queryData.length
val msg1 = Array.ofDim[Byte](queryData.length)
dis.readFully(msg1)
msg1 should be === queryData

// 2 messages were written because it crashed during the 1st response read, after writing
dis.readInt should be === queryData.length
val msg2 = Array.ofDim[Byte](queryData.length)
dis.readFully(msg2)
msg2 should be === queryData
}

test("reconnect in case of SocketException while connecting") {
val wrapper = mock[ConnectedSocketWrapper]
val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper]

val os = new ByteArrayOutputStream

socketFactory expects address throwing new SocketException once;
socketFactory expects address returning wrapper once;
wrapper expects 'inputStream returning new ByteArrayInputStream(Array.ofDim[Byte](0)) once;
wrapper expects 'outputStream returning os once

val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))

// TODO need to test that automatic resending works (two messages should be sent instead of one)
implicitly[SendAndExpectFeedback[Query, Iterable[EventPart], Reliable]].send(conn, Write(Query("true")))

implicitly[SendAndExpectFeedback[Query, Iterable[EventPart], Reliable]].send(conn, Write(Query("true")))

val out = os.toByteArray
val queryData = protoMsgQuery.toByteArray
new DataInputStream(new ByteArrayInputStream(out)).readInt should be === queryData.length
out.slice(4, out.length) should be === queryData
}
}

0 comments on commit 98c43cd

Please sign in to comment.