Permalink
Browse files

Added project to test stressing the message bus.

  • Loading branch information...
1 parent 84b4b41 commit 9c51ca7f2d8a2ab277722b04e5a8ec05a77bbe07 @davidfowl davidfowl committed Jan 22, 2012
@@ -0,0 +1,194 @@
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace SignalR.Stress
+{
+ class Program
+ {
+ private static Timer _rateTimer;
+ private static bool _measuringRate;
+ private static Stopwatch _sw = Stopwatch.StartNew();
+ private static double _receivesPerSecond;
+ private static double _peakReceivesPerSecond;
+ private static double _avgReceivesPerSecond;
+ private static long _received;
+ private static long _lastReceivedCount;
+ private static long _avgLastReceivedCount;
+ private static DateTime _avgCalcStart;
+ private static long _rate = 10;
+ private static int _runs = 1000;
+ private static int _clients = 100;
+ private static Exception _exception;
+
+ public static long TotalRate
+ {
+ get
+ {
+ return _rate * _clients;
+ }
+ }
+
+ static void Main(string[] args)
+ {
+ var bus = new InProcessMessageBus();
+ var eventKeys = new[] { "a", "b", "c" };
+ string payload = GetPayload();
+
+ TaskScheduler.UnobservedTaskException += OnUnobservedTaskException;
+
+ for (int i = 0; i < _clients; i++)
+ {
+ Task.Factory.StartNew(() => StartClientLoop(bus, eventKeys), TaskCreationOptions.LongRunning);
+ }
+
+ Task.Factory.StartNew(() =>
+ {
+ while (_exception == null)
+ {
+ long old = _rate;
+ var interval = TimeSpan.FromMilliseconds(1000.0 / _rate);
+ while (Interlocked.Read(ref _rate) == old && _exception == null)
+ {
+ try
+ {
+ bus.Send("a", payload).Wait();
+ Thread.Sleep(interval);
+ }
+ catch (Exception ex)
+ {
+ Interlocked.Exchange(ref _exception, ex);
+ }
+ }
+ }
+ },
+ TaskCreationOptions.LongRunning);
+
+ MeasureStats();
+
+ Console.ReadLine();
+ }
+
+ private static string GetPayload(int n = 32)
+ {
+ return Encoding.UTF8.GetString(Enumerable.Range(0, n).Select(i => (byte)i).ToArray());
+ }
+
+ static void OnUnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
+ {
+ Interlocked.Exchange(ref _exception, e.Exception);
+ e.SetObserved();
+ }
+
+ private static void StartClientLoop(InProcessMessageBus bus, string[] eventKeys)
+ {
+ ReceiveLoop(bus, eventKeys, null);
+ }
+
+ private static void ReceiveLoop(InProcessMessageBus bus, string[] eventKeys, ulong? id)
+ {
+ try
+ {
+ bus.GetMessagesSince(eventKeys, id).ContinueWith(task =>
+ {
+ if (task.IsFaulted)
+ {
+ Interlocked.Exchange(ref _exception, task.Exception);
+ }
+ else
+ {
+ var list = task.Result.ToList();
+ id = list[list.Count - 1].Id;
+ Interlocked.Increment(ref _received);
+ Interlocked.Increment(ref _avgLastReceivedCount);
+
+ ReceiveLoop(bus, eventKeys, id);
+ }
+ });
+ }
+ catch (Exception ex)
+ {
+ Interlocked.Exchange(ref _exception, ex);
+ }
+ }
+
+ public static void MeasureStats()
+ {
+ _sw.Start();
+ _avgCalcStart = DateTime.UtcNow;
+
+ _rateTimer = new Timer(_ =>
+ {
+ if (_measuringRate)
+ {
+ return;
+ }
+ _measuringRate = true;
+
+ try
+ {
+ var now = DateTime.UtcNow;
+ var timeDiffSecs = _sw.Elapsed.TotalSeconds;
+
+ _sw.Restart();
+
+ if (timeDiffSecs <= 0)
+ {
+ return;
+ }
+
+ if (_exception != null)
+ {
+ Console.WriteLine("Failed With:\r\n {0}", _exception.GetBaseException());
+ _rateTimer.Change(-1, -1);
+ _rateTimer.Dispose();
+ _rateTimer = null;
+ return;
+ }
+
+ var recv = Interlocked.Read(ref _received);
+ var recvDiff = recv - _lastReceivedCount;
+ var recvPerSec = recvDiff / timeDiffSecs;
+ _receivesPerSecond = recvPerSec;
+
+ _lastReceivedCount = recv;
+
+ Console.Clear();
+ Console.WriteLine("Total Rate: {0:0.000} (mps) = {1:0.000} (mps) * {2} (clients)", TotalRate, _rate, _clients);
+ var d1 = Math.Max(0, TotalRate - _receivesPerSecond);
+ Console.WriteLine("RPS: {0:0.000} (diff: {1:0.000}, {2:0.00}%)", Math.Min(TotalRate, _receivesPerSecond), d1, d1 * 100.0 / TotalRate);
+ var d2 = Math.Max(0, TotalRate - _peakReceivesPerSecond);
+ Console.WriteLine("Peak RPS: {0:0.000} (diff: {1:0.000} {2:0.00}%)", Math.Min(TotalRate, _peakReceivesPerSecond), d2, d2 * 100.0 / TotalRate);
+ var d3 = Math.Max(0, TotalRate - _avgReceivesPerSecond);
+ Console.WriteLine("Avg RPS: {0:0.000} (diff: {1:0.000} {2:0.00}%)", Math.Min(TotalRate, _avgReceivesPerSecond), d3, d3 * 100.0 / TotalRate);
+
+ if (recvPerSec < long.MaxValue && recvPerSec > _peakReceivesPerSecond)
+ {
+ Interlocked.Exchange(ref _peakReceivesPerSecond, recvPerSec);
+ }
+
+ _avgReceivesPerSecond = _avgLastReceivedCount / (now - _avgCalcStart).TotalSeconds;
+
+ if (_runs > 0 && _runs % 30 == 0)
+ {
+ _avgCalcStart = DateTime.UtcNow;
+ _avgLastReceivedCount = 0;
+ long old = _rate;
+ long @new = old + 10;
+ while (Interlocked.Exchange(ref _rate, @new) == old) { }
+ }
+
+ _runs++;
+
+ }
+ finally
+ {
+ _measuringRate = false;
+ }
+ }, null, 1000, 1000);
+ }
+ }
+}
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("SignalR.Stress")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Microsoft")]
+[assembly: AssemblyProduct("SignalR.Stress")]
+[assembly: AssemblyCopyright("Copyright © Microsoft 2012")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("7b897397-3992-47f4-abd0-68a0f5a7b736")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">x86</Platform>
+ <ProductVersion>8.0.30703</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{758D93C8-7985-4F55-82DA-8BA5198CF4EF}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>SignalR.Stress</RootNamespace>
+ <AssemblyName>SignalR.Stress</AssemblyName>
+ <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+ <TargetFrameworkProfile>
+ </TargetFrameworkProfile>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">
+ <PlatformTarget>x86</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' ">
+ <PlatformTarget>x86</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Program.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\SignalR\SignalR.csproj">
+ <Project>{1B9A82C4-BCA1-4834-A33E-226F17BE070B}</Project>
+ <Name>SignalR</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="app.config" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
@@ -0,0 +1,3 @@
+<?xml version="1.0"?>
+<configuration>
+<startup><supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/></startup></configuration>
View
@@ -29,6 +29,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SignalR.Hosting.Owin", "Sig
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SignalR.Hosting.Owin.Samples", "SignalrKayakGateDemo\SignalR.Hosting.Owin.Samples.csproj", "{8BF43A5B-4C96-4490-A9AF-5A057AC37C24}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SignalR.Stress", "SignalR.Stress\SignalR.Stress.csproj", "{758D93C8-7985-4F55-82DA-8BA5198CF4EF}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -149,6 +151,16 @@ Global
{8BF43A5B-4C96-4490-A9AF-5A057AC37C24}.Release|Mixed Platforms.Build.0 = Release|x86
{8BF43A5B-4C96-4490-A9AF-5A057AC37C24}.Release|x86.ActiveCfg = Release|x86
{8BF43A5B-4C96-4490-A9AF-5A057AC37C24}.Release|x86.Build.0 = Release|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Debug|Any CPU.ActiveCfg = Debug|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Debug|Mixed Platforms.ActiveCfg = Debug|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Debug|Mixed Platforms.Build.0 = Debug|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Debug|x86.ActiveCfg = Debug|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Debug|x86.Build.0 = Debug|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Release|Any CPU.ActiveCfg = Release|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Release|Mixed Platforms.ActiveCfg = Release|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Release|Mixed Platforms.Build.0 = Release|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Release|x86.ActiveCfg = Release|x86
+ {758D93C8-7985-4F55-82DA-8BA5198CF4EF}.Release|x86.Build.0 = Release|x86
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

0 comments on commit 9c51ca7

Please sign in to comment.