Permalink
Browse files

Merge branch 'projections' into dev

Conflicts:
	src/EventStore/EventStore.Projections.Core/Messages/EventReaderSubscriptionMessage.cs
	src/EventStore/EventStore.Projections.Core/Services/Processing/EventProcessingProjectionProcessingPhase.cs
  • Loading branch information...
2 parents 3253d2e + 2139d44 commit a6a9ede4c4f4b00dc6ab3649dbc57fc188092d84 @ysw ysw committed Feb 24, 2014
Showing with 246 additions and 78 deletions.
  1. +1 −1 src/EventStore/EventStore.ClientAPI/ClientOperations/SubscriptionOperation.cs
  2. +1 −0 src/EventStore/EventStore.Projections.Core/EventStore.Projections.Core.csproj
  3. +84 −17 src/EventStore/EventStore.Projections.Core/Messages/ParallelQueryProcessingMessages.cs
  4. +2 −0 src/EventStore/EventStore.Projections.Core/ProjectionWorkerNode.cs
  5. +0 −8 src/EventStore/EventStore.Projections.Core/Services/Processing/CoreProjection.cs
  6. +16 −0 ...Store/EventStore.Projections.Core/Services/Processing/EventProcessingProjectionProcessingPhase.cs
  7. +22 −17 ...ventStore.Projections.Core/Services/Processing/EventSubscriptionBasedProjectionProcessingPhase.cs
  8. +0 −1 src/EventStore/EventStore.Projections.Core/Services/Processing/ExternallyFedByStreamEventReader.cs
  9. +37 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/IProgressResultWriter.cs
  10. +3 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/IResultWriter.cs
  11. +15 −15 src/EventStore/EventStore.Projections.Core/Services/Processing/ParallelProcessingLoadBalancer.cs
  12. +7 −3 ...e/EventStore.Projections.Core/Services/Processing/ParallelQueryMasterProjectionProcessingPhase.cs
  13. +0 −1 src/EventStore/EventStore.Projections.Core/Services/Processing/ProcessingStrategySelector.cs
  14. +4 −1 src/EventStore/EventStore.Projections.Core/Services/Processing/ProgressWorkItem.cs
  15. +10 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/ResultWriter.cs
  16. +11 −1 src/EventStore/EventStore.Projections.Core/Services/Processing/SlaveResultWriter.cs
  17. +15 −3 src/EventStore/EventStore.Projections.Core/Services/Processing/SpoolStreamProcessingWorkItem.cs
  18. +1 −1 src/EventStore/EventStore.Projections.Core/Services/ReaderSubscriptionDispatcher.cs
  19. +7 −2 src/EventStore/EventStore.Projections.v8Integration/CompiledScript.cpp
  20. +1 −0 src/EventStore/EventStore.Projections.v8Integration/CompiledScript.h
  21. +1 −1 src/EventStore/EventStore.Projections.v8Integration/ModuleScript.cpp
  22. +4 −4 src/EventStore/EventStore.Projections.v8Integration/PreludeScript.cpp
  23. +4 −2 src/EventStore/EventStore.Projections.v8Integration/QueryScript.cpp
@@ -127,7 +127,7 @@ public InspectionResult InspectPackage(TcpPackage package)
return new InspectionResult(InspectionDecision.Subscribed, "SubscriptionConfirmation");
}
- case TcpCommand.StreamEventAppeared:
+ case TcpCommand.StreamEventAppeared:
{
var dto = package.Data.Deserialize<ClientMessage.StreamEventAppeared>();
EventAppeared(new ResolvedEvent(dto.Event));
@@ -222,6 +222,7 @@
<Compile Include="Services\Processing\IEventReader.cs" />
<Compile Include="Services\Processing\IEventWriter.cs" />
<Compile Include="Services\Processing\IEventProcessingPhase.cs" />
+ <Compile Include="Services\Processing\IProgressResultWriter.cs" />
<Compile Include="Services\Processing\IProjectionProcessingPhase.cs" />
<Compile Include="Services\Processing\IReaderStrategy.cs" />
<Compile Include="Services\Processing\IResultEventEmitter.cs" />
@@ -6,35 +6,65 @@ namespace EventStore.Projections.Core.Messages
{
namespace ParallelQueryProcessingMessages
{
- public sealed class PartitionProcessingResult : Message
+ public abstract class PartitionProcessingResultBase : Message
{
private static readonly int TypeId = System.Threading.Interlocked.Increment(ref NextMsgId);
- public override int MsgTypeId { get { return TypeId; } }
- private readonly Guid _correlationId;
- private readonly Guid _subscriptionId;
- private readonly string _partition;
+ public override int MsgTypeId
+ {
+ get { return TypeId; }
+ }
+
+ protected readonly Guid _correlationId;
+ protected readonly Guid _subscriptionId;
+ protected readonly string _partition;
+
+ protected PartitionProcessingResultBase(Guid correlationId, Guid subscriptionId, string partition)
+ {
+ _correlationId = correlationId;
+ _subscriptionId = subscriptionId;
+ _partition = partition;
+ }
+
+ public string Partition
+ {
+ get { return _partition; }
+ }
+
+ public Guid CorrelationId
+ {
+ get { return _correlationId; }
+ }
+
+ public Guid SubscriptionId
+ {
+ get { return _subscriptionId; }
+ }
+ }
+
+ public sealed class PartitionProcessingResult : PartitionProcessingResultBase
+ {
+ private static readonly int TypeId = System.Threading.Interlocked.Increment(ref NextMsgId);
+
+ public override int MsgTypeId
+ {
+ get { return TypeId; }
+ }
+
private readonly string _result;
private readonly Guid _causedByGuid;
private readonly CheckpointTag _position;
public PartitionProcessingResult(
Guid correlationId, Guid subscriptionId, string partition, Guid causedByGuid, CheckpointTag position,
string result)
+ : base(correlationId, subscriptionId, partition)
{
- _correlationId = correlationId;
- _subscriptionId = subscriptionId;
- _partition = partition;
_causedByGuid = causedByGuid;
_position = position;
_result = result;
}
- public string Partition
- {
- get { return _partition; }
- }
-
public string Result
{
get { return _result; }
@@ -50,16 +80,53 @@ public CheckpointTag Position
get { return _position; }
}
- public Guid CorrelationId
+ }
+
+ public sealed class PartitionMeasured : PartitionProcessingResultBase
+ {
+ private static readonly int TypeId = System.Threading.Interlocked.Increment(ref NextMsgId);
+
+ public override int MsgTypeId
{
- get { return _correlationId; }
+ get { return TypeId; }
}
- public Guid SubscriptionId
+ private readonly int _size;
+
+ public PartitionMeasured(Guid correlationId, Guid subscriptionId, string partition, int size)
+ : base(correlationId, subscriptionId, partition)
{
- get { return _subscriptionId; }
+ _size = size;
+ }
+
+
+ public int Size
+ {
+ get { return _size; }
}
}
+ public sealed class PartitionProcessingProgress : PartitionProcessingResultBase
+ {
+ private static readonly int TypeId = System.Threading.Interlocked.Increment(ref NextMsgId);
+
+ public override int MsgTypeId
+ {
+ get { return TypeId; }
+ }
+
+ private readonly float _progress;
+
+ public PartitionProcessingProgress(Guid correlationId, Guid subscriptionId, float progress)
+ : base(correlationId, subscriptionId, null)
+ {
+ _progress = progress;
+ }
+
+ public float Progress
+ {
+ get { return _progress; }
+ }
+ }
}
}
@@ -99,6 +99,8 @@ public void SetupMessaging(IBus coreInputBus)
coreInputBus.Subscribe(_subscriptionDispatcher.CreateSubscriber<EventReaderSubscriptionMessage.NotAuthorized>());
coreInputBus.Subscribe(_subscriptionDispatcher.CreateSubscriber<EventReaderSubscriptionMessage.ReaderAssignedReader>());
coreInputBus.Subscribe(_spoolProcessingResponseDispatcher.CreateSubscriber<PartitionProcessingResult>());
+ coreInputBus.Subscribe(_spoolProcessingResponseDispatcher.CreateSubscriber<PartitionMeasured>());
+ coreInputBus.Subscribe(_spoolProcessingResponseDispatcher.CreateSubscriber<PartitionProcessingProgress>());
coreInputBus.Subscribe(_feedReaderService);
@@ -27,14 +27,12 @@
//
using System;
-using System.Runtime.InteropServices;
using System.Security.Principal;
using EventStore.Common.Log;
using EventStore.Core.Bus;
using EventStore.Core.Helpers;
using EventStore.Core.Messaging;
using EventStore.Core.Services.TimerService;
-using EventStore.Core.Services.UserManagement;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Messages.ParallelQueryProcessingMessages;
using EventStore.Projections.Core.Utils;
@@ -49,7 +47,6 @@ public class CoreProjection : IDisposable,
ICoreProjectionForProcessingPhase,
IHandle<CoreProjectionManagementMessage.GetState>,
IHandle<CoreProjectionManagementMessage.GetResult>,
- IHandle<PartitionProcessingResult>,
IHandle<ProjectionManagementMessage.SlaveProjectionsStarted>
{
[Flags]
@@ -719,11 +716,6 @@ public void Subscribed()
GoToState(State.Subscribed);
}
- public void Handle(PartitionProcessingResult message)
- {
- throw new NotImplementedException();
- }
-
public void Handle(ProjectionManagementMessage.SlaveProjectionsStarted message)
{
_slaveProjections = message.SlaveProjections;
@@ -38,6 +38,7 @@ public class EventProcessingProjectionProcessingPhase : EventSubscriptionBasedPr
IHandle<EventReaderSubscriptionMessage.CommittedEventReceived>,
IHandle<EventReaderSubscriptionMessage.PartitionEofReached>,
IHandle<EventReaderSubscriptionMessage.PartitionDeleted>,
+ IHandle<EventReaderSubscriptionMessage.PartitionMeasured>,
IEventProcessingProjectionPhase
{
private readonly IProjectionStateHandler _projectionStateHandler;
@@ -476,5 +477,20 @@ public override void Dispose()
if (_projectionStateHandler != null)
_projectionStateHandler.Dispose();
}
+
+ public void Handle(EventReaderSubscriptionMessage.PartitionMeasured message)
+ {
+ if (IsOutOfOrderSubscriptionMessage(message))
+ return;
+ RegisterSubscriptionMessage(message);
+ try
+ {
+ _resultWriter.WritePartitionMeasured(message.SubscriptionId, message.Partition, message.Size);
+ }
+ catch (Exception ex)
+ {
+ _coreProjection.SetFaulted(ex);
+ }
+ }
}
}
@@ -42,14 +42,14 @@ public abstract class EventSubscriptionBasedProjectionProcessingPhase : IProject
IHandle<EventReaderSubscriptionMessage.EofReached>,
IHandle<EventReaderSubscriptionMessage.CheckpointSuggested>,
IHandle<EventReaderSubscriptionMessage.ReaderAssignedReader>,
- IHandle<EventReaderSubscriptionMessage.PartitionMeasured>,
IProjectionProcessingPhase,
IProjectionPhaseStateManager
{
protected readonly IPublisher _publisher;
protected readonly ICoreProjectionForProcessingPhase _coreProjection;
protected readonly Guid _projectionCorrelationId;
protected readonly ICoreProjectionCheckpointManager _checkpointManager;
+ protected readonly IProgressResultWriter _progressResultWriter;
protected readonly ProjectionConfig _projectionConfig;
protected readonly string _projectionName;
protected readonly ILogger _logger;
@@ -94,6 +94,7 @@ public abstract class EventSubscriptionBasedProjectionProcessingPhase : IProject
_useCheckpoints = useCheckpoints;
_stopOnEof = stopOnEof;
_isBiState = isBiState;
+ _progressResultWriter = new ProgressResultWriter(this, _resultWriter);
}
public void UnlockAndForgetBefore(CheckpointTag checkpointTag)
@@ -147,7 +148,7 @@ public void Handle(EventReaderSubscriptionMessage.ProgressChanged message)
RegisterSubscriptionMessage(message);
try
{
- var progressWorkItem = new ProgressWorkItem(_checkpointManager, message.Progress);
+ var progressWorkItem = new ProgressWorkItem(_checkpointManager, _progressResultWriter, message.Progress);
_processingQueue.EnqueueTask(progressWorkItem, message.CheckpointTag, allowCurrentPosition: true);
ProcessEvent();
}
@@ -214,21 +215,6 @@ public void Handle(EventReaderSubscriptionMessage.EofReached message)
}
}
- public void Handle(EventReaderSubscriptionMessage.PartitionMeasured message)
- {
- if (IsOutOfOrderSubscriptionMessage(message))
- return;
- RegisterSubscriptionMessage(message);
- try
- {
- //TODO: handle
- }
- catch (Exception ex)
- {
- _coreProjection.SetFaulted(ex);
- }
- }
-
public void Handle(EventReaderSubscriptionMessage.CheckpointSuggested message)
{
if (IsOutOfOrderSubscriptionMessage(message))
@@ -537,6 +523,7 @@ PartitionState newPartitionState
}
}
_checkpointManager.EventProcessed(eventCheckpointTag, progress);
+ _progressResultWriter.WriteProgress(progress);
}
}
@@ -625,5 +612,23 @@ public void SetProjectionState(PhaseState state)
if (starting)
NewCheckpointStarted(LastProcessedEventPosition);
}
+
+ class ProgressResultWriter : IProgressResultWriter
+ {
+ private readonly EventSubscriptionBasedProjectionProcessingPhase _phase;
+ private readonly IResultWriter _resultWriter;
+
+ public ProgressResultWriter(EventSubscriptionBasedProjectionProcessingPhase phase, IResultWriter resultWriter)
+ {
+ _phase = phase;
+ _resultWriter = resultWriter;
+ }
+
+ public void WriteProgress(float progress)
+ {
+ _resultWriter.WriteProgress(_phase._currentSubscriptionId, progress);
+ }
+ }
}
+
}
@@ -33,7 +33,6 @@
using EventStore.Core.Data;
using EventStore.Core.Helpers;
using EventStore.Core.Messages;
-using EventStore.Core.Messaging;
using EventStore.Core.Services.TimerService;
using EventStore.Projections.Core.Messages;
@@ -0,0 +1,37 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+using System;
+
+namespace EventStore.Projections.Core.Services.Processing
+{
+ public interface IProgressResultWriter
+ {
+ void WriteProgress(float progress);
+ }
+}
@@ -37,11 +37,14 @@ public interface IResultWriter
Guid subscriptionId, string partition, string resultBody, CheckpointTag causedBy, Guid causedByGuid,
string correlationId);
+ void WritePartitionMeasured(Guid subscriptionId, string partition, int size);
+
void WriteRunningResult(EventProcessedResult result);
void AccountPartition(EventProcessedResult result);
void EventsEmitted(EmittedEventEnvelope[] scheduledWrites, Guid causedBy, string correlationId);
+ void WriteProgress(Guid subscriptionId, float progress);
}
}
Oops, something went wrong.

0 comments on commit a6a9ede

Please sign in to comment.