diff --git a/Raven.Server/FromMono/Options.cs b/Raven.Server/FromMono/Options.cs index d8e3533ed1d8..38f5c82c110f 100644 --- a/Raven.Server/FromMono/Options.cs +++ b/Raven.Server/FromMono/Options.cs @@ -1258,6 +1258,9 @@ public enum OptionCategory [Description("Import Database/FileSystem")] SmugglerImportDatabaseFileSystem = 1 << 13, + + [Description("Record/Replay Options")] + TrafficRecordReplay = 1 << 14, } } diff --git a/Raven.Tests.Core/ChangesApi/WebsocketsTests.cs b/Raven.Tests.Core/ChangesApi/WebsocketsTests.cs index 14f21d9af310..90b8c05cddbe 100644 --- a/Raven.Tests.Core/ChangesApi/WebsocketsTests.cs +++ b/Raven.Tests.Core/ChangesApi/WebsocketsTests.cs @@ -26,7 +26,7 @@ public async Task Can_connect_via_websockets_and_receive_heartbeat() using (var clientWebSocket = TryCreateClientWebSocket()) { string url = store.Url.Replace("http:", "ws:"); - url = url + "/changes/websocket?id=" + Guid.NewGuid(); + url = url + "/traffic-watch/events?" + Guid.NewGuid(); await clientWebSocket.ConnectAsync(new Uri(url), CancellationToken.None); var buffer = new byte[1024]; diff --git a/RavenDB.sln b/RavenDB.sln index f3776e4d3180..1f99052035e5 100644 --- a/RavenDB.sln +++ b/RavenDB.sln @@ -101,6 +101,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sparrow.Tryout", "Raven.Spa EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Raven.Tests.Web", "Raven.Tests.Web\Raven.Tests.Web.csproj", "{551BAFD7-082C-4ABF-8415-55E178CCA2AD}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Raven.Traffic", "Tools\Raven.Traffic\Raven.Traffic.csproj", "{5BB7F2B6-CE06-4F1F-9D12-8E25E60FF06F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -330,6 +332,12 @@ Global {551BAFD7-082C-4ABF-8415-55E178CCA2AD}.Profiling|Any CPU.Build.0 = Release|Any CPU {551BAFD7-082C-4ABF-8415-55E178CCA2AD}.Release|Any CPU.ActiveCfg = Release|Any CPU {551BAFD7-082C-4ABF-8415-55E178CCA2AD}.Release|Any CPU.Build.0 = Release|Any CPU + {5BB7F2B6-CE06-4F1F-9D12-8E25E60FF06F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5BB7F2B6-CE06-4F1F-9D12-8E25E60FF06F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5BB7F2B6-CE06-4F1F-9D12-8E25E60FF06F}.Profiling|Any CPU.ActiveCfg = Release|Any CPU + {5BB7F2B6-CE06-4F1F-9D12-8E25E60FF06F}.Profiling|Any CPU.Build.0 = Release|Any CPU + {5BB7F2B6-CE06-4F1F-9D12-8E25E60FF06F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5BB7F2B6-CE06-4F1F-9D12-8E25E60FF06F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -365,6 +373,7 @@ Global {794CC625-31A4-4DA2-968E-5D33CF1CADA1} = {DB0F0968-FB58-4A29-8BEA-F73E55D92665} {15CDA168-1BF1-4DE6-AD59-DC37494E5C12} = {39EAEB7D-1638-48FA-9BAB-E4FA8F7DB7A4} {551BAFD7-082C-4ABF-8415-55E178CCA2AD} = {DB0F0968-FB58-4A29-8BEA-F73E55D92665} + {5BB7F2B6-CE06-4F1F-9D12-8E25E60FF06F} = {702CE58F-C8D5-4C3E-8138-98D20B0E26F9} EndGlobalSection GlobalSection(CodealikeProperties) = postSolution SolutionGuid = 6944aade-3195-477c-8aed-9ebe2902fc57 diff --git a/Tools/Raven.Traffic/App.config b/Tools/Raven.Traffic/App.config new file mode 100644 index 000000000000..8e15646352ec --- /dev/null +++ b/Tools/Raven.Traffic/App.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Tools/Raven.Traffic/Program.cs b/Tools/Raven.Traffic/Program.cs new file mode 100644 index 000000000000..6433e7da4501 --- /dev/null +++ b/Tools/Raven.Traffic/Program.cs @@ -0,0 +1,91 @@ +using System; +using Raven.Abstractions; +using Raven.Client; +using Raven.Client.Document; +using Raven.Traffic; + +namespace TrafficRecorder +{ + + public class Program + { + private static void Main(string[] args) + { + TrafficToolConfiguration config; + var parseStatus = TrafficToolConfiguration.ProcessArgs(args, out config); + switch (parseStatus) + { + case TrafficToolConfiguration.TrafficArgsProcessStatus.InvalidMode: + PrintUsage(); + break; + case TrafficToolConfiguration.TrafficArgsProcessStatus.NoArguments: + PrintUsage(); + break; + case TrafficToolConfiguration.TrafficArgsProcessStatus.NotEnoughArguments: + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine("Illegal arguments amount, see usage instructions:"); + Console.ForegroundColor = ConsoleColor.White; + break; + case TrafficToolConfiguration.TrafficArgsProcessStatus.ValidConfig: + IDocumentStore store; + try + { + store = new DocumentStore + { + Url = config.ConnectionString.Url, + DefaultDatabase = config.ResourceName, + Credentials = config.ConnectionString.Credentials, + }.Initialize(); + } + catch (Exception e) + { + Console.WriteLine("Could not connect to server. Exception: {0}",e); + return; + } + + using (store) + { + try + { + store.DatabaseCommands.GetStatistics(); + } + catch (Exception e) + { + Console.WriteLine("Database does not exist"); + return; + } + new TrafficRec(store, config).ExecuteTrafficCommand(); + } + break; + } + + + } + + private static void PrintUsage() + { + Console.ForegroundColor = ConsoleColor.DarkMagenta; + Console.WriteLine(@" +Traffic Recording and Replaying utility for RavenDB +---------------------------------------------- +Copyright (C) 2008 - {0} - Hibernating Rhinos +----------------------------------------------", SystemTime.UtcNow.Year); + Console.ForegroundColor = ConsoleColor.White; + Console.WriteLine(@" +Usage: + Raven.Traffic [Mode(rec/play)] [Url] [resource name] [recordingFile] [[option1], [option2] ...] + +Examples: + - Record 'Northwind' database found on specified server: + Raven.Traffic rec http://localhost:8080/ Northwind recording.json + - Replay 'Northwind' database from specified server to the dump.raven file: + Raven.Traffic play http://localhost:8080/ Northwind recording.json "); + + Console.ForegroundColor = ConsoleColor.Green; + TrafficToolConfiguration.InitOptionsSetObject().WriteOptionDescriptions(Console.Out); + Console.ForegroundColor = ConsoleColor.White; + } + + + } +} \ No newline at end of file diff --git a/Tools/Raven.Traffic/Properties/AssemblyInfo.cs b/Tools/Raven.Traffic/Properties/AssemblyInfo.cs new file mode 100644 index 000000000000..dfa080b75478 --- /dev/null +++ b/Tools/Raven.Traffic/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("TrafficRecorder")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("TrafficRecorder")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("1a0bd55c-571b-43c9-908c-b560b833ef12")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Tools/Raven.Traffic/Raven.Traffic b/Tools/Raven.Traffic/Raven.Traffic new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/Tools/Raven.Traffic/Raven.Traffic.csproj b/Tools/Raven.Traffic/Raven.Traffic.csproj new file mode 100644 index 000000000000..ae9a7afe7475 --- /dev/null +++ b/Tools/Raven.Traffic/Raven.Traffic.csproj @@ -0,0 +1,78 @@ + + + + + Debug + AnyCPU + {5BB7F2B6-CE06-4F1F-9D12-8E25E60FF06F} + Exe + Properties + Raven.Traffic + Raven.Traffic + v4.5 + 512 + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + FromMono\Options.cs + + + + + + + + + + + + + {41AC479E-1EB2-4D23-AAF2-E4C8DF1BC2BA} + Raven.Abstractions + + + {4E087ECB-E7CA-4891-AC3C-3C76702715B6} + Raven.Client.Lightweight + + + {212823CD-25E1-41AC-92D1-D6DF4D53FC85} + Raven.Database + + + + + \ No newline at end of file diff --git a/Tools/Raven.Traffic/TrafficRec.cs b/Tools/Raven.Traffic/TrafficRec.cs new file mode 100644 index 000000000000..f005dca2e0c3 --- /dev/null +++ b/Tools/Raven.Traffic/TrafficRec.cs @@ -0,0 +1,316 @@ +// ----------------------------------------------------------------------- +// +// Copyright (c) Hibernating Rhinos LTD. All rights reserved. +// +// ----------------------------------------------------------------------- +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; +using System.IO.Compression; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Raven.Abstractions.Data; +using Raven.Client; +using Raven.Database.Server; +using Raven.Imports.Newtonsoft.Json; +using Raven.Json.Linq; +using TrafficRecorder; + +namespace Raven.Traffic +{ + public class TrafficRec + { + private readonly IDocumentStore store; + private readonly TrafficToolConfiguration config; + + public TrafficRec(IDocumentStore store, TrafficToolConfiguration config) + { + this.store = store; + this.config = config; + } + + public void ExecuteTrafficCommand() + { + switch (config.Mode) + { + case TrafficToolConfiguration.TrafficToolMode.Record: + RecordRequests(config, store); + break; + case TrafficToolConfiguration.TrafficToolMode.Replay: + ReplayRequests(config, store); + break; + } + } + + private void ReplayRequests(TrafficToolConfiguration config, IDocumentStore store) + { + Stream finalStream; + var requestsCounter = 0; + var skippedRequestsCounter = 0; + var totalCountOfLogicRequests = 0; + var totalSp = Stopwatch.StartNew(); + using (var stream = File.Open(config.RecordFilePath, FileMode.Open)) + { + if (config.IsCompressed) + { + finalStream = new GZipStream(stream, CompressionMode.Decompress, leaveOpen: true); + } + else + { + finalStream = stream; + } + var trafficLogs = + JsonSerializer.Create().Deserialize(new JsonTextReader(new StreamReader(finalStream))); + + ConcurrentQueue queue = null; + var cts = new CancellationTokenSource(); + var ct = cts.Token; + Task outputTask = null; + if (config.PrintOutput) + { + + queue = new ConcurrentQueue(); + outputTask = Task.Run(() => + { + while (!ct.IsCancellationRequested || queue.Count != 0) + { + string message; + if (queue.TryDequeue(out message)) + { + Console.WriteLine(message); + } + else + { + Thread.Sleep(10); + } + } + }); + } + + + + const string postLineSeparatorRegex = "\\t\\d: databases\\/[\\w\\.]+"; + const string endOfPostLineString = "\t\t\tQuery:"; + const string uriCleanRegex = "http://[\\w\\.]+(:\\d*)?(\\/databases\\/[\\w\\.]+)?"; + + Parallel.ForEach(trafficLogs, new ParallelOptions + { + MaxDegreeOfParallelism = Environment.ProcessorCount + }, trafficLog => + { + var sp = Stopwatch.StartNew(); + GetRequest[] requestsArray = null; + + var uriString = Regex.Replace(trafficLog.RequestUri, uriCleanRegex, string.Empty); + string trafficUrlPart; + string trafficQueryPart; + trafficUrlPart = ExtractUrlAndQuery(uriString, out trafficQueryPart); + + var curCount = Interlocked.Increment(ref requestsCounter); + if (ValidateUrlString(trafficUrlPart)) + { + Interlocked.Increment(ref skippedRequestsCounter); + if (queue != null) + { + queue.Enqueue(string.Format("{0} out of {1}, skipped whole message", + curCount, trafficLogs.Length)); + } + return; + } + Interlocked.Increment(ref totalCountOfLogicRequests); + if (trafficLog.HttpMethod.Equals("get", StringComparison.CurrentCultureIgnoreCase)) + { + requestsArray = new[] + { + new GetRequest + { + Url = trafficUrlPart, + Query = trafficQueryPart + } + }; + } + else if (trafficLog.CustomInfo != null) + { + var subArray = Regex.Split(trafficLog.CustomInfo.Replace("\r", string.Empty), postLineSeparatorRegex).Where(x => !String.IsNullOrEmpty(x)).Select(x => + { + var endOfPostLastIndex = x.IndexOf(endOfPostLineString); + if (endOfPostLastIndex < 0) + return x; + return x.Remove(endOfPostLastIndex); + }).ToArray(); + requestsArray = + subArray.Select(customInfoLine => + { + + trafficUrlPart = ExtractUrlAndQuery(customInfoLine, out trafficQueryPart); + + if (ValidateUrlString(trafficUrlPart)) + { + if (queue != null) + { + queue.Enqueue(string.Format("{0} out of {1}, skipped inner message", + curCount, trafficLogs.Length)); + } + return null; + } + + return new GetRequest + { + Url = trafficUrlPart, + Query = trafficQueryPart, + }; + }).Where(x => x != null).ToArray(); + } + Interlocked.Add(ref totalCountOfLogicRequests, requestsArray.Length); + try + { + store.DatabaseCommands.MultiGet(requestsArray); + if (queue != null) + { + queue.Enqueue(string.Format("{0} out of {1}, took {2} ms. Total Time: {3} ms", + curCount, trafficLogs.Length, sp.ElapsedMilliseconds, totalSp.ElapsedMilliseconds)); + } + } + catch (Exception e) + { + Interlocked.Increment(ref skippedRequestsCounter); + if (queue != null) + { + queue.Enqueue(string.Format("{0} out of {1}, failed", + curCount, trafficLogs.Length, sp.ElapsedMilliseconds, totalSp.ElapsedMilliseconds)); + } + } + }); + + if (outputTask != null) + { + cts.Cancel(); + outputTask.Wait(); + } + } + + Console.WriteLine(@"Summary: +Requests sent: {0} +Requests skipped: {1} +Nested and non nested request: {2} +Total Time: {3}", requestsCounter, skippedRequestsCounter, totalCountOfLogicRequests, totalSp.ElapsedMilliseconds); + + } + + private static bool ValidateUrlString(string trafficUrlPart) + { + return (trafficUrlPart.StartsWith("/") || trafficUrlPart.StartsWith("\\")) && ValidateUrlFirstPathSegment(trafficUrlPart.Substring(1, trafficUrlPart.Length - 1)) + || ValidateUrlFirstPathSegment(trafficUrlPart); + } + + private string ExtractUrlAndQuery(string uriString, out string trafficQueryPart) + { + string trafficUrlPart; + var queryStartIndex = uriString.IndexOf("?"); + + if (queryStartIndex <= 0) + { + trafficUrlPart = uriString; + trafficQueryPart = uriString; + } + else + { + trafficUrlPart = uriString.Substring(0, queryStartIndex); + trafficQueryPart = uriString.Substring(queryStartIndex); + } + return trafficUrlPart; + } + + private static bool ValidateUrlFirstPathSegment(string trafficUrlPart) + { + return trafficUrlPart.StartsWith("bulk_docs") || + trafficUrlPart.StartsWith("static") || + trafficUrlPart.StartsWith("bulkInsert") || + trafficUrlPart.StartsWith("changes") || + trafficUrlPart.StartsWith("traffic-watch"); + } + + + /// + /// Connects to raven traffic event source and registers all the requests to the file defined in the config + /// + /// configuration conatining the connection, the file to write to, etc. + /// the store to work with + private void RecordRequests(TrafficToolConfiguration config, IDocumentStore store) + { + var requestsQueue = new ConcurrentQueue(); + var messagesCount = 0; + var mre = new ManualResetEvent(false); + var trafficWatchObserver = new TrafficWatchObserver(store,config.ResourceName, mre, config.Timeout, x => + { + if (config.AmountConstraint.HasValue && Interlocked.Increment(ref messagesCount) > config.AmountConstraint.Value) + mre.Set(); + else + requestsQueue.Enqueue(x); + }); + + trafficWatchObserver.EstablishConnection(); + + var writingTask = Task.Run(() => + { + WriteRequestsFromQueueToFile(requestsQueue, config.RecordFilePath, config.IsCompressed,config.PrintOutput, mre); + }); + + + + if (config.DurationConstraint.HasValue) + mre.WaitOne(config.DurationConstraint.Value); + else + mre.WaitOne(); + + mre.Set(); + + writingTask.Wait(); + trafficWatchObserver.OnCompleted(); + } + + private void WriteRequestsFromQueueToFile(ConcurrentQueue messages, string filePath, bool isCompressed, bool printOutput, ManualResetEvent mre) + { + RavenJObject notification; + var requestsCounter = 0; + using (var stream = File.Create(filePath)) + { + Stream finalStream = stream; + if (isCompressed) + finalStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true); + + using (var streamWriter = new StreamWriter(finalStream)) + { + var jsonWriter = new JsonTextWriter(streamWriter) + { + Formatting = Formatting.Indented + }; + jsonWriter.WriteStartArray(); + + while (messages.TryDequeue(out notification) || mre.WaitOne(0) == false) + { + if (notification == null) + { + Thread.Sleep(100); + continue; + } + requestsCounter++; + if (printOutput) + { + Console.WriteLine("Request #{0} Stored", requestsCounter); + } + notification.WriteTo(jsonWriter); + } + jsonWriter.WriteEndArray(); + streamWriter.Flush(); + } + + if (isCompressed) + finalStream.Dispose(); + } + } + } +} \ No newline at end of file diff --git a/Tools/Raven.Traffic/TrafficRecorderConfiguration.cs b/Tools/Raven.Traffic/TrafficRecorderConfiguration.cs new file mode 100644 index 000000000000..fcd3e63039c5 --- /dev/null +++ b/Tools/Raven.Traffic/TrafficRecorderConfiguration.cs @@ -0,0 +1,141 @@ +// ----------------------------------------------------------------------- +// +// Copyright (c) Hibernating Rhinos LTD. All rights reserved. +// +// ----------------------------------------------------------------------- +using System; +using System.Net; +using System.Security.Policy; +using NDesk.Options; +using Raven.Abstractions.Data; + +namespace TrafficRecorder +{ + public class TrafficToolConfiguration + { + public RavenConnectionStringOptions ConnectionString { get; private set; } + public string ResourceName { get; set; } + public TrafficToolMode Mode { get; set; } + public string RecordFilePath { get; set; } + public TimeSpan Timeout { get; set; } + public string ApiKey { get; set; } + public bool IsCompressed { get; set; } + public bool PrintOutput { get; set; } + public int? AmountConstraint { get; set; } + public TimeSpan? DurationConstraint { get; set; } + + public TrafficToolConfiguration() + { + ConnectionString = new RavenConnectionStringOptions(); + IsCompressed = false; + Timeout = TimeSpan.MinValue; + PrintOutput = true; + } + + public enum TrafficToolMode + { + Record, + Replay + } + + public class RecordConstraint + { + public enum ConstraintType + { + Time, + Amount + } + + public ConstraintType Type { get; set; } + public int Amount { get; set; } + public TimeSpan Length { get; set; } + + } + + public enum TrafficArgsProcessStatus + { + NoArguments, + NotEnoughArguments, + InvalidMode, + ValidConfig + } + + private static NetworkCredential GetCredentials(RavenConnectionStringOptions connectionStringOptions) + { + var cred = connectionStringOptions.Credentials as NetworkCredential; + if (cred != null) + return cred; + cred = new NetworkCredential(); + connectionStringOptions.Credentials = cred; + return cred; + } + + public static OptionSet InitOptionsSetObject(TrafficToolConfiguration config = null) + { + var options = new OptionSet(); + + options.Add("traceSeconds:", OptionCategory.TrafficRecordReplay, "Time to perform the traffic watch(seconds)", x => + { + var durationConstraint = Int32.Parse(x); + config.DurationConstraint = TimeSpan.FromSeconds(durationConstraint); + }); + + options.Add("traceRequests:", OptionCategory.TrafficRecordReplay, "Time to perform the traffic watch", x => + { + var amountConstraint = Int32.Parse(x); + config.AmountConstraint = amountConstraint; + }); + options.Add("compressed", OptionCategory.TrafficRecordReplay, "Time to perform the traffic watch", x => { config.IsCompressed = true; }); + options.Add("noOutput", OptionCategory.TrafficRecordReplay, "Suppress output", value => config.PrintOutput = false); + options.Add("timeout:", OptionCategory.TrafficRecordReplay, "The timeout to use for requests(seconds)", s => config.Timeout = TimeSpan.FromSeconds(int.Parse(s))); + options.Add("u|user|username:", OptionCategory.TrafficRecordReplay, "The username to use when the database requires the client to authenticate.", value => GetCredentials(config.ConnectionString).UserName = value); + options.Add("p|pass|password:", OptionCategory.TrafficRecordReplay, "The password to use when the database requires the client to authenticate.", value => GetCredentials(config.ConnectionString).Password = value); + options.Add("domain:", OptionCategory.TrafficRecordReplay, "The domain to use when the database requires the client to authenticate.", value => GetCredentials(config.ConnectionString).Domain = value); + options.Add("key|api-key|apikey:", OptionCategory.TrafficRecordReplay, "The API-key to use, when using OAuth.", value => config.ApiKey = value); + return options; + } + + public static TrafficArgsProcessStatus ProcessArgs(string[] args, out TrafficToolConfiguration config) + { + if (args.Length == 0) + { + config = null; + return TrafficArgsProcessStatus.NoArguments; + } + if (args.Length < 4) + { + config = null; + return TrafficArgsProcessStatus.NoArguments; + } + new Url(args[1]); + config = new TrafficToolConfiguration(); + + switch (args[0]) + { + case "rec": + config.Mode = TrafficToolMode.Record; + break; + case "play": + config.Mode = TrafficToolMode.Replay; + break; + default: + config = null; + return TrafficArgsProcessStatus.InvalidMode; + } + + config.ConnectionString.Url = args[1]; + config.ConnectionString.DefaultDatabase = args[2]; + config.ResourceName = args[2]; + config.RecordFilePath = args[3]; + InitOptionsSetObject(config).Parse(args); + + if (config.AmountConstraint.HasValue == false && config.DurationConstraint.HasValue == false) + { + config.AmountConstraint = 1000; + config.DurationConstraint = TimeSpan.FromSeconds(60); + } + + return TrafficArgsProcessStatus.ValidConfig; + } + } +} \ No newline at end of file diff --git a/Tools/Raven.Traffic/TrafficWatchObserver.cs b/Tools/Raven.Traffic/TrafficWatchObserver.cs new file mode 100644 index 000000000000..96e3fcd7d2d6 --- /dev/null +++ b/Tools/Raven.Traffic/TrafficWatchObserver.cs @@ -0,0 +1,154 @@ +using System; +using System.Threading; +using Raven.Abstractions; +using Raven.Client; +using Raven.Client.Connection; +using Raven.Client.Connection.Implementation; +using Raven.Client.Document; +using Raven.Json.Linq; + +namespace Raven.Traffic +{ + public class TrafficWatchObserver : IObserver + { + private readonly IDocumentStore store; + private string databaseName; + private ManualResetEvent _mre; + private DateTime _lastHeartbeat; + private Action _onRequestReceived; + private readonly DocumentConvention _conventions; + private Timer timeoutTimer; + + + public TrafficWatchObserver(IDocumentStore store, string databaseName, ManualResetEvent mre, TimeSpan timeout, Action onRequestReceived) + { + this.store = store; + this.databaseName = databaseName; + _mre = mre; + _onRequestReceived = onRequestReceived; + + if (timeout != TimeSpan.MinValue && mre.WaitOne(0) == false) + { + timeoutTimer = new Timer(x => + { + if (SystemTime.UtcNow - _lastHeartbeat > timeout) + { + Console.WriteLine("Timeout Reached"); + mre.Set(); + return; + } + timeoutTimer.Change(timeout, TimeSpan.FromDays(7)); + + }, null, timeout, TimeSpan.FromDays(7)); + } + } + + public void OnNext(string dataFromConnection) + { + try + { + _lastHeartbeat = SystemTime.UtcNow; + + var ravenJObject = RavenJObject.Parse(dataFromConnection); + var type = ravenJObject.Value("Type"); + + switch (type) + { + case "Disconnect": + EstablishConnection(); + break; + case "Initialized": + case "Heartbeat": + break; + default: + var value = ravenJObject.Value("Value"); + _onRequestReceived(value); + break; + } + } + catch (Exception e) + { + Console.WriteLine(e); + } + } + + public void OnError(Exception error) + { + DisposeSubscriptionAndRequest(); + EstablishConnection(); + } + + public void OnCompleted() + { + DisposeSubscriptionAndRequest(); + _mre.Set(); + } + + private void DisposeSubscriptionAndRequest() + { + if (currentSubscription != null) + { + currentSubscription.Dispose(); + currentSubscription = null; + } + if (currentRequest != null) + { + currentRequest.Dispose(); + currentRequest = null; + } + } + + /// + /// Subscribes received ovserver to the given request, intended to be used for EventSource connections + /// + /// EventSource request + /// Observer that will treat events + private void SubscribeToServerEvents(HttpJsonRequest request, IObserver observer) + { + var serverPullTask = request.ServerPullAsync(); + serverPullTask.Wait(); + var serverEvents = serverPullTask.Result; + currentSubscription = serverEvents.Subscribe(observer); + } + + private HttpJsonRequest currentRequest; + private IDisposable currentSubscription; + + public void EstablishConnection() + { + currentRequest = GetTrafficWatchRequest(); + + SubscribeToServerEvents(currentRequest, + this); + + } + + private string GetAuthToken() + { + string authToken; + using (var request = store.JsonRequestFactory.CreateHttpJsonRequest( + new CreateHttpJsonRequestParams(null, + store.Url + "//databases//" + databaseName + "/singleAuthToken", + "GET", store.DatabaseCommands.PrimaryCredentials, + store.Conventions))) + { + authToken = request.ReadResponseJson().Value("Token"); + } + return authToken; + } + + private HttpJsonRequest GetTrafficWatchRequest() + { + return store.JsonRequestFactory.CreateHttpJsonRequest( + new CreateHttpJsonRequestParams(null, + store.Url + "//databases//" + databaseName + "/traffic-watch/events?" + "singleUseAuthToken=" + GetAuthToken() + "&id=" + Guid.NewGuid(), + "GET", + store.DatabaseCommands.PrimaryCredentials, + store.Conventions) + { + AvoidCachingRequest = true, + DisableRequestCompression = true + }); + } + } +}