New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client API for executing queries and awaiting results #1262

Merged
merged 1 commit into from May 25, 2017
Jump to file or symbol
Failed to load files and symbols.
+193 −0
Diff settings

Always

Just for now

@@ -0,0 +1,19 @@
using System;
using System.Threading.Tasks;
using EventStore.ClientAPI.Exceptions;
namespace EventStore.ClientAPI.Common.Utils.Threading
{
internal static class TaskExtensions
{
public static async Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
{
if (await Task.WhenAny(task, Task.Delay(timeout)) != task)
{
throw new OperationTimedOutException(string.Format("The operation did not complete within the specified time of {0}", timeout));
}
return await task;
}
}
}
@@ -65,6 +65,7 @@
<Compile Include="ClientOperations\VolatileSubscriptionOperation.cs" />
<Compile Include="Common\Log\FileLogger.cs" />
<Compile Include="Common\Utils\Threading\ManualResetEventSlimExtensions.cs" />
<Compile Include="Common\Utils\Threading\TaskExtensions.cs" />
<Compile Include="ConditionalWriteFailureReason.cs" />
<Compile Include="ConditionalWriteResult.cs" />
<Compile Include="ConnectionString.cs" />
@@ -92,6 +93,7 @@
<Compile Include="PersistentSubscriptionSettings.cs" />
<Compile Include="PersistentSubscriptionSettingsBuilder.cs" />
<Compile Include="Projections\ProjectionDetails.cs" />
<Compile Include="Projections\QueryManager.cs" />
<Compile Include="StreamCheckpoint.cs" />
<Compile Include="ClientAuthenticationFailedEventArgs.cs" />
<Compile Include="ClientErrorEventArgs.cs" />
@@ -0,0 +1,83 @@
using System;
using System.Net;
using System.Threading.Tasks;
using EventStore.ClientAPI.Common.Utils;
using EventStore.ClientAPI.Common.Utils.Threading;
using EventStore.ClientAPI.SystemData;
using Newtonsoft.Json.Linq;
namespace EventStore.ClientAPI.Projections
{
/// <summary>
/// API for executinmg queries in the Event Store through C# code. Communicates
/// with the Event Store over the RESTful API.
/// </summary>
public class QueryManager
{
private readonly TimeSpan _queryTimeout;
private readonly ProjectionsManager _projectionsManager;
/// <summary>
/// Creates a new instance of <see cref="QueryManager"/>.
/// </summary>
/// <param name="log">An instance of <see cref="ILogger"/> to use for logging.</param>
/// <param name="httpEndPoint">HTTP endpoint of an Event Store server.</param>
/// <param name="projectionOperationTimeout">Timeout of projection API operations</param>
/// <param name="queryTimeout">Timeout of query execution</param>
public QueryManager(ILogger log, IPEndPoint httpEndPoint, TimeSpan projectionOperationTimeout, TimeSpan queryTimeout)
{
_queryTimeout = queryTimeout;
_projectionsManager = new ProjectionsManager(log, httpEndPoint, projectionOperationTimeout);
}
/// <summary>
/// Asynchronously executes a query.
/// </summary>
/// <remarks>
/// Creates a new transient projection and polls its status until it is Completed.
/// </remarks>
/// <param name="name">A name for the query.</param>
/// <param name="query">The JavaScript source code for the query.</param>
/// <param name="initialPollingDelay">Initial time to wait between polling for projection status.</param>
/// <param name="maximumPollingDelay">Maximum time to wait between polling for projection status.</param>
/// <param name="userCredentials">Credentials for a user with permission to create a query.</param>
/// <returns>String of JSON containing query result.</returns>
public async Task<string> ExecuteAsync(string name, string query, TimeSpan initialPollingDelay, TimeSpan maximumPollingDelay, UserCredentials userCredentials = null)
{
return await Task.Run(async () =>
{
await _projectionsManager.CreateTransientAsync(name, query, userCredentials);
await WaitForCompletedAsync(name, initialPollingDelay, maximumPollingDelay, userCredentials);
return await _projectionsManager.GetStateAsync(name, userCredentials);
}).WithTimeout(_queryTimeout).ConfigureAwait(false);
}
private async Task WaitForCompletedAsync(string name, TimeSpan initialPollingDelay, TimeSpan maximumPollingDelay, UserCredentials userCredentials)
{
var attempts = 0;
var status = await GetStatusAsync(name, userCredentials);
while (!status.Contains("Completed"))
{
attempts++;
await DelayPollingAsync(attempts, initialPollingDelay, maximumPollingDelay);
status = await GetStatusAsync(name, userCredentials);
}
}
private static Task DelayPollingAsync(int attempts, TimeSpan initialPollingDelay, TimeSpan maximumPollingDelay)
{
var delayInMilliseconds = initialPollingDelay.TotalMilliseconds * (Math.Pow(2, attempts) - 1);
delayInMilliseconds = Math.Min(delayInMilliseconds, maximumPollingDelay.TotalMilliseconds);
return Task.Delay(TimeSpan.FromMilliseconds(delayInMilliseconds));
}
private async Task<string> GetStatusAsync(string name, UserCredentials userCredentials)
{
var projectionStatus = await _projectionsManager.GetStatusAsync(name, userCredentials);
return projectionStatus.ParseJson<JObject>()["status"].ToString();

This comment has been minimized.

@gregoryyoung

gregoryyoung Apr 3, 2017

Member

We probably want these passed in as timespans

@gregoryyoung

gregoryyoung Apr 3, 2017

Member

We probably want these passed in as timespans

This comment has been minimized.

@jasonmitchell

jasonmitchell Apr 4, 2017

Contributor

Makes sense, I'll make this change.

@jasonmitchell

jasonmitchell Apr 4, 2017

Contributor

Makes sense, I'll make this change.

}
}
}
@@ -25,6 +25,7 @@ public class specification_with_standard_projections_runnning : SpecificationWit
protected ProjectionsSubsystem _projections;
protected UserCredentials _admin = DefaultData.AdminCredentials;
protected ProjectionsManager _manager;
protected QueryManager _queryManager;
[OneTimeSetUp]
public override void TestFixtureSetUp()
@@ -44,9 +45,18 @@ public override void TestFixtureSetUp()
new ConsoleLogger(),
_node.ExtHttpEndPoint,
TimeSpan.FromMilliseconds(10000));
_queryManager = new QueryManager(
new ConsoleLogger(),
_node.ExtHttpEndPoint,
TimeSpan.FromMilliseconds(10000),
TimeSpan.FromMilliseconds(10000));
WaitIdle();
if (GivenStandardProjectionsRunning())
EnableStandardProjections();
QueueStatsCollector.WaitIdle();
Given();
When();
@@ -0,0 +1,39 @@
using System;
using NUnit.Framework;
namespace EventStore.Projections.Core.Tests.ClientAPI.query_result.with_long_from_all_query
{
[TestFixture]
public class when_getting_result : specification_with_standard_projections_runnning
{
protected override void Given()
{
base.Given();
PostEvent("stream-1", "type1", "{}");
PostEvent("stream-1", "type1", "{}");
PostEvent("stream-1", "type1", "{}");
WaitIdle();
}
[Test, Category("Network")]
public void waits_for_results()
{
const string query = @"
fromAll().when({
$init: function(){return {count:0}},
type1: function(s,e){
var start = new Date();
while(new Date()-start < 500){}
s.count++;
},
});
";
var result = _queryManager.ExecuteAsync("query", query, TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(5000), _admin).GetAwaiter().GetResult();
Assert.AreEqual("{\"count\":3}", result);
}
}
}
@@ -0,0 +1,38 @@
using System;
using EventStore.ClientAPI.Exceptions;
using NUnit.Framework;
namespace EventStore.Projections.Core.Tests.ClientAPI.when_executing_query.with_long_from_all_query
{
[TestFixture]
public class when_getting_result_and_timeout_exceeded : specification_with_standard_projections_runnning
{
protected override void Given()
{
base.Given();
PostEvent("stream-1", "type1", "{}");
PostEvent("stream-1", "type1", "{}");
PostEvent("stream-1", "type1", "{}");
WaitIdle();
}
[Test, Category("Network")]
public void throws_exception()
{
const string query = @"
fromAll().when({
$init: function(){return {count:0}},
type1: function(s,e){
var start = new Date();
while(new Date()-start < 5000){}
s.count++;
},
});
";
Assert.ThrowsAsync<OperationTimedOutException>(() => _queryManager.ExecuteAsync("query", query, TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(5000), _admin));
}
}
}
@@ -66,6 +66,8 @@
<Compile Include="ClientAPI\event_by_type_index.cs" />
<Compile Include="ClientAPI\event_by_type_index\when_reverting_after_index_catches_up.cs" />
<Compile Include="ClientAPI\list_projections.cs" />
<Compile Include="ClientAPI\when_executing_query\with_long_from_all_query\when_getting_result.cs" />
<Compile Include="ClientAPI\when_executing_query\with_long_from_all_query\when_getting_result_and_timeout_exceeded.cs" />
<Compile Include="ClientAPI\specification_with_standard_projections_runnning.cs" />
<Compile Include="ClientAPI\when_handling_created\with_from_all_any_foreach_projection\when_running_and_events_are_posted.cs" />
<Compile Include="ClientAPI\when_handling_created\with_from_all_foreach_projection\when_running_and_events_are_indexed.cs" />
ProTip! Use n and p to navigate between commits in a pull request.