Skip to content

Commit

Permalink
added support of step interception into NBomber Community version
Browse files Browse the repository at this point in the history
  • Loading branch information
AntyaDev committed Oct 5, 2022
1 parent 35ec16e commit 583a490
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 155 deletions.
25 changes: 9 additions & 16 deletions src/NBomber/Domain/Concurrency/ScenarioActor.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module internal NBomber.Domain.Concurrency.ScenarioActor

open System.Collections.Generic
open System.Threading.Tasks

open NBomber
open NBomber.Contracts
Expand All @@ -11,20 +10,14 @@ open NBomber.Domain.DomainTypes
open NBomber.Domain.Step
open NBomber.Domain.Stats.ScenarioStatsActor

type ActorDep = {
ScenarioDep: ScenarioDep
ExecSteps: StepDep -> RunningStep[] -> int[] -> Task<unit> // stepDep steps stepsOrder
}
type ScenarioActor(scnCtx: ScenarioExecContext, scenarioInfo: ScenarioInfo) =

type ScenarioActor(dep: ActorDep, scenarioInfo: ScenarioInfo) =

let _logger = dep.ScenarioDep.Logger.ForContext<ScenarioActor>()
let _scnDep = dep.ScenarioDep
let _scenario = dep.ScenarioDep.Scenario
let _logger = scnCtx.Logger.ForContext<ScenarioActor>()
let _scenario = scnCtx.Scenario
let mutable _actorWorking = false

let _stepDep = {
ScenarioDep = _scnDep
ScenarioExecContext = scnCtx
ScenarioInfo = scenarioInfo
Data = Dictionary<string,obj>()
}
Expand All @@ -41,20 +34,20 @@ type ScenarioActor(dep: ActorDep, scenarioInfo: ScenarioInfo) =
_actorWorking <- true

while shouldRun && _actorWorking
&& not _scnDep.ScenarioCancellationToken.IsCancellationRequested
&& _scenario.PlanedDuration.TotalMilliseconds > _scnDep.ScenarioTimer.Elapsed.TotalMilliseconds do
&& not scnCtx.ScenarioCancellationToken.IsCancellationRequested
&& _scenario.PlanedDuration.TotalMilliseconds > scnCtx.ScenarioTimer.Elapsed.TotalMilliseconds do

_stepDep.Data.Clear()

try
let stepOrder = Scenario.getStepOrder _scenario
do! dep.ExecSteps _stepDep _steps stepOrder
do! RunningStep.execSteps _stepDep _steps stepOrder
with
| ex ->
_logger.Error(ex, $"Unhandled exception for Scenario: {_scenario.ScenarioName}")
let response = Response.fail(statusCode = Constants.StepInternalClientErrorCode, error = ex.Message)
let resp = { StepIndex = 0; ClientResponse = response; EndTimeMs = 0; LatencyMs = 0 }
_scnDep.ScenarioStatsActor.Publish(AddResponse resp)
scnCtx.ScenarioStatsActor.Publish(AddResponse resp)

shouldRun <- runInfinite
else
Expand All @@ -63,7 +56,7 @@ type ScenarioActor(dep: ActorDep, scenarioInfo: ScenarioInfo) =
_actorWorking <- false
}

member _.ScenarioStatsActor = _scnDep.ScenarioStatsActor
member _.ScenarioStatsActor = scnCtx.ScenarioStatsActor
member _.ScenarioInfo = scenarioInfo
member _.Working = _actorWorking

Expand Down
9 changes: 5 additions & 4 deletions src/NBomber/Domain/Concurrency/ScenarioActorPool.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ module internal NBomber.Domain.Concurrency.ScenarioActorPool

open NBomber.Domain
open NBomber.Domain.Concurrency.ScenarioActor
open NBomber.Domain.Step

[<Struct>]
type ActorPoolResult = {
ActorsFromPool: ScenarioActor list
NewActors: ScenarioActor list
}

let createActors (dep: ActorDep) count fromIndex =
let scenario = dep.ScenarioDep.Scenario
let createActors (scnCtx: ScenarioExecContext) count fromIndex =
let scenario = scnCtx.Scenario
List.init count (fun i ->
let actorIndex = fromIndex + i
let scenarioInfo = Scenario.createScenarioInfo(scenario.ScenarioName, scenario.PlanedDuration, actorIndex, dep.ScenarioDep.ScenarioOperation)
ScenarioActor(dep, scenarioInfo)
let scenarioInfo = Scenario.createScenarioInfo(scenario.ScenarioName, scenario.PlanedDuration, actorIndex, scnCtx.ScenarioOperation)
ScenarioActor(scnCtx, scenarioInfo)
)

// todo: add tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ open System

open NBomber.Domain.Concurrency
open NBomber.Domain.Concurrency.ScenarioActor
open NBomber.Domain.Step

[<Struct>]
type SchedulerCommand =
Expand All @@ -12,8 +13,8 @@ type SchedulerCommand =
| RemoveActor of removeCount:int
| StopScheduler

// dep * actorPool * scheduledActorCount
type SchedulerExec = ActorDep -> ScenarioActor list -> int -> ScenarioActor list
// scnCtx * actorPool * scheduledActorCount
type SchedulerExec = ScenarioExecContext -> ScenarioActor list -> int -> ScenarioActor list

let removeFromScheduler scheduledActorsCount removeCount =
let actorsCount = scheduledActorsCount - removeCount
Expand All @@ -27,14 +28,14 @@ let schedule workingActorCount scheduledActorCount =
elif workingActorCount < scheduledActorCount then AddActors(scheduledActorCount - workingActorCount)
else RemoveActor(workingActorCount - scheduledActorCount)

let exec (dep: ActorDep) (actorPool: ScenarioActor list) (scheduledActorCount: int) =
let exec (scnCtx: ScenarioExecContext) (actorPool: ScenarioActor list) (scheduledActorCount: int) =
let workingActors = ScenarioActorPool.getWorkingActors actorPool
match schedule workingActors.Length scheduledActorCount with
| KeepWorking ->
actorPool

| AddActors count ->
let result = ScenarioActorPool.rentActors (ScenarioActorPool.createActors dep) actorPool count
let result = ScenarioActorPool.rentActors (ScenarioActorPool.createActors scnCtx) actorPool count
let newActorPool = ScenarioActorPool.updatePool actorPool result.NewActors
result.ActorsFromPool |> List.iter(fun x -> x.RunInfinite() |> ignore)
result.NewActors|> List.iter(fun x -> x.RunInfinite() |> ignore)
Expand All @@ -50,7 +51,7 @@ let exec (dep: ActorDep) (actorPool: ScenarioActor list) (scheduledActorCount: i
ScenarioActorPool.stopActors actorPool
actorPool

type ConstantActorScheduler(dep: ActorDep, exec: SchedulerExec) =
type ConstantActorScheduler(scnCtx: ScenarioExecContext, exec: SchedulerExec) =

let mutable _actorPool = List.empty<ScenarioActor>
let mutable _scheduledActorCount = 0
Expand All @@ -62,11 +63,11 @@ type ConstantActorScheduler(dep: ActorDep, exec: SchedulerExec) =

member _.AddActors(count) =
_scheduledActorCount <- _scheduledActorCount + count
_actorPool <- exec dep _actorPool _scheduledActorCount
_actorPool <- exec scnCtx _actorPool _scheduledActorCount

member _.RemoveActors(count) =
_scheduledActorCount <- removeFromScheduler _scheduledActorCount count
_actorPool <- exec dep _actorPool _scheduledActorCount
_actorPool <- exec scnCtx _actorPool _scheduledActorCount

member _.Stop() = stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ open System

open NBomber.Domain.Concurrency
open NBomber.Domain.Concurrency.ScenarioActor
open NBomber.Domain.Step

[<Struct>]
type SchedulerCommand =
| StartActors of actors:ScenarioActor list
| RentActors of actorCount:int

// dep * actorPool * scheduledActorCount
type SchedulerExec = ActorDep -> ScenarioActor list -> int -> ScenarioActor list
// scnCtx * actorPool * scheduledActorCount
type SchedulerExec = ScenarioExecContext -> ScenarioActor list -> int -> ScenarioActor list

// todo: add tests
let schedule (actorPool: ScenarioActor list) (actorCount: int) =
Expand All @@ -25,7 +26,7 @@ let schedule (actorPool: ScenarioActor list) (actorCount: int) =
else
StartActors freeActors

let exec (dep: ActorDep) (actorPool: ScenarioActor list) (scheduledActorCount: int) =
let exec (scnCtx: ScenarioExecContext) (actorPool: ScenarioActor list) (scheduledActorCount: int) =

let execSteps (actors: ScenarioActor list) =
actors |> List.iter(fun x -> x.ExecSteps() |> ignore)
Expand All @@ -36,12 +37,12 @@ let exec (dep: ActorDep) (actorPool: ScenarioActor list) (scheduledActorCount: i
actorPool

| RentActors actorCount ->
let result = ScenarioActorPool.rentActors (ScenarioActorPool.createActors dep) actorPool actorCount
let result = ScenarioActorPool.rentActors (ScenarioActorPool.createActors scnCtx) actorPool actorCount
execSteps result.ActorsFromPool
execSteps result.NewActors
ScenarioActorPool.updatePool actorPool result.NewActors

type OneTimeActorScheduler(dep: ActorDep, exec: SchedulerExec) =
type OneTimeActorScheduler(scnCtx: ScenarioExecContext, exec: SchedulerExec) =

let _lockObj = obj()
let mutable _actorPool = List.empty<ScenarioActor>
Expand All @@ -55,7 +56,7 @@ type OneTimeActorScheduler(dep: ActorDep, exec: SchedulerExec) =
member _.InjectActors(count) =
lock _lockObj (fun _ ->
_scheduledActorCount <- count
_actorPool <- exec dep _actorPool _scheduledActorCount
_actorPool <- exec scnCtx _actorPool _scheduledActorCount
)

member _.Stop() = stop()
Expand Down
50 changes: 25 additions & 25 deletions src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ open NBomber.Domain.Stats.ScenarioStatsActor
open NBomber.Domain.Concurrency.ScenarioActor
open NBomber.Domain.Concurrency.Scheduler.ConstantActorScheduler
open NBomber.Domain.Concurrency.Scheduler.OneTimeActorScheduler
open NBomber.Domain.Step

[<Struct>]
type SchedulerCommand =
Expand Down Expand Up @@ -59,24 +60,23 @@ let schedule (getRandomValue: int -> int -> int) // min -> max -> result
if constWorkingActorCount > 0 then [RemoveConstantActors(constWorkingActorCount); command]
else [command]

let emptyExec (dep: ActorDep) (actorPool: ScenarioActor list) (scheduledActorCount: int) = actorPool
let emptyExec (scnCtx: ScenarioExecContext) (actorPool: ScenarioActor list) (scheduledActorCount: int) = actorPool

type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
type ScenarioScheduler(scnCtx: ScenarioExecContext, scenarioClusterCount: int) =

let _log = dep.ScenarioDep.Logger.ForContext<ScenarioScheduler>()
let _scnDep = dep.ScenarioDep
let mutable _scenario = _scnDep.Scenario
let _log = scnCtx.Logger.ForContext<ScenarioScheduler>()
let mutable _scenario = scnCtx.Scenario
let mutable _currentSimulation = _scenario.LoadTimeLine.Head.LoadSimulation
let mutable _cachedSimulationStats = Unchecked.defaultof<LoadSimulationStats>
let mutable _isWorking = false

let _constantScheduler =
if _scenario.IsEnabled then new ConstantActorScheduler(dep, ConstantActorScheduler.exec)
else new ConstantActorScheduler(dep, emptyExec)
if _scenario.IsEnabled then new ConstantActorScheduler(scnCtx, ConstantActorScheduler.exec)
else new ConstantActorScheduler(scnCtx, emptyExec)

let _oneTimeScheduler =
if _scenario.IsEnabled then new OneTimeActorScheduler(dep, OneTimeActorScheduler.exec)
else new OneTimeActorScheduler(dep, emptyExec)
if _scenario.IsEnabled then new OneTimeActorScheduler(scnCtx, OneTimeActorScheduler.exec)
else new OneTimeActorScheduler(scnCtx, emptyExec)

let _tcs = TaskCompletionSource()
let _randomGen = Random()
Expand All @@ -89,25 +89,25 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =

let prepareForRealtimeStats () =
_cachedSimulationStats <- getCurrentSimulationStats()
_scnDep.ScenarioStatsActor.Publish StartUseTempBuffer
scnCtx.ScenarioStatsActor.Publish StartUseTempBuffer

let buildRealtimeStats (duration: TimeSpan) =
let simulationStats = getCurrentSimulationStats()
let reply = TaskCompletionSource<ScenarioStats>()
_scnDep.ScenarioStatsActor.Publish(BuildReportingStats(reply, simulationStats, duration))
scnCtx.ScenarioStatsActor.Publish(BuildReportingStats(reply, simulationStats, duration))
reply.Task

let commitRealtimeStats (duration) =
let reply = TaskCompletionSource<ScenarioStats>()
_scnDep.ScenarioStatsActor.Publish(BuildReportingStats(reply, _cachedSimulationStats, duration))
_scnDep.ScenarioStatsActor.Publish FlushTempBuffer
scnCtx.ScenarioStatsActor.Publish(BuildReportingStats(reply, _cachedSimulationStats, duration))
scnCtx.ScenarioStatsActor.Publish FlushTempBuffer
reply.Task

let getFinalStats () =
let simulationStats = getCurrentSimulationStats()
let duration = Scenario.getExecutedDuration _scenario
let reply = TaskCompletionSource<ScenarioStats>()
_scnDep.ScenarioStatsActor.Publish(GetFinalStats(reply, simulationStats, duration))
scnCtx.ScenarioStatsActor.Publish(GetFinalStats(reply, simulationStats, duration))
reply.Task

let getRandomValue minRate maxRate =
Expand All @@ -117,21 +117,21 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
if _isWorking then
_isWorking <- false
_tcs.TrySetResult() |> ignore
_scnDep.ScenarioCancellationToken.Cancel()
_scenario <- Scenario.setExecutedDuration(_scenario, _scnDep.ScenarioTimer.Elapsed)
_scnDep.ScenarioTimer.Stop()
scnCtx.ScenarioCancellationToken.Cancel()
_scenario <- Scenario.setExecutedDuration(_scenario, scnCtx.ScenarioTimer.Elapsed)
scnCtx.ScenarioTimer.Stop()
_constantScheduler.Stop()
_oneTimeScheduler.Stop()

let execScheduler () =
if _isWorking && _scnDep.ScenarioStatsActor.FailCount > _scnDep.MaxFailCount then
if _isWorking && scnCtx.ScenarioStatsActor.FailCount > scnCtx.MaxFailCount then
stop()
_scnDep.ExecStopCommand(StopCommand.StopTest $"Stopping test because of too many fails. Scenario '{_scenario.ScenarioName}' contains '{_scnDep.ScenarioStatsActor.FailCount}' fails.")
scnCtx.ExecStopCommand(StopCommand.StopTest $"Stopping test because of too many fails. Scenario '{_scenario.ScenarioName}' contains '{scnCtx.ScenarioStatsActor.FailCount}' fails.")

elif _isWorking then
let currentTime = _scnDep.ScenarioTimer.Elapsed
let currentTime = scnCtx.ScenarioTimer.Elapsed

if _scnDep.ScenarioOperation = ScenarioOperation.WarmUp
if scnCtx.ScenarioOperation = ScenarioOperation.WarmUp
&& _scenario.WarmUpDuration.IsSome && _scenario.WarmUpDuration.Value <= currentTime then
stop()
else
Expand All @@ -157,20 +157,20 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =

let start () =
_isWorking <- true
_scnDep.ScenarioTimer.Restart()
scnCtx.ScenarioTimer.Restart()
execScheduler()
_tcs.Task :> Task

member _.Working = _isWorking
member _.Scenario = _scenario
member _.AllRealtimeStats = _scnDep.ScenarioStatsActor.AllRealtimeStats
member _.MergedReportingStats = _scnDep.ScenarioStatsActor.MergedReportingStats
member _.AllRealtimeStats = scnCtx.ScenarioStatsActor.AllRealtimeStats
member _.MergedReportingStats = scnCtx.ScenarioStatsActor.MergedReportingStats

member _.Start() = start()
member _.Stop() = stop()
member _.ExecScheduler() = execScheduler()

member _.AddStatsFromAgent(stats) = _scnDep.ScenarioStatsActor.Publish(AddFromAgent stats)
member _.AddStatsFromAgent(stats) = scnCtx.ScenarioStatsActor.Publish(AddFromAgent stats)
member _.PrepareForRealtimeStats() = prepareForRealtimeStats()
member _.CommitRealtimeStats(duration) = commitRealtimeStats duration
member _.BuildRealtimeStats(duration) = buildRealtimeStats duration
Expand Down

0 comments on commit 583a490

Please sign in to comment.