Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bycorrid #1622

Merged
merged 6 commits into from Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,338 @@
using System;
using System.Text;
using EventStore.Core.Data;
using EventStore.Projections.Core.Services.Processing;
using EventStore.Projections.Core.Standard;
using Newtonsoft.Json.Linq;
using NUnit.Framework;
using ResolvedEvent = EventStore.Projections.Core.Services.Processing.ResolvedEvent;

namespace EventStore.Projections.Core.Tests.Services.handlers
{
public static class categorize_events_by_correlation_id
{
[TestFixture]
public class when_handling_simple_event
{
private ByCorrelationId _handler;
private string _state;
private EmittedEventEnvelope[] _emittedEvents;
private bool _result;
private DateTime _dateTime;

[SetUp]
public void when()
{
_handler = new ByCorrelationId("", Console.WriteLine);
_handler.Initialize();
_dateTime = DateTime.UtcNow;
var dataBytes = Encoding.ASCII.GetBytes("{}");
var metadataBytes = Encoding.ASCII.GetBytes("{\"$correlationId\":\"testing1\"}");

string sharedState;
_result = _handler.ProcessEvent(
"", CheckpointTag.FromPosition(0, 200, 150), null,
new ResolvedEvent(
"cat1-stream1",10, "cat1-stream1", 10, false, new TFPos(200, 150), new TFPos(200,150), Guid.NewGuid(),
"event_type", true, dataBytes, metadataBytes ,null,null,_dateTime), out _state, out sharedState, out _emittedEvents);
}

[Test]
public void result_is_true()
{
Assert.IsTrue(_result);
}

[Test]
public void state_stays_null()
{
Assert.IsNull(_state);
}

[Test]
public void emits_correct_link()
{
Assert.NotNull(_emittedEvents);
Assert.AreEqual(1, _emittedEvents.Length);
var @event = _emittedEvents[0].Event;
Assert.AreEqual("$>", @event.EventType);
Assert.AreEqual("$bc-testing1", @event.StreamId);
Assert.AreEqual("10@cat1-stream1", @event.Data);

string eventTimestampJson = null;
var extraMetadata = @event.ExtraMetaData();
foreach(var kvp in extraMetadata){
if(kvp.Key.Equals("$eventTimestamp")){
eventTimestampJson = kvp.Value;
}
}
Assert.NotNull(eventTimestampJson);
Assert.AreEqual("\""+_dateTime.ToString("yyyy-MM-ddTHH:mm:ss.ffffffZ")+"\"", eventTimestampJson);
}

}

[TestFixture]
public class when_handling_link_to_event
{
private ByCorrelationId _handler;
private string _state;
private EmittedEventEnvelope[] _emittedEvents;
private bool _result;
private DateTime _dateTime;
private Guid _eventId;

[SetUp]
public void when()
{
_handler = new ByCorrelationId("", Console.WriteLine);
_handler.Initialize();
_dateTime = DateTime.UtcNow;
string sharedState;

var dataBytes = Encoding.ASCII.GetBytes("10@cat1-stream1");
var metadataBytes = Encoding.ASCII.GetBytes("{\"$correlationId\":\"testing2\", \"$whatever\":\"hello\"}");
var myEvent = new ResolvedEvent("cat2-stream2", 20, "cat2-stream2", 20, true, new TFPos(200, 150), new TFPos(200,150), Guid.NewGuid(),"$>", true, dataBytes, metadataBytes,null,null,_dateTime);

_eventId = myEvent.EventId;
_result = _handler.ProcessEvent(
"", CheckpointTag.FromPosition(0, 200, 150), null,
myEvent, out _state, out sharedState, out _emittedEvents);
}

[Test]
public void result_is_true()
{
Assert.IsTrue(_result);
}

[Test]
public void state_stays_null()
{
Assert.IsNull(_state);
}

[Test]
public void emits_correct_link()
{
Assert.NotNull(_emittedEvents);
Assert.AreEqual(1, _emittedEvents.Length);
var @event = _emittedEvents[0].Event;
Assert.AreEqual("$>", @event.EventType);
Assert.AreEqual("$bc-testing2", @event.StreamId);
Assert.AreEqual("10@cat1-stream1", @event.Data);

string eventTimestampJson = null;
string linkJson = null;
var extraMetadata = @event.ExtraMetaData();
foreach(var kvp in extraMetadata){
switch(kvp.Key){
case "$eventTimestamp":
eventTimestampJson = kvp.Value;
break;
case "$link":
linkJson = kvp.Value;
break;
}
}
Assert.NotNull(eventTimestampJson);
Assert.AreEqual("\""+_dateTime.ToString("yyyy-MM-ddTHH:mm:ss.ffffffZ")+"\"", eventTimestampJson);

//the link's metadata should be copied to $link.metadata and id to $link.eventId
Assert.NotNull(linkJson);
var link = JObject.Parse(linkJson);
Assert.AreEqual(link.GetValue("eventId").ToObject<string>(), _eventId.ToString());

var linkMetadata = (JObject)link.GetValue("metadata");
Assert.AreEqual(linkMetadata.GetValue("$correlationId").ToObject<string>(), "testing2");
Assert.AreEqual(linkMetadata.GetValue("$whatever").ToObject<string>(), "hello");
}
}

[TestFixture]
public class when_handling_non_json_event
{
private ByCorrelationId _handler;
private string _state;
private EmittedEventEnvelope[] _emittedEvents;
private bool _result;

[SetUp]
public void when()
{
_handler = new ByCorrelationId("", Console.WriteLine);
_handler.Initialize();
string sharedState;
_result = _handler.ProcessEvent(
"", CheckpointTag.FromPosition(0, 200, 150), null,
new ResolvedEvent(
"cat1-stream1", 10, "cat1-stream1", 10, false, new TFPos(200, 150), Guid.NewGuid(),
"event_type", false, "non_json_data", "non_json_metadata"), out _state, out sharedState, out _emittedEvents);
}

[Test]
public void result_is_false()
{
Assert.IsFalse(_result);
}

[Test]
public void state_stays_null()
{
Assert.IsNull(_state);
}

[Test]
public void does_not_emit_link()
{
Assert.IsNull(_emittedEvents);
}
}

[TestFixture]
public class when_handling_json_event_with_no_correlation_id
{
private ByCorrelationId _handler;
private string _state;
private EmittedEventEnvelope[] _emittedEvents;
private bool _result;

[SetUp]
public void when()
{
_handler = new ByCorrelationId("", Console.WriteLine);
_handler.Initialize();
string sharedState;
_result = _handler.ProcessEvent(
"", CheckpointTag.FromPosition(0, 200, 150), null,
new ResolvedEvent(
"cat1-stream1", 10, "cat1-stream1", 10, false, new TFPos(200, 150), Guid.NewGuid(),
"event_type", true, "{}", "{}"), out _state, out sharedState, out _emittedEvents);
}

[Test]
public void result_is_false()
{
Assert.IsFalse(_result);
}

[Test]
public void state_stays_null()
{
Assert.IsNull(_state);
}

[Test]
public void does_not_emit_link()
{
Assert.IsNull(_emittedEvents);
}
}

[TestFixture]
public class when_handling_json_event_with_non_json_metadata
{
private ByCorrelationId _handler;
private string _state;
private EmittedEventEnvelope[] _emittedEvents;
private bool _result;

[SetUp]
public void when()
{
_handler = new ByCorrelationId("", Console.WriteLine);
_handler.Initialize();
string sharedState;
_result = _handler.ProcessEvent(
"", CheckpointTag.FromPosition(0, 200, 150), null,
new ResolvedEvent(
"cat1-stream1", 10, "cat1-stream1", 10, false, new TFPos(200, 150), Guid.NewGuid(),
"event_type", true, "{}", "non_json_metadata"), out _state, out sharedState, out _emittedEvents);
}

[Test]
public void result_is_false()
{
Assert.IsFalse(_result);
}

[Test]
public void state_stays_null()
{
Assert.IsNull(_state);
}

[Test]
public void does_not_emit_link()
{
Assert.IsNull(_emittedEvents);
}
}

[TestFixture]
public class with_custom_valid_correlation_id_property
{
private ByCorrelationId _handler;
private string _state;
private EmittedEventEnvelope[] _emittedEvents;
private bool _result;
private string source = "{\"correlationIdProperty\":\"$myCorrelationId\"}";

[SetUp]
public void when()
{
_handler = new ByCorrelationId(source, Console.WriteLine);
_handler.Initialize();
string sharedState;
_result = _handler.ProcessEvent(
"", CheckpointTag.FromPosition(0, 200, 150), null,
new ResolvedEvent(
"cat1-stream1", 10, "cat1-stream1", 10, false, new TFPos(200, 150), Guid.NewGuid(),
"event_type", true, "{}", "{\"$myCorrelationId\":\"testing1\"}"), out _state, out sharedState, out _emittedEvents);
}

[Test]
public void result_is_true()
{
Assert.IsTrue(_result);
}

[Test]
public void state_stays_null()
{
Assert.IsNull(_state);
}

[Test]
public void emits_correct_link()
{
Assert.NotNull(_emittedEvents);
Assert.AreEqual(1, _emittedEvents.Length);
var @event = _emittedEvents[0].Event;
Assert.AreEqual("$>", @event.EventType);
Assert.AreEqual("$bc-testing1", @event.StreamId);
Assert.AreEqual("10@cat1-stream1", @event.Data);
}

}

[TestFixture]
public class with_custom_invalid_correlation_id_property
{
private string source = "{\"thisisnotvalid\":\"$myCorrelationId\"}";

[SetUp]
public void when()
{
}

[Test]
public void should_throw_invalid_operation_exception()
{
Assert.Throws<InvalidOperationException>(()=>{
new ByCorrelationId(source, Console.WriteLine);
});
}
}
}
}
Expand Up @@ -27,7 +27,7 @@ public void system_projections_are_registered()
{
var statistics = HandledMessages.OfType<ProjectionManagementMessage.Statistics>().LastOrDefault();
Assert.NotNull(statistics);
Assert.AreEqual(4, statistics.Projections.Length);
Assert.AreEqual(5, statistics.Projections.Length);
}

[Test]
Expand Down
3 changes: 2 additions & 1 deletion src/EventStore.Projections.Core/ProjectionsSubsystem.cs
Expand Up @@ -102,7 +102,8 @@ public void Stop()
"$by_category",
"$stream_by_category",
"$streams",
"$by_event_type"
"$by_event_type",
"$by_correlation_id"
};

public void Handle(CoreProjectionStatusMessage.Stopped message)
Expand Down