Skip to content

Commit

Permalink
[rutta] extract Router's State type
Browse files Browse the repository at this point in the history
  • Loading branch information
haf committed Feb 10, 2016
1 parent c4011bf commit 16cda96
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 34 deletions.
6 changes: 3 additions & 3 deletions src/Logary/Configuration_Config.fs
Expand Up @@ -203,7 +203,7 @@ let configure serviceName targets pollPeriod metrics rules (internalLevel, inter
/// configured the configuration. This will call the `validateLogary` function
/// too. The un-primed version of the function `withLogary` doesn't return a
/// `LogManager` but the F#-oriented LogaryInstance.
[<CompiledName "WithLogaryInstance">]
[<CompiledName "WithLogary">]
let withLogary serviceName fConf =
fConf (confLogary serviceName)
|> validate
Expand All @@ -213,8 +213,8 @@ let withLogary serviceName fConf =
/// configured the configuration. This will call the `validateLogary` function
/// too. The un-primed version of the function `withLogary` doesn't return a
/// `LogManager` but the F#-oriented LogaryInstance.
[<CompiledName "WithLogary">]
let withLogary' serviceName fConf =
[<CompiledName "WithLogaryManager">]
let withLogaryManager serviceName fConf =
fConf (confLogary serviceName)
|> validate
|> runLogary
Expand Down
5 changes: 0 additions & 5 deletions src/Logary/Targets_Noop.fs
Expand Up @@ -43,11 +43,6 @@ module internal Impl =
/// Create a new Noop target
let create conf = TargetUtils.stdNamedTarget (Impl.loop conf)

/// C# Interop: Create a new Noop target
[<CompiledName "Create">]
let createInterop (conf, name) =
create conf name

/// Use with LogaryFactory.New( s => s.Target<Noop.Builder>() )
type Builder(conf, callParent : FactoryApi.ParentCallback<Builder>) =
member x.IsYes(yes : bool) =
Expand Down
58 changes: 41 additions & 17 deletions src/services/Logary.Services.Rutta/Rutta.fs
Expand Up @@ -244,27 +244,53 @@ module Router =
open Hopac
open System
open Logary
open Logary.Configuration
open Logary.Targets
open fszmq

let private logger = Logging.getCurrentLogger ()
type State =
{ zmqCtx : Context
receiver : Socket
forwarder : LogManager
logger : Logger }
interface IDisposable with
member x.Dispose() =
(x.zmqCtx :> IDisposable).Dispose()
(x.receiver :> IDisposable).Dispose()
(x.forwarder :> IDisposable).Dispose()

let private init binding createSocket mode : State =
let context = new Context()
let receiver = createSocket context
Socket.bind receiver binding

let forwarder =
withLogaryManager (sprintf "Logary Rutta[%s]" mode) (
withTarget (Noop.create Noop.empty (PointName.ofSingle "influxdb"))
>> withRule (Rule.createForTarget (PointName.ofSingle "influxdb"))
)
|> run
let targetLogger = forwarder.getLogger (PointName.parse "Logary.Services.Rutta.Router")
{ zmqCtx = context
receiver = receiver
forwarder = forwarder
logger = targetLogger }

// FOCUS:
let pullFrom binding = function
| Router_Target ep ->
use context = new Context()
use receiver = Context.pull context
Socket.bind receiver binding
| Router_Target ep:: _ ->
use state = init binding Context.pull "PULL"

let rec outer () =
try // TODO: remove and use non cancelling
// TODO: handle cancellation so that we don't block on recv
let data = Socket.tryRecv receiver
let data = Socket.tryRecv state.receiver

// TODO: deserialise
global.Logary.Message.debug "TODO: deserialise data above"
|> Logger.logWithAck logger
|> run // TODO: can we do this non-blocking?
|> ignore // TODO: await ack
|> Logger.logWithAck state.logger
|> Job.Ignore
|> queue

outer ()

Expand All @@ -275,23 +301,21 @@ module Router =
outer ()

| x ->
Choice2Of2 (sprintf "unknown parameter %A" x)
Choice2Of2 (sprintf "unknown parameter(s) %A" x)

let xsubBind binding pars =
use context = new Context()
use receiver = Context.xsub context
Socket.bind receiver binding
use state = init binding Context.xsub "XSUB"

let rec outer () =
try // TODO: remove and use non cancelling
// TODO: handle cancellation so that we don't block on recv
let data = Socket.tryRecv receiver
let data = Socket.tryRecv state.receiver

// TODO: deserialise
global.Logary.Message.debug "TODO: deserialise data above"
|> Logger.logWithAck logger
|> run // TODO: can we do this non-blocking?
|> ignore // TODO: await ack
|> Logger.logWithAck state.logger
|> Job.Ignore
|> queue

outer ()

Expand Down
22 changes: 13 additions & 9 deletions src/services/Logary.Services.SuaveReporter/SuaveReporter.fs
Expand Up @@ -40,26 +40,30 @@ module Impl =
open Impl
open Hopac
open Suave
open Suave.Types
open Suave.Model
open Suave.Http
open Suave.Http.Applicatives
open Suave.Http.RequestErrors
open Suave.Http.Successful
open Suave.Filters
open Suave.RequestErrors
open Suave.Successful
open Suave.Operators

let api (logger : Logger) (verbatimPath : string option) : WebPart =
let verbatimPath = defaultArg verbatimPath "/i/logary/loglines"
let getMsg = sprintf "You can post a JSON structure to: %s" verbatimPath

let readJson =
Lens.get HttpRequest.rawForm_
>> UTF8.toString
>> Json.tryParse
>> Choice.bind Json.tryDeserialize

let jsonMsg msg =
{ message = msg; id = r.NextUInt64() }
|> Json.serialize
|> Json.format

path verbatimPath >>= choose [
GET >>= OK (jsonMsg getMsg)
POST >>= Binding.bindReq
(Lens.get HttpRequest.rawForm_ >> UTF8.toString >> Json.tryParse >> Choice.bind Json.tryDeserialize)
path verbatimPath >=> choose [
GET >=> OK (jsonMsg getMsg)
POST >=> Binding.bindReq readJson
(fun msg ->
Logger.log logger msg |> queue
CREATED (jsonMsg "Created"))
Expand Down

0 comments on commit 16cda96

Please sign in to comment.