diff --git a/README.md b/README.md index 1040bf2..e666cad 100644 --- a/README.md +++ b/README.md @@ -121,9 +121,9 @@ Care has been taken to be as reliable as possible, because sending metrics shoul - All API-visible data structures are immutable and concurrency-friendly - Network writes are serialized through Akka actors - Exceptions are ignored silently (only logged to the akka event bus) +- Failed connections are retried at most twice per second Remaining items include : -- Add explicit unit tests for TCP reconnections (which already work thanks to Akka automatically respawning failed actors). - Hybrid tcp/udp connection mode - Retrying failed Writes after reconnecting (with a counter) - Shutdown/closing diff --git a/src/main/scala/net/benmur/riemann/client/ReliableIO.scala b/src/main/scala/net/benmur/riemann/client/ReliableIO.scala index 1ae9cbf..6c0f541 100644 --- a/src/main/scala/net/benmur/riemann/client/ReliableIO.scala +++ b/src/main/scala/net/benmur/riemann/client/ReliableIO.scala @@ -3,24 +3,22 @@ package net.benmur.riemann.client import java.io.{ DataInputStream, DataOutputStream } import java.net.{ Socket, SocketAddress, SocketException } import java.util.concurrent.atomic.AtomicLong - import scala.annotation.implicitNotFound - import com.aphyr.riemann.Proto - import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, OneForOneStrategy, Props } -import akka.actor.SupervisorStrategy.Restart +import akka.actor.SupervisorStrategy._ import akka.actor.actorRef2Scala import akka.dispatch.{ Future, Promise } import akka.pattern.ask import akka.util.Timeout import akka.util.duration.intToDurationInt +import akka.actor.ActorInitializationException trait ReliableIO { private val nClients = new AtomicLong(0L) // FIXME this should be more global private[this] class ReliableConnectionActor(where: SocketAddress, factory: Reliable#SocketFactory, dispatcherId: Option[String])(implicit system: ActorSystem) extends Actor { - override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 36000, withinTimeRange = 1 hour) { // This needs to be more reasonable + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second) { // This needs to be more reasonable case _ => Restart } @@ -71,10 +69,10 @@ trait ReliableIO { } class TcpConnectionActor(where: SocketAddress, factory: Reliable#SocketFactory) extends Actor with ActorLogging { - val connection = factory(where) - val outputStream = new DataOutputStream(connection.outputStream) - val inputStream = new DataInputStream(connection.inputStream) - println("actor init") + lazy val connection = factory(where) + lazy val outputStream = new DataOutputStream(connection.outputStream) + lazy val inputStream = new DataInputStream(connection.inputStream) + def receive = { case WriteBinary(ab) => try { @@ -88,7 +86,8 @@ trait ReliableIO { case e: SocketException => throw e case exception => log.error(exception, "could not send or receive data") - sender ! Proto.Msg.newBuilder.setError(exception.getMessage).setOk(false).build + val message = Option(exception.getMessage) getOrElse "(no message)" + sender ! Proto.Msg.newBuilder.setError(message).setOk(false).build } } } diff --git a/src/test/scala/net/benmur/riemann/client/ReliableIOTest.scala b/src/test/scala/net/benmur/riemann/client/ReliableIOTest.scala index 5dab22b..3d77f44 100644 --- a/src/test/scala/net/benmur/riemann/client/ReliableIOTest.scala +++ b/src/test/scala/net/benmur/riemann/client/ReliableIOTest.scala @@ -2,18 +2,17 @@ package net.benmur.riemann.client import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream } import java.net.SocketAddress - import org.scalamock.ProxyMockFactory import org.scalamock.scalatest.MockFactory import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers - import com.aphyr.riemann.Proto - import akka.actor.ActorSystem import akka.dispatch.Await import akka.testkit.CallingThreadDispatcher import akka.util.duration.intToDurationInt +import java.io.InputStream +import java.net.SocketException class ReliableIOTest extends FunSuite with testingsupport.ImplicitActorSystem @@ -104,4 +103,66 @@ class ReliableIOTest extends FunSuite val resp = Await.result(respFuture, 1 second) resp should be === Seq(event, event2) } + + test("reconnect in case of SocketException while reading") { + val inputStream = new InputStream { + override def read = throw new SocketException + } + + val os = new ByteArrayOutputStream + + val wrapper = mock[ConnectedSocketWrapper] + wrapper expects 'inputStream returning inputStream twice; + wrapper expects 'outputStream returning os twice + + val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper] + socketFactory expects address returning wrapper twice + + val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id)) + + // TODO need to test that automatic resending works (two messages should be sent instead of one) + implicitly[SendAndExpectFeedback[Query, Iterable[EventPart], Reliable]].send(conn, Write(Query("true"))) + + implicitly[SendAndExpectFeedback[Query, Iterable[EventPart], Reliable]].send(conn, Write(Query("true"))) + + val queryData = protoMsgQuery.toByteArray + val out = os.toByteArray + val is = new ByteArrayInputStream(out) + val dis = new DataInputStream(is) + + dis.readInt should be === queryData.length + val msg1 = Array.ofDim[Byte](queryData.length) + dis.readFully(msg1) + msg1 should be === queryData + + // 2 messages were written because it crashed during the 1st response read, after writing + dis.readInt should be === queryData.length + val msg2 = Array.ofDim[Byte](queryData.length) + dis.readFully(msg2) + msg2 should be === queryData + } + + test("reconnect in case of SocketException while connecting") { + val wrapper = mock[ConnectedSocketWrapper] + val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper] + + val os = new ByteArrayOutputStream + + socketFactory expects address throwing new SocketException once; + socketFactory expects address returning wrapper once; + wrapper expects 'inputStream returning new ByteArrayInputStream(Array.ofDim[Byte](0)) once; + wrapper expects 'outputStream returning os once + + val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id)) + + // TODO need to test that automatic resending works (two messages should be sent instead of one) + implicitly[SendAndExpectFeedback[Query, Iterable[EventPart], Reliable]].send(conn, Write(Query("true"))) + + implicitly[SendAndExpectFeedback[Query, Iterable[EventPart], Reliable]].send(conn, Write(Query("true"))) + + val out = os.toByteArray + val queryData = protoMsgQuery.toByteArray + new DataInputStream(new ByteArrayInputStream(out)).readInt should be === queryData.length + out.slice(4, out.length) should be === queryData + } } \ No newline at end of file