Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added: projection abort pubic command

Added: basic test on projection recovery with deleted streams
  • Loading branch information...
commit 5ee38f20e334d055ca364a789a95714d8dcc7d23 1 parent efddca1
@ysw ysw authored
Showing with 289 additions and 6 deletions.
  1. +5 −0 src/EventStore/EventStore.ClientAPI/ProjectionsClient.cs
  2. +24 −1 src/EventStore/EventStore.ClientAPI/ProjectionsManager.cs
  3. +85 −0 .../ClientAPI/when_handling_delete/recovery/with_from_all_foreach_projection_running_and_events_are_indexed.cs
  4. +2 −0  ...ore.Tests/ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed.cs
  5. +1 −1  ...ing_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs
  6. +80 −0 ...ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_no_indexing_and_other_events.cs
  7. +2 −0  src/EventStore/EventStore.Projections.Core.Tests/EventStore.Projections.Core.Tests.csproj
  8. +1 −0  ...e.Projections.Core.Tests/Services/projections_manager/TestFixtureWithProjectionCoreAndManagementServices.cs
  9. +1 −0  ...ore.Projections.Core.Tests/Services/projections_manager/specification_with_projection_management_service.cs
  10. +19 −0 src/EventStore/EventStore.Projections.Core/Messages/ProjectionManagementMessage.cs
  11. +1 −0  src/EventStore/EventStore.Projections.Core/ProjectionManagerNode.cs
  12. +12 −0 src/EventStore/EventStore.Projections.Core/Services/Http/ProjectionsController.cs
  13. +42 −4 src/EventStore/EventStore.Projections.Core/Services/Management/ManagedProjection.cs
  14. +14 −0 src/EventStore/EventStore.Projections.Core/Services/Management/ProjectionManager.cs
View
5 src/EventStore/EventStore.ClientAPI/ProjectionsClient.cs
@@ -55,6 +55,11 @@ public Task Disable(IPEndPoint endPoint, string name, UserCredentials userCreden
return SendPost(endPoint.ToHttpUrl("/projection/{0}/command/disable", name), string.Empty, userCredentials, HttpStatusCode.OK);
}
+ public Task Abort(IPEndPoint endPoint, string name, UserCredentials userCredentials = null)
+ {
+ return SendPost(endPoint.ToHttpUrl("/projection/{0}/command/abort", name), string.Empty, userCredentials, HttpStatusCode.OK);
+ }
+
public Task CreateOneTime(IPEndPoint endPoint, string query, UserCredentials userCredentials = null)
{
return SendPost(endPoint.ToHttpUrl("/projections/onetime?type=JS"), query, userCredentials, HttpStatusCode.Created);
View
25 src/EventStore/EventStore.ClientAPI/ProjectionsManager.cs
@@ -90,7 +90,7 @@ public void Disable(string name, UserCredentials userCredentials = null)
}
/// <summary>
- /// Asynchronously disables a projection.
+ /// Asynchronously aborts and disables a projection without writing a checkpoint.
/// </summary>
/// <param name="name">The name of the projection.</param>
/// <param name="userCredentials">Credentials for a user with permission to disable a projection.</param>
@@ -102,6 +102,29 @@ public Task DisableAsync(string name, UserCredentials userCredentials = null)
}
/// <summary>
+ /// Synchronously avborts and disables a projection without writing a checkpoint.
+ /// </summary>
+ /// <param name="name">The name of the projection.</param>
+ /// <param name="userCredentials">Credentials for a user with permission to disable a projection.</param>
+ public void Abort(string name, UserCredentials userCredentials = null)
+ {
+ Ensure.NotNullOrEmpty(name, "name");
+ AbortAsync(name, userCredentials).Wait();
+ }
+
+ /// <summary>
+ /// Asynchronously disables a projection.
+ /// </summary>
+ /// <param name="name">The name of the projection.</param>
+ /// <param name="userCredentials">Credentials for a user with permission to disable a projection.</param>
+ /// <returns>A task representing the operation.</returns>
+ public Task AbortAsync(string name, UserCredentials userCredentials = null)
+ {
+ Ensure.NotNullOrEmpty(name, "name");
+ return _client.Abort(_httpEndPoint, name, userCredentials);
+ }
+
+ /// <summary>
/// Synchronously creates a one-time query.
/// </summary>
/// <param name="query">The JavaScript source code for the query.</param>
View
85 ...ndling_delete/recovery/with_from_all_foreach_projection_running_and_events_are_indexed.cs
@@ -0,0 +1,85 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+using NUnit.Framework;
+
+namespace EventStore.Projections.Core.Tests.ClientAPI.when_handling_delete.recovery
+{
+ [TestFixture]
+ public class with_from_all_foreach_projection_running_and_events_are_indexed :
+ specification_with_standard_projections_runnning
+ {
+ protected override bool GivenStandardProjectionsRunning()
+ {
+ return false;
+ }
+
+ protected override void Given()
+ {
+ base.Given();
+ PostEvent("stream1", "type1", "{}");
+ PostEvent("stream1", "type2", "{}");
+ PostEvent("stream2", "type1", "{}");
+ PostEvent("stream2", "type2", "{}");
+ WaitIdle();
+ EnableStandardProjections();
+ WaitIdle();
+ PostProjection(@"
+fromAll().foreachStream().when({
+ $init: function(){return {a:0}},
+ type1: function(s,e){s.a++},
+ type2: function(s,e){s.a++},
+ $deleted: function(s,e){s.deleted=1},
+}).outputState();
+");
+ WaitIdle();
+ HardDeleteStream("stream1");
+ WaitIdle();
+ DisableStandardProjections();
+ WaitIdle();
+ EnableStandardProjections();
+ WaitIdle();
+ }
+
+ protected override void When()
+ {
+ base.When();
+ _manager.Abort("test-projection", _admin);
+ WaitIdle();
+ _manager.Enable("test-projection", _admin);
+ WaitIdle();
+ }
+
+ [Test, Category("Network")]
+ public void receives_deleted_notification()
+ {
+ AssertStreamTail("$projections-test-projection-stream1-result", "Result:{\"a\":2,\"deleted\":1}");
+ AssertStreamTail("$projections-test-projection-stream2-result", "Result:{\"a\":2}");
+ }
+ }
+}
View
2  ...I/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed.cs
@@ -54,6 +54,8 @@ public void receives_deleted_notification()
{
AssertStreamTail(
"$projections-test-projection-stream1-result", "Result:{\"deleted\":1}");
+ AssertStreamTail(
+ "$projections-test-projection-stream2-result", "Result:{\"deleted\":1}");
}
}
}
View
2  ...om_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs
@@ -49,7 +49,7 @@ protected override void Given()
EnableStandardProjections();
WaitIdle();
DisableStandardProjections();
- WaitIdle();
+ ; WaitIdle();
// required to flush index checkpoint
{
View
80 ...dling_delete/with_from_all_foreach_projection_running_and_no_indexing_and_other_events.cs
@@ -0,0 +1,80 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+using NUnit.Framework;
+
+namespace EventStore.Projections.Core.Tests.ClientAPI.when_handling_delete
+{
+ [TestFixture]
+ public class with_from_all_foreach_projection_running_and_no_indexing_and_other_events :
+ specification_with_standard_projections_runnning
+ {
+ protected override bool GivenStandardProjectionsRunning()
+ {
+ return false;
+ }
+
+ protected override void Given()
+ {
+ base.Given();
+ PostEvent("stream1", "type1", "{}");
+ PostEvent("stream1", "type2", "{}");
+ PostEvent("stream2", "type1", "{}");
+ PostEvent("stream2", "type2", "{}");
+ WaitIdle();
+ PostProjection(@"
+fromAll().foreachStream().when({
+ $init: function(){return {a:0}},
+ type1: function(s,e){s.a++},
+ type2: function(s,e){s.a++},
+ $deleted: function(s,e){s.deleted=1;},
+}).outputState();
+");
+ }
+
+ protected override void When()
+ {
+ base.When();
+ this.HardDeleteStream("stream1");
+ WaitIdle();
+ PostEvent("stream2", "type1", "{}");
+ PostEvent("stream2", "type2", "{}");
+ PostEvent("stream3", "type1", "{}");
+ WaitIdle();
+ }
+
+ [Test, Category("Network")]
+ public void receives_deleted_notification()
+ {
+ AssertStreamTail(
+ "$projections-test-projection-stream1-result", "Result:{\"a\":2}", "Result:{\"a\":2,\"deleted\":1}");
+ AssertStreamTail("$projections-test-projection-stream2-result", "Result:{\"a\":4}");
+ AssertStreamTail("$projections-test-projection-stream3-result", "Result:{\"a\":1}");
+ }
+ }
+}
View
2  src/EventStore/EventStore.Projections.Core.Tests/EventStore.Projections.Core.Tests.csproj
@@ -108,11 +108,13 @@
<ItemGroup>
<Compile Include="ClientAPI\event_by_type_index.cs" />
<Compile Include="ClientAPI\specification_with_standard_projections_runnning.cs" />
+ <Compile Include="ClientAPI\when_handling_delete\recovery\with_from_all_foreach_projection_running_and_events_are_indexed.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed_but_a_stream_and_tombstone.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone.cs" />
+ <Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_no_indexing_and_other_events.cs" />
<Compile Include="ClientAPI\with_standard_projections_running.cs" />
<Compile Include="Integration\from_streams_matching\when_running_without_stream_metadata.cs" />
<Compile Include="Integration\from_streams_matching\when_running_wit_stream_metadata.cs" />
View
1  ....Tests/Services/projections_manager/TestFixtureWithProjectionCoreAndManagementServices.cs
@@ -104,6 +104,7 @@ public void Setup()
_bus.Subscribe<ProjectionManagementMessage.GetResult>(_manager);
_bus.Subscribe<ProjectionManagementMessage.Disable>(_manager);
_bus.Subscribe<ProjectionManagementMessage.Enable>(_manager);
+ _bus.Subscribe<ProjectionManagementMessage.Abort>(_manager);
_bus.Subscribe<ProjectionManagementMessage.SetRunAs>(_manager);
_bus.Subscribe<ProjectionManagementMessage.Reset>(_manager);
_bus.Subscribe<ProjectionManagementMessage.StartSlaveProjections>(_manager);
View
1  ...re.Tests/Services/projections_manager/specification_with_projection_management_service.cs
@@ -99,6 +99,7 @@ public void Setup()
_bus.Subscribe<ProjectionManagementMessage.GetResult>(_manager);
_bus.Subscribe<ProjectionManagementMessage.Disable>(_manager);
_bus.Subscribe<ProjectionManagementMessage.Enable>(_manager);
+ _bus.Subscribe<ProjectionManagementMessage.Abort>(_manager);
_bus.Subscribe<ProjectionManagementMessage.SetRunAs>(_manager);
_bus.Subscribe<ProjectionManagementMessage.Reset>(_manager);
_bus.Subscribe<ProjectionManagementMessage.StartSlaveProjections>(_manager);
View
19 src/EventStore/EventStore.Projections.Core/Messages/ProjectionManagementMessage.cs
@@ -297,6 +297,25 @@ public string Name
}
}
+ public class Abort : ControlMessage
+ {
+ private static readonly int TypeId = System.Threading.Interlocked.Increment(ref NextMsgId);
+ public override int MsgTypeId { get { return TypeId; } }
+
+ private readonly string _name;
+
+ public Abort(IEnvelope envelope, string name, RunAs runAs)
+ : base(envelope, runAs)
+ {
+ _name = name;
+ }
+
+ public string Name
+ {
+ get { return _name; }
+ }
+ }
+
public class SetRunAs : ControlMessage
{
private static readonly int TypeId = System.Threading.Interlocked.Increment(ref NextMsgId);
View
1  src/EventStore/EventStore.Projections.Core/ProjectionManagerNode.cs
@@ -44,6 +44,7 @@ public void SetupMessaging(ISubscriber mainBus)
mainBus.Subscribe<ProjectionManagementMessage.GetResult>(_projectionManager);
mainBus.Subscribe<ProjectionManagementMessage.Disable>(_projectionManager);
mainBus.Subscribe<ProjectionManagementMessage.Enable>(_projectionManager);
+ mainBus.Subscribe<ProjectionManagementMessage.Abort>(_projectionManager);
mainBus.Subscribe<ProjectionManagementMessage.SetRunAs>(_projectionManager);
mainBus.Subscribe<ProjectionManagementMessage.Reset>(_projectionManager);
mainBus.Subscribe<ProjectionManagementMessage.StartSlaveProjections>(_projectionManager);
View
12 src/EventStore/EventStore.Projections.Core/Services/Http/ProjectionsController.cs
@@ -124,6 +124,8 @@ protected override void SubscribeCore(IHttpService service)
HttpMethod.Post, OnProjectionCommandEnable, Codec.NoCodecs, SupportedCodecs);
Register(service, "/projection/{name}/command/reset?enableRunAs={enableRunAs}",
HttpMethod.Post, OnProjectionCommandReset, Codec.NoCodecs, SupportedCodecs);
+ Register(service, "/projection/{name}/command/abort?enableRunAs={enableRunAs}",
+ HttpMethod.Post, OnProjectionCommandAbort, Codec.NoCodecs, SupportedCodecs);
}
private void OnProjections(HttpEntityManager http, UriTemplateMatch match)
@@ -241,6 +243,16 @@ private void OnProjectionCommandReset(HttpEntityManager http, UriTemplateMatch m
Publish(new ProjectionManagementMessage.Reset(envelope, match.BoundVariables["name"], GetRunAs(http, match)));
}
+ private void OnProjectionCommandAbort(HttpEntityManager http, UriTemplateMatch match)
+ {
+ if (_httpForwarder.ForwardRequest(http))
+ return;
+
+ var envelope = new SendToHttpEnvelope<ProjectionManagementMessage.Updated>(
+ _networkSendQueue, http, DefaultFormatter, OkResponseConfigurator, ErrorsEnvelope(http));
+ Publish(new ProjectionManagementMessage.Abort(envelope, match.BoundVariables["name"], GetRunAs(http, match)));
+ }
+
private void OnProjectionStatusGet(HttpEntityManager http, UriTemplateMatch match)
{
if (_httpForwarder.ForwardRequest(http))
View
46 src/EventStore/EventStore.Projections.Core/Services/Management/ManagedProjection.cs
@@ -288,7 +288,14 @@ public void Handle(ProjectionManagementMessage.Disable message)
{
_lastAccessed = _timeProvider.Now;
if (!ProjectionManagementMessage.RunAs.ValidateRunAs(Mode, ReadWrite.Write, _runAs, message)) return;
- Stop(() => DoDisable(message));
+ Stop(() => DoDisable(message.Envelope, message.Name));
+ }
+
+ public void Handle(ProjectionManagementMessage.Abort message)
+ {
+ _lastAccessed = _timeProvider.Now;
+ if (!ProjectionManagementMessage.RunAs.ValidateRunAs(Mode, ReadWrite.Write, _runAs, message)) return;
+ Abort(() => DoDisable(message.Envelope, message.Name));
}
public void Handle(ProjectionManagementMessage.Enable message)
@@ -856,6 +863,37 @@ private void Stop(Action completed)
}
}
+ private void Abort(Action completed)
+ {
+ switch (_state)
+ {
+ case ManagedProjectionState.Stopped:
+ case ManagedProjectionState.Completed:
+ case ManagedProjectionState.Faulted:
+ case ManagedProjectionState.Loaded:
+ if (completed != null) completed();
+ return;
+ case ManagedProjectionState.Loading:
+ case ManagedProjectionState.Creating:
+ throw new InvalidOperationException(
+ string.Format(
+ "Cannot stop a projection in the '{0}' state",
+ Enum.GetName(typeof (ManagedProjectionState), _state)));
+ case ManagedProjectionState.Stopping:
+ _onStopped = completed;
+ _coreQueue.Publish(new CoreProjectionManagementMessage.Kill(Id));
+ return;
+ case ManagedProjectionState.Running:
+ case ManagedProjectionState.Starting:
+ _state = ManagedProjectionState.Stopping;
+ _onStopped = completed;
+ _coreQueue.Publish(new CoreProjectionManagementMessage.Kill(Id));
+ break;
+ default:
+ throw new NotSupportedException();
+ }
+ }
+
private void SetFaulted(string reason, Exception ex = null)
{
if (ex != null)
@@ -916,15 +954,15 @@ private void DoUpdateQuery(ProjectionManagementMessage.UpdateQuery message)
Prepare(() => BeginWrite(completed));
}
- private void DoDisable(ProjectionManagementMessage.Disable message)
+ private void DoDisable(IEnvelope envelope, string name)
{
if (!Enabled)
{
- message.Envelope.ReplyWith(new ProjectionManagementMessage.OperationFailed("Not enabled"));
+ envelope.ReplyWith(new ProjectionManagementMessage.OperationFailed("Not enabled"));
return;
}
Disable();
- Action completed = () => message.Envelope.ReplyWith(new ProjectionManagementMessage.Updated(message.Name));
+ Action completed = () => envelope.ReplyWith(new ProjectionManagementMessage.Updated(name));
UpdateProjectionVersion();
if (Enabled)
Prepare(() => BeginWrite(completed));
View
14 src/EventStore/EventStore.Projections.Core/Services/Management/ProjectionManager.cs
@@ -62,6 +62,7 @@ public class ProjectionManager : IDisposable,
IHandle<ProjectionManagementMessage.GetResult>,
IHandle<ProjectionManagementMessage.Disable>,
IHandle<ProjectionManagementMessage.Enable>,
+ IHandle<ProjectionManagementMessage.Abort>,
IHandle<ProjectionManagementMessage.SetRunAs>,
IHandle<ProjectionManagementMessage.Reset>,
IHandle<ProjectionManagementMessage.StartSlaveProjections>,
@@ -301,6 +302,19 @@ public void Handle(ProjectionManagementMessage.Enable message)
projection.Handle(message);
}
+ public void Handle(ProjectionManagementMessage.Abort message)
+ {
+ if (!_started)
+ return;
+ _logger.Info("Aborting '{0}' projection", message.Name);
+
+ var projection = GetProjection(message.Name);
+ if (projection == null)
+ message.Envelope.ReplyWith(new ProjectionManagementMessage.NotFound());
+ else
+ projection.Handle(message);
+ }
+
public void Handle(ProjectionManagementMessage.SetRunAs message)
{
if (!_started)
Please sign in to comment.
Something went wrong with that request. Please try again.