Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert events into MSSQL in deterministic order #20

Merged
merged 1 commit into from
Mar 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<Compile Include="..\SharedAssemblyInfo.cs">
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="EventStoreAcceptanceTests.ReadLongStreamInPages.cs" />
<Compile Include="ExploratoryTests.cs" />
<Compile Include="MsSqlEventStoreFixture.cs" />
<Compile Include="MsSqlEventStoreTests.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
namespace Cedar.EventStore
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
using Cedar.EventStore.Streams;
using Shouldly;
using Xunit;

public partial class EventStoreAcceptanceTests
{
[Fact]
public async Task Given_large_event_stream_can_be_read_back_in_pages()
{
using (var fixture = GetFixture())
{
using (var eventStore = await fixture.GetEventStore())
{
var eventsToWrite = CreateNewStreamEvents();

await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, eventsToWrite);


var readEvents = await new PagedEventStore(eventStore).GetAsync("stream-1");

readEvents.Count().ShouldBe(eventsToWrite.Length);
}
}
}

private static NewStreamEvent[] CreateNewStreamEvents()
{
var eventsToWrite = new List<NewStreamEvent>();
var largeStreamCount = 7500;
for (int i = 0; i < largeStreamCount; i++)
{
var envelope = new NewStreamEvent(Guid.NewGuid(), $"event{i}", "{}", $"{i}");

eventsToWrite.Add(envelope);
}

return eventsToWrite.ToArray();
}
}

public class PagedEventStore
{
private readonly IEventStore _eventStore;

public PagedEventStore(IEventStore eventStore)
{
_eventStore = eventStore;
}

public async Task<IEnumerable<StreamEvent>> GetAsync(string streamName)
{
var start = 0;
const int BatchSize = 500;

StreamEventsPage eventsPage;
var events = new List<StreamEvent>();

do
{
eventsPage = await _eventStore.ReadStreamForwards(streamName, start, BatchSize);

if (eventsPage.Status == PageReadStatus.StreamDeleted)
{
throw new Exception("Stream deleted");
}

if (eventsPage.Status == PageReadStatus.StreamNotFound)
{
throw new Exception("Stream not found");
}

events.AddRange(
eventsPage.Events);

start = eventsPage.NextStreamVersion;
}
while (!eventsPage.IsEndOfStream);

return events;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ INSERT INTO dbo.Events (StreamIdInternal, StreamVersion, Id, Created, [Type], Js
[Type],
JsonData,
JsonMetadata
FROM @newEvents;
FROM @newEvents
ORDER BY StreamVersion;

COMMIT TRANSACTION AppendStream;
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ BEGIN TRANSACTION AppendStream;
[Type],
JsonData,
JsonMetadata
FROM @newEvents;
FROM @newEvents
ORDER BY StreamVersion;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yreynhout looks simple enough fix :)

END
ELSE
BEGIN
Expand All @@ -38,6 +39,7 @@ BEGIN TRANSACTION AppendStream;
[Type],
JsonData,
JsonMetadata
FROM @newEvents;
FROM @newEvents
ORDER BY StreamVersion;
END
COMMIT TRANSACTION AppendStream;
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ BEGIN TRANSACTION CreateStream;
[Type],
JsonData,
JsonMetadata
FROM @newEvents;
FROM @newEvents
ORDER BY StreamVersion;

END;
SELECT @streamIdInternal;
Expand Down