Skip to content

Commit

Permalink
Merge pull request #1262 from jasonmitchell/awaiting-query-result
Browse files Browse the repository at this point in the history
Client API for executing queries and awaiting results
  • Loading branch information
pgermishuys authored and hayley-jean committed Jul 31, 2017
1 parent 5e84507 commit d243452
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/EventStore.ClientAPI/Common/Utils/Threading/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using System.Threading.Tasks;
using EventStore.ClientAPI.Exceptions;

namespace EventStore.ClientAPI.Common.Utils.Threading
{
internal static class TaskExtensions
{
public static async Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
{
if (await Task.WhenAny(task, Task.Delay(timeout)) != task)
{
throw new OperationTimedOutException(string.Format("The operation did not complete within the specified time of {0}", timeout));
}

return await task;
}
}
}
2 changes: 2 additions & 0 deletions src/EventStore.ClientAPI/EventStore.ClientAPI.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<Compile Include="ClientOperations\VolatileSubscriptionOperation.cs" />
<Compile Include="Common\Log\FileLogger.cs" />
<Compile Include="Common\Utils\Threading\ManualResetEventSlimExtensions.cs" />
<Compile Include="Common\Utils\Threading\TaskExtensions.cs" />
<Compile Include="ConditionalWriteFailureReason.cs" />
<Compile Include="ConditionalWriteResult.cs" />
<Compile Include="ConnectionString.cs" />
Expand Down Expand Up @@ -92,6 +93,7 @@
<Compile Include="PersistentSubscriptionSettings.cs" />
<Compile Include="PersistentSubscriptionSettingsBuilder.cs" />
<Compile Include="Projections\ProjectionDetails.cs" />
<Compile Include="Projections\QueryManager.cs" />
<Compile Include="StreamCheckpoint.cs" />
<Compile Include="ClientAuthenticationFailedEventArgs.cs" />
<Compile Include="ClientErrorEventArgs.cs" />
Expand Down
83 changes: 83 additions & 0 deletions src/EventStore.ClientAPI/Projections/QueryManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using System;
using System.Net;
using System.Threading.Tasks;
using EventStore.ClientAPI.Common.Utils;
using EventStore.ClientAPI.Common.Utils.Threading;
using EventStore.ClientAPI.SystemData;
using Newtonsoft.Json.Linq;

namespace EventStore.ClientAPI.Projections
{
/// <summary>
/// API for executinmg queries in the Event Store through C# code. Communicates
/// with the Event Store over the RESTful API.
/// </summary>
public class QueryManager
{
private readonly TimeSpan _queryTimeout;
private readonly ProjectionsManager _projectionsManager;

/// <summary>
/// Creates a new instance of <see cref="QueryManager"/>.
/// </summary>
/// <param name="log">An instance of <see cref="ILogger"/> to use for logging.</param>
/// <param name="httpEndPoint">HTTP endpoint of an Event Store server.</param>
/// <param name="projectionOperationTimeout">Timeout of projection API operations</param>
/// <param name="queryTimeout">Timeout of query execution</param>
public QueryManager(ILogger log, IPEndPoint httpEndPoint, TimeSpan projectionOperationTimeout, TimeSpan queryTimeout)
{
_queryTimeout = queryTimeout;
_projectionsManager = new ProjectionsManager(log, httpEndPoint, projectionOperationTimeout);
}

/// <summary>
/// Asynchronously executes a query.
/// </summary>
/// <remarks>
/// Creates a new transient projection and polls its status until it is Completed.
/// </remarks>
/// <param name="name">A name for the query.</param>
/// <param name="query">The JavaScript source code for the query.</param>
/// <param name="initialPollingDelay">Initial time to wait between polling for projection status.</param>
/// <param name="maximumPollingDelay">Maximum time to wait between polling for projection status.</param>
/// <param name="userCredentials">Credentials for a user with permission to create a query.</param>
/// <returns>String of JSON containing query result.</returns>
public async Task<string> ExecuteAsync(string name, string query, TimeSpan initialPollingDelay, TimeSpan maximumPollingDelay, UserCredentials userCredentials = null)
{
return await Task.Run(async () =>
{
await _projectionsManager.CreateTransientAsync(name, query, userCredentials);
await WaitForCompletedAsync(name, initialPollingDelay, maximumPollingDelay, userCredentials);
return await _projectionsManager.GetStateAsync(name, userCredentials);
}).WithTimeout(_queryTimeout).ConfigureAwait(false);
}

private async Task WaitForCompletedAsync(string name, TimeSpan initialPollingDelay, TimeSpan maximumPollingDelay, UserCredentials userCredentials)
{
var attempts = 0;
var status = await GetStatusAsync(name, userCredentials);

while (!status.Contains("Completed"))
{
attempts++;

await DelayPollingAsync(attempts, initialPollingDelay, maximumPollingDelay);
status = await GetStatusAsync(name, userCredentials);
}
}

private static Task DelayPollingAsync(int attempts, TimeSpan initialPollingDelay, TimeSpan maximumPollingDelay)
{
var delayInMilliseconds = initialPollingDelay.TotalMilliseconds * (Math.Pow(2, attempts) - 1);
delayInMilliseconds = Math.Min(delayInMilliseconds, maximumPollingDelay.TotalMilliseconds);

return Task.Delay(TimeSpan.FromMilliseconds(delayInMilliseconds));
}

private async Task<string> GetStatusAsync(string name, UserCredentials userCredentials)
{
var projectionStatus = await _projectionsManager.GetStatusAsync(name, userCredentials);
return projectionStatus.ParseJson<JObject>()["status"].ToString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class specification_with_standard_projections_runnning : SpecificationWit
protected ProjectionsSubsystem _projections;
protected UserCredentials _admin = DefaultData.AdminCredentials;
protected ProjectionsManager _manager;
protected QueryManager _queryManager;

[OneTimeSetUp]
public override void TestFixtureSetUp()
Expand All @@ -44,9 +45,18 @@ public override void TestFixtureSetUp()
new ConsoleLogger(),
_node.ExtHttpEndPoint,
TimeSpan.FromMilliseconds(10000));

_queryManager = new QueryManager(
new ConsoleLogger(),
_node.ExtHttpEndPoint,
TimeSpan.FromMilliseconds(10000),
TimeSpan.FromMilliseconds(10000));

WaitIdle();

if (GivenStandardProjectionsRunning())
EnableStandardProjections();

QueueStatsCollector.WaitIdle();
Given();
When();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using NUnit.Framework;

namespace EventStore.Projections.Core.Tests.ClientAPI.query_result.with_long_from_all_query
{
[TestFixture]
public class when_getting_result : specification_with_standard_projections_runnning
{
protected override void Given()
{
base.Given();

PostEvent("stream-1", "type1", "{}");
PostEvent("stream-1", "type1", "{}");
PostEvent("stream-1", "type1", "{}");

WaitIdle();
}

[Test, Category("Network")]
public void waits_for_results()
{
const string query = @"
fromAll().when({
$init: function(){return {count:0}},
type1: function(s,e){
var start = new Date();
while(new Date()-start < 500){}
s.count++;
},
});
";

var result = _queryManager.ExecuteAsync("query", query, TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(5000), _admin).GetAwaiter().GetResult();
Assert.AreEqual("{\"count\":3}", result);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using EventStore.ClientAPI.Exceptions;
using NUnit.Framework;

namespace EventStore.Projections.Core.Tests.ClientAPI.when_executing_query.with_long_from_all_query
{
[TestFixture]
public class when_getting_result_and_timeout_exceeded : specification_with_standard_projections_runnning
{
protected override void Given()
{
base.Given();

PostEvent("stream-1", "type1", "{}");
PostEvent("stream-1", "type1", "{}");
PostEvent("stream-1", "type1", "{}");

WaitIdle();
}

[Test, Category("Network")]
public void throws_exception()
{
const string query = @"
fromAll().when({
$init: function(){return {count:0}},
type1: function(s,e){
var start = new Date();
while(new Date()-start < 5000){}
s.count++;
},
});
";
Assert.ThrowsAsync<OperationTimedOutException>(() => _queryManager.ExecuteAsync("query", query, TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(5000), _admin));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
<Compile Include="ClientAPI\event_by_type_index.cs" />
<Compile Include="ClientAPI\event_by_type_index\when_reverting_after_index_catches_up.cs" />
<Compile Include="ClientAPI\list_projections.cs" />
<Compile Include="ClientAPI\when_executing_query\with_long_from_all_query\when_getting_result.cs" />
<Compile Include="ClientAPI\when_executing_query\with_long_from_all_query\when_getting_result_and_timeout_exceeded.cs" />
<Compile Include="ClientAPI\specification_with_standard_projections_runnning.cs" />
<Compile Include="ClientAPI\when_handling_created\with_from_all_any_foreach_projection\when_running_and_events_are_posted.cs" />
<Compile Include="ClientAPI\when_handling_created\with_from_all_foreach_projection\when_running_and_events_are_indexed.cs" />
Expand Down

0 comments on commit d243452

Please sign in to comment.