Skip to content

Commit

Permalink
Resolve problem with repeated request when persistent connections are…
Browse files Browse the repository at this point in the history
… open and change the default status code to 404, which highlighted a problem with the env.ResponseStatusCode setter.
  • Loading branch information
panesofglass committed Oct 8, 2013
1 parent 7ef36df commit 1acf30f
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 176 deletions.
12 changes: 7 additions & 5 deletions samples/TestServer/Program.fs
Expand Up @@ -29,11 +29,13 @@ System.AppDomain.CurrentDomain.UnhandledException |> Observable.add debug
let shortdate = DateTime.UtcNow.ToShortDateString

let server = new HttpServer (fun env -> async {
env.ResponseStatusCode <- 200
env.ResponseHeaders.Add("Content-Type", [|"text/plain"|])
env.ResponseHeaders.Add("Content-Length", [| "13" |])
env.ResponseHeaders.Add("Server", [| "Fracture" |])
do! env.ResponseBody.AsyncWrite("Hello, world!"B, 0, 13)
let context = Microsoft.Owin.OwinContext(env)
let response = context.Response
response.StatusCode <- 200
response.Headers.Add("Content-Type", [|"text/plain"|])
response.Headers.Add("Content-Length", [| "13" |])
response.Headers.Add("Server", [| "Fracture" |])
response.Write("Hello, world!"B)
})

server.Start(6667)
Expand Down
8 changes: 8 additions & 0 deletions samples/TestServer/TestServer.fsproj
Expand Up @@ -47,10 +47,18 @@
<Reference Include="HttpMachine">
<HintPath>..\..\packages\HttpMachine.0.9.0.0\lib\HttpMachine.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Owin">
<HintPath>..\..\packages\Microsoft.Owin.2.0.0-rc1\lib\net40\Microsoft.Owin.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="mscorlib" />
<Reference Include="FSharp.Core">
<HintPath>..\..\lib\FSharp\v4.0\bin\FSharp.Core.dll</HintPath>
</Reference>
<Reference Include="Owin">
<HintPath>..\..\packages\Owin.1.0\lib\net40\Owin.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
</ItemGroup>
Expand Down
2 changes: 2 additions & 0 deletions samples/TestServer/packages.config
@@ -1,4 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="HttpMachine" version="0.9.0.0" />
<package id="Microsoft.Owin" version="2.0.0-rc1" targetFramework="net40" />
<package id="Owin" version="1.0" targetFramework="net40" />
</packages>
203 changes: 74 additions & 129 deletions src/http/HttpParser.fs
Expand Up @@ -21,87 +21,54 @@ open System
open System.Collections.Generic
open System.Net
open System.Text
open System.Threading
open Fracture
open HttpMachine
open Owin

type Env = Owin.Environment
/// An Environment dictionary to store OWIN request and response values.
type internal ServerEnvironment() as x =
inherit Environment(new Dictionary<_,_>())

//namespace Frack
//
//open System
//open System.Collections.Generic
//open System.IO
//open System.Net.Sockets
//open System.Text
//open System.Threading
//open System.Threading.Tasks
//open Microsoft.FSharp.Core
//open Fracture
//
//type Env = Owin.Environment
//
//[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
//module Request =
// [<CompiledName("ParseStartLine")>]
// let private parseStartLine (line: string, env: Env) =
// let arr = line.Split([|' '|], 3)
// env.[Owin.Constants.requestMethod] <- arr.[0]
//
// let uri = Uri(arr.[1], UriKind.RelativeOrAbsolute)
//
// // TODO: Fix this so that the path can be determined correctly.
// env.Add(Owin.Constants.requestPathBase, "")
//
// if uri.IsAbsoluteUri then
// env.[Owin.Constants.requestPath] <- uri.AbsolutePath
// env.[Owin.Constants.requestQueryString] <- uri.Query
// env.[Owin.Constants.requestScheme] <- uri.Scheme
// env.RequestHeaders.["Host"] <- [|uri.Host|]
// else
// env.[Owin.Constants.requestPath] <- uri.OriginalString
//
// env.[Owin.Constants.requestProtocol] <- arr.[2].Trim()
//
// [<CompiledName("ParseHeader")>]
// let private parseHeader (header: string, env: Env) =
// // TODO: Proper header parsing and aggregation, including linear white space.
// let pair = header.Split([|':'|], 2)
// if pair.Length > 1 then
// env.RequestHeaders.[pair.[0]] <- [| pair.[1].TrimStart(' ') |]
//
// [<CompiledName("ShouldKeepAlive")>]
// let shouldKeepAlive (env: Env) =
// let requestHeaders = env.RequestHeaders
// let connection =
// if requestHeaders <> null && requestHeaders.Count > 0 && requestHeaders.ContainsKey("Connection") then
// requestHeaders.["Connection"]
// else Array.empty
// match string env.[Owin.Constants.requestProtocol] with
// | "HTTP/1.1" -> Array.isEmpty connection || not (connection |> Array.exists ((=) "Close"))
// | "HTTP/1.0" -> not (Array.isEmpty connection) && connection |> Array.exists ((=) "Keep-Alive")
// | _ -> false
//
// [<CompiledName("Parse")>]
// let parse (readStream: SocketReadStream) =
// async {
// let env = new Env()
// // Do the parsing manually, as the reader is likely less efficient.
// use reader = new StreamReader(readStream, encoding = System.Text.Encoding.ASCII, detectEncodingFromByteOrderMarks = false, bufferSize = 4096, leaveOpen = true)
// let! requestLine = Async.AwaitTask <| reader.ReadLineAsync()
// parseStartLine(requestLine, env)
// let parsingRequestHeaders = ref true
// while !parsingRequestHeaders do
// if reader.EndOfStream then parsingRequestHeaders := false else
// // If not at the end of the stream, read the next line.
// // TODO: Account for linear white space.
// let! line = Async.AwaitTask <| reader.ReadLineAsync()
// if line = "" then
// parsingRequestHeaders := false
// else parseHeader(line, env)
// env.Add(Owin.Constants.requestBody, readStream :> Stream)
// return env
// }
(* Set environment settings *)

// Set a per-request cancellation token
// TODO: Determine if this can use the token from the Async block.
let cts = new CancellationTokenSource()
do x.Add(Constants.callCancelled, cts.Token)

do x.Add(Constants.owinVersion, "1.0")

(* Set request defaults *)

// Add the request headers dictionary
let requestHeaders = new Dictionary<string, string[]>(StringComparer.OrdinalIgnoreCase)
do x.Add(Constants.requestHeaders, requestHeaders)

let requestBody = new IO.MemoryStream()
do x.Add(Constants.requestBody, requestBody)

(* Set response defaults *)
do x.Add(Constants.responseStatusCode, 404)

// Add the response headers dictionary
let responseHeaders = new Dictionary<string, string[]>(StringComparer.OrdinalIgnoreCase)
do x.Add(Constants.responseHeaders, responseHeaders)

let responseBody = new IO.MemoryStream()
do x.Add(Constants.responseBody, responseBody)

/// Gets the request headers dictionary for the current request.
override x.RequestHeaders = requestHeaders :> _

/// Gets the request body for the current request.
override x.RequestBody = requestBody :> _

/// Gets the response headers dictionary for the current response.
override x.ResponseHeaders = responseHeaders :> _

/// Gets the response body stream.
override x.ResponseBody = responseBody :> _

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Response =
Expand Down Expand Up @@ -177,56 +144,30 @@ module Response =
| 511 -> BS"HTTP/1.1 511 Network Authentication Required\r\n"B
| _ -> BS"HTTP/1.1 500 Internal Server Error\r\n"B

[<CompiledName("ToBytes")>]
let toBytes (env: Env) =
let headers =
String.Join(
"\r\n",
[| yield sprintf "HTTP/1.1 %i %A" env.ResponseStatusCode (Enum.ToObject(typeof<HttpStatusCode>, env.ResponseStatusCode))
for KeyValue(header, values) in env.ResponseHeaders do
// TODO: Fix handling of certain headers where this approach is invalid, e.g. Set-Cookie
yield sprintf "%s: %s" header <| String.Join(",", values)
// Add the body separator.
yield "\r\n"
|])
|> Encoding.ASCII.GetBytes

[| yield! headers; yield! env.ResponseBody.ToArray() |]

// [<CompiledName("Send")>]
// let send (env: Env, writeStream: SocketWriteStream) = async {
// // TODO: Aggregate the response pieces and send as a whole chunk.
// // Write the status line
// let statusLine = getStatusLine <| Convert.ToInt32(env.[Owin.Constants.responseStatusCode])
// do! writeStream.AsyncWrite(statusLine.Array, statusLine.Offset, statusLine.Count)
//
// // Write the headers
// for (KeyValue(header, values)) in env.ResponseHeaders do
// for value in values do
// let headerBytes = BS(Encoding.ASCII.GetBytes(sprintf "%s: %s\r\n" header value))
// do! writeStream.AsyncWrite(headerBytes.Array, headerBytes.Offset, headerBytes.Count)
//
// // Write the body separator
// do! writeStream.AsyncWrite("\r\n"B, 0, 2)
//
// // Write the response body
// // TODO: Set a default timeout
// let body = env.ResponseBody.ToArray()
// let! _ = Async.AwaitIAsyncResult <| writeStream.WriteAsync(body, 0, body.Length)
// return env.Dispose()
// }

[<CompiledName("HeadersToBytes")>]
let headersToBytes (env: #Environment) =
String.Join(
"\r\n",
[| yield sprintf "HTTP/1.1 %i %A" env.ResponseStatusCode (Enum.ToObject(typeof<HttpStatusCode>, env.ResponseStatusCode))
for KeyValue(header, values) in env.ResponseHeaders do
// TODO: Fix handling of certain headers where this approach is invalid, e.g. Set-Cookie
yield sprintf "%s: %s" header <| String.Join(",", values)
// Add the body separator.
yield "\r\n"
|])
|> Encoding.ASCII.GetBytes

type ParserDelegate(app, send) as p =
[<DefaultValue>] val mutable httpMethod : string
[<DefaultValue>] val mutable headerName : string
[<DefaultValue>] val mutable headerValue : string
[<DefaultValue>] val mutable env : Env
[<DefaultValue>] val mutable finished : bool
[<DefaultValue>] val mutable private httpMethod : string
[<DefaultValue>] val mutable private headerName : string
[<DefaultValue>] val mutable private headerValue : string
[<DefaultValue>] val mutable private env : ServerEnvironment
[<DefaultValue>] val mutable private finished : bool

let app env keepAlive = async {
do! app env
send keepAlive <| Response.toBytes env
do! app (env :> IDictionary<_,_>)
send keepAlive <| Response.headersToBytes env
send keepAlive <| (env.ResponseBody :?> IO.MemoryStream).ToArray()
send keepAlive [||]
}

Expand All @@ -237,9 +178,9 @@ type ParserDelegate(app, send) as p =

let toHttpStatusCode (i:int) = Enum.ToObject(typeof<HttpStatusCode>, i)

let onHeadersEnd = Event<Env>()
let onHeadersEnd = Event<Environment>()
let onDataReceived = Event<ArraySegment<byte>>()
let onMessageEnd = Event<Env>()
let onMessageEnd = Event<Environment>()

[<CLIEvent>]
member p.OnHeadersEnd = onHeadersEnd.Publish
Expand All @@ -254,7 +195,7 @@ type ParserDelegate(app, send) as p =
this.httpMethod <- Unchecked.defaultof<_>
this.headerName <- Unchecked.defaultof<_>
this.headerValue <- Unchecked.defaultof<_>
this.env <- new Env()
this.env <- new ServerEnvironment()

member this.OnMethod( parser, m) =
this.httpMethod <- m
Expand Down Expand Up @@ -309,11 +250,15 @@ type ParserDelegate(app, send) as p =
onDataReceived.Trigger(data)

member this.OnMessageEnd(parser) =
if not this.finished then
this.finished <- true
onMessageEnd.Trigger(this.env)

let keepAlive = parser.ShouldKeepAlive

// Execute the application.
let keepAlive = parser.ShouldKeepAlive
Async.StartImmediate <| app this.env keepAlive

// Reset the parser in the event of keep alives.
this.finished <- false
this.httpMethod <- Unchecked.defaultof<_>
this.headerName <- Unchecked.defaultof<_>
this.headerValue <- Unchecked.defaultof<_>
this.env <- new ServerEnvironment()
9 changes: 8 additions & 1 deletion src/http/HttpServer.fs
Expand Up @@ -32,7 +32,7 @@ open HttpMachine
open Owin

[<Sealed>]
type HttpServer(app) as this =
type HttpServer(app) =
let parserCache = new ConcurrentDictionary<_,HttpParser>()
let tcp = new TcpServer()
let send client keepAlive data = tcp.Send(client, data, keepAlive)
Expand All @@ -51,6 +51,13 @@ type HttpServer(app) as this =
let removed, parser = parserCache.TryRemove(sd.RemoteEndPoint)
if removed then
parser.Execute(ArraySegment<_>()) |> ignore)

new (app: Func<_,Task>) =
let inner env = async {
let _ = Async.AwaitIAsyncResult(app.Invoke env)
return ()
}
new HttpServer(inner)

member this.Start(port) =
tcp.Listen(IPAddress.Loopback, port)
Expand Down

0 comments on commit 1acf30f

Please sign in to comment.