Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Progress: partition deleted notification

  • Loading branch information...
commit 161b2597eaecbe08b78f4c7d3d543e7b0cfb75be 1 parent 427e334
@ysw ysw authored
Showing with 429 additions and 28 deletions.
  1. +5 −0 src/EventStore/EventStore.Projections.Core.Tests/Services/core_projection/FakeProjectionHandler.cs
  2. +5 −0 src/EventStore/EventStore.Projections.Core.Tests/Services/projections_manager/FakeBiStateProjection.cs
  3. +5 −0 src/EventStore/EventStore.Projections.Core.Tests/Services/projections_manager/FakeForeachStreamProjection.cs
  4. +5 −0 ...ventStore/EventStore.Projections.Core.Tests/Services/projections_manager/FakeFromCatalogStreamProjection.cs
  5. +5 −0 src/EventStore/EventStore.Projections.Core.Tests/Services/projections_manager/FakeProjection.cs
  6. +1 −0  src/EventStore/EventStore.Projections.Core/EventStore.Projections.Core.csproj
  7. +3 −2 src/EventStore/EventStore.Projections.Core/Messages/EventReaderSubscriptionMessage.cs
  8. +6 −0 src/EventStore/EventStore.Projections.Core/Services/IProjectionStateHandler.cs
  9. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/ByHandleStatePartitionSelector.cs
  10. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/ByPositionStreamStatePartitionSelector.cs
  11. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/ByStreamStatePartitionSelector.cs
  12. +2 −2 src/EventStore/EventStore.Projections.Core/Services/Processing/CoreProjectionQueue.cs
  13. +3 −1 src/EventStore/EventStore.Projections.Core/Services/Processing/EventProcessedResult.cs
  14. +96 −6 src/EventStore/EventStore.Projections.Core/Services/Processing/EventProcessingProjectionProcessingPhase.cs
  15. +2 −0  src/EventStore/EventStore.Projections.Core/Services/Processing/EventReaderCoreService.cs
  16. +23 −0 ...entStore/EventStore.Projections.Core/Services/Processing/EventSubscriptionBasedProjectionProcessingPhase.cs
  17. +106 −12 src/EventStore/EventStore.Projections.Core/Services/Processing/HeadingEventReader.cs
  18. +2 −0  src/EventStore/EventStore.Projections.Core/Services/Processing/IEventProcessingPhase.cs
  19. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/NoopStatePartitionSelector.cs
  20. +96 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/PartitionDeletedWorkItem.cs
  21. +1 −0  src/EventStore/EventStore.Projections.Core/Services/Processing/PositionTracker.cs
  22. +2 −1  src/EventStore/EventStore.Projections.Core/Services/Processing/ReaderSubscription.cs
  23. +4 −4 src/EventStore/EventStore.Projections.Core/Services/Processing/ReaderSubscriptionBase.cs
  24. +1 −0  src/EventStore/EventStore.Projections.Core/Services/Processing/StatePartitionSelector.cs
  25. +5 −0 src/EventStore/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs
  26. +5 −0 src/EventStore/EventStore.Projections.Core/Standard/CategorizeEventsByStreamPath.cs
  27. +5 −0 src/EventStore/EventStore.Projections.Core/Standard/CategorizeStreamByPath.cs
  28. +5 −0 src/EventStore/EventStore.Projections.Core/Standard/IndexEventsByEventType.cs
  29. +5 −0 src/EventStore/EventStore.Projections.Core/Standard/IndexStreams.cs
  30. +5 −0 src/EventStore/EventStore.Projections.Core/Standard/StubHandler.cs
  31. +5 −0 src/EventStore/EventStore.Web/Users/IndexUsersProjectionHandler.cs
  32. +1 −0  src/ReformatOptions.DotSettings
View
5 src/EventStore/EventStore.Projections.Core.Tests/Services/core_projection/FakeProjectionHandler.cs
@@ -234,6 +234,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
}
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
return _loadedState;
View
5 src/EventStore/EventStore.Projections.Core.Tests/Services/projections_manager/FakeBiStateProjection.cs
@@ -102,6 +102,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
throw new NotImplementedException();
View
5 ...EventStore/EventStore.Projections.Core.Tests/Services/projections_manager/FakeForeachStreamProjection.cs
@@ -105,6 +105,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
return _state;
View
5 ...tStore/EventStore.Projections.Core.Tests/Services/projections_manager/FakeFromCatalogStreamProjection.cs
@@ -104,6 +104,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
return _state;
View
5 src/EventStore/EventStore.Projections.Core.Tests/Services/projections_manager/FakeProjection.cs
@@ -100,6 +100,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
throw new NotImplementedException();
View
1  src/EventStore/EventStore.Projections.Core/EventStore.Projections.Core.csproj
@@ -245,6 +245,7 @@
<Compile Include="Services\Processing\ParallelQueryMasterReaderStrategy.cs" />
<Compile Include="Services\Processing\ParallelQueryProcessingStrategy.cs" />
<Compile Include="Services\Processing\PartitionCompletedWorkItem.cs" />
+ <Compile Include="Services\Processing\PartitionDeletedWorkItem.cs" />
<Compile Include="Services\Processing\PartitionState.cs" />
<Compile Include="Services\Processing\PartitionStateCache.cs" />
<Compile Include="Services\Processing\PartitionStateUpdateManager.cs" />
View
5 src/EventStore/EventStore.Projections.Core/Messages/EventReaderSubscriptionMessage.cs
@@ -198,8 +198,9 @@ public string Partition
}
public PartitionDeleted(
- Guid subscriptionId, string partition, long subscriptionMessageSequenceNumber, object source = null)
- : base(subscriptionId, null, 100.0f, subscriptionMessageSequenceNumber, source)
+ Guid subscriptionId, CheckpointTag checkpointTag, string partition,
+ long subscriptionMessageSequenceNumber, object source = null)
+ : base(subscriptionId, checkpointTag, 100.0f, subscriptionMessageSequenceNumber, source)
{
_partition = partition;
}
View
6 src/EventStore/EventStore.Projections.Core/Services/IProjectionStateHandler.cs
@@ -71,6 +71,12 @@ public interface IProjectionStateHandler : IDisposable, ISourceDefinitionSource
out string newSharedState, out EmittedEventEnvelope[] emittedEvents);
/// <summary>
+ /// Processes partition deleted notification and updates internal state if necessary.
+ /// </summary>
+ /// <returns>true - if event was processed (new state must be returned) </returns>
+ bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState);
+
+ /// <summary>
/// Transforms current state into a projection result. Should not call any emit/linkTo etc
/// </summary>
/// <returns>result JSON or NULL if current state has been skipped</returns>
View
5 src/EventStore/EventStore.Projections.Core/Services/Processing/ByHandleStatePartitionSelector.cs
@@ -44,5 +44,10 @@ public override string GetStatePartition(EventReaderSubscriptionMessage.Committe
{
return _handler.GetStatePartition(@event.CheckpointTag, @event.EventCategory, @event.Data);
}
+
+ public override bool EventReaderBasePartitionDeletedIsSupported()
+ {
+ return false;
+ }
}
}
View
5 src/EventStore/EventStore.Projections.Core/Services/Processing/ByPositionStreamStatePartitionSelector.cs
@@ -53,5 +53,10 @@ public override string GetStatePartition(EventReaderSubscriptionMessage.Committe
? eventStreamId.Substring("$$".Length)
: eventStreamId;
}
+
+ public override bool EventReaderBasePartitionDeletedIsSupported()
+ {
+ return true;
+ }
}
}
View
5 src/EventStore/EventStore.Projections.Core/Services/Processing/ByStreamStatePartitionSelector.cs
@@ -54,5 +54,10 @@ public override string GetStatePartition(EventReaderSubscriptionMessage.Committe
? eventStreamId.Substring("$$".Length)
: eventStreamId;
}
+
+ public override bool EventReaderBasePartitionDeletedIsSupported()
+ {
+ return true;
+ }
}
}
View
4 src/EventStore/EventStore.Projections.Core/Services/Processing/CoreProjectionQueue.cs
@@ -111,7 +111,7 @@ public void EnqueueOutOfOrderTask(WorkItem workItem)
_queuePendingEvents.Enqueue(workItem);
}
- public void InitializeQueue(CheckpointTag zeroCheckpointTag)
+ public void InitializeQueue(CheckpointTag startingPosition)
{
_subscriptionPaused = false;
_unsubscribed = false;
@@ -121,7 +121,7 @@ public void InitializeQueue(CheckpointTag zeroCheckpointTag)
_queuePendingEvents.Initialize();
- _lastEnqueuedEventTag = zeroCheckpointTag;
+ _lastEnqueuedEventTag = startingPosition;
_justInitialized = true;
}
View
4 src/EventStore/EventStore.Projections.Core/Services/Processing/EventProcessedResult.cs
@@ -41,17 +41,19 @@ public class EventProcessedResult
private readonly CheckpointTag _checkpointTag;
private readonly Guid _causedBy;
private readonly string _correlationId;
+ private readonly bool _isPartitionTombstone;
public EventProcessedResult(
string partition, CheckpointTag checkpointTag, PartitionState oldState, PartitionState newState,
PartitionState oldSharedState, PartitionState newSharedState, EmittedEventEnvelope[] emittedEvents,
- Guid causedBy, string correlationId)
+ Guid causedBy, string correlationId, bool isPartitionTombstone = false)
{
if (partition == null) throw new ArgumentNullException("partition");
if (checkpointTag == null) throw new ArgumentNullException("checkpointTag");
_emittedEvents = emittedEvents;
_causedBy = causedBy;
_correlationId = correlationId;
+ _isPartitionTombstone = isPartitionTombstone;
_oldState = oldState;
_newState = newState;
_oldSharedState = oldSharedState;
View
102 src/EventStore/EventStore.Projections.Core/Services/Processing/EventProcessingProjectionProcessingPhase.cs
@@ -94,18 +94,19 @@ public void Handle(EventReaderSubscriptionMessage.CommittedEventReceived message
public void Handle(EventReaderSubscriptionMessage.PartitionDeleted message)
{
- throw new NotImplementedException();
//TODO: make sure this is no longer required : if (_state != State.StateLoaded)
if (IsOutOfOrderSubscriptionMessage(message))
return;
RegisterSubscriptionMessage(message);
try
{
- CheckpointTag eventTag = message.CheckpointTag;
-// var committedEventWorkItem = new CommittedEventWorkItem(this, message, _statePartitionSelector);
- //_processingQueue.EnqueueTask(committedEventWorkItem, eventTag);
- if (_state == PhaseState.Running) // prevent processing mostly one projection
- EnsureTickPending();
+ if (_statePartitionSelector.EventReaderBasePartitionDeletedIsSupported())
+ {
+ var partitionDeletedWorkItem = new PartitionDeletedWorkItem(this, message);
+ _processingQueue.EnqueueOutOfOrderTask(partitionDeletedWorkItem);
+ if (_state == PhaseState.Running) // prevent processing mostly one projection
+ EnsureTickPending();
+ }
}
catch (Exception ex)
{
@@ -164,6 +165,21 @@ public string TransformCatalogEvent(EventReaderSubscriptionMessage.CommittedEven
}
}
+ public EventProcessedResult ProcessPartitionDeleted(string partition, CheckpointTag deletedPosition)
+ {
+ switch (_state)
+ {
+ case PhaseState.Running:
+ var result = InternalProcessPartitionDeleted(partition, deletedPosition);
+ return result;
+ case PhaseState.Stopped:
+ _logger.Error("Ignoring committed event in stopped state");
+ return null;
+ default:
+ throw new NotSupportedException();
+ }
+ }
+
private EventProcessedResult InternalProcessCommittedEvent(
string partition, EventReaderSubscriptionMessage.CommittedEventReceived message)
{
@@ -187,6 +203,22 @@ public string TransformCatalogEvent(EventReaderSubscriptionMessage.CommittedEven
return null;
}
+ private EventProcessedResult InternalProcessPartitionDeleted(
+ string partition, CheckpointTag deletedPosition)
+ {
+ string newState;
+ string projectionResult;
+ var hasBeenProcessed = SafeProcessPartitionDeletedByHandler(
+ partition, deletedPosition, out newState, out projectionResult);
+ if (hasBeenProcessed)
+ {
+ var newPartitionState = new PartitionState(newState, projectionResult, deletedPosition);
+
+ return InternalPartitionDeletedProcessed(partition, deletedPosition, newPartitionState);
+ }
+ return null;
+ }
+
private string InternalTransformCatalogEvent(
EventReaderSubscriptionMessage.CommittedEventReceived message)
{
@@ -223,6 +255,31 @@ public string TransformCatalogEvent(EventReaderSubscriptionMessage.CommittedEven
return hasBeenProcessed;
}
+ private bool SafeProcessPartitionDeletedByHandler(
+ string partition, CheckpointTag deletedPosition, out string newState,
+ out string projectionResult)
+ {
+ projectionResult = null;
+ //TODO: not emitting (optimized) projection handlers can skip serializing state on each processed event
+ bool hasBeenProcessed;
+ try
+ {
+ hasBeenProcessed = ProcessPartitionDeletedByHandler(
+ partition, deletedPosition, out newState, out projectionResult);
+ }
+ catch (Exception ex)
+ {
+ SetFaulting(
+ String.Format(
+ "The {0} projection failed to process a delete partition notification.\r\nHandler: {1}\r\nEvent Position: {2}\r\n\r\nMessage:\r\n\r\n{3}",
+ _projectionName, GetHandlerTypeName(), deletedPosition, ex.Message), ex);
+ newState = null;
+ hasBeenProcessed = false;
+ }
+ newState = newState ?? "";
+ return hasBeenProcessed;
+ }
+
private string SafeTransformCatalogEventByHandler(EventReaderSubscriptionMessage.CommittedEventReceived message)
{
string result;
@@ -282,6 +339,39 @@ private string GetHandlerTypeName()
return result;
}
+ private bool ProcessPartitionDeletedByHandler(
+ string partition, CheckpointTag deletePosition, out string newState,
+ out string projectionResult)
+ {
+ projectionResult = null;
+ SetHandlerState(partition);
+ _stopwatch.Start();
+ var result = _projectionStateHandler.ProcessPartitionDeleted(
+ partition, deletePosition, out newState);
+ if (result)
+ {
+ var oldState = _partitionStateCache.GetLockedPartitionState(partition);
+ //TODO: depending on query processing final state to result transformation should happen either here (if EOF) on while writing results
+ if ( /*_producesRunningResults && */oldState.State != newState)
+ {
+ if (_definesStateTransform)
+ {
+ projectionResult = _projectionStateHandler.TransformStateToResult();
+ }
+ else
+ {
+ projectionResult = newState;
+ }
+ }
+ else
+ {
+ projectionResult = oldState.Result;
+ }
+ }
+ _stopwatch.Stop();
+ return result;
+ }
+
private string TransformCatalogEventByHandler(EventReaderSubscriptionMessage.CommittedEventReceived message)
{
_stopwatch.Start();
View
2  src/EventStore/EventStore.Projections.Core/Services/Processing/EventReaderCoreService.cs
@@ -236,6 +236,8 @@ public void Handle(ReaderSubscriptionMessage.EventReaderPartitionDeleted message
Guid projectionId;
if (_stopped)
return;
+ if (_runHeadingReader && _headingEventReader.Handle(message))
+ return;
if (!_eventReaderSubscriptions.TryGetValue(message.CorrelationId, out projectionId))
return; // unsubscribed
_subscriptions[projectionId].Handle(message);
View
23 ...Store/EventStore.Projections.Core/Services/Processing/EventSubscriptionBasedProjectionProcessingPhase.cs
@@ -453,6 +453,29 @@ public string GetStatus()
else return null;
}
+ protected EventProcessedResult InternalPartitionDeletedProcessed(
+ string partition, CheckpointTag deletePosition,
+ PartitionState newPartitionState
+ )
+ {
+ var oldState = _partitionStateCache.GetLockedPartitionState(partition);
+ var oldSharedState = _isBiState ? _partitionStateCache.GetLockedPartitionState("") : null;
+ bool changed = oldState.IsChanged(newPartitionState);
+
+
+ PartitionState partitionState = null;
+ // NOTE: projectionResult cannot change independently unless projection definition has changed
+ if (changed)
+ {
+ var lockPartitionStateAt = partition != "" ? deletePosition : null;
+ partitionState = newPartitionState;
+ _partitionStateCache.CacheAndLockPartitionState(partition, partitionState, lockPartitionStateAt);
+ }
+ return new EventProcessedResult(
+ partition, deletePosition, oldState, partitionState, oldSharedState, null, null, Guid.Empty, null,
+ isPartitionTombstone: true);
+ }
+
public void BeginGetPartitionStateAt(
string statePartition, CheckpointTag at, Action<PartitionState> loadCompleted, bool lockLoaded)
{
View
118 src/EventStore/EventStore.Projections.Core/Services/Processing/HeadingEventReader.cs
@@ -31,6 +31,7 @@
using EventStore.Common.Log;
using EventStore.Core.Bus;
using EventStore.Core.Data;
+using EventStore.Core.Messaging;
using EventStore.Projections.Core.Messages;
namespace EventStore.Projections.Core.Services.Processing
@@ -41,8 +42,51 @@ public class HeadingEventReader
private IEventReader _headEventReader;
private TFPos _subscribeFromPosition = new TFPos(long.MaxValue, long.MaxValue);
- private readonly Queue<ReaderSubscriptionMessage.CommittedEventDistributed> _lastMessages =
- new Queue<ReaderSubscriptionMessage.CommittedEventDistributed>();
+ private abstract class Item
+ {
+ public readonly TFPos Position;
+
+ public Item(TFPos position)
+ {
+ Position = position;
+ }
+
+ public abstract void Handle(IReaderSubscription subscription);
+ }
+
+ private class CommittedEventItem : Item
+ {
+ public readonly ReaderSubscriptionMessage.CommittedEventDistributed Message;
+
+ public CommittedEventItem(ReaderSubscriptionMessage.CommittedEventDistributed message)
+ : base(message.Data.Position)
+ {
+ Message = message;
+ }
+
+ public override void Handle(IReaderSubscription subscription)
+ {
+ subscription.Handle(Message);
+ }
+ }
+
+ private class PartitionDeletedItem : Item
+ {
+ public readonly ReaderSubscriptionMessage.EventReaderPartitionDeleted Message;
+
+ public PartitionDeletedItem(ReaderSubscriptionMessage.EventReaderPartitionDeleted message)
+ : base(message.DeleteEventPosition.Value)
+ {
+ Message = message;
+ }
+
+ public override void Handle(IReaderSubscription subscription)
+ {
+ subscription.Handle(Message);
+ }
+ }
+
+ private readonly Queue<Item> _lastMessages = new Queue<Item>();
private readonly int _eventCacheSize;
@@ -50,9 +94,13 @@ public class HeadingEventReader
new Dictionary<Guid, IReaderSubscription>();
private bool _headEventReaderPaused;
+
private Guid _eventReaderId;
+
private bool _started;
+
private TFPos _lastEventPosition = new TFPos(0, -1);
+ private TFPos _lastDeletePosition = new TFPos(0, -1);
public HeadingEventReader(int eventCacheSize)
{
@@ -78,6 +126,25 @@ public bool Handle(ReaderSubscriptionMessage.CommittedEventDistributed message)
return true;
}
+ public bool Handle(ReaderSubscriptionMessage.EventReaderPartitionDeleted message)
+ {
+ EnsureStarted();
+ if (message.CorrelationId != _eventReaderId)
+ return false;
+
+ ValidateEventOrder(message);
+
+
+ CacheRecentMessage(message);
+ DistributeMessage(message);
+ if (_headSubscribers.Count == 0 && !_headEventReaderPaused)
+ {
+ // _headEventReader.Pause();
+ // _headEventReaderPaused = true;
+ }
+ return true;
+ }
+
public bool Handle(ReaderSubscriptionMessage.EventReaderIdle message)
{
EnsureStarted();
@@ -89,14 +156,25 @@ public bool Handle(ReaderSubscriptionMessage.EventReaderIdle message)
private void ValidateEventOrder(ReaderSubscriptionMessage.CommittedEventDistributed message)
{
- if (_lastEventPosition >= message.Data.Position)
+ if (_lastEventPosition >= message.Data.Position || _lastDeletePosition > message.Data.Position)
throw new InvalidOperationException(
string.Format(
- "Invalid committed event order. Last: '{0}' Received: '{1}'", _lastEventPosition,
- message.Data.Position));
+ "Invalid committed event order. Last: '{0}' Received: '{1}' LastDelete: '{2}'",
+ _lastEventPosition, message.Data.Position, _lastEventPosition));
_lastEventPosition = message.Data.Position;
}
+ private void ValidateEventOrder(ReaderSubscriptionMessage.EventReaderPartitionDeleted message)
+ {
+ if (_lastEventPosition > message.DeleteEventPosition.Value
+ || _lastDeletePosition >= message.DeleteEventPosition.Value)
+ throw new InvalidOperationException(
+ string.Format(
+ "Invalid partition deleted event order. Last: '{0}' Received: '{1}' LastDelete: '{2}'",
+ _lastEventPosition, message.DeleteEventPosition.Value, _lastEventPosition));
+ _lastDeletePosition = message.DeleteEventPosition.Value;
+ }
+
public void Start(Guid eventReaderId, IEventReader eventReader)
{
if (_started)
@@ -148,13 +226,11 @@ public void Unsubscribe(Guid projectionId)
_headSubscribers.Remove(projectionId);
}
- private void DispatchRecentMessagesTo(
- IHandle<ReaderSubscriptionMessage.CommittedEventDistributed> subscription,
- long fromTransactionFilePosition)
+ private void DispatchRecentMessagesTo(IReaderSubscription subscription, long fromTransactionFilePosition)
{
foreach (var m in _lastMessages)
- if (m.Data.Position.CommitPosition >= fromTransactionFilePosition)
- subscription.Handle(m);
+ if (m.Position.CommitPosition >= fromTransactionFilePosition)
+ m.Handle(subscription);
}
private void DistributeMessage(ReaderSubscriptionMessage.CommittedEventDistributed message)
@@ -163,6 +239,12 @@ private void DistributeMessage(ReaderSubscriptionMessage.CommittedEventDistribut
subscriber.Handle(message);
}
+ private void DistributeMessage(ReaderSubscriptionMessage.EventReaderPartitionDeleted message)
+ {
+ foreach (var subscriber in _headSubscribers.Values)
+ subscriber.Handle(message);
+ }
+
private void DistributeMessage(ReaderSubscriptionMessage.EventReaderIdle message)
{
foreach (var subscriber in _headSubscribers.Values)
@@ -171,13 +253,24 @@ private void DistributeMessage(ReaderSubscriptionMessage.EventReaderIdle message
private void CacheRecentMessage(ReaderSubscriptionMessage.CommittedEventDistributed message)
{
- _lastMessages.Enqueue(message);
+ _lastMessages.Enqueue(new CommittedEventItem(message));
if (_lastMessages.Count > _eventCacheSize)
{
_lastMessages.Dequeue();
}
var lastAvailableCommittedevent = _lastMessages.Peek();
- _subscribeFromPosition = lastAvailableCommittedevent.Data.Position;
+ _subscribeFromPosition = lastAvailableCommittedevent.Position;
+ }
+
+ private void CacheRecentMessage(ReaderSubscriptionMessage.EventReaderPartitionDeleted message)
+ {
+ _lastMessages.Enqueue(new PartitionDeletedItem(message));
+ if (_lastMessages.Count > _eventCacheSize)
+ {
+ _lastMessages.Dequeue();
+ }
+ var lastAvailableCommittedevent = _lastMessages.Peek();
+ _subscribeFromPosition = lastAvailableCommittedevent.Position;
}
private void AddSubscriber(Guid publishWithCorrelationId, IReaderSubscription subscription)
@@ -198,5 +291,6 @@ private void EnsureStarted()
if (!_started)
throw new InvalidOperationException("Not started");
}
+
}
}
View
2  src/EventStore/EventStore.Projections.Core/Services/Processing/IEventProcessingPhase.cs
@@ -65,5 +65,7 @@ public interface IEventProcessingProjectionPhase : IProjectionPhaseStateManager
void EmitEofResult(
string partition, string resultBody, CheckpointTag causedBy, Guid causedByGuid, string correlationId);
+
+ EventProcessedResult ProcessPartitionDeleted(string partition, CheckpointTag deletedPosition);
}
}
View
5 src/EventStore/EventStore.Projections.Core/Services/Processing/NoopStatePartitionSelector.cs
@@ -35,5 +35,10 @@ public override string GetStatePartition(EventReaderSubscriptionMessage.Committe
{
return "";
}
+
+ public override bool EventReaderBasePartitionDeletedIsSupported()
+ {
+ return false;
+ }
}
}
View
96 src/EventStore/EventStore.Projections.Core/Services/Processing/PartitionDeletedWorkItem.cs
@@ -0,0 +1,96 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+using System;
+using EventStore.Projections.Core.Messages;
+
+namespace EventStore.Projections.Core.Services.Processing
+{
+ class PartitionDeletedWorkItem : WorkItem
+ {
+ private readonly EventReaderSubscriptionMessage.PartitionDeleted _message;
+ private readonly string _partition;
+ private readonly IEventProcessingProjectionPhase _projection;
+ private EventProcessedResult _eventProcessedResult;
+
+ public PartitionDeletedWorkItem(
+ IEventProcessingProjectionPhase projection, EventReaderSubscriptionMessage.PartitionDeleted message)
+ : base(null)
+ {
+ _partition = message.Partition;
+ _projection = projection;
+ _message = message;
+ _requiresRunning = true;
+ }
+
+ protected override void GetStatePartition()
+ {
+ NextStage(_partition);
+ }
+
+ protected override void Load(CheckpointTag checkpointTag)
+ {
+ // we load partition state even if stopping etc. should we skip?
+ _projection.BeginGetPartitionStateAt(_partition, _message.CheckpointTag, LoadCompleted, lockLoaded: true);
+ }
+
+ private void LoadCompleted(PartitionState state)
+ {
+ NextStage();
+ }
+
+ protected override void ProcessEvent()
+ {
+ if (_partition == null)
+ {
+ NextStage();
+ return;
+ }
+ var eventProcessedResult = _projection.ProcessPartitionDeleted(_partition, _message.CheckpointTag);
+ if (eventProcessedResult != null)
+ SetEventProcessedResult(eventProcessedResult);
+ NextStage();
+ }
+
+ protected override void WriteOutput()
+ {
+ if (_partition == null)
+ {
+ NextStage();
+ return;
+ }
+ _projection.FinalizeEventProcessing(_eventProcessedResult, _message.CheckpointTag, _message.Progress);
+ NextStage();
+ }
+
+ private void SetEventProcessedResult(EventProcessedResult eventProcessedResult)
+ {
+ _eventProcessedResult = eventProcessedResult;
+ }
+ }
+}
View
1  src/EventStore/EventStore.Projections.Core/Services/Processing/PositionTracker.cs
@@ -72,5 +72,6 @@ public void Initialize()
{
_lastTag = null;
}
+
}
}
View
3  src/EventStore/EventStore.Projections.Core/Services/Processing/ReaderSubscription.cs
@@ -56,7 +56,8 @@ public void Handle(ReaderSubscriptionMessage.EventReaderIdle message)
public void Handle(ReaderSubscriptionMessage.EventReaderPartitionDeleted message)
{
- PublishPartitionDeleted(message.Partition);
+ var deletePosition = _positionTagger.MakeCheckpointTag(_positionTracker.LastTag, message);
+ PublishPartitionDeleted(message.Partition, deletePosition);
}
}
}
View
8 src/EventStore/EventStore.Projections.Core/Services/Processing/ReaderSubscriptionBase.cs
@@ -44,8 +44,8 @@ public class ReaderSubscriptionBase
private readonly bool _stopOnEof;
private readonly int? _stopAfterNEvents;
private readonly EventFilter _eventFilter;
- private readonly PositionTagger _positionTagger;
- private readonly PositionTracker _positionTracker;
+ protected readonly PositionTagger _positionTagger;
+ protected readonly PositionTracker _positionTracker;
private long? _lastPassedOrCheckpointedEventPosition;
private float _progress = -1;
private long _subscriptionMessageSequenceNumber;
@@ -152,11 +152,11 @@ private void PublishProgress()
_subscriptionId, _positionTracker.LastTag, _progress, _subscriptionMessageSequenceNumber++));
}
- protected void PublishPartitionDeleted(string partition)
+ protected void PublishPartitionDeleted(string partition, CheckpointTag deletePosition)
{
_publisher.Publish(
new EventReaderSubscriptionMessage.PartitionDeleted(
- _subscriptionId, partition, _subscriptionMessageSequenceNumber++));
+ _subscriptionId, deletePosition, partition, _subscriptionMessageSequenceNumber++));
}
protected void PublishStartingAt(long startingLastCommitPosition)
View
1  src/EventStore/EventStore.Projections.Core/Services/Processing/StatePartitionSelector.cs
@@ -32,5 +32,6 @@ namespace EventStore.Projections.Core.Services.Processing
public abstract class StatePartitionSelector
{
public abstract string GetStatePartition(EventReaderSubscriptionMessage.CommittedEventReceived @event);
+ public abstract bool EventReaderBasePartitionDeletedIsSupported();
}
}
View
5 src/EventStore/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs
@@ -219,6 +219,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
CheckDisposed();
View
5 src/EventStore/EventStore.Projections.Core/Standard/CategorizeEventsByStreamPath.cs
@@ -115,6 +115,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
throw new NotImplementedException();
View
5 src/EventStore/EventStore.Projections.Core/Standard/CategorizeStreamByPath.cs
@@ -104,6 +104,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
throw new NotImplementedException();
View
5 src/EventStore/EventStore.Projections.Core/Standard/IndexEventsByEventType.cs
@@ -120,6 +120,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
throw new NotImplementedException();
View
5 src/EventStore/EventStore.Projections.Core/Standard/IndexStreams.cs
@@ -102,6 +102,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
throw new NotImplementedException();
View
5 src/EventStore/EventStore.Projections.Core/Standard/StubHandler.cs
@@ -88,6 +88,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
throw new NotImplementedException();
View
5 src/EventStore/EventStore.Web/Users/IndexUsersProjectionHandler.cs
@@ -113,6 +113,11 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true;
}
+ public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
+ {
+ throw new NotImplementedException();
+ }
+
public string TransformStateToResult()
{
throw new NotImplementedException();
View
1  src/ReformatOptions.DotSettings
@@ -2,4 +2,5 @@
<s:String x:Key="/Default/CodeStyle/CodeCleanup/Profiles/=header/@EntryIndexedValue">&lt;?xml version="1.0" encoding="utf-16"?&gt;&lt;Profile name="header"&gt;&lt;CSUpdateFileHeader&gt;True&lt;/CSUpdateFileHeader&gt;&lt;/Profile&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/CodeCleanup/Profiles/=removecast/@EntryIndexedValue">&lt;?xml version="1.0" encoding="utf-16"?&gt;&lt;Profile name="removecast"&gt;&lt;CSRemoveCodeRedundancies&gt;True&lt;/CSRemoveCodeRedundancies&gt;&lt;/Profile&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/CodeCleanup/Profiles/=removeredudant/@EntryIndexedValue">&lt;?xml version="1.0" encoding="utf-16"?&gt;&lt;Profile name="removeredudant"&gt;&lt;CSOptimizeUsings&gt;&lt;OptimizeUsings&gt;True&lt;/OptimizeUsings&gt;&lt;EmbraceInRegion&gt;False&lt;/EmbraceInRegion&gt;&lt;RegionName&gt;&lt;/RegionName&gt;&lt;/CSOptimizeUsings&gt;&lt;CSShortenReferences&gt;True&lt;/CSShortenReferences&gt;&lt;/Profile&gt;</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeCleanup/Profiles/=SW_0020main/@EntryIndexedValue">&lt;?xml version="1.0" encoding="utf-16"?&gt;&lt;Profile name="SW main"&gt;&lt;CSOptimizeUsings&gt;&lt;OptimizeUsings&gt;True&lt;/OptimizeUsings&gt;&lt;EmbraceInRegion&gt;False&lt;/EmbraceInRegion&gt;&lt;RegionName&gt;&lt;/RegionName&gt;&lt;/CSOptimizeUsings&gt;&lt;CSUpdateFileHeader&gt;True&lt;/CSUpdateFileHeader&gt;&lt;CSShortenReferences&gt;True&lt;/CSShortenReferences&gt;&lt;CSReformatCode&gt;True&lt;/CSReformatCode&gt;&lt;/Profile&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/CodeCleanup/RecentlyUsedProfile/@EntryValue">Default: Reformat Code</s:String></wpf:ResourceDictionary>
Please sign in to comment.
Something went wrong with that request. Please try again.