Skip to content

Commit

Permalink
Add a failing concurrency test
Browse files Browse the repository at this point in the history
Add a test to AcceptanceTests.AppendStream that fails for postgres but passes for all other DBs. This is to reproduce a bug with concurrent appends in postgres, see: #478

Co-authored-by: Rasmus Larsson <stoft@users.noreply.github.com>
  • Loading branch information
2 people authored and Till committed Nov 26, 2020
1 parent ade40d2 commit dc18132
Showing 1 changed file with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Shouldly;
using SqlStreamStore.Streams;
using Xunit;
using System.Linq;

public partial class AcceptanceTests
{
Expand All @@ -28,7 +29,7 @@ public async Task

[Fact, Trait("Category", "AppendStream")]
public async Task
When_append_stream_second_time_with_no_stream_expected_and_same_messages_then_should_then_should_be_idempotent()
When_append_stream_second_time_with_no_stream_expected_and_same_messages_then_should_be_idempotent()
{
// Idempotency
const string streamId = "stream-1";
Expand All @@ -42,7 +43,7 @@ await Store
}

[Fact, Trait("Category", "AppendStream")]
public async Task When_append_stream_second_time_with_no_stream_expected_and_same_messages_then_should_then_should_have_expected_result()
public async Task When_append_stream_second_time_with_no_stream_expected_and_same_messages_then_should_have_expected_result()
{
// Idempotency
const string streamId = "stream-1";
Expand Down Expand Up @@ -665,7 +666,7 @@ await Store
exception.ShouldBeOfType<WrongExpectedVersionException>(
ErrorMessages.AppendFailedWrongExpectedVersion(streamId, 10));
}

[Theory, Trait("Category", "AppendStream")]
[InlineData("stream/id")]
[InlineData("stream%id")]
Expand All @@ -676,7 +677,7 @@ public async Task When_append_to_stream_with_url_encodable_characters_and_expect
result.CurrentVersion.ShouldBe(2);
result.CurrentPosition.ShouldBeGreaterThanOrEqualTo(Fixture.MinPosition + 2L);
}

[Theory, Trait("Category", "AppendStream")]
[InlineData("stream/id")]
[InlineData("stream%id")]
Expand All @@ -699,7 +700,7 @@ public async Task When_append_to_stream_with_url_encodable_characters_and_expect
result.CurrentVersion.ShouldBe(2);
result.CurrentPosition.ShouldBeGreaterThanOrEqualTo(Fixture.MinPosition + 2L);
}

[Theory, Trait("Category", "AppendStream")]
[InlineData("stream/id")]
[InlineData("stream%id")]
Expand All @@ -713,40 +714,56 @@ public async Task When_append_to_stream_with_url_encodable_characters_and_expect
}

[Fact, Trait("Category", "AppendStream")]
public async Task When_append_stream_concurrently_with_no_stream_expected_and_same_messages_then_should_then_should_have_expected_result()
public async Task When_append_stream_concurrently_with_no_stream_expected_and_same_messages_then_should_have_expected_result()
{
// Idempotency
const string streamId = "stream-1";

var messages = CreateNewStreamMessages(1, 2);
var tasks = new List<Task<AppendResult>>();
for(var index = 0; index < 10; index++)
for (var index = 0; index < 10; index++)
{
tasks.Add(Store.AppendToStream(streamId, ExpectedVersion.NoStream, messages));
}

var results = await Task.WhenAll(tasks);

Assert.All(results, result => result.CurrentVersion.ShouldBe(1));
Assert.All(results, result => result.CurrentPosition.ShouldBe(results[0].CurrentPosition));
}

[Fact, Trait("Category", "AppendStream")]
public async Task When_append_to_different_streams_concurrently_with_no_stream_expected_and_same_messages_then_should_then_should_have_expected_result()
public async Task When_append_to_different_streams_concurrently_with_no_stream_expected_and_same_messages_then_should_have_expected_result()
{
// Idempotency
const string streamPrefix = "stream-";

var messages = CreateNewStreamMessages(1, 2);
var tasks = new List<Task<AppendResult>>();
for(var index = 0; index < 10; index++)
for (var index = 0; index < 10; index++)
{
tasks.Add(Store.AppendToStream(streamPrefix + index, ExpectedVersion.NoStream, messages));
}

var results = await Task.WhenAll(tasks);

Assert.All(results, result => result.CurrentVersion.ShouldBe(1));
}

[Fact, Trait("Category", "AppendStream")]
public async Task When_append_to_same_stream_concurrently_with_expected_version_any_and_different_messages_then_should_not_throw()
{
const string streamName = "stream";

var messages = CreateNewStreamMessages(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
var tasks = new List<Task<AppendResult>>();
foreach (var message in messages)
{
tasks.Add(Store.AppendToStream(streamName, ExpectedVersion.Any, new []{message}));
}

var results = await Task.WhenAll(tasks);
results.Select(r => r.CurrentVersion).Max().ShouldBe(9);
}
}
}

0 comments on commit dc18132

Please sign in to comment.