Skip to content

Commit

Permalink
Merge pull request #378 from logary/fix-pipe-compose
Browse files Browse the repository at this point in the history
fix pipe compose and rethink the log error model
  • Loading branch information
haf committed Dec 20, 2018
2 parents b1382e1 + a35ebe7 commit 86401e9
Show file tree
Hide file tree
Showing 45 changed files with 588 additions and 614 deletions.
90 changes: 40 additions & 50 deletions examples/Libryy/LoggingV4.fs
Expand Up @@ -299,29 +299,14 @@ module Alt =
>>-. x)
Job.start markNack >>-. altCommit)

/// Why was the message/metric/event not logged?
[<Struct>]
type LogError =
/// The buffer of the target was full, so the message was not logged.
| BufferFull of target:string
/// The target, or the processing step before the targets, rejected the message.
| Rejected

type internal LogResult = Alt<Result<Promise<unit>, LogError>>
type internal LogResult = Alt<Result<Promise<unit>, string>>

module internal Promise =
let unit: Promise<unit> = Promise (())

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module internal LogError =
let rejected: LogError = Rejected
let bufferFull target: LogError = BufferFull target

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module internal LogResult =
let success: Alt<Result<Promise<unit>, LogError>> = Alt.always (Result.Ok Promise.unit)
let bufferFull target: Alt<Result<Promise<unit>, LogError>> = Alt.always (Result.Error (BufferFull target))
let rejected: Alt<Result<Promise<unit>, LogError>> = Alt.always (Result.Error Rejected)
let success: LogResult = Alt.always (Result.Ok Promise.unit)

module internal H =
/// Finds all exceptions
Expand Down Expand Up @@ -360,6 +345,12 @@ type Message =
|> Seq.map (fun (KeyValue (k, v)) -> k.Substring(Literals.FieldsPrefix.Length), v)
|> Map.ofSeq

member x.getContext(): Map<string, obj> =
x.context
|> Map.filter (fun k _ ->
not (k.StartsWith Literals.FieldsPrefix)
&& not (k.StartsWith Literals.LogaryPrefix))

/// If you're looking for how to transform the Message's fields, then use the
/// module methods rather than instance methods, since you'll be creating new
/// values rather than changing an existing value.
Expand All @@ -378,9 +369,7 @@ module Logger =
logger.logWithAck (false, logLevel) messageFactory ^-> function
| Ok _ ->
true
| Result.Error Rejected ->
true
| Result.Error (BufferFull _) ->
| Result.Error error ->
false

let private printDotOnOverflow accepted =
Expand All @@ -396,10 +385,7 @@ module Logger =
logger.logWithAck (true, logLevel) messageFactory ^=> function
| Ok _ ->
Job.result ()
| Result.Error Rejected ->
Job.result ()
| Result.Error (BufferFull target) ->
//Job.raises (exn (sprintf "logWithAck (true, _) should have waited for the RingBuffer(s) to accept the Message. Target(%s)" target))
| Result.Error error ->
Job.result ()

/// Special case: e.g. Fatal messages.
Expand All @@ -409,19 +395,20 @@ module Logger =
logger.logWithAck (true, level) messageFactory ^=> function
| Ok promise ->
Job.start (promise ^=> IVar.fill ack)
| Result.Error Rejected ->
IVar.fill ack ()
| Result.Error (BufferFull target) ->
//let e = exn (sprintf "logWithAck (true, _) should have waited for the RingBuffer(s) to accept the Message. Target(%s)" target)
//IVar.fillFailure ack e
| Result.Error error ->
IVar.fill ack ()
start inner
ack :> Promise<_>

let private ensureName name =
fun (m: Message) ->
if m.name.Length = 0 then { m with name = name } else m

let apply (transform: Message -> Message) (logger: Logger): Logger =
let ensureName = ensureName logger.name
{ new Logger with
member x.logWithAck (waitForBuffers, logLevel) messageFactory =
logger.logWithAck (waitForBuffers, logLevel) (messageFactory >> transform)
logger.logWithAck (waitForBuffers, logLevel) (messageFactory >> ensureName >> transform)
member x.name =
logger.name }

Expand Down Expand Up @@ -675,12 +662,12 @@ module internal LiterateTokenisation =
| _ ->
OtherSymbol

let tokeniseValue (options: LiterateOptions) (fields: Map<string, obj>) (template: string) =
let tokeniseValue (options: LiterateOptions) (fields: Map<string, obj>) (context: Map<string, obj>) (template: string) =
let themedParts = ResizeArray<TokenisedPart>()
let matchedFields = ResizeArray<string>()
let foundText (text: string) = themedParts.Add (text, Text)
let foundProp (prop: FsMtParser.Property) =
match Map.tryFind prop.name fields with
match fields |> Map.tryFind prop.name |> Option.orElseWith (fun () -> context |> Map.tryFind prop.name) with
| Some propValue ->
// render using string.Format, so the formatting is applied
let stringFormatTemplate = prop.AppendPropertyString(StringBuilder(), "0").ToString()
Expand Down Expand Up @@ -736,8 +723,8 @@ module internal LiterateTokenisation =
.ToString("HH:mm:ss", options.formatProvider),
Subtext

let fields = message.getFields()
let _, themedMessageParts = message.value |> tokeniseValue options fields
let fields, context = message.getFields(), message.getContext()
let _, themedMessageParts = message.value |> tokeniseValue options fields context
let themedExceptionParts = tokeniseExns options message

[ yield "[", Punctuation
Expand All @@ -758,9 +745,9 @@ module internal Formatting =
open Literate
open System.Text

let formatValue (fields: Map<string, obj>) value =
let formatValue (fields: Map<string, obj>) (context: Map<string, obj>) value =
let matchedFields, themedParts =
LiterateTokenisation.tokeniseValue (LiterateOptions.createInvariant()) fields value
LiterateTokenisation.tokeniseValue (LiterateOptions.createInvariant()) fields context value
matchedFields, System.String.Concat(themedParts |> Seq.map fst)

let formatLevel (level: LogLevel) =
Expand Down Expand Up @@ -793,8 +780,8 @@ module internal Formatting =

/// let the ISO8601 love flow
let defaultFormatter (message: Message) =
let fields = message.getFields()
let matchedFields, valueString = formatValue fields message.value
let fields, context = message.getFields(), message.getContext()
let matchedFields, valueString = formatValue fields context message.value

// [I] 2014-04-05T12:34:56Z: Hello World! [my.sample.app]
formatLevel message.level +
Expand Down Expand Up @@ -891,23 +878,23 @@ module internal LiterateFormatting =
let tokeniserForOutputTemplate template: LiterateTokeniser =
let tokens = parseTemplate template
fun options message ->
let fields = message.getFields()
let fields, context = message.getFields(), message.getContext()
// render the message template first so we have the template-matched fields available
let matchedFields, messageParts =
tokeniseValue options fields message.value
tokeniseValue options fields context message.value

let tokeniseOutputTemplateField fieldName format = seq {
match fieldName with
| "timestamp" -> yield! tokeniseTimestamp format options message
| "timestampUtc" -> yield! tokeniseTimestampUtc format options message
| "level" -> yield! tokeniseLogLevel options message
| "source" -> yield! tokeniseSource options message
| "newline" -> yield! tokeniseNewline options message
| "tab" -> yield! tokeniseTab options message
| "message" -> yield! messageParts
| "properties" -> yield! tokeniseExtraFields options message matchedFields
| "exceptions" -> yield! tokeniseExns options message
| _ -> yield! tokeniseMissingField fieldName format
| "timestamp" -> yield! tokeniseTimestamp format options message
| "timestampUtc" -> yield! tokeniseTimestampUtc format options message
| "level" -> yield! tokeniseLogLevel options message
| "source" -> yield! tokeniseSource options message
| "newline" -> yield! tokeniseNewline options message
| "tab" -> yield! tokeniseTab options message
| "message" -> yield! messageParts
| "properties" -> yield! tokeniseExtraFields options message matchedFields
| "exceptions" -> yield! tokeniseExns options message
| _ -> yield! tokeniseMissingField fieldName format
}

seq {
Expand Down Expand Up @@ -1122,6 +1109,9 @@ module Message =
let setField name value (message: Message): Message =
{ message with context = message.context |> Map.add (Literals.FieldsPrefix + name) (box value) }

let setFields (fields: Map<string, obj>) (message: Message): Message =
fields |> Seq.fold (fun m (KeyValue (k, vO)) -> m |> setField k vO) message

let tryGetField name (message: Message): 'a option =
tryGetContext (Literals.FieldsPrefix + name) message

Expand Down
9 changes: 5 additions & 4 deletions examples/Logary.ConsoleApp/Program.fs
Expand Up @@ -16,6 +16,7 @@ open Logary.Configuration
open Logary.Targets
open Logary.Configuration
open Logary.Configuration.Transformers
open NodaTime

module RandomWalk =

Expand Down Expand Up @@ -115,20 +116,20 @@ let main argv =

let randomWalkPipe =
Events.events
|> Pipe.tickTimer (randomness) (TimeSpan.FromMilliseconds 500.)
|> Pipe.tickTimer (randomness) (Duration.FromMilliseconds 500.)

let processing =
Events.compose [
Events.events |> Events.minLevel LogLevel.Fatal |> Events.sink ["fatal"]

//Events.events
//|> Pipe.tickTimer (WinPerfCounters.appMetrics (PointName.ofSingle "app")) (TimeSpan.FromMilliseconds 5000.)
//|> Pipe.tickTimer (WinPerfCounters.appMetrics (PointName.ofSingle "app")) (Duration.FromMilliseconds 5000.)
//|> Pipe.map Array.toSeq
//|> Events.flattenToProcessing
//|> Events.sink ["console"; "influxdb"]

//Events.events
//|> Pipe.tickTimer (WinPerfCounters.systemMetrics (PointName.ofSingle "system")) (TimeSpan.FromMilliseconds 5000.)
//|> Pipe.tickTimer (WinPerfCounters.systemMetrics (PointName.ofSingle "system")) (Duration.FromMilliseconds 5000.)
//|> Pipe.map Array.toSeq
//|> Events.flattenToProcessing
//|> Events.sink ["console"; "influxdb"]
Expand All @@ -138,7 +139,7 @@ let main argv =

randomWalkPipe
|> Pipe.choose (Message.tryGetGauge "Logary.ConsoleApp.randomWalk")
|> Pipe.tickTimer timing (TimeSpan.FromSeconds 10.)
|> Pipe.tickTimer timing (Duration.FromSeconds 10.)
|> Pipe.map Array.toSeq
|> Events.flattenSeq
|> Events.sink ["console"]
Expand Down
1 change: 1 addition & 0 deletions examples/Logary.ConsoleApp/paket.references
Expand Up @@ -2,3 +2,4 @@ group Examples
RabbitMQ.Client
FSharp.Core
Argu
NodaTime
8 changes: 4 additions & 4 deletions examples/Logary.MetricsWriter/Program.fs
Expand Up @@ -43,23 +43,23 @@ let main argv =
let clock = SystemClock.Instance
let tenSecondsEWMATicker = EWMATicker (Duration.FromSeconds 1L, Duration.FromSeconds 10L, clock)
let randomWalk = Sample.randomWalk "randomWalk"
let walkPipe = Events.events |> Pipe.tickTimer randomWalk (TimeSpan.FromMilliseconds 500.)
let systemMetrics = Events.events |> Pipe.tickTimer (systemMetrics (PointName.parse "sys")) (TimeSpan.FromSeconds 10.)
let walkPipe = Events.events |> Pipe.tickTimer randomWalk (Duration.FromMilliseconds 500.)
let systemMetrics = Events.events |> Pipe.tickTimer (systemMetrics (PointName.parse "sys")) (Duration.FromSeconds 10.)
let processing =
Events.compose [
walkPipe
|> Events.sink ["WalkFile";]

walkPipe
|> Pipe.choose (Message.tryGetGauge "randomWalk")
|> Pipe.counter (fun _ -> 1L) (TimeSpan.FromSeconds 2.)
|> Pipe.counter (fun _ -> 1L) (Duration.FromSeconds 2.)
|> Pipe.map (fun counted -> Message.eventFormat (Info, "There are {totalNumbers} randomWalk within 2s", [|counted|]))
|> Events.sink ["Console";]

walkPipe
|> Pipe.choose (Message.tryGetGauge "randomWalk")
|> Pipe.map (fun _ -> 1L) // think of randomWalk as an event, mapping to 1
|> Pipe.tickTimer tenSecondsEWMATicker (TimeSpan.FromSeconds 5.)
|> Pipe.tickTimer tenSecondsEWMATicker (Duration.FromSeconds 5.)
|> Pipe.map (fun rate -> Message.eventFormat (Info, "tenSecondsEWMA of randomWalk's rate is {rateInSec}", [|rate|]))
|> Events.sink ["Console";]

Expand Down
5 changes: 3 additions & 2 deletions paket.dependencies
Expand Up @@ -52,7 +52,7 @@ nuget protobuf-net
github eiriktsarpalis/TypeShape src/TypeShape/TypeShape.fs
github eiriktsarpalis/TypeShape src/TypeShape/Utils.fs
github haf/YoLo:bd91bbe94a183aa9dc9c13e885c73b20516b01e7 YoLo.fs
github logary/RingBuffer:014138cdfc9dff76e74dc154e68c929655bcbd86 RingBuffer.fs
github logary/RingBuffer RingBuffer.fs
github logary/logary src/Logary.CSharp.Facade/Facade.cs
github logary/logary src/Logary.Facade/Facade.fs
github messagetemplates/messagetemplates-fsharp src/FsMtParser/FsMtParserFull.fs
Expand Down Expand Up @@ -88,4 +88,5 @@ group Benchmarks
nuget Hopac
nuget NodaTime
nuget Expecto
nuget Expecto.BenchmarkDotNet
nuget Expecto.BenchmarkDotNet == 8.6.5
nuget BenchmarkDotNet == 0.10.14

0 comments on commit 86401e9

Please sign in to comment.