Skip to content
This repository has been archived by the owner on Apr 24, 2024. It is now read-only.

Commit

Permalink
! can, io: use util.Timestamp instead of longs for timeout checking
Browse files Browse the repository at this point in the history
The breaking part is that ConnectionTimeouts pipeline stage will now
reset the timeout when it receives an `SetIdleTimeout` message.
  • Loading branch information
jrudolph committed Sep 25, 2013
1 parent 87720e8 commit ab17f00
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 55 deletions.
34 changes: 24 additions & 10 deletions spray-can/src/main/scala/spray/can/client/ClientFrontend.scala
Expand Up @@ -24,7 +24,7 @@ import spray.can.rendering.RequestPartRenderingContext
import spray.can.Http
import spray.http._
import spray.io._
import System.{ nanoTime now }
import spray.util.Timestamp

object ClientFrontend {

Expand All @@ -37,30 +37,32 @@ object ClientFrontend {
var requestTimeout = initialRequestTimeout
var closeCommanders = Set.empty[ActorRef]

def lastRequestComplete = openRequests.isEmpty || openRequests.last.state.isComplete

val commandPipeline: CPL = {
case Http.MessageCommand(HttpMessagePartWrapper(x: HttpRequest, ack)) if closeCommanders.isEmpty
if (openRequests.isEmpty || openRequests.last.timestamp > 0) {
if (lastRequestComplete) {
render(x, ack)
openRequests = openRequests enqueue new RequestRecord(x, context.sender, timestamp = now)
openRequests = openRequests enqueue new RequestRecord(x, context.sender, state = Complete(Timestamp.now))
} else log.warning("Received new HttpRequest before previous chunking request was " +
"finished, ignoring...")

case Http.MessageCommand(HttpMessagePartWrapper(x: ChunkedRequestStart, ack)) if closeCommanders.isEmpty
if (openRequests.isEmpty || openRequests.last.timestamp > 0) {
if (lastRequestComplete) {
render(x, ack)
openRequests = openRequests enqueue new RequestRecord(x, context.sender, timestamp = 0)
openRequests = openRequests enqueue new RequestRecord(x, context.sender, state = WaitingForChunkedEnd)
} else log.warning("Received new ChunkedRequestStart before previous chunking " +
"request was finished, ignoring...")

case Http.MessageCommand(HttpMessagePartWrapper(x: MessageChunk, ack)) if closeCommanders.isEmpty
if (!openRequests.isEmpty && openRequests.last.timestamp == 0) {
if (!lastRequestComplete) {
render(x, ack)
} else log.warning("Received MessageChunk outside of chunking request context, ignoring...")

case Http.MessageCommand(HttpMessagePartWrapper(x: ChunkedMessageEnd, ack)) if closeCommanders.isEmpty
if (!openRequests.isEmpty && openRequests.last.timestamp == 0) {
if (!lastRequestComplete) {
render(x, ack)
openRequests.last.timestamp = now // only start timer once the request is completed
openRequests.last.state = Complete(Timestamp.now) // only start timer once the request is completed
} else log.warning("Received ChunkedMessageEnd outside of chunking request " +
"context, ignoring...")

Expand Down Expand Up @@ -127,7 +129,7 @@ object ClientFrontend {
def checkForTimeout(): Unit =
if (!openRequests.isEmpty && requestTimeout.isFinite) {
val rec = openRequests.head
if (rec.timestamp > 0 && rec.timestamp + requestTimeout.toNanos < now) {
if (rec.state.isOverdue(requestTimeout)) {
log.warning("Request timed out after {}, closing connection", requestTimeout)
dispatch(rec.sender, Timedout(rec.request))
commandPL(Http.Close)
Expand All @@ -140,7 +142,19 @@ object ClientFrontend {
}
}

private class RequestRecord(val request: HttpRequestPart with HttpMessageStart, val sender: ActorRef, var timestamp: Long)
sealed trait RequestState {
def isComplete: Boolean
def isOverdue(timeout: Duration): Boolean
}
case object WaitingForChunkedEnd extends RequestState {
def isComplete: Boolean = false
def isOverdue(timeout: Duration): Boolean = false
}
case class Complete(timestamp: Timestamp) extends RequestState {
def isComplete: Boolean = true
def isOverdue(timeout: Duration): Boolean = (timestamp + timeout).isPast
}
private class RequestRecord(val request: HttpRequestPart with HttpMessageStart, val sender: ActorRef, var state: RequestState)

private case class PartAndSender(part: HttpRequestPart, sender: ActorRef)
}
54 changes: 32 additions & 22 deletions spray-can/src/main/scala/spray/can/server/OpenRequest.scala
Expand Up @@ -26,6 +26,7 @@ import spray.http._
import spray.can.Http
import spray.can.server.ServerFrontend.Context
import akka.io.Tcp
import spray.util.Timestamp

sealed trait OpenRequest {
def context: ServerFrontend.Context
Expand All @@ -34,7 +35,7 @@ sealed trait OpenRequest {
def appendToEndOfChain(openRequest: OpenRequest): OpenRequest
def dispatchInitialRequestPartToHandler()
def dispatchNextQueuedResponse()
def checkForTimeout(now: Long)
def checkForTimeout(now: Timestamp)
def nextIfNoAcksPending: OpenRequest

// commands
Expand All @@ -58,7 +59,7 @@ trait OpenRequestComponent { component ⇒

class DefaultOpenRequest(val request: HttpRequest,
private[this] val closeAfterResponseCompletion: Boolean,
private[this] var timestamp: Long) extends OpenRequest {
private[this] var state: RequestState) extends OpenRequest {
private[this] val receiverRef = new ResponseReceiverRef(this)
private[this] var handler = context.handler
private[this] var nextInChain: OpenRequest = EmptyOpenRequest
Expand All @@ -79,7 +80,7 @@ trait OpenRequestComponent { component ⇒
request.copy(method = HttpMethods.GET)
else request
val partToDispatch: HttpRequestPart =
if (timestamp == 0L) ChunkedRequestStart(requestToDispatch)
if (state == WaitingForChunkedEnd) ChunkedRequestStart(requestToDispatch)
else requestToDispatch
if (context.log.isDebugEnabled)
context.log.debug("Dispatching {} to handler {}", format(partToDispatch), handler)
Expand All @@ -93,22 +94,26 @@ trait OpenRequestComponent { component ⇒
}
}

def checkForTimeout(now: Long): Unit = {
if (timestamp > 0) {
if (timestamp + requestTimeout.toNanos < now) {
val timeoutHandler =
if (settings.timeoutHandler.isEmpty) handler
else context.actorContext.actorFor(settings.timeoutHandler)
if (RefUtils.isLocal(timeoutHandler))
downstreamCommandPL(Pipeline.Tell(timeoutHandler, Timedout(request), receiverRef))
else context.log.warning("The TimeoutHandler '{}' is not a local actor and thus cannot be used as a " +
"timeout handler", timeoutHandler)
timestamp = -now // we record the time of the Timeout dispatch as negative timestamp value
}
} else if (timestamp < -1 && timeoutTimeout.isFinite() && (-timestamp + timeoutTimeout.toNanos < now)) {
val response = timeoutResponse(request)
// we always close the connection after a timeout-timeout
sendPart(response.withHeaders(HttpHeaders.Connection("close") :: response.headers))
def checkForTimeout(now: Timestamp): Unit = {
state match {
case WaitingForChunkedEnd
case WaitingForResponse(timestamp)
if ((timestamp + requestTimeout).isPast) {
val timeoutHandler =
if (settings.timeoutHandler.isEmpty) handler
else context.actorContext.actorFor(settings.timeoutHandler)
if (RefUtils.isLocal(timeoutHandler))
downstreamCommandPL(Pipeline.Tell(timeoutHandler, Timedout(request), receiverRef))
else context.log.warning("The TimeoutHandler '{}' is not a local actor and thus cannot be used as a " +
"timeout handler", timeoutHandler)
state = WaitingForTimeoutResponse()
}
case WaitingForTimeoutResponse(timestamp)
if ((timestamp + timeoutTimeout).isPast) {
val response = timeoutResponse(request)
// we always close the connection after a timeout-timeout
sendPart(response.withHeaders(HttpHeaders.Connection("close") :: response.headers))
}
}
nextInChain checkForTimeout now // we accept non-tail recursion since HTTP pipeline depth is limited (and small)
}
Expand All @@ -131,7 +136,7 @@ trait OpenRequestComponent { component ⇒
}

def handleResponsePart(part: HttpMessagePartWrapper): Unit = {
timestamp = 0L // disable request timeout checking once the first response part has come in
state = WaitingForChunkedEnd // disable request timeout checking once the first response part has come in
handler = context.actorContext.sender // remember who to send Closed events to
sendPart(part)
dispatchNextQueuedResponse()
Expand All @@ -154,7 +159,7 @@ trait OpenRequestComponent { component ⇒
def handleChunkedMessageEnd(part: ChunkedMessageEnd): Unit = {
if (nextInChain.isEmpty) {
// only start request timeout checking after request has been completed
timestamp = System.nanoTime()
state = WaitingForResponse()
downstreamCommandPL(Pipeline.Tell(handler, part, receiverRef))
} else
// we accept non-tail recursion since HTTP pipeline depth is limited (and small)
Expand Down Expand Up @@ -206,7 +211,7 @@ trait OpenRequestComponent { component ⇒
def request = throw new IllegalStateException
def dispatchInitialRequestPartToHandler(): Unit = { throw new IllegalStateException }
def dispatchNextQueuedResponse(): Unit = {}
def checkForTimeout(now: Long): Unit = {}
def checkForTimeout(now: Timestamp): Unit = {}
def nextIfNoAcksPending = throw new IllegalStateException

// commands
Expand Down Expand Up @@ -234,3 +239,8 @@ trait OpenRequestComponent { component ⇒

private[server] case class AckEventWithReceiver(ack: Any, receiver: ActorRef) extends Event
private[server] case class PartAndSender(part: HttpResponsePart, sender: ActorRef)

private[server] sealed trait RequestState
private[server] case object WaitingForChunkedEnd extends RequestState
private[server] case class WaitingForResponse(timestamp: Timestamp = Timestamp.now) extends RequestState
private[server] case class WaitingForTimeoutResponse(timestamp: Timestamp = Timestamp.now) extends RequestState
11 changes: 6 additions & 5 deletions spray-can/src/main/scala/spray/can/server/ServerFrontend.scala
Expand Up @@ -25,6 +25,7 @@ import spray.can.rendering.ResponsePartRenderingContext
import spray.can.Http
import spray.http._
import spray.io._
import spray.util.Timestamp

object ServerFrontend {

Expand Down Expand Up @@ -109,10 +110,10 @@ object ServerFrontend {
}
else throw new NotImplementedError("fastPath is not yet supported with pipelining enabled")

} else openNewRequest(request, closeAfterResponseCompletion, System.nanoTime())
} else openNewRequest(request, closeAfterResponseCompletion, WaitingForResponse())

case HttpMessageStartEvent(ChunkedRequestStart(request), closeAfterResponseCompletion)
openNewRequest(request, closeAfterResponseCompletion, 0L)
openNewRequest(request, closeAfterResponseCompletion, WaitingForChunkedEnd)

case Http.MessageEvent(x: MessageChunk)
firstOpenRequest handleMessageChunk x
Expand All @@ -137,7 +138,7 @@ object ServerFrontend {

case TickGenerator.Tick
if (requestTimeout.isFinite())
firstOpenRequest checkForTimeout System.nanoTime()
firstOpenRequest checkForTimeout Timestamp.now
eventPL(TickGenerator.Tick)

case Pipeline.ActorDeath(actor) if actor == context.handler
Expand All @@ -147,8 +148,8 @@ object ServerFrontend {
case ev eventPL(ev)
}

def openNewRequest(request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {
val nextOpenRequest = new DefaultOpenRequest(request, closeAfterResponseCompletion, timestamp)
def openNewRequest(request: HttpRequest, closeAfterResponseCompletion: Boolean, state: RequestState): Unit = {
val nextOpenRequest = new DefaultOpenRequest(request, closeAfterResponseCompletion, state)
firstOpenRequest = firstOpenRequest appendToEndOfChain nextOpenRequest
nextOpenRequest.dispatchInitialRequestPartToHandler()
if (firstUnconfirmed.isEmpty) firstUnconfirmed = firstOpenRequest
Expand Down
6 changes: 3 additions & 3 deletions spray-can/src/main/scala/spray/can/server/StatsSupport.scala
Expand Up @@ -22,12 +22,12 @@ import spray.can.rendering.ResponsePartRenderingContext
import spray.can.server.RequestParsing.HttpMessageStartEvent
import spray.io._
import spray.can.Http
import spray.util.PaddedAtomicLong
import spray.util.{ Timestamp, PaddedAtomicLong }

object StatsSupport {

class StatsHolder {
val startTimestamp = System.nanoTime()
val startTimestamp = Timestamp.now
val requestStarts = new PaddedAtomicLong
val responseStarts = new PaddedAtomicLong
val maxOpenRequests = new PaddedAtomicLong
Expand Down Expand Up @@ -57,7 +57,7 @@ object StatsSupport {
}

def toStats = Stats(
uptime = (System.nanoTime() - startTimestamp).nanos,
uptime = (Timestamp.now - startTimestamp).asInstanceOf[FiniteDuration],
totalRequests = requestStarts.get,
openRequests = requestStarts.get - responseStarts.get,
maxOpenRequests = maxOpenRequests.get,
Expand Down
24 changes: 9 additions & 15 deletions spray-io/src/main/scala/spray/io/ConnectionTimeouts.scala
Expand Up @@ -16,10 +16,9 @@

package spray.io

import scala.concurrent.duration.{ FiniteDuration, Deadline, Duration }
import scala.concurrent.duration.Duration
import akka.io.Tcp
import spray.util.requirePositive
import System.{ nanoTime now }
import spray.util.{ Timestamp, requirePositive }

object ConnectionTimeouts {

Expand All @@ -29,25 +28,20 @@ object ConnectionTimeouts {
new PipelineStage {
def apply(context: PipelineContext, commandPL: CPL, eventPL: EPL): Pipelines = new Pipelines {
var timeout = idleTimeout
var lastActivity = now
var idleDeadline = Timestamp.never
def refreshDeadline() = idleDeadline = Timestamp.now + timeout
refreshDeadline()

val commandPipeline: CPL = {
case x: Tcp.Write
commandPL(x)
lastActivity = now

case SetIdleTimeout(x) timeout = x

case x: Tcp.Write commandPL(x); refreshDeadline()
case SetIdleTimeout(x) timeout = x; refreshDeadline()
case cmd commandPL(cmd)
}

val eventPipeline: EPL = {
case x: Tcp.Received
lastActivity = now
eventPL(x)

case x: Tcp.Received refreshDeadline(); eventPL(x)
case tick @ TickGenerator.Tick
if (timeout.isFinite && (lastActivity + timeout.toNanos < System.nanoTime())) {
if (idleDeadline.isPast) {
context.log.debug("Closing connection due to idle timeout...")
commandPL(Tcp.Close)
}
Expand Down

0 comments on commit ab17f00

Please sign in to comment.