Permalink
Browse files

adding back (mist) http work in a new branch. misitfy was too stale. …

…this is WIP - trying to support both SAPI 3.0 and Jetty Continuations at once
  • Loading branch information...
1 parent a4d25da commit 10f2fcc4866653b80935c284b86b6ee19124655b @buka buka committed with Garrick Evans Nov 8, 2010
@@ -0,0 +1,109 @@
+/**
+ * Copyright 2010 Autodesk, Inc. All rights reserved.
+ * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
+ */
+
+package akka.http
+
+import akka.util.Logging
+import javax.servlet.http.{HttpServletResponse, HttpServlet}
+
+/**
+ * @author Garrick Evans
+ */
+class AsyncHttpServlet extends HttpServlet with Logging
+{
+ import java.util. {Date, TimeZone}
+ import java.text.SimpleDateFormat
+ import javax.servlet.ServletConfig
+ import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+ import akka.actor.ActorRegistry
+ import Types._
+
+ //
+ // the root endpoint for this servlet will have been booted already
+ // use the system property to find out the actor id and cache him
+ // TODO: currently this is hardcoded but really use a property
+ //
+ protected val _root = ActorRegistry.actorsFor("DefaultGridRoot").head
+
+ /**
+ * Handles the HTTP request method on the servlet, suspends the connection and sends the asynchronous context
+ * along to the root endpoint in a SuspendedRequest message
+ */
+ protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder: (()=>Option[tAsyncContext]) => SuspendedRequest) =
+ {
+ def suspend:Option[tAsyncContext] =
+ {
+ //
+ // set to effectively "already expired"
+ //
+ val gmt = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z")
+ gmt.setTimeZone(TimeZone.getTimeZone("GMT"))
+
+ response.setHeader("Expires", gmt.format(new Date))
+ response.setHeader("Cache-Control", "no-cache, must-revalidate")
+ response.setHeader("Connection","close")
+
+ Some(request.asInstanceOf[tAsyncRequest].startAsync)
+ }
+
+ //
+ // shoot the message to the root endpoint for processing
+ // IMPORTANT: the suspend method is invoked on the jetty thread not in the actor
+ //
+ val msg = builder(suspend _)
+ if (msg.context ne None) {_root ! msg}
+ }
+
+ /**
+ * Subclasses can choose to have the servlet listen to the async context events
+ * @return A type of either AsyncListener or ContinuationListener
+ */
+ def hook:Option[AnyRef] = None
+
+
+ //
+ // HttpServlet API
+ //
+
+ final val Jetty7Server = "Jetty(7"
+
+ override def init(config: ServletConfig) =
+ {
+ super.init(config)
+
+ val context = config.getServletContext
+ val server = context.getServerInfo
+ val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
+
+ log.debug("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor)
+
+ (major, minor) match {
+
+ case (3,0) => {
+ log.debug("Supporting Java asynchronous contexts.")
+ }
+
+ case (2,5) if (server startsWith Jetty7Server) => {
+ log.debug("Supporting Jetty asynchronous continuations.")
+
+ }
+
+ case _ => {
+ log.error("No asynchronous request handling can be supported.")
+ }
+ }
+ }
+
+
+ protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Delete(f, hook _))
+ protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Get(f, hook _))
+ protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Head(f, hook _))
+ protected override def doOptions(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Options(f, hook _))
+ protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Post(f, hook _))
+ protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Put(f, hook _))
+ protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Trace(f, hook _))
+}
+
@@ -0,0 +1,134 @@
+/**
+ * Copyright 2010 Autodesk, Inc. All rights reserved.
+ * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
+ */
+
+package akka.http
+
+import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
+import akka.actor.{ActorRegistry, ActorRef, Actor}
+
+/**
+ * @author Garrick Evans
+ */
+trait Endpoint
+{
+ this: Actor =>
+
+ import Endpoint._
+
+ type Hook = Function[String, Boolean]
+ type Provider = Function[String, ActorRef]
+
+ /**
+ * A convenience method to get the actor ref
+ */
+ def actor: ActorRef = this.self
+
+ /**
+ * The list of connected endpoints to which this one should/could forward the request.
+ * If the hook func returns true, the message will be sent to the actor returned from provider.
+ */
+ protected var _attachments = List[Tuple2[Hook, Provider]]()
+
+ /**
+ *
+ */
+ protected def _attach(hook:Hook, provider:Provider) =
+ {
+ _attachments = (hook, provider) :: _attachments
+ }
+
+ /**
+ * Message handling common to all endpoints, must be chained
+ */
+ protected def _recv: Receive =
+ {
+ //
+ // add the endpoint - the if the uri hook matches,
+ // the message will be sent to the actor returned by the provider func
+ //
+ case Attach(hook, provider) => _attach(hook, provider)
+
+
+ //
+ // dispatch the suspended requests
+ //
+ case msg if msg.isInstanceOf[SuspendedRequest] =>
+ {
+ val req = msg.asInstanceOf[SuspendedRequest]
+ val uri = req.request.getRequestURI
+ val endpoints = _attachments.filter {_._1(uri)}
+
+ if (endpoints.size > 0)
+ endpoints.foreach {_._2(uri) ! req}
+ else
+ {
+ self.sender match
+ {
+ case Some(s) => s reply NoneAvailable(uri, req)
+ case None => _na(uri, req)
+ }
+ }
+ }
+
+ }
+
+ /**
+ * no endpoint available - completes the request with a 404
+ */
+ protected def _na(uri: String, req: SuspendedRequest) =
+ {
+ req.NotFound("No endpoint available for [" + uri + "]")
+ log.debug("No endpoint available for [" + uri + "]")
+ }
+}
+
+
+class RootEndpoint extends Actor with Endpoint
+{
+ import Endpoint._
+
+ final val Root = "/"
+
+ //
+ // use the configurable dispatcher
+ //
+ self.dispatcher = Endpoint.Dispatcher
+
+ override def preStart = _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
+
+ def recv: Receive =
+ {
+ case NoneAvailable(uri, req) => _na(uri, req)
+ case unknown =>
+ {
+ log.error("Unexpected message sent to root endpoint. [" + unknown.toString + "]")
+ }
+ }
+
+ /**
+ * Note that root is a little different, other endpoints should chain their own recv first
+ */
+ def receive = {_recv orElse recv}
+}
+
+
+
+object Endpoint
+{
+ import akka.dispatch.Dispatchers
+
+
+ /**
+ * leverage the akka config to tweak the dispatcher for our endpoints
+ */
+ final val Dispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher")
+
+ type Hook = Function[String, Boolean]
+ type Provider = Function[String, ActorRef]
+
+ case class Attach(hook: Hook, provider: Provider)
+ case class NoneAvailable(uri: String, req: SuspendedRequest)
+}
Oops, something went wrong.

0 comments on commit 10f2fcc

Please sign in to comment.