Skip to content

Commit

Permalink
Merge pull request #1422 from EventStore/fix-multistreamreader-bug
Browse files Browse the repository at this point in the history
Fix bug in StreamEventReader / MultiStreamEventReader causing projections to get stuck
  • Loading branch information
pgermishuys committed Sep 15, 2017
2 parents c1774ab + 1e2fe3b commit 798febd
Show file tree
Hide file tree
Showing 7 changed files with 503 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@
<Compile Include="Services\projections_manager\managed_projection\when_updating_projection_config.cs" />
<Compile Include="Services\projections_manager\when_starting_the_projection_manager_with_duplicate_projection_created.cs" />
<Compile Include="Services\core_projection\when_killing_a_projection_and_an_event_is_received.cs" />
<Compile Include="Services\event_reader\stream_reader\when_handling_streams_with_deleted_events_and_starting_at_event_zero.cs" />
<Compile Include="Services\event_reader\stream_reader\when_handling_streams_with_deleted_events_and_starting_after_event_zero.cs" />
<Compile Include="Services\event_reader\multi_stream_reader\when_handling_streams_with_deleted_events_and_starting_at_event_zero.cs" />
<Compile Include="Services\event_reader\multi_stream_reader\when_handling_streams_with_deleted_events_and_starting_after_event_zero.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\EventStore.ClientAPI\EventStore.ClientAPI.csproj">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using System;
using System.Collections.Generic;
using System.Linq;
using EventStore.Core.Data;
using EventStore.Core.Messages;
using EventStore.Core.Services.AwakeReaderService;
using EventStore.Core.Services.TimerService;
using EventStore.Core.TransactionLog.LogRecords;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Services.Processing;
using EventStore.Projections.Core.Tests.Services.core_projection;
using NUnit.Framework;
using ResolvedEvent = EventStore.Core.Data.ResolvedEvent;

namespace EventStore.Projections.Core.Tests.Services.event_reader.multi_stream_reader
{
[TestFixture]
public class when_handling_streams_with_deleted_events_and_starting_after_event_zero : TestFixtureWithExistingEvents
{
private MultiStreamEventReader _edp;
private int _fromSequenceNumber;
private string[] _streamNames;
private Dictionary<string,long> _streamPositions;
private Guid _distibutionPointCorrelationId;

protected override void Given()
{
TicksAreHandledImmediately();
_fromSequenceNumber = 10;
_streamNames = new[] {"stream1","stream2"};
_streamPositions = new Dictionary<string, long> {{"stream1", _fromSequenceNumber}, {"stream2", 100}};
}

[SetUp]
public new void When()
{
_distibutionPointCorrelationId = Guid.NewGuid();

_edp = new MultiStreamEventReader(
_ioDispatcher, _bus, _distibutionPointCorrelationId, null, 0, _streamNames, _streamPositions, false,
new RealTimeProvider());

_edp.Resume();
}

private void HandleEvents(string stream, long[] eventNumbers){
string eventType = "event_type";
List<ResolvedEvent> events = new List<ResolvedEvent>();

foreach(long eventNumber in eventNumbers){
events.Add(
ResolvedEvent.ForUnresolvedEvent(
new EventRecord(
eventNumber, 50*(eventNumber+1), Guid.NewGuid(), Guid.NewGuid(), 50*(eventNumber+1), 0, stream, ExpectedVersion.Any, DateTime.UtcNow,
PrepareFlags.SingleWrite | PrepareFlags.TransactionBegin | PrepareFlags.TransactionEnd,
eventType, new byte[] {0}, new byte[] {0}
)
)
);
}

long start, end;
if(eventNumbers.Length > 0){
start = eventNumbers[0];
end = eventNumbers[eventNumbers.Length-1];
}
else{
start = _fromSequenceNumber;
end = _fromSequenceNumber;
}

var correlationId = _consumer.HandledMessages.OfType<ClientMessage.ReadStreamEventsForward>().Last(x => x.EventStreamId == stream).CorrelationId;

_edp.Handle(
new ClientMessage.ReadStreamEventsForwardCompleted(
correlationId, stream, start, 100, ReadStreamResult.Success,events.ToArray(), null, false, "", start+1, end, true, 200)
);
}

private void HandleEvents(string stream,long start,long end){
List<long> eventNumbers = new List<long>();
for(long i=start;i<=end;i++) eventNumbers.Add(i);
HandleEvents(stream,eventNumbers.ToArray());
}

[Test]
public void allows_first_event_to_be_equal_to_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber;

Assert.DoesNotThrow(() => {
HandleEvents(_streamNames[0],eventSequenceNumber,eventSequenceNumber);
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
}

[Test]
public void allows_first_event_to_be_greater_than_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber+5;

Assert.DoesNotThrow(() => {
HandleEvents(_streamNames[0],eventSequenceNumber,eventSequenceNumber);
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
}

[Test]
public void does_not_allow_first_event_to_be_less_than_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber-1;

Assert.Throws<InvalidOperationException>(() => {
HandleEvents(_streamNames[0],eventSequenceNumber,eventSequenceNumber);
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
}

[Test]
public void events_after_first_event_should_be_in_sequence()
{
Assert.Throws<InvalidOperationException>(() => {
//_fromSequenceNumber+2 has been omitted
HandleEvents(_streamNames[0],new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System;
using System.Collections.Generic;
using System.Linq;
using EventStore.Core.Data;
using EventStore.Core.Messages;
using EventStore.Core.Services.AwakeReaderService;
using EventStore.Core.Services.TimerService;
using EventStore.Core.TransactionLog.LogRecords;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Services.Processing;
using EventStore.Projections.Core.Tests.Services.core_projection;
using NUnit.Framework;
using ResolvedEvent = EventStore.Core.Data.ResolvedEvent;

namespace EventStore.Projections.Core.Tests.Services.event_reader.multi_stream_reader
{
[TestFixture]
public class when_handling_streams_with_deleted_events_and_starting_at_event_zero : TestFixtureWithExistingEvents
{
private MultiStreamEventReader _edp;
private int _fromSequenceNumber;
private string[] _streamNames;
private Dictionary<string,long> _streamPositions;
private Guid _distibutionPointCorrelationId;

protected override void Given()
{
TicksAreHandledImmediately();
_fromSequenceNumber = 0;
_streamNames = new[] {"stream1","stream2"};
_streamPositions = new Dictionary<string, long> {{"stream1", _fromSequenceNumber}, {"stream2", 100}};
}

[SetUp]
public new void When()
{
_distibutionPointCorrelationId = Guid.NewGuid();

_edp = new MultiStreamEventReader(
_ioDispatcher, _bus, _distibutionPointCorrelationId, null, 0, _streamNames, _streamPositions, false,
new RealTimeProvider());

_edp.Resume();
}

private void HandleEvents(string stream, long[] eventNumbers){
string eventType = "event_type";
List<ResolvedEvent> events = new List<ResolvedEvent>();

foreach(long eventNumber in eventNumbers){
events.Add(
ResolvedEvent.ForUnresolvedEvent(
new EventRecord(
eventNumber, 50*(eventNumber+1), Guid.NewGuid(), Guid.NewGuid(), 50*(eventNumber+1), 0, stream, ExpectedVersion.Any, DateTime.UtcNow,
PrepareFlags.SingleWrite | PrepareFlags.TransactionBegin | PrepareFlags.TransactionEnd,
eventType, new byte[] {0}, new byte[] {0}
)
)
);
}

long start, end;
if(eventNumbers.Length > 0){
start = eventNumbers[0];
end = eventNumbers[eventNumbers.Length-1];
}
else{
start = _fromSequenceNumber;
end = _fromSequenceNumber;
}

var correlationId = _consumer.HandledMessages.OfType<ClientMessage.ReadStreamEventsForward>().Last(x => x.EventStreamId == stream).CorrelationId;

_edp.Handle(
new ClientMessage.ReadStreamEventsForwardCompleted(
correlationId, stream, start, 100, ReadStreamResult.Success,events.ToArray(), null, false, "", start+1, end, true, 200)
);
}

private void HandleEvents(string stream,long start,long end){
List<long> eventNumbers = new List<long>();
for(long i=start;i<=end;i++) eventNumbers.Add(i);
HandleEvents(stream,eventNumbers.ToArray());
}

[Test]
public void allows_first_event_to_be_equal_to_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber;

Assert.DoesNotThrow(() => {
HandleEvents(_streamNames[0],eventSequenceNumber,eventSequenceNumber);
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
}

[Test]
public void allows_first_event_to_be_greater_than_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber+5;

Assert.DoesNotThrow(() => {
HandleEvents(_streamNames[0],eventSequenceNumber,eventSequenceNumber);
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
}

[Test]
public void events_after_first_event_should_be_in_sequence()
{
Assert.Throws<InvalidOperationException>(() => {
//_fromSequenceNumber+2 has been omitted
HandleEvents(_streamNames[0],new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System;
using System.Collections.Generic;
using System.Linq;
using EventStore.Core.Data;
using EventStore.Core.Messages;
using EventStore.Core.Services.AwakeReaderService;
using EventStore.Core.Services.TimerService;
using EventStore.Core.TransactionLog.LogRecords;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Services.Processing;
using EventStore.Projections.Core.Tests.Services.core_projection;
using NUnit.Framework;
using ResolvedEvent = EventStore.Core.Data.ResolvedEvent;

namespace EventStore.Projections.Core.Tests.Services.event_reader.stream_reader
{
[TestFixture]
public class when_handling_streams_with_deleted_events_and_starting_after_event_zero : TestFixtureWithExistingEvents
{
private StreamEventReader _edp;
private int _fromSequenceNumber;
const string _streamName = "stream";

protected override void Given()
{
TicksAreHandledImmediately();
_fromSequenceNumber = 10;
}

[SetUp]
public new void When()
{
_edp = new StreamEventReader(_bus, Guid.NewGuid(), null, _streamName, _fromSequenceNumber, new RealTimeProvider(), false,
produceStreamDeletes: false);
_edp.Resume();
}

private void HandleEvents(long[] eventNumbers){
string eventType = "event_type";
List<ResolvedEvent> events = new List<ResolvedEvent>();

foreach(long eventNumber in eventNumbers){
events.Add(
ResolvedEvent.ForUnresolvedEvent(
new EventRecord(
eventNumber, 50*(eventNumber+1), Guid.NewGuid(), Guid.NewGuid(), 50*(eventNumber+1), 0, _streamName, ExpectedVersion.Any, DateTime.UtcNow,
PrepareFlags.SingleWrite | PrepareFlags.TransactionBegin | PrepareFlags.TransactionEnd,
eventType, new byte[] {0}, new byte[] {0}
)
)
);
}

var correlationId = _consumer.HandledMessages.OfType<ClientMessage.ReadStreamEventsForward>().Last().CorrelationId;

long start, end;
if(eventNumbers.Length > 0){
start = eventNumbers[0];
end = eventNumbers[eventNumbers.Length-1];
}
else{
start = _fromSequenceNumber;
end = _fromSequenceNumber;
}

_edp.Handle(
new ClientMessage.ReadStreamEventsForwardCompleted(
correlationId, _streamName, start, 100, ReadStreamResult.Success,events.ToArray(), null, false, "", start+1, end, true, 200)
);
}

private void HandleEvents(long start,long end){
List<long> eventNumbers = new List<long>();
for(long i=start;i<=end;i++) eventNumbers.Add(i);
HandleEvents(eventNumbers.ToArray());
}

[Test]
public void allows_first_event_to_be_equal_to_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber;

Assert.DoesNotThrow(() => {
HandleEvents(eventSequenceNumber,eventSequenceNumber);
});
}

[Test]
public void allows_first_event_to_be_greater_than_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber+5;

Assert.DoesNotThrow(() => {
HandleEvents(eventSequenceNumber,eventSequenceNumber);
});
}

[Test]
public void does_not_allow_first_event_to_be_less_than_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber-1;

Assert.Throws<InvalidOperationException>(() => {
HandleEvents(eventSequenceNumber,eventSequenceNumber);
});
}

[Test]
public void events_after_first_event_should_be_in_sequence()
{
Assert.Throws<InvalidOperationException>(() => {
//_fromSequenceNumber+2 has been omitted
HandleEvents(new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});
});
}
}
}
Loading

0 comments on commit 798febd

Please sign in to comment.