Skip to content

Commit

Permalink
fixed issue with displaying reporting on console for cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
AntyaDev committed Sep 26, 2022
1 parent 9f0ef08 commit 2647ffa
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 101 deletions.
16 changes: 6 additions & 10 deletions src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,18 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
let buildRealtimeStats (duration: TimeSpan) =
let simulationStats = getCurrentSimulationStats()
let reply = TaskCompletionSource<ScenarioStats>()
_scnDep.ScenarioStatsActor.Publish(BuildRealtimeStats(reply, simulationStats, duration))
_scnDep.ScenarioStatsActor.Publish(BuildReportingStats(reply, simulationStats, duration))
reply.Task

let commitRealtimeStats (duration) =
let reply = TaskCompletionSource<ScenarioStats>()
_scnDep.ScenarioStatsActor.Publish(BuildRealtimeStats(reply, _cachedSimulationStats, duration))
_scnDep.ScenarioStatsActor.Publish(BuildReportingStats(reply, _cachedSimulationStats, duration))
_scnDep.ScenarioStatsActor.Publish FlushTempBuffer
reply.Task

let getStats (isFinal: bool) =
let getFinalStats () =
let simulationStats = getCurrentSimulationStats()

let duration =
if isFinal then Scenario.getExecutedDuration _scenario
else _scnDep.ScenarioTimer.Elapsed

let duration = Scenario.getExecutedDuration _scenario
let reply = TaskCompletionSource<ScenarioStats>()
_scnDep.ScenarioStatsActor.Publish(GetFinalStats(reply, simulationStats, duration))
reply.Task
Expand Down Expand Up @@ -168,6 +164,7 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
member _.Working = _isWorking
member _.Scenario = _scenario
member _.AllRealtimeStats = _scnDep.ScenarioStatsActor.AllRealtimeStats
member _.MergedReportingStats = _scnDep.ScenarioStatsActor.MergedReportingStats

member _.Start() = start()
member _.Stop() = stop()
Expand All @@ -177,8 +174,7 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
member _.PrepareForRealtimeStats() = prepareForRealtimeStats()
member _.CommitRealtimeStats(duration) = commitRealtimeStats duration
member _.BuildRealtimeStats(duration) = buildRealtimeStats duration
member _.GetFinalStats() = getStats true
member _.GetCurrentStats() = getStats false
member _.GetFinalStats() = getFinalStats()

interface IDisposable with
member _.Dispose() =
Expand Down
1 change: 0 additions & 1 deletion src/NBomber/Domain/LoadTimeLine.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
module internal NBomber.Domain.LoadTimeLine

open System
open System.Globalization

open FsToolkit.ErrorHandling

Expand Down
209 changes: 135 additions & 74 deletions src/NBomber/Domain/Stats/ScenarioStatsActor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,109 +12,170 @@ open NBomber.Contracts.Stats
open NBomber.Contracts.Internal
open NBomber.Domain.DomainTypes
open NBomber.Domain.Stats.Statistics
open NBomber.Domain.Stats.StepStatsRawData

type ActorMessage =
| AddResponse of StepResponse
| AddFromAgent of ScenarioStats
| StartUseTempBuffer
| FlushTempBuffer
| BuildRealtimeStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
| GetFinalStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
| BuildReportingStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
| GetFinalStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan

type State = {
Logger: ILogger
Scenario: Scenario
ReportingInterval: TimeSpan
MergeStatsFn: (LoadSimulationStats -> ScenarioStats list -> ScenarioStats) option
AllStepsData: StepStatsRawData[]

mutable AllReportingStats: Map<TimeSpan,ScenarioStats>
mutable ReportingStepsData: StepStatsRawData[]
mutable ReportingAgentsStats: ScenarioStats list
mutable ReportingTempBuffer: StepResponse list
mutable UseReportingTempBuffer: bool
mutable FinalAgentsStats: ScenarioStats list
mutable MergedReportingStats: ScenarioStats // need to display on console (we merge absolute request counts)

mutable FailCount: int
}

let createState logger scenario reportingInterval mergeStatsFn = {
Logger = logger
Scenario = scenario
ReportingInterval = reportingInterval
MergeStatsFn = mergeStatsFn
AllStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
ReportingStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
AllReportingStats = Map.empty
ReportingAgentsStats = List.empty
ReportingTempBuffer = List.empty
UseReportingTempBuffer = false
FinalAgentsStats = List.empty
MergedReportingStats = ScenarioStats.empty scenario
FailCount = 0
}

let addResponse (state: State) (resp: StepResponse) =
if state.UseReportingTempBuffer then
state.ReportingTempBuffer <- resp :: state.ReportingTempBuffer
else
let allStData = state.AllStepsData[resp.StepIndex]
let intervalStData = state.ReportingStepsData[resp.StepIndex]
state.AllStepsData[resp.StepIndex] <- StepStatsRawData.addResponse allStData resp
state.ReportingStepsData[resp.StepIndex] <- StepStatsRawData.addResponse intervalStData resp

if resp.ClientResponse.IsError then
state.FailCount <- state.FailCount + 1
state

let flushTempBuffer (state: State) =
state.UseReportingTempBuffer <- false
state.ReportingTempBuffer |> List.iter(addResponse state >> ignore)
state.ReportingTempBuffer <- List.empty
state

let createReportingStats (state: State) (simulationStats) (duration) (stepsData) =
ScenarioStats.create state.Scenario stepsData simulationStats OperationType.Bombing %duration state.ReportingInterval

let createFinalStats (state: State) (simulationStats) (duration) (stepsData) =
ScenarioStats.create state.Scenario stepsData simulationStats OperationType.Complete %duration duration

let buildStats (state: State)
(stepsData: StepStatsRawData[])
(agentStats: ScenarioStats list)
(simulationStats: LoadSimulationStats)
(duration: TimeSpan)
(isFinalStats: bool) =

let cordStats =
if isFinalStats then
stepsData |> createFinalStats state simulationStats duration
else
stepsData |> createReportingStats state simulationStats duration

let allStats =
if state.Scenario.IsEnabled then cordStats :: agentStats
else agentStats

if allStats.Length > 0 then
state.MergeStatsFn
|> Option.map(fun merge -> merge simulationStats allStats)
|> Option.defaultValue cordStats
else
cordStats

let mergeReportingStats (latestReportingStats: ScenarioStats) (reportingStats: ScenarioStats) =
let mergedStepSteps =
reportingStats.StepStats
|> Array.mapi(fun i x ->
let latestStep = latestReportingStats.StepStats[i]
let ok = { x.Ok.Request with Count = x.Ok.Request.Count + latestStep.Ok.Request.Count }
let fail = { x.Fail.Request with Count = x.Fail.Request.Count + latestStep.Fail.Request.Count }
let allBytes = { x.Ok.DataTransfer with AllBytes = x.Ok.DataTransfer.AllBytes + latestStep.Ok.DataTransfer.AllBytes }

let okData = { x.Ok with Request = ok; DataTransfer = allBytes }
let failData = { x.Fail with Request = fail }

{ x with Ok = okData; Fail = failData }
)

{ reportingStats with StepStats = mergedStepSteps } |> ScenarioStats.round

let addReportingStats (state: State) (reportingStats: ScenarioStats) =
state.AllReportingStats <- Map.add reportingStats.Duration reportingStats state.AllReportingStats
// reset reporting interval steps data
state.ReportingStepsData <- Array.init state.Scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
state.ReportingAgentsStats <- List.empty
state.MergedReportingStats <- mergeReportingStats state.MergedReportingStats reportingStats
state

let addStatsFromAgent (state: State) (agentStats: ScenarioStats) =
if agentStats.CurrentOperation = OperationType.Bombing then
state.ReportingAgentsStats <- agentStats :: state.ReportingAgentsStats
else
state.FinalAgentsStats <- agentStats :: state.FinalAgentsStats
state

type ScenarioStatsActor(logger: ILogger,
scenario: Scenario,
reportingInterval: TimeSpan,
?mergeStatsFn: LoadSimulationStats -> ScenarioStats list -> ScenarioStats) =

let _log = logger.ForContext<ScenarioStatsActor>()
let _allStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
let mutable _intervalStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
let mutable _allRealtimeStats = Map.empty<TimeSpan,ScenarioStats>
let mutable _agentsStats = List.empty<ScenarioStats>
let mutable _tempBuffer = List.empty<StepResponse>
let mutable _useTempBuffer = false
let mutable _failCount = 0

let addResponse (resp: StepResponse) =
let allStData = _allStepsData.[resp.StepIndex]
let intervalStData = _intervalStepsData.[resp.StepIndex]
_allStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse allStData resp
_intervalStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse intervalStData resp

if resp.ClientResponse.IsError then
_failCount <- _failCount + 1

let createRealtimeStats (simulationStats) (duration) (stepsData) =
ScenarioStats.create scenario stepsData simulationStats OperationType.Bombing %duration reportingInterval

let createFinalStats (simulationStats) (duration) (stepsData) =
ScenarioStats.create scenario stepsData simulationStats OperationType.Complete %duration duration

let addToCacheAndReset (realtimeStats: ScenarioStats) =
_allRealtimeStats <- _allRealtimeStats.Add(realtimeStats.Duration, realtimeStats)
// reset interval steps data
_intervalStepsData <- Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
_agentsStats <- List.empty
let mutable _state = createState logger scenario reportingInterval mergeStatsFn

let _actor = ActionBlock(fun msg ->
try
match msg with
| AddResponse response ->
if _useTempBuffer then _tempBuffer <- response :: _tempBuffer
else addResponse response
_state <- addResponse _state response

| AddFromAgent agentStats ->
_agentsStats <- agentStats :: _agentsStats
_state <- addStatsFromAgent _state agentStats

| StartUseTempBuffer ->
_useTempBuffer <- true
_state.UseReportingTempBuffer <- true

| FlushTempBuffer ->
_useTempBuffer <- false
_tempBuffer |> List.iter addResponse
_tempBuffer <- List.empty
_state <- flushTempBuffer _state

| BuildRealtimeStats (reply, simulationStats, duration) ->
let cordStats = _intervalStepsData |> createRealtimeStats simulationStats duration

let allStats =
if scenario.IsEnabled then cordStats :: _agentsStats
else _agentsStats

if allStats.Length > 0 then
let merged =
mergeStatsFn
|> Option.map(fun merge -> merge simulationStats allStats)
|> Option.defaultValue cordStats

addToCacheAndReset merged
reply.TrySetResult(merged) |> ignore
else
addToCacheAndReset cordStats
reply.TrySetResult(cordStats) |> ignore
| BuildReportingStats (reply, simulationStats, duration) ->
let isFinalStats = false
let stats = buildStats _state _state.ReportingStepsData _state.ReportingAgentsStats simulationStats duration isFinalStats
_state <- addReportingStats _state stats
reply.TrySetResult(stats) |> ignore

| GetFinalStats (reply, simulationStats, duration) ->
let cordStats = _allStepsData |> createFinalStats simulationStats duration

let allStats =
if scenario.IsEnabled then cordStats :: _agentsStats
else _agentsStats

if allStats.Length > 0 then
let merged =
mergeStatsFn
|> Option.map(fun merge -> merge simulationStats allStats)
|> Option.defaultValue cordStats

reply.TrySetResult(merged) |> ignore
else
reply.TrySetResult(cordStats) |> ignore
let isFinalStats = true
let stats = buildStats _state _state.AllStepsData _state.FinalAgentsStats simulationStats duration isFinalStats
reply.TrySetResult(stats) |> ignore
with
| ex -> logger.Error $"{nameof ScenarioStatsActor} failed: {ex.ToString()}"
| ex -> _state.Logger.Error $"{nameof ScenarioStatsActor} failed: {ex.ToString()}"
)

member _.FailCount = _failCount
member _.AllRealtimeStats = _allRealtimeStats
member _.FailCount = _state.FailCount
member _.AllRealtimeStats = _state.AllReportingStats
member _.MergedReportingStats = _state.MergedReportingStats

[<MethodImpl(MethodImplOptions.AggressiveInlining)>]
member _.Publish(msg) = _actor.Post(msg) |> ignore
Expand Down
34 changes: 33 additions & 1 deletion src/NBomber/Domain/Stats/Statistics.fs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,37 @@ module StepStats =

module ScenarioStats =

let empty (scenario: Scenario) =

let simulation = scenario.LoadTimeLine.Head.LoadSimulation
let simulationStats = LoadTimeLine.createSimulationStats(simulation, 0, 0)

let stepStats =
scenario.Steps
|> List.mapi(fun i st ->
if st.DoNotTrack then None
else
let clName = st.ClientFactory |> Option.map(fun x -> x.FactoryName |> ClientFactory.getOriginalName) |> Option.defaultValue "none"
let clCount = st.ClientFactory |> Option.map(fun x -> x.ClientCount) |> Option.defaultValue 0
let fdName = st.Feed |> Option.map(fun x -> x.FeedName) |> Option.defaultValue "none"
let stepData = StepStatsRawData.createEmpty()
Some (StepStats.create st.StepName stepData st.Timeout clName clCount fdName (TimeSpan.FromSeconds 5))
)
|> List.choose id
|> List.toArray

{ ScenarioName = scenario.ScenarioName
RequestCount = 0
OkCount = 0
FailCount = 0
AllBytes = 0
StepStats = stepStats
LatencyCount = { LessOrEq800 = 0; More800Less1200 = 0; MoreOrEq1200 = 0 }
LoadSimulationStats = simulationStats
StatusCodes = Array.empty
CurrentOperation = OperationType.None
Duration = TimeSpan.Zero }

let create (scenario: Scenario)
(allStepsData: StepStatsRawData[])
(simulationStats: LoadSimulationStats)
Expand All @@ -176,7 +207,8 @@ module ScenarioStats =
let clName = st.ClientFactory |> Option.map(fun x -> x.FactoryName |> ClientFactory.getOriginalName) |> Option.defaultValue "none"
let clCount = st.ClientFactory |> Option.map(fun x -> x.ClientCount) |> Option.defaultValue 0
let fdName = st.Feed |> Option.map(fun x -> x.FeedName) |> Option.defaultValue "none"
Some(StepStats.create st.StepName allStepsData[i] st.Timeout clName clCount fdName reportingInterval))
Some (StepStats.create st.StepName allStepsData[i] st.Timeout clName clCount fdName reportingInterval)
)
|> List.choose id
|> List.toArray

Expand Down

0 comments on commit 2647ffa

Please sign in to comment.