Skip to content

Commit

Permalink
Create system projection if it does not exist
Browse files Browse the repository at this point in the history
Add $by_correlation_id to _standardProjections list
Metadata parsing bugfixes
Fix test
  • Loading branch information
shaan1337 committed May 28, 2018
1 parent 1f56416 commit 450cb45
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 7 deletions.
Expand Up @@ -27,7 +27,7 @@ public void system_projections_are_registered()
{
var statistics = HandledMessages.OfType<ProjectionManagementMessage.Statistics>().LastOrDefault();
Assert.NotNull(statistics);
Assert.AreEqual(4, statistics.Projections.Length);
Assert.AreEqual(5, statistics.Projections.Length);
}

[Test]
Expand Down
3 changes: 2 additions & 1 deletion src/EventStore.Projections.Core/ProjectionsSubsystem.cs
Expand Up @@ -99,7 +99,8 @@ public void Stop()
"$by_category",
"$stream_by_category",
"$streams",
"$by_event_type"
"$by_event_type",
"$by_correlation_id"
};

public void Handle(CoreProjectionStatusMessage.Stopped message)
Expand Down
Expand Up @@ -807,6 +807,10 @@ private void StartRegisteredProjections(IDictionary<string, long> registeredProj
String.Join(", ", registeredProjections
.Where(x => x.Key != ProjectionEventTypes.ProjectionsInitialized)
.Select(x => x.Key)));

//create any missing system projections
CreateSystemProjections(registeredProjections.Select(x => x.Key).ToList());

foreach (var projectionRegistration in registeredProjections.Where(x => x.Key != ProjectionEventTypes.ProjectionsInitialized))
{
int queueIndex = GetNextWorkerIndex();
Expand Down Expand Up @@ -862,24 +866,37 @@ private void WriteProjectionsInitializedCompleted(ClientMessage.WriteEventsCompl

private void CreateSystemProjections()
{
CreateSystemProjections(new List<String>());
}

private void CreateSystemProjections(List<String> existingSystemProjections){
if (_initializeSystemProjections)
{
if(!existingSystemProjections.Contains(ProjectionNamesBuilder.StandardProjections.StreamsStandardProjection))
CreateSystemProjection(
ProjectionNamesBuilder.StandardProjections.StreamsStandardProjection,
typeof(IndexStreams),
"");

if(!existingSystemProjections.Contains(ProjectionNamesBuilder.StandardProjections.StreamByCategoryStandardProjection))
CreateSystemProjection(
ProjectionNamesBuilder.StandardProjections.StreamByCategoryStandardProjection,
typeof(CategorizeStreamByPath),
"first\r\n-");

if(!existingSystemProjections.Contains(ProjectionNamesBuilder.StandardProjections.EventByCategoryStandardProjection))
CreateSystemProjection(
ProjectionNamesBuilder.StandardProjections.EventByCategoryStandardProjection,
typeof(CategorizeEventsByStreamPath),
"first\r\n-");

if(!existingSystemProjections.Contains(ProjectionNamesBuilder.StandardProjections.EventByTypeStandardProjection))
CreateSystemProjection(
ProjectionNamesBuilder.StandardProjections.EventByTypeStandardProjection,
typeof(IndexEventsByEventType),
"");

if(!existingSystemProjections.Contains(ProjectionNamesBuilder.StandardProjections.EventByCorrIdStandardProjection))
CreateSystemProjection(
ProjectionNamesBuilder.StandardProjections.EventByCorrIdStandardProjection,
typeof(ByCorrelationId),
Expand Down
19 changes: 14 additions & 5 deletions src/EventStore.Projections.Core/Standard/ByCorrelationId.cs
Expand Up @@ -69,13 +69,22 @@ public string TransformCatalogEvent(CheckpointTag eventPosition, ResolvedEvent d
newState = null;
if (data.EventStreamId != data.PositionStreamId)
return false;
if (!data.IsJson)
if(!data.IsJson)
return false;
var metadata = JObject.Parse(data.Metadata);
var indexedEventType = data.EventType;


JObject metadata = null;

try{
metadata = JObject.Parse(data.Metadata);
}
catch(JsonReaderException){
return false;
}

if(metadata["$correlationId"] == null)
return false;

string correlationId = metadata["$correlationId"].Value<string>();

if (correlationId == null)
return false;
string positionStreamId = data.PositionStreamId;
Expand Down

0 comments on commit 450cb45

Please sign in to comment.