Skip to content

Commit

Permalink
Initial commit for distributed tests using crank (#7323)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminpetit committed Dec 14, 2021
1 parent 43cce69 commit 1ae2434
Show file tree
Hide file tree
Showing 32 changed files with 1,515 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -30,6 +30,7 @@ x86/
bld/
[Bb]in/
[Oo]bj/
Artifacts/

# Roslyn cache directories
*.ide/
Expand Down
8 changes: 8 additions & 0 deletions Directory.Build.props
Expand Up @@ -100,12 +100,15 @@
<SystemCodeDomVersion>5.0.0</SystemCodeDomVersion>
<MicrosoftNETFrameworkReferenceAssembliesVersion>1.0.0</MicrosoftNETFrameworkReferenceAssembliesVersion>
<AzureIdentityVersion>1.3.0</AzureIdentityVersion>
<AzureKeyVaultVersion>4.2.0</AzureKeyVaultVersion>
<FSharpCoreVersion>4.7.0</FSharpCoreVersion>
<NSubstituteVersion>4.2.2</NSubstituteVersion>
<NSubstituteAnalyzersCSharpVersion>1.0.14</NSubstituteAnalyzersCSharpVersion>
<CoverletVersion>3.0.3</CoverletVersion>
<CsCheckVersion>1.1.3</CsCheckVersion>
<DotnetReportGeneratorCliVersion>4.3.0</DotnetReportGeneratorCliVersion>
<SystemCommandLineVersion>2.0.0-beta1.21308.1</SystemCommandLineVersion>
<CrankVersion>0.2.0-alpha.21457.1</CrankVersion>

<!-- Tooling related packages -->
<SourceLinkVersion>2.8.3</SourceLinkVersion>
Expand All @@ -130,6 +133,11 @@
<PackageOutputPath Condition=" '$(PackageOutputPath)'=='' ">$(SourceRoot)/Artifacts/$(Configuration)</PackageOutputPath>
</PropertyGroup>

<!-- Set output folder for distributed test apps -->
<PropertyGroup>
<DistributedTestsOutputPath Condition=" '$(DistributedODistributedTestsOutputPathutputPath)'=='' ">$(SourceRoot)/Artifacts/DistributedTests</DistributedTestsOutputPath>
</PropertyGroup>

<Choose>
<When Condition="'$(OfficialBuild)' != 'true'">
<!-- On non-official builds we don't burn in a git sha. In large part because it
Expand Down
31 changes: 31 additions & 0 deletions Orleans.sln
Expand Up @@ -195,6 +195,16 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.editorconfig = .editorconfig
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "DistributedTests", "DistributedTests", "{FFEC9FEE-FEDF-4510-B7D2-0B0B3374ED2F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DistributedTests.Common", "test\DistributedTests\DistributedTests.Common\DistributedTests.Common.csproj", "{E8FFBF2E-80FE-4D80-B79A-2097180D203A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DistributedTests.Grains", "test\DistributedTests\DistributedTests.Grains\DistributedTests.Grains.csproj", "{7F6A75BB-72AE-4509-8E82-55C764D0C070}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DistributedTests.Client", "test\DistributedTests\DistributedTests.Client\DistributedTests.Client.csproj", "{25D20278-8901-47CC-AD1D-F3C4BEB845BF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DistributedTests.Server", "test\DistributedTests\DistributedTests.Server\DistributedTests.Server.csproj", "{E8335DC9-9A7F-45C1-AFA3-0AA93ABD4FA5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -525,6 +535,22 @@ Global
{0ECCC344-21E8-4AD7-9CB9-933054200000}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0ECCC344-21E8-4AD7-9CB9-933054200000}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0ECCC344-21E8-4AD7-9CB9-933054200000}.Release|Any CPU.Build.0 = Release|Any CPU
{E8FFBF2E-80FE-4D80-B79A-2097180D203A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E8FFBF2E-80FE-4D80-B79A-2097180D203A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8FFBF2E-80FE-4D80-B79A-2097180D203A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8FFBF2E-80FE-4D80-B79A-2097180D203A}.Release|Any CPU.Build.0 = Release|Any CPU
{7F6A75BB-72AE-4509-8E82-55C764D0C070}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7F6A75BB-72AE-4509-8E82-55C764D0C070}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7F6A75BB-72AE-4509-8E82-55C764D0C070}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7F6A75BB-72AE-4509-8E82-55C764D0C070}.Release|Any CPU.Build.0 = Release|Any CPU
{25D20278-8901-47CC-AD1D-F3C4BEB845BF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{25D20278-8901-47CC-AD1D-F3C4BEB845BF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{25D20278-8901-47CC-AD1D-F3C4BEB845BF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{25D20278-8901-47CC-AD1D-F3C4BEB845BF}.Release|Any CPU.Build.0 = Release|Any CPU
{E8335DC9-9A7F-45C1-AFA3-0AA93ABD4FA5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E8335DC9-9A7F-45C1-AFA3-0AA93ABD4FA5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8335DC9-9A7F-45C1-AFA3-0AA93ABD4FA5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8335DC9-9A7F-45C1-AFA3-0AA93ABD4FA5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -622,6 +648,11 @@ Global
{EA49BFD9-6946-4858-AF4B-4138E871447F} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{2A9C009B-5219-47DB-A540-106DADF9FC91} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
{0ECCC344-21E8-4AD7-9CB9-933054200000} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
{FFEC9FEE-FEDF-4510-B7D2-0B0B3374ED2F} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{E8FFBF2E-80FE-4D80-B79A-2097180D203A} = {FFEC9FEE-FEDF-4510-B7D2-0B0B3374ED2F}
{7F6A75BB-72AE-4509-8E82-55C764D0C070} = {FFEC9FEE-FEDF-4510-B7D2-0B0B3374ED2F}
{25D20278-8901-47CC-AD1D-F3C4BEB845BF} = {FFEC9FEE-FEDF-4510-B7D2-0B0B3374ED2F}
{E8335DC9-9A7F-45C1-AFA3-0AA93ABD4FA5} = {FFEC9FEE-FEDF-4510-B7D2-0B0B3374ED2F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
114 changes: 114 additions & 0 deletions distributed-tests.yml
@@ -0,0 +1,114 @@
variables:
clusterId: '{{ "now" | date: "%s" }}'
serviceId: '{{ "now" | date: "%s" }}'
secretSource: KeyVault
framework: net5.0

jobs:
server:
source:
localFolder: Artifacts/DistributedTests/DistributedTests.Server/{{framework}}
executable: DistributedTests.Server.exe
readyStateText: Orleans Silo started.
framework: net5.0
arguments: "{{configurator}} --clusterId {{clusterId}} --serviceId {{serviceId}} --secretSource {{secretSource}} {{configuratorOptions}}"
onConfigure:
- if (job.endpoints.Count > 0) {
job.endpoints.Reverse();
job.endpoints.RemoveRange(job.variables.instances, job.endpoints.Count - job.variables.instances);
}
client:
source:
localFolder: Artifacts/DistributedTests/DistributedTests.Client/{{framework}}
executable: DistributedTests.Client.exe
waitForExit: true
framework: net5.0
arguments: "{{command}} --clusterId {{clusterId}} --serviceId {{serviceId}} --secretSource {{secretSource}} {{commandOptions}}"
onConfigure:
- if (job.endpoints.Count > 0) {
job.endpoints.Reverse();
job.endpoints.RemoveRange(job.variables.instances, job.endpoints.Count - job.variables.instances);
}

scenarios:
ping:
server:
job: server
variables:
instances: 10
configurator: SimpleSilo
client:
job: client
variables:
command: ping
instances: 1
numWorkers: 250
blocksPerWorker: 0
requestsPerBlock: 500
duration: 120
commandOptions: "--numWorkers {{numWorkers}} --blocksPerWorker {{blocksPerWorker}} --requestsPerBlock {{requestsPerBlock}} --duration {{duration}}"
streaming:
server:
job: server
variables:
instances: 10
configurator: EventGeneratorStreamingSilo
duration: 300
type: ExplicitGrainBasedAndImplicit
streamsPerQueue: 1000
queueCount: 8
wait: 20
configuratorOptions: "--type {{type}} --streamsPerQueue {{streamsPerQueue}} --queueCount {{queueCount}} --wait {{wait}} --duration {{duration}}"
client:
job: client
variables:
command: counter
instances: 1
commandOptions: "requests errors"
reliability:
server:
job: server
variables:
instances: 10
configurator: SimpleSilo
chaosagent:
job: client
waitForExit: false
variables:
command: chaosagent
instances: 1
wait: 30
serversPerRound: 1
rounds: 4
roundDelay: 30
graceful: true
restart: true
commandOptions: "--wait {{wait}} --serversPerRound {{serversPerRound}} --rounds {{rounds}} --roundDelay {{roundDelay}} --graceful {{graceful}} --restart {{restart}}"
client:
job: client
waitForExit: true
variables:
command: ping
instances: 1
numWorkers: 250
blocksPerWorker: 0
requestsPerBlock: 500
duration: 180
commandOptions: "--numWorkers {{numWorkers}} --blocksPerWorker {{blocksPerWorker}} --requestsPerBlock {{requestsPerBlock}} --duration {{duration}}"

profiles:
local:
variables:
secretSource: File
jobs:
server:
endpoints:
- http://localhost:5010
variables:
instances: 1
client:
endpoints:
- http://localhost:5010
chaosagent:
endpoints:
- http://localhost:5010
Expand Up @@ -8,7 +8,7 @@
namespace Orleans.Providers.Streams.Generator
{
[GenerateSerializer]
internal class GeneratedBatchContainer : IBatchContainer
public class GeneratedBatchContainer : IBatchContainer
{
[Id(0)]
public StreamId StreamId { get; }
Expand Down
@@ -0,0 +1,77 @@
using System;
using System.Collections.Generic;
using System.CommandLine;
using System.CommandLine.Invocation;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DistributedTests.Common.MessageChannel;
using Microsoft.Extensions.Logging;

namespace DistributedTests.Client.Commands
{
public class ChaosAgentCommand : Command
{
private readonly ILogger _logger;

private class Parameters
{
public string ServiceId { get; set; }
public string ClusterId { get; set; }
public SecretConfiguration.SecretSource SecretSource { get; set; }
public int Wait { get; set; }
public int ServersPerRound { get; set; }
public int Rounds { get; set; }
public int RoundDelay { get; set; }
public bool Graceful { get; set; }
public bool Restart { get; set; }
}

public ChaosAgentCommand(ILogger logger)
: base("chaosagent", "Shutdown/restart servers gracefully or not")
{
AddOption(OptionHelper.CreateOption<string>("--serviceId", isRequired: true));
AddOption(OptionHelper.CreateOption<string>("--clusterId", isRequired: true));
AddOption(OptionHelper.CreateOption("--secretSource", defaultValue: SecretConfiguration.SecretSource.File));
AddOption(OptionHelper.CreateOption<int>("--wait", defaultValue: 30));
AddOption(OptionHelper.CreateOption<int>("--serversPerRound", defaultValue: 1));
AddOption(OptionHelper.CreateOption<int>("--rounds", defaultValue: 5));
AddOption(OptionHelper.CreateOption<int>("--roundDelay", defaultValue: 60));
AddOption(OptionHelper.CreateOption<bool>("--graceful", defaultValue: false));
AddOption(OptionHelper.CreateOption<bool>("--restart", defaultValue: false));

Handler = CommandHandler.Create<Parameters>(RunAsync);
_logger = logger;
}

private async Task RunAsync(Parameters parameters)
{
var secrets = SecretConfiguration.Load(parameters.SecretSource);
var channel = await Channels.CreateSendChannel(parameters.ClusterId, secrets);

_logger.LogInformation($"Waiting {parameters.Wait} seconds before starting...");
await Task.Delay(TimeSpan.FromSeconds(parameters.Wait));

for (var i=0; i<parameters.Rounds; i++)
{
_logger.LogInformation($"Round #{i + 1}: sending {parameters.ServersPerRound} orders [Restart: {parameters.Restart}, Graceful: {parameters.Graceful}]");
var responses = await channel.SendMessages(
GetMessages(),
new CancellationTokenSource(TimeSpan.FromSeconds(parameters.RoundDelay)).Token);
_logger.LogInformation($"Round #{i + 1}: silos {string.Join(",", responses.Select(r => r.ServerName))} acked");
_logger.LogInformation($"Round #{i + 1}: waiting {parameters.RoundDelay}");
await Task.Delay(TimeSpan.FromSeconds(parameters.RoundDelay));
}

List<ServerMessage> GetMessages()
{
var msgs = new List<ServerMessage>();
for (var i = 0; i < parameters.ServersPerRound; i++)
{
msgs.Add(new ServerMessage(parameters.Graceful, parameters.Restart));
}
return msgs;
}
}
}
}
@@ -0,0 +1,87 @@
using System;
using System.Collections.Generic;
using System.CommandLine;
using System.CommandLine.Invocation;
using System.Threading.Tasks;
using DistributedTests.GrainInterfaces;
using Microsoft.Crank.EventSources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Orleans.Configuration;
using Orleans.Hosting;

namespace DistributedTests.Client.Commands
{
public class CounterCaptureCommand : Command
{
private readonly ILogger _logger;

private class Parameters
{
public string ServiceId { get; set; }
public string ClusterId { get; set; }
public SecretConfiguration.SecretSource SecretSource { get; set; }
public string CounterKey { get; set; }
public List<string> Counters { get; set; }
}

public CounterCaptureCommand(ILogger logger)
: base("counter", "capture the counters in parameter")
{
AddOption(OptionHelper.CreateOption<string>("--serviceId", isRequired: true));
AddOption(OptionHelper.CreateOption<string>("--clusterId", isRequired: true));
AddOption(OptionHelper.CreateOption("--secretSource", defaultValue: SecretConfiguration.SecretSource.File));
AddOption(OptionHelper.CreateOption<string>("--counterKey", defaultValue: StreamingConstants.DefaultCounterGrain));
AddArgument(new Argument<List<string>>("Counters") { Arity = ArgumentArity.OneOrMore });

Handler = CommandHandler.Create<Parameters>(RunAsync);
_logger = logger;
}

private async Task RunAsync(Parameters parameters)
{
_logger.LogInformation("Connecting to cluster...");
var secrets = SecretConfiguration.Load(parameters.SecretSource);
var hostBuilder = new HostBuilder()
.UseOrleansClient(builder => {
builder
.Configure<ClusterOptions>(options => { options.ClusterId = parameters.ClusterId; options.ServiceId = parameters.ServiceId; })
.UseAzureStorageClustering(options => options.ConfigureTableServiceClient(secrets.ClusteringConnectionString));
});
using var host = hostBuilder.Build();
await host.StartAsync();

var client = host.Services.GetService<IClusterClient>();

var counterGrain = client.GetGrain<ICounterGrain>(parameters.CounterKey);

var duration = await counterGrain.GetRunDuration();
BenchmarksEventSource.Register("duration", Operations.First, Operations.Last, "duration", "duration", "n0");
BenchmarksEventSource.Measure("duration", duration.TotalSeconds);

var initialWait = await counterGrain.WaitTimeForReport();

_logger.LogInformation($"Counters should be ready in {initialWait}");
await Task.Delay(initialWait);

_logger.LogInformation($"Counters ready");
foreach (var counter in parameters.Counters)
{
var value = await counterGrain.GetTotalCounterValue(counter);
_logger.LogInformation($"{counter}: {value}");
BenchmarksEventSource.Register(counter, Operations.First, Operations.Sum, counter, counter, "n0");
BenchmarksEventSource.Measure(counter, value);
if (string.Equals(counter, "requests", StringComparison.InvariantCultureIgnoreCase))
{
var rps = (float) value / duration.TotalSeconds;
BenchmarksEventSource.Register("rps", Operations.First, Operations.Last, "rps", "Requests per second", "n0");
BenchmarksEventSource.Measure("rps", rps);
}
}

await host.StopAsync();
}
}
}

0 comments on commit 1ae2434

Please sign in to comment.