Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added project for testing performance of HTTP & Thrift protocols #112

Merged
merged 1 commit into from Nov 11, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Nest.Connection.Thrift/ThriftConnection.cs
Expand Up @@ -9,7 +9,9 @@

namespace Nest.Thrift
{
internal class ThriftConnection : IConnection, IDisposable
// TODO: Cocowalla
// Changed from internal to public for performance testing
public class ThriftConnection : IConnection, IDisposable
{
private readonly Rest.Client _client;
private readonly TProtocol _protocol;
Expand Down
12 changes: 12 additions & 0 deletions src/Nest.sln
Expand Up @@ -34,6 +34,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{E5457A
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nest.Tests.Unit", "Nest.Tests.Unit\Nest.Tests.Unit.csproj", "{97408393-78AC-45DF-BE6E-4C219A2E456D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProtocolLoadTest", "ProtocolLoadTest\ProtocolLoadTest.csproj", "{B9FE4875-0171-40F7-A357-064A93BE09C6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -122,6 +124,16 @@ Global
{97408393-78AC-45DF-BE6E-4C219A2E456D}.Release|Mixed Platforms.Build.0 = Release|x86
{97408393-78AC-45DF-BE6E-4C219A2E456D}.Release|x86.ActiveCfg = Release|x86
{97408393-78AC-45DF-BE6E-4C219A2E456D}.Release|x86.Build.0 = Release|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Debug|Any CPU.ActiveCfg = Debug|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Debug|Mixed Platforms.ActiveCfg = Debug|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Debug|Mixed Platforms.Build.0 = Debug|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Debug|x86.ActiveCfg = Debug|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Debug|x86.Build.0 = Debug|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Release|Any CPU.ActiveCfg = Release|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Release|Mixed Platforms.ActiveCfg = Release|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Release|Mixed Platforms.Build.0 = Release|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Release|x86.ActiveCfg = Release|x86
{B9FE4875-0171-40F7-A357-064A93BE09C6}.Release|x86.Build.0 = Release|x86
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
7 changes: 7 additions & 0 deletions src/Nest/ElasticClient.cs
Expand Up @@ -133,6 +133,13 @@ private ConnectionStatus GetNodeInfo()
response = this.Connection.GetSync("");
if (response.Success)
{
// TODO: Cocowalla
// Next 3 lines are just there to workaround Thrift connection issue,
// whereby only the first call to GetNodeInfo works
this._IsValid = true;
this._gotNodeInfo = true;
return response;

JObject o = JObject.Parse(response.Result);
if (o["ok"] == null)
{
Expand Down
49 changes: 49 additions & 0 deletions src/ProtocolLoadTest/HttpTester.cs
@@ -0,0 +1,49 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Nest;

namespace ProtocolLoadTest
{
internal class HttpTester : Tester, ITester
{
// Number of messages sent by all HttpTester instances
private static int NumSent;

public void Run(string indexName, int port, int numMessages, int bufferSize)
{
var bulkParms = new SimpleBulkParameters() { Refresh = false };

var settings = new ConnectionSettings("localhost", port)
.SetDefaultIndex(indexName);

var client = new ElasticClient(settings);

Connect(client, settings);

IList<Message> msgBuffer = new List<Message>(bufferSize);

var msgGenerator = new MessageGenerator();

foreach (var msg in msgGenerator.Generate(numMessages))
{
msgBuffer.Add(msg);

// Flush buffer once max size reached
if (msgBuffer.Count >= bufferSize)
{
client.IndexMany(msgBuffer, indexName, bulkParms);
msgBuffer.Clear();

Interlocked.Add(ref NumSent, bufferSize);

// Output how many messages sent so far
if (NumSent % 10000 == 0)
{
Console.WriteLine("Sent {0:0,0} messages over HTTP", NumSent);
}
}
}
}
}
}
8 changes: 8 additions & 0 deletions src/ProtocolLoadTest/ITester.cs
@@ -0,0 +1,8 @@

namespace ProtocolLoadTest
{
internal interface ITester
{
void Run(string indexName, int port, int numMessages, int bufferSize);
}
}
16 changes: 16 additions & 0 deletions src/ProtocolLoadTest/Message.cs
@@ -0,0 +1,16 @@
using System;
using System.Runtime.Serialization;

namespace ProtocolLoadTest
{
public class Message
{
// Already included by NEST as ID property, so don't store it again as a seperate field in the
// index
[IgnoreDataMember]
public Guid Id { get; set; }

public DateTime Timestamp { get; set; }
public string Body { get; set; }
}
}
42 changes: 42 additions & 0 deletions src/ProtocolLoadTest/MessageGenerator.cs
@@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace ProtocolLoadTest
{
/// <summary>
/// Generates random messages
/// </summary>
internal class MessageGenerator
{
private static Random Rng = new Random(15842);
private static string[] WORDS = { "molestie", "vel", "metus", "neque", "dui", "volutpat", "sollicitudin", "sociis", "ac", "imperdiet", "tristique", "et", "nascetur", "ad", "rhoncus", "viverra", "ornare", "consectetur", "ultrices", "orci", "parturient", "lorem", "massa", "quis", "platea", "aenean", "fermentum", "augue", "placerat", "auctor", "natoque", "habitasse", "pharetra", "ridiculus", "leo", "sit", "cras", "est", "venenatis", "aptent", "nibh", "magnis", "sodales", "malesuada", "praesent", "potenti", "lobortis", "justo", "quam", "cubilia", "pellentesque", "porttitor", "pretium", "adipiscing", "phasellus", "lectus", "vivamus", "id", "mi", "bibendum", "feugiat", "odio", "rutrum", "vestibulum", "posuere", "elementum", "suscipit", "purus", "accumsan", "egestas", "mus", "varius", "a", "arcu", "commodo", "dis", "lacinia", "tellus", "cursus", "aliquet", "interdum", "turpis", "maecenas", "dapibus", "cum", "fames", "montes", "iaculis", "erat", "euismod", "hac", "faucibus", "mauris", "tempus", "primis", "velit", "sem", "duis", "luctus", "penatibus", "sapien", "blandit", "eros", "suspendisse", "urna", "ipsum", "congue", "nulla", "taciti", "mollis", "facilisis", "at", "amet", "laoreet", "dignissim", "fringilla", "in", "nostra", "quisque", "donec", "enim", "eleifend", "nisl", "morbi", "felis", "torquent", "eget", "convallis", "etiam", "tincidunt", "facilisi", "pulvinar", "vulputate", "integre", "himenaeos", "netus", "senectus", "non", "litora", "per", "curae", "ultricies", "nec", "nam", "eu", "ante", "mattis", "vehicula", "sociosqu", "nunc", "semper", "lacus", "proin", "risus", "condimentum", "scelerisque", "conubia", "consequat", "dolor", "libero", "diam", "ut", "inceptos", "porta", "nullam", "dictumst", "magna", "tempor", "fusce", "vitae", "aliquam", "curabitur", "ligula", "habitant", "class", "hendrerit", "sagittis", "gravida", "nisi", "tortor", "ullamcorper", "dictum", "elit", "sed" };

public IEnumerable<Message> Generate(int numToGenerate)
{
for (int i = 0; i < numToGenerate; i++)
{
yield return new Message()
{
Id = Guid.NewGuid(),
Timestamp = DateTime.UtcNow,
Body = GetMessageText()
};
}
}

private static string GetMessageText()
{
int numWords = Rng.Next(1, 20);

var sb = new StringBuilder();

for (int i = 0; i < numWords; i++)
{
sb.Append(WORDS[Rng.Next(0, WORDS.Length)]).Append(" ");
}

return sb.ToString().TrimEnd();
}
}
}
123 changes: 123 additions & 0 deletions src/ProtocolLoadTest/Program.cs
@@ -0,0 +1,123 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Nest;

namespace ProtocolLoadTest
{
class Program
{
const string INDEX_PREFIX = "proto-load-test-";
const int HTTP_PORT = 9200;
const int THRIFT_PORT = 9500;

// Total number of messages to send to ElasticSearch
const int NUM_MESSAGES = 250000;

// Number of ElasticSearch clients to use in parallel
const int NUM_CLIENTS = 4;

// Number of messages to buffer before sending via bulk API
const int BUFFER_SIZE = 100;

static void Main(string[] args)
{
double httpRate = RunTest<HttpTester>(HTTP_PORT);
double thriftRate = RunTest<ThriftTester>(THRIFT_PORT);

Console.WriteLine();
Console.WriteLine("HTTP: {0:0,0}/s", httpRate);
Console.WriteLine("Thrift: {0:0,0}/s", thriftRate);

Console.ReadLine();
}

private static double RunTest<T>(int port) where T : ITester
{
string type = typeof(T).Name.ToLower();
Console.WriteLine("Starting {0} test", type);

// Recreate index up-front, so this process doesn't interfere with perf figures
RecreateIndex(type);

Stopwatch sw = new Stopwatch();
sw.Start();

Task[] clients = new Task[NUM_CLIENTS];

for (int i = 0; i < NUM_CLIENTS; i++)
{
clients[i] = Task.Factory.StartNew(() =>
{
var tester = Activator.CreateInstance<T>();
tester.Run(INDEX_PREFIX + type, port, NUM_MESSAGES / NUM_CLIENTS, BUFFER_SIZE);
});
}

Task.WaitAll(clients);

sw.Stop();
double rate = NUM_MESSAGES / ((double)sw.ElapsedMilliseconds / 1000);

Console.WriteLine("{0} test completed in {1}ms ({2:0,0}/s)", type, sw.ElapsedMilliseconds, rate);

// Close the index so we don't interfere with the next test
CloseIndex(type);

return rate;
}

private static void RecreateIndex(string suffix)
{
string indexName = INDEX_PREFIX + suffix;

var connSettings = new ConnectionSettings("localhost", 9200)
.SetDefaultIndex(indexName);

var client = new ElasticClient(connSettings);

ConnectionStatus connStatus;

if (!client.TryConnect(out connStatus))
{
Console.Error.WriteLine("Could not connect to {0}:\r\n{1}",
connSettings.Host, connStatus.Error.OriginalException.Message);
Console.Read();
return;
}

client.DeleteIndex(indexName);

var indexSettings = new IndexSettings();
indexSettings.NumberOfReplicas = 1;
indexSettings.NumberOfShards = 5;
indexSettings.Add("index.refresh_interval", "10s");

var createResponse = client.CreateIndex(indexName, indexSettings);
client.Map<Message>();
}

private static void CloseIndex(string suffix)
{
string indexName = INDEX_PREFIX + suffix;

var connSettings = new ConnectionSettings("localhost", 9200)
.SetDefaultIndex(indexName);

var client = new ElasticClient(connSettings);

ConnectionStatus connStatus;

if (!client.TryConnect(out connStatus))
{
Console.Error.WriteLine("Could not connect to {0}:\r\n{1}",
connSettings.Host, connStatus.Error.OriginalException.Message);
Console.Read();
return;
}

client.CloseIndex(indexName);
}
}
}
36 changes: 36 additions & 0 deletions src/ProtocolLoadTest/Properties/AssemblyInfo.cs
@@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("ProtocolLoadTest")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("ProtocolLoadTest")]
[assembly: AssemblyCopyright("Copyright © 2012")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("cca96fde-8ee3-4b78-9ed9-b8675776d4bc")]

// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]