Skip to content
This repository has been archived by the owner on Apr 24, 2024. It is now read-only.

Commit

Permalink
! can: introduce dedicated exceptions for connection failure and requ…
Browse files Browse the repository at this point in the history
…est timeout for host-level API
  • Loading branch information
Ian Forsey committed Aug 21, 2013
1 parent bfc23ce commit 4b48875
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
Expand Up @@ -203,7 +203,7 @@ class SprayCanClientSpec extends Specification {
val probe = TestProbe()
probe.send(IO(Http), Get("/abc") ~> Host(hostname, port))
acceptConnection()
probe.expectMsgType[Status.Failure].cause.getMessage must startWith("Request timeout")
probe.expectMsgType[Status.Failure].cause must beAnInstanceOf[Http.RequestTimeoutException]
}
}

Expand Down
10 changes: 9 additions & 1 deletion spray-can/src/main/scala/spray/can/Http.scala
Expand Up @@ -24,7 +24,7 @@ import akka.actor._
import spray.can.server.ServerSettings
import spray.can.client.{ HostConnectorSettings, ClientConnectionSettings }
import spray.io.{ ConnectionTimeouts, ClientSSLEngineProvider, ServerSSLEngineProvider }
import spray.http.{ HttpResponse, HttpRequest, HttpMessagePart, HttpMessagePartWrapper }
import spray.http._
import spray.util.actorSystem

object Http extends ExtensionKey[HttpExt] {
Expand Down Expand Up @@ -122,6 +122,14 @@ object Http extends ExtensionKey[HttpExt] {
case class MessageEvent(ev: HttpMessagePart) extends Event

case class HostConnectorInfo(hostConnector: ActorRef, setup: HostConnectorSetup) extends Event

// exceptions
class ConnectionException(message: String) extends RuntimeException(message)

class ConnectionAttemptFailedException(val host: String, val port: Int) extends ConnectionException(s"Connection attempt to $host:$port failed")

class RequestTimeoutException(val request: HttpRequestPart with HttpMessageStart, message: String)
extends ConnectionException(message)
}

class HttpExt(system: ExtendedActorSystem) extends akka.io.IO.Extension {
Expand Down
Expand Up @@ -82,7 +82,8 @@ private[client] class HttpHostConnectionSlot(host: String, port: Int,

case _: Http.CommandFailed
log.debug("Connection attempt failed")
openRequests foreach clear("Connection attempt failed", retry = false)
val error = new Http.ConnectionAttemptFailedException(host, port)
openRequests foreach clear(error, retry = false)
if (aborted.isEmpty) {
context.parent ! Disconnected(openRequests.size)
context.become(unconnected)
Expand Down Expand Up @@ -126,7 +127,7 @@ private[client] class HttpHostConnectionSlot(host: String, port: Int,

case ev @ Timedout(part)
log.debug("{} timed out, closing connection", format(part))
context.become(closing(httpConnection, openRequests, "Request timeout", retry = true))
context.become(closing(httpConnection, openRequests, new Http.RequestTimeoutException(part, format(part) + " timed out"), retry = true))

case cmd: Http.CloseCommand
httpConnection ! cmd
Expand All @@ -149,12 +150,15 @@ private[client] class HttpHostConnectionSlot(host: String, port: Int,
context.become(unconnected)
}

def closing(httpConnection: ActorRef, openRequests: Queue[RequestContext], errorMsg: String,
def closing(httpConnection: ActorRef, openRequests: Queue[RequestContext], error: String, retry: Boolean): Receive =
closing(httpConnection, openRequests, new Http.ConnectionException(error), retry)

def closing(httpConnection: ActorRef, openRequests: Queue[RequestContext], error: Http.ConnectionException,
retry: Boolean): Receive = {

case ev @ (_: Http.ConnectionClosed | Terminated(`httpConnection`))
context.parent ! Disconnected(openRequests.size)
openRequests foreach clear(errorMsg, retry)
openRequests foreach clear(error, retry)
context.unwatch(httpConnection)
context.become(unconnected)
}
Expand All @@ -164,14 +168,16 @@ private[client] class HttpHostConnectionSlot(host: String, port: Int,
case Terminated(`httpConnection`) context.stop(self)
}

def clear(errorMsg: String, retry: Boolean): RequestContext Unit = {
def clear(error: String, retry: Boolean): RequestContext Unit = clear(new Http.ConnectionException(error), retry)

def clear(error: Http.ConnectionException, retry: Boolean): RequestContext Unit = {
case ctx @ RequestContext(request, retriesLeft, _) if retry && request.canBeRetried && retriesLeft > 0
log.warning("{} in response to {} with {} retries left, retrying...", errorMsg, format(request), retriesLeft)
log.warning("{} in response to {} with {} retries left, retrying...", error.getMessage, format(request), retriesLeft)
context.parent ! ctx.copy(retriesLeft = retriesLeft - 1)

case RequestContext(request, _, commander)
log.warning("{} in response to {} with no retries left, dispatching error...", errorMsg, format(request))
commander ! Status.Failure(new RuntimeException(errorMsg))
log.warning("{} in response to {} with no retries left, dispatching error...", error.getMessage, format(request))
commander ! Status.Failure(error)
}

def dispatchToServer(httpConnection: ActorRef)(ctx: RequestContext): Unit = {
Expand Down

0 comments on commit 4b48875

Please sign in to comment.