Skip to content

Commit

Permalink
Added: basic $created support without any integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ysw committed Feb 24, 2014
1 parent bd283f8 commit 4c8fd15
Show file tree
Hide file tree
Showing 19 changed files with 269 additions and 12 deletions.
Expand Up @@ -198,6 +198,7 @@
<Compile Include="Services\core_projection\query_by_stream\when_receiving_a_committed_event_the_projection_with_partitioned_state_by_custom_rule_should.cs" /> <Compile Include="Services\core_projection\query_by_stream\when_receiving_a_committed_event_the_projection_with_partitioned_state_by_custom_rule_should.cs" />
<Compile Include="Services\core_projection\TestFixtureUtils.cs" /> <Compile Include="Services\core_projection\TestFixtureUtils.cs" />
<Compile Include="Services\core_projection\TestFixtureWithReadWriteDispatchers.cs" /> <Compile Include="Services\core_projection\TestFixtureWithReadWriteDispatchers.cs" />
<Compile Include="Services\core_projection\when_creating_a_new_partitiion_the_projection_should.cs" />
<Compile Include="Services\core_projection\when_receiving_committed_events_the_projection_without_when.cs" /> <Compile Include="Services\core_projection\when_receiving_committed_events_the_projection_without_when.cs" />
<Compile Include="Services\core_projection\when_receiving_committed_event_the_projection_with_existing_partitioned_state_should.cs" /> <Compile Include="Services\core_projection\when_receiving_committed_event_the_projection_with_existing_partitioned_state_should.cs" />
<Compile Include="Services\core_projection\when_the_projection_with_pending_checkpoint_is_stopped.cs" /> <Compile Include="Services\core_projection\when_the_projection_with_pending_checkpoint_is_stopped.cs" />
Expand Down
Expand Up @@ -40,6 +40,7 @@ public class FakeProjectionStateHandler : IProjectionStateHandler
public int _initializeSharedCalled = 0; public int _initializeSharedCalled = 0;
public int _loadCalled = 0; public int _loadCalled = 0;
public int _eventsProcessed = 0; public int _eventsProcessed = 0;
public int _partitionCreatedProcessed = 0;
public string _loadedState = null; public string _loadedState = null;
public string _lastProcessedStreamId; public string _lastProcessedStreamId;
public string _lastProcessedEventType; public string _lastProcessedEventType;
Expand Down Expand Up @@ -234,6 +235,14 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
} }
} }


public bool ProcessPartitionCreated(
string partition, CheckpointTag createPosition, out EmittedEventEnvelope[] emittedEvents)
{
_partitionCreatedProcessed++;
emittedEvents = null;
return true;
}

public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState) public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
Expand Down
@@ -0,0 +1,79 @@
// Copyright (c) 2012, Event Store LLP
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// Neither the name of the Event Store LLP nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

using System;
using System.Linq;
using EventStore.Common.Utils;
using EventStore.Core.Data;
using EventStore.Core.Messages;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Services.Processing;
using NUnit.Framework;
using ResolvedEvent = EventStore.Projections.Core.Services.Processing.ResolvedEvent;

namespace EventStore.Projections.Core.Tests.Services.core_projection
{
[TestFixture]
public class when_creating_a_new_partitiion_the_projection_should : TestFixtureWithCoreProjectionStarted
{
private Guid _eventId;

protected override void Given()
{
_configureBuilderByQuerySource = source =>
{
source.FromAll();
source.AllEvents();
source.SetByStream();
source.SetDefinesStateTransform();
};
TicksAreHandledImmediately();
AllWritesSucceed();
NoOtherStreams();
}

protected override void When()
{
//projection subscribes here
_eventId = Guid.NewGuid();
_consumer.HandledMessages.Clear();
_bus.Publish(
EventReaderSubscriptionMessage.CommittedEventReceived.Sample(
new ResolvedEvent(
"account-01", -1, "account-01", -1, false, new TFPos(120, 110), _eventId, "handle_this_type",
false, "data", "metadata"), _subscriptionId, 0));
}

[Test]
public void passes_partition_created_notification_to_the_handler()
{
Assert.AreEqual(1, _stateHandler._partitionCreatedProcessed);
Assert.Inconclusive();
}
}
}
Expand Up @@ -102,6 +102,14 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true; return true;
} }


public bool ProcessPartitionCreated(
string partition, CheckpointTag createPosition, out EmittedEventEnvelope[] emittedEvents)
{
_logger("ProcessPartitionCreated");
emittedEvents = null;
return false;
}

public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState) public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
Expand Down
Expand Up @@ -105,6 +105,14 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true; return true;
} }


public bool ProcessPartitionCreated(
string partition, CheckpointTag createPosition, out EmittedEventEnvelope[] emittedEvents)
{
_logger("Process ProcessPartitionCreated");
emittedEvents = null;
return false;
}

public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState) public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
Expand Down
Expand Up @@ -104,6 +104,12 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true; return true;
} }


public bool ProcessPartitionCreated(
string partition, CheckpointTag createPosition, out EmittedEventEnvelope[] emittedEvents)
{
throw new NotImplementedException();
}

public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState) public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
Expand Down
Expand Up @@ -100,6 +100,14 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true; return true;
} }


public bool ProcessPartitionCreated(
string partition, CheckpointTag createPosition, out EmittedEventEnvelope[] emittedEvents)
{
_logger("Process ProcessPartitionCreated");
emittedEvents = null;
return false;
}

public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState) public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
Expand Down
14 changes: 6 additions & 8 deletions src/EventStore/EventStore.Projections.Core/Prelude/1Prelude.js
Expand Up @@ -71,17 +71,15 @@ function scope($on, $notify) {
for (var name in handlers) { for (var name in handlers) {
if (name == 0 || name === "$init") { if (name == 0 || name === "$init") {
eventProcessor.on_init_state(handlers[name]); eventProcessor.on_init_state(handlers[name]);
} } else if (name === "$initShared") {
else if (name === "$initShared") {
eventProcessor.on_init_shared_state(handlers[name]); eventProcessor.on_init_shared_state(handlers[name]);
} } else if (name === "$any") {
else if (name === "$any") {
eventProcessor.on_any(handlers[name]); eventProcessor.on_any(handlers[name]);
} } else if (name === "$deleted") {
else if (name === "$deleted") {
eventProcessor.on_deleted_notification(handlers[name]); eventProcessor.on_deleted_notification(handlers[name]);
} } else if (name === "$created") {
else { eventProcessor.on_created_notification(handlers[name]);
} else {
eventProcessor.on_event(name, handlers[name]); eventProcessor.on_event(name, handlers[name]);
} }
} }
Expand Down
41 changes: 41 additions & 0 deletions src/EventStore/EventStore.Projections.Core/Prelude/Projections.js
Expand Up @@ -34,6 +34,7 @@ var $projections = {
var eventHandlers = {}; var eventHandlers = {};
var anyEventHandlers = []; var anyEventHandlers = [];
var deletedNotificationHandlers = []; var deletedNotificationHandlers = [];
var createdNotificationHandlers = [];
var rawEventHandlers = []; var rawEventHandlers = [];
var transformers = []; var transformers = [];
var getStatePartitionHandler = function () { var getStatePartitionHandler = function () {
Expand Down Expand Up @@ -142,6 +143,13 @@ var $projections = {
} }
}, },


process_created_notification: function (event, isJson, streamId, eventType, category, sequenceNumber, metadata, linkMetadata, partition) {
processCreatedNotification(event, isJson, streamId, eventType, category, sequenceNumber, metadata, linkMetadata, partition);
var stateJson;
stateJson = JSON.stringify(projectionState);
return stateJson;
},

transform_state_to_result: function () { transform_state_to_result: function () {
var result = projectionState; var result = projectionState;
for (var i = 0; i < transformers.length; i++) { for (var i = 0; i < transformers.length; i++) {
Expand Down Expand Up @@ -213,6 +221,12 @@ var $projections = {
sources.options.definesFold = true; sources.options.definesFold = true;
} }


function on_created_notification(eventHandler) {
createdNotificationHandlers.push(eventHandler);
sources.options.handlesCreatedNotifications = true;
sources.options.definesFold = true;
}

function on_raw(eventHandler) { function on_raw(eventHandler) {
runDefaultHandler = false; runDefaultHandler = false;
sources.allEvents = true; sources.allEvents = true;
Expand Down Expand Up @@ -388,6 +402,32 @@ var $projections = {
} }
} }


function processCreatedNotification(eventRaw, isJson, streamId, eventType, category, sequenceNumber, metadataRaw, linkMetadataRaw, partition, streamMetadataRaw) {

var eventHandler;
var state = !sources.options.biState ? projectionState : [projectionState, projectionSharedState];

var index;

var eventEnvelope = new envelope(null, eventRaw, eventType, streamId, sequenceNumber, metadataRaw, linkMetadataRaw, partition, streamMetadataRaw);

if (isJson) {
tryDeserializeBody(eventEnvelope);
}

for (index = 0; index < createdNotificationHandlers.length; index++) {
eventHandler = createdNotificationHandlers[index];
state = callHandler(eventHandler, state, eventEnvelope);
}

if (!sources.options.biState) {
projectionState = state;
} else {
projectionState = state[0];
projectionSharedState = state[1];
}
}

function fromStream(sourceStream) { function fromStream(sourceStream) {
sources.streams.push(sourceStream); sources.streams.push(sourceStream);
} }
Expand Down Expand Up @@ -460,6 +500,7 @@ var $projections = {
on_any: on_any, on_any: on_any,
on_raw: on_raw, on_raw: on_raw,
on_deleted_notification: on_deleted_notification, on_deleted_notification: on_deleted_notification,
on_created_notification: on_created_notification,


fromAll: fromAll, fromAll: fromAll,
fromCategory: fromCategory, fromCategory: fromCategory,
Expand Down
Expand Up @@ -70,6 +70,16 @@ public interface IProjectionStateHandler : IDisposable, ISourceDefinitionSource
string partition, CheckpointTag eventPosition, string category, ResolvedEvent data, out string newState, string partition, CheckpointTag eventPosition, string category, ResolvedEvent data, out string newState,
out string newSharedState, out EmittedEventEnvelope[] emittedEvents); out string newSharedState, out EmittedEventEnvelope[] emittedEvents);


/// <summary>
/// Processes partition created notificatiion and updates internal state if necessary.
/// </summary>
/// <param name="partition"></param>
/// <param name="createPosition"></param>
/// <param name="emittedEvents"></param>
/// <returns>true - if notification was processed (new state must be returned)</returns>
bool ProcessPartitionCreated(
string partition, CheckpointTag createPosition, out EmittedEventEnvelope[] emittedEvents);

/// <summary> /// <summary>
/// Processes partition deleted notification and updates internal state if necessary. /// Processes partition deleted notification and updates internal state if necessary.
/// </summary> /// </summary>
Expand Down
Expand Up @@ -27,6 +27,7 @@
// //
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Linq;
using EventStore.Common.Log; using EventStore.Common.Log;
using EventStore.Core.Bus; using EventStore.Core.Bus;
using EventStore.Projections.Core.Messages; using EventStore.Projections.Core.Messages;
Expand Down Expand Up @@ -310,8 +311,15 @@ private string GetHandlerTypeName()
out string newSharedState, out string projectionResult, out EmittedEventEnvelope[] emittedEvents) out string newSharedState, out string projectionResult, out EmittedEventEnvelope[] emittedEvents)
{ {
projectionResult = null; projectionResult = null;
SetHandlerState(partition); var newPatitionInitialized = InitOrLoadHandlerState(partition);
_stopwatch.Start(); _stopwatch.Start();
EmittedEventEnvelope[] eventsEmittedOnInitialization = null;
if (newPatitionInitialized)
{
_projectionStateHandler.ProcessPartitionCreated(
partition, message.CheckpointTag, out eventsEmittedOnInitialization);
}

var result = _projectionStateHandler.ProcessEvent( var result = _projectionStateHandler.ProcessEvent(
partition, message.CheckpointTag, message.EventCategory, message.Data, out newState, out newSharedState, partition, message.CheckpointTag, message.EventCategory, message.Data, out newState, out newSharedState,
out emittedEvents); out emittedEvents);
Expand All @@ -336,6 +344,14 @@ private string GetHandlerTypeName()
} }
} }
_stopwatch.Stop(); _stopwatch.Stop();
if (eventsEmittedOnInitialization != null)
{
if (emittedEvents == null || emittedEvents.Length == 0)
emittedEvents = eventsEmittedOnInitialization;
else
emittedEvents = eventsEmittedOnInitialization.Concat(emittedEvents).ToArray();

}
return result; return result;
} }


Expand All @@ -344,7 +360,7 @@ private string GetHandlerTypeName()
out string projectionResult) out string projectionResult)
{ {
projectionResult = null; projectionResult = null;
SetHandlerState(partition); InitOrLoadHandlerState(partition);
_stopwatch.Start(); _stopwatch.Start();
var result = _projectionStateHandler.ProcessPartitionDeleted( var result = _projectionStateHandler.ProcessPartitionDeleted(
partition, deletePosition, out newState); partition, deletePosition, out newState);
Expand Down Expand Up @@ -380,17 +396,24 @@ private string TransformCatalogEventByHandler(EventReaderSubscriptionMessage.Com
return result; return result;
} }


private void SetHandlerState(string partition) /// <summary>
/// initializes or loads existing partition state
/// </summary>
/// <param name="partition"></param>
/// <returns>true - if new partition state was initialized</returns>
private bool InitOrLoadHandlerState(string partition)
{ {
if (_handlerPartition == partition) if (_handlerPartition == partition)
return; return false;


var newState = _partitionStateCache.GetLockedPartitionState(partition); var newState = _partitionStateCache.GetLockedPartitionState(partition);
_handlerPartition = partition; _handlerPartition = partition;
var initialized = false;
if (newState != null && !String.IsNullOrEmpty(newState.State)) if (newState != null && !String.IsNullOrEmpty(newState.State))
_projectionStateHandler.Load(newState.State); _projectionStateHandler.Load(newState.State);
else else
{ {
initialized = true;
_projectionStateHandler.Initialize(); _projectionStateHandler.Initialize();
} }


Expand All @@ -403,6 +426,7 @@ private void SetHandlerState(string partition)
else else
_projectionStateHandler.InitializeShared(); _projectionStateHandler.InitializeShared();
} }
return initialized;
} }


public override void NewCheckpointStarted(CheckpointTag at) public override void NewCheckpointStarted(CheckpointTag at)
Expand Down
Expand Up @@ -219,6 +219,19 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
return true; return true;
} }


public bool ProcessPartitionCreated(
string partition, CheckpointTag createPosition, out EmittedEventEnvelope[] emittedEvents)
{
CheckDisposed();
_eventPosition = createPosition;
_emittedEvents = null;
var newStates = _query.NotifyCreated(
"", // trimming data passed to a JS
new[] {partition, "" /* isSoftDedleted */});
emittedEvents = _emittedEvents == null ? null : _emittedEvents.ToArray();
return true;
}

public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState) public bool ProcessPartitionDeleted(string partition, CheckpointTag deletePosition, out string newState)
{ {
CheckDisposed(); CheckDisposed();
Expand Down

0 comments on commit 4c8fd15

Please sign in to comment.