Skip to content

Commit

Permalink
replaced TPLDataFlow on Channels due to performance and memory alloca…
Browse files Browse the repository at this point in the history
…tions

fixed bug with DataTransfer calculation for Global Info
  • Loading branch information
AntyaDev committed Nov 22, 2022
1 parent 6ba83c9 commit cc2cce7
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 46 deletions.
4 changes: 2 additions & 2 deletions examples/CSharpProd/HTTP/HttpResponseValidation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace CSharpProd.HTTP;

public class JsonResponse
{
public string Fact { get; set; }
public string? Fact { get; set; }
public int Length { get; set; }
}

Expand All @@ -29,7 +29,7 @@ public void Run()
// var body = await response.Payload.Value.Content.ReadAsStringAsync();
var body = await response.Payload.Value.Content.ReadFromJsonAsync<JsonResponse>();
if (body.Length > 11111)
if (body?.Length > 11111)
return Response.Fail(statusCode: "small length", sizeBytes: response.SizeBytes);
return response;
Expand Down
62 changes: 35 additions & 27 deletions src/NBomber/Domain/Stats/ScenarioStatsActor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
open System
open System.Collections.Generic
open System.Runtime.CompilerServices
open System.Threading.Channels
open System.Threading.Tasks
open System.Threading.Tasks.Dataflow

open FSharp.UMX
open Serilog
Expand Down Expand Up @@ -82,9 +82,11 @@ let updateIntervalStats (state: State) (measurement: Measurement) =
RawMeasurementStats.addMeasurement rawStats measurement

let updateGlobalInfoDataSize (state: State) (measurement: Measurement) =
if state.GlobalInfoDataSize.Count > 0 && measurement.Name = Constants.ScenarioGlobalInfo then
measurement.ClientResponse.SizeBytes <- (state.GlobalInfoDataSize |> Seq.sum) + measurement.ClientResponse.SizeBytes
state.GlobalInfoDataSize.Clear()
if measurement.Name = Constants.ScenarioGlobalInfo then

if state.GlobalInfoDataSize.Count > 0 then
measurement.ClientResponse.SizeBytes <- (state.GlobalInfoDataSize |> Seq.sum) + measurement.ClientResponse.SizeBytes
state.GlobalInfoDataSize.Clear()

elif measurement.ClientResponse.SizeBytes > 0 then
state.GlobalInfoDataSize.Add measurement.ClientResponse.SizeBytes
Expand Down Expand Up @@ -198,48 +200,54 @@ type ScenarioStatsActor(logger: ILogger,
?mergeStatsFn: LoadSimulationStats -> ScenarioStats seq -> ScenarioStats) =

let mutable _state = createState logger scenario reportingInterval mergeStatsFn
let mutable _stop = false
let _channel = Channel.CreateUnbounded<ActorMessage>()

let _actor = ActionBlock(fun msg ->
let loop () = backgroundTask {
try
match msg with
| AddMeasurement result ->
addMeasurement _state result
while not _stop do
match! _channel.Reader.ReadAsync() with
| AddMeasurement result ->
addMeasurement _state result

| AddFromAgent agentStats ->
addStatsFromAgent _state agentStats
| AddFromAgent agentStats ->
addStatsFromAgent _state agentStats

| StartUseTempBuffer ->
_state.UseTempBuffer <- true
| StartUseTempBuffer ->
_state.UseTempBuffer <- true

| FlushTempBuffer ->
flushTempBuffer _state
| FlushTempBuffer ->
flushTempBuffer _state

| BuildReportingStats (reply, simulationStats, duration) ->
let isFinalStats = false
let reportingStats = _state.IntervalStepsResults.Values |> Seq.toArray
| BuildReportingStats (reply, simulationStats, duration) ->
let isFinalStats = false
let reportingStats = _state.IntervalStepsResults.Values |> Seq.toArray

let stats = buildStats _state reportingStats _state.IntervalAgentsStats simulationStats duration isFinalStats
let stats = buildStats _state reportingStats _state.IntervalAgentsStats simulationStats duration isFinalStats

addReportingStats _state stats
reply.TrySetResult(stats) |> ignore
addReportingStats _state stats
reply.TrySetResult(stats) |> ignore

| GetFinalStats (reply, simulationStats, duration) ->
let isFinalStats = true
| GetFinalStats (reply, simulationStats, duration) ->
let isFinalStats = true

let cordStats = _state.CoordinatorStepsResults.Values |> Seq.toArray
let cordStats = _state.CoordinatorStepsResults.Values |> Seq.toArray

let stats = buildStats _state cordStats _state.FinalAgentsStats simulationStats duration isFinalStats
reply.TrySetResult(stats) |> ignore
let stats = buildStats _state cordStats _state.FinalAgentsStats simulationStats duration isFinalStats
reply.TrySetResult(stats) |> ignore
with
| ex -> _state.Logger.Fatal $"Unhandled exception: {nameof ScenarioStatsActor} failed: {ex.ToString()}"
)
}

do
loop() |> ignore

member _.ScenarioFailCount = _state.ScenarioFailCount
member _.AllRealtimeStats = _state.ReportingStatsCache
member _.ConsoleScenarioStats = _state.ConsoleScenarioStats

[<MethodImpl(MethodImplOptions.AggressiveInlining)>]
member _.Publish(msg) = _actor.Post(msg) |> ignore
member _.Publish(msg) = _channel.Writer.TryWrite(msg) |> ignore

let createDefault (logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) =
ScenarioStatsActor(logger, scenario, reportingInterval)
Expand Down
2 changes: 1 addition & 1 deletion src/NBomber/DomainServices/NBomberRunner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@ let run (context: NBomberContext) =
error |> AppError.toString |> Serilog.Log.Error
error
)
|> fun task -> task.Result
|> fun task -> task.GetAwaiter().GetResult()
4 changes: 4 additions & 0 deletions src/NBomber/DomainServices/TestHost/TestHost.fs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ type internal TestHost(dep: IGlobalDependency, regScenarios: Scenario list) as t

do! dep.WorkerPlugins |> WorkerPlugins.stop _log
do! dep.ReportingSinks |> ReportingSinks.stop _log

if isWarmUp then
GC.Collect()
do! Task.Delay 1_000
}

let startInit (consoleStatus: StatusContext option) (sessionArgs: SessionArgs) = taskResult {
Expand Down
8 changes: 4 additions & 4 deletions src/NBomber/NBomber.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="NBomber.Contracts" Version="[4.0.0-beta9]" />
<PackageReference Include="NBomber.Contracts" Version="[4.0.0-beta10]" />
<PackageReference Include="CommandLineParser" Version="2.8.0" />
<PackageReference Include="CsvHelper" Version="27.2.1" />
<PackageReference Include="FSharp.UMX" Version="1.1.0" />
Expand All @@ -80,9 +80,9 @@
<PackageReference Include="Serilog.Settings.Configuration" Version="3.3.0" />
<PackageReference Include="Serilog.Sinks.File" Version="5.0.0" />
<PackageReference Include="ConsoleTables" Version="2.4.2" />
<PackageReference Include="Serilog.Sinks.SpectreConsole" Version="[0.3.3]" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="6.0.0" />
<PackageReference Update="FSharp.Core" Version="7.0.0" />
<PackageReference Include="Serilog.Sinks.SpectreConsole" Version="[0.3.3]" />
<PackageReference Update="FSharp.Core" Version="7.0.0" />
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Resources\HtmlReport\assets\js\index.js" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
<Compile Include="StepTests\BasicStepTests.fs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="Ply" Version="0.3.1" />
<PackageReference Include="Serilog.Sinks.InMemory" Version="0.6.0" />
<PackageReference Include="Serilog.Sinks.InMemory.Assertions" Version="0.6.0" />
<PackageReference Include="Unquote" Version="6.1.0" />
Expand Down
1 change: 0 additions & 1 deletion tests/NBomber.IntegrationTests/ScenarioStatsActorTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ open Xunit
open Swensen.Unquote

open NBomber
open NBomber.Contracts
open NBomber.Contracts.Stats
open NBomber.Contracts.Internal
open NBomber.Extensions.Internal
Expand Down
21 changes: 11 additions & 10 deletions tests/NBomber.IntegrationTests/StepTests/BasicStepTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,27 @@ let ``Response Ok and Fail should be properly count`` () =
[<Trait("CI", "disable")>]
let ``Min/Mean/Max/RPS/DataTransfer should be properly count`` () =

Scenario.create("latency count test", fun ctx -> task {
Scenario.create("latency count test", fun ctx -> backgroundTask {
do! Task.Delay(milliseconds 100)
return Response.ok(sizeBytes = 1000)
})
|> Scenario.withWarmUpDuration(TimeSpan.FromSeconds 1.0)
|> Scenario.withWarmUpDuration(seconds 2)
|> Scenario.withLoadSimulations [KeepConstant(copies = 1, during = seconds 10)]
|> NBomberRunner.registerScenario
|> NBomberRunner.withoutReports
|> NBomberRunner.run
|> Result.getOk
|> fun nodeStats ->
let stats = nodeStats.ScenarioStats[0]

test <@ stats.Ok.Request.RPS >= 8.0 @>
test <@ stats.Ok.Request.RPS <= 10.0 @>
test <@ stats.Ok.Latency.MinMs <= 103.0 @>
test <@ stats.Ok.Latency.MeanMs <= 110.0 @>
test <@ stats.Ok.Latency.MaxMs <= 120.0 @>
test <@ stats.Ok.DataTransfer.MinBytes = 1000 @>
test <@ stats.Ok.DataTransfer.AllBytes >= 90_000L && stats.Ok.DataTransfer.AllBytes <= 100_000L @>
let ok = stats.Ok

test <@ ok.Request.RPS >= 8.0 @>
test <@ ok.Request.RPS <= 10.0 @>
test <@ ok.Latency.MinMs <= 103.0 @>
test <@ ok.Latency.MeanMs <= 111.0 @>
test <@ ok.Latency.MaxMs <= 125.0 @>
test <@ ok.DataTransfer.MinBytes = 1000 @>
test <@ ok.DataTransfer.AllBytes >= 90_000L && ok.DataTransfer.AllBytes <= 100_000L @>

// [<Fact>]
// let ``can be duplicated to introduce repeatable behaviour`` () =
Expand Down

0 comments on commit cc2cce7

Please sign in to comment.