New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If a gap is detected in the allstream page sequence number, reissue query #39

Merged
merged 3 commits into from Sep 29, 2016
Jump to file or symbol
Failed to load files and symbols.
+45 −7
Diff settings

Always

Just for now

@@ -12,6 +12,7 @@ namespace SqlStreamStore.Infrastructure
public abstract class ReadonlyStreamStoreBase : IReadonlyStreamStore
{
private const int DefaultReloadInterval = 3000;
protected readonly GetUtcNow GetUtcNow;
protected readonly ILog Logger;
private bool _isDisposed;
@@ -46,8 +47,44 @@ public abstract class ReadonlyStreamStoreBase : IReadonlyStreamStore
"{maxCount}.", fromPositionInclusive, maxCount);
}
var page = await ReadAllForwardsInternal(fromPositionInclusive, maxCount, cancellationToken);
return await FilterExpired(page, cancellationToken);
var page = await ReadAllForwardsInternal(fromPositionInclusive, maxCount, cancellationToken)
.NotOnCapturedContext();
// https://github.com/damianh/SqlStreamStore/issues/31
// Under heavy parallel load, gaps may appear in the position sequence due to sequence
// number reservation of in-flight transactions.
// Here we check if there are any gaps, and in the unlikely event there is, we delay a little bit
// and re-issue the read. This is expected
if(!page.IsEnd || page.Messages.Length <= 1)
{
return await FilterExpired(page, cancellationToken).NotOnCapturedContext();
}
Func<CancellationToken, Task<AllMessagesPage>> reload = async ct =>
{
Logger.InfoFormat($"ReadAllForwards: gap detected in position, reloading after {DefaultReloadInterval}ms");
await Task.Delay(DefaultReloadInterval, cancellationToken);
var reloadedPage = await ReadAllForwardsInternal(fromPositionInclusive, maxCount, cancellationToken)
.NotOnCapturedContext();
return await FilterExpired(reloadedPage, cancellationToken).NotOnCapturedContext();
};
// Check for gap between last page and this.
if (page.Messages[0].Position != fromPositionInclusive)
{
return await reload(cancellationToken);
}
// check for gap in messages collection
for(int i = 0; i < page.Messages.Length - 1; i++)
{
if(page.Messages[i].Position + 1 != page.Messages[i + 1].Position)
{
return await reload(cancellationToken);
}
}
return await FilterExpired(page, cancellationToken).NotOnCapturedContext();
}
public async Task<AllMessagesPage> ReadAllBackwards(
@@ -5,19 +5,20 @@
public static class TaskExtensions
{
/// <summary>
/// ConfigureAwait(false)
/// </summary>
public static ConfiguredTaskAwaitable NotOnCapturedContext(this Task task)
{
return task.ConfigureAwait(false);
}
/// <summary>
/// ConfigureAwait(false)
/// </summary>
public static ConfiguredTaskAwaitable<T> NotOnCapturedContext<T>(this Task<T> task)
{
return task.ConfigureAwait(false);
}
public static void SwallowException(this Task task)
{
task.ContinueWith(_ => { });
}
}
}
ProTip! Use n and p to navigate between commits in a pull request.